package org.microbean.kubernetes.controller;

import io.fabric8.kubernetes.api.model.HasMetadata;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.Immutable;
import net.jcip.annotations.ThreadSafe;

@Immutable
@ThreadSafe
/* loaded from: input_file:org/microbean/kubernetes/controller/EventDistributor.class */
public final class EventDistributor<T extends HasMetadata> extends ResourceTrackingEventQueueConsumer<T> implements AutoCloseable {

    @GuardedBy("readLock && writeLock")
    private final Collection<Pump<T>> distributors;

    @GuardedBy("readLock && writeLock")
    private final Collection<Pump<T>> synchronizingDistributors;
    private final Duration synchronizationInterval;
    private final Lock readLock;
    private final Lock writeLock;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/microbean/kubernetes/controller/EventDistributor$Pump.class */
    public static final class Pump<T extends HasMetadata> implements Consumer<AbstractEvent<? extends T>>, AutoCloseable {
        private volatile boolean closing;
        private volatile Instant nextSynchronizationInstant;
        private volatile Duration synchronizationInterval;
        final BlockingQueue<AbstractEvent<? extends T>> queue;
        private final ScheduledExecutorService executor;
        private final Future<?> task;
        private final Consumer<? super AbstractEvent<? extends T>> eventConsumer;
        static final /* synthetic */ boolean $assertionsDisabled;

        private Pump(Duration duration, Consumer<? super AbstractEvent<? extends T>> consumer) {
            Objects.requireNonNull(consumer);
            this.eventConsumer = consumer;
            this.executor = createScheduledThreadPoolExecutor();
            if (this.executor == null) {
                throw new IllegalStateException("createScheduledThreadPoolExecutor() == null");
            }
            this.queue = new LinkedBlockingQueue();
            this.task = this.executor.scheduleWithFixedDelay(() -> {
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        this.eventConsumer.accept(this.queue.take());
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }, 0L, 1L, TimeUnit.SECONDS);
            setSynchronizationInterval(duration);
        }

        private final ScheduledExecutorService createScheduledThreadPoolExecutor() {
            ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
            scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
            return scheduledThreadPoolExecutor;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public final Consumer<? super AbstractEvent<? extends T>> getEventConsumer() {
            return this.eventConsumer;
        }

        @Override // java.util.function.Consumer
        public final void accept(AbstractEvent<? extends T> abstractEvent) {
            if (this.closing) {
                throw new IllegalStateException();
            }
            if (abstractEvent != null) {
                boolean add = this.queue.add(abstractEvent);
                if (!$assertionsDisabled && !add) {
                    throw new AssertionError();
                }
            }
        }

        @Override // java.lang.AutoCloseable
        public final void close() {
            this.closing = true;
            this.executor.shutdown();
            this.task.cancel(true);
            try {
                if (!this.executor.awaitTermination(60L, TimeUnit.SECONDS)) {
                    this.executor.shutdownNow();
                    if (!this.executor.awaitTermination(60L, TimeUnit.SECONDS)) {
                    }
                }
            } catch (InterruptedException e) {
                this.executor.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public final boolean shouldSynchronize(Instant instant) {
            boolean z;
            Duration synchronizationInterval = getSynchronizationInterval();
            if (synchronizationInterval == null || synchronizationInterval.isZero()) {
                z = false;
            } else if (instant == null) {
                z = Instant.now().compareTo(this.nextSynchronizationInstant) >= 0;
            } else {
                z = instant.compareTo(this.nextSynchronizationInstant) >= 0;
            }
            return z;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public final void determineNextSynchronizationInterval(Instant instant) {
            Duration synchronizationInterval = getSynchronizationInterval();
            if (synchronizationInterval == null) {
                if (instant == null) {
                    this.nextSynchronizationInstant = Instant.now();
                    return;
                } else {
                    this.nextSynchronizationInstant = instant;
                    return;
                }
            }
            if (instant == null) {
                this.nextSynchronizationInstant = Instant.now().plus((TemporalAmount) synchronizationInterval);
            } else {
                this.nextSynchronizationInstant = instant.plus((TemporalAmount) synchronizationInterval);
            }
        }

        public final void setSynchronizationInterval(Duration duration) {
            this.synchronizationInterval = duration;
        }

        public final Duration getSynchronizationInterval() {
            return this.synchronizationInterval;
        }

        static {
            $assertionsDisabled = !EventDistributor.class.desiredAssertionStatus();
        }
    }

    public EventDistributor(Map<Object, T> map) {
        this(map, null);
    }

    public EventDistributor(Map<Object, T> map, Duration duration) {
        super(map);
        ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
        this.readLock = reentrantReadWriteLock.readLock();
        this.writeLock = reentrantReadWriteLock.writeLock();
        this.distributors = new ArrayList();
        this.synchronizingDistributors = new ArrayList();
        this.synchronizationInterval = duration;
    }

    public final void addConsumer(Consumer<? super AbstractEvent<? extends T>> consumer) {
        if (consumer != null) {
            this.writeLock.lock();
            try {
                Pump<T> pump = new Pump<>(this.synchronizationInterval, consumer);
                this.distributors.add(pump);
                this.synchronizingDistributors.add(pump);
            } finally {
                this.writeLock.unlock();
            }
        }
    }

    public final void removeConsumer(Consumer<? super AbstractEvent<? extends T>> consumer) {
        if (consumer != null) {
            this.writeLock.lock();
            try {
                Iterator<Pump<T>> it = this.distributors.iterator();
                if (!$assertionsDisabled && it == null) {
                    throw new AssertionError();
                }
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    Pump<T> next = it.next();
                    if (next != null && consumer.equals(next.getEventConsumer())) {
                        it.remove();
                        break;
                    }
                }
            } finally {
                this.writeLock.unlock();
            }
        }
    }

    @Override // java.lang.AutoCloseable
    public final void close() {
        this.writeLock.lock();
        try {
            this.distributors.parallelStream().forEach(pump -> {
                pump.close();
            });
            this.synchronizingDistributors.clear();
            this.distributors.clear();
        } finally {
            this.writeLock.unlock();
        }
    }

    public final boolean shouldSynchronize() {
        this.writeLock.lock();
        try {
            this.synchronizingDistributors.clear();
            Instant now = Instant.now();
            this.distributors.parallelStream().filter(pump -> {
                return pump.shouldSynchronize(now);
            }).forEach(pump2 -> {
                this.synchronizingDistributors.add(pump2);
                pump2.determineNextSynchronizationInterval(now);
            });
            return !this.synchronizingDistributors.isEmpty();
        } finally {
            this.writeLock.unlock();
        }
    }

    @Override // org.microbean.kubernetes.controller.ResourceTrackingEventQueueConsumer
    protected final void accept(AbstractEvent<? extends T> abstractEvent) {
        if (abstractEvent != null) {
            if (abstractEvent instanceof SynchronizationEvent) {
                accept((SynchronizationEvent) abstractEvent);
            } else if (abstractEvent instanceof Event) {
                accept((Event) abstractEvent);
            } else if (!$assertionsDisabled) {
                throw new AssertionError("Unexpected event type: " + abstractEvent.getClass());
            }
        }
    }

    private final void accept(SynchronizationEvent<? extends T> synchronizationEvent) {
        this.readLock.lock();
        try {
            if (!this.synchronizingDistributors.isEmpty()) {
                this.synchronizingDistributors.parallelStream().forEach(pump -> {
                    pump.accept((AbstractEvent) synchronizationEvent);
                });
            }
        } finally {
            this.readLock.unlock();
        }
    }

    private final void accept(Event<? extends T> event) {
        this.readLock.lock();
        try {
            if (!this.distributors.isEmpty()) {
                this.distributors.parallelStream().forEach(pump -> {
                    pump.accept((AbstractEvent) event);
                });
            }
        } finally {
            this.readLock.unlock();
        }
    }

    static {
        $assertionsDisabled = !EventDistributor.class.desiredAssertionStatus();
    }
}
