package org.microbean.kubernetes.controller;

import io.fabric8.kubernetes.api.model.HasMetadata;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.Immutable;
import net.jcip.annotations.ThreadSafe;

@Immutable
@ThreadSafe
/* loaded from: input_file:org/microbean/kubernetes/controller/EventDistributor.class */
public final class EventDistributor<T extends HasMetadata> extends ResourceTrackingEventQueueConsumer<T> implements AutoCloseable {

    @GuardedBy("readLock && writeLock")
    private final Collection<Pump<T>> distributors;

    @GuardedBy("readLock && writeLock")
    private final Collection<Pump<T>> synchronizingDistributors;
    private final Duration synchronizationInterval;
    private final Lock readLock;
    private final Lock writeLock;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/microbean/kubernetes/controller/EventDistributor$Pump.class */
    public static final class Pump<T extends HasMetadata> implements Consumer<AbstractEvent<? extends T>>, AutoCloseable {
        private final Logger logger;
        private volatile boolean closing;
        private volatile Instant nextSynchronizationInstant;
        private volatile Duration synchronizationInterval;
        final BlockingQueue<AbstractEvent<? extends T>> queue;
        private final ScheduledExecutorService executor;
        private final Future<?> task;
        private volatile Future<?> errorHandlingTask;
        private final Consumer<? super AbstractEvent<? extends T>> eventConsumer;
        static final /* synthetic */ boolean $assertionsDisabled;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/microbean/kubernetes/controller/EventDistributor$Pump$PumpThreadFactory.class */
        public static final class PumpThreadFactory implements ThreadFactory {
            private final ThreadGroup group;
            private final AtomicInteger threadNumber;

            private PumpThreadFactory() {
                this.threadNumber = new AtomicInteger(1);
                SecurityManager securityManager = System.getSecurityManager();
                if (securityManager == null) {
                    this.group = Thread.currentThread().getThreadGroup();
                } else {
                    this.group = securityManager.getThreadGroup();
                }
            }

            @Override // java.util.concurrent.ThreadFactory
            public final Thread newThread(Runnable runnable) {
                Thread thread = new Thread(this.group, runnable, "event-pump-thread-" + this.threadNumber.getAndIncrement(), 0L);
                if (thread.isDaemon()) {
                    thread.setDaemon(false);
                }
                if (thread.getPriority() != 5) {
                    thread.setPriority(5);
                }
                return thread;
            }
        }

        private Pump(Duration duration, Consumer<? super AbstractEvent<? extends T>> consumer) {
            this(duration, consumer, null);
        }

        private Pump(Duration duration, Consumer<? super AbstractEvent<? extends T>> consumer, Function<? super Throwable, Boolean> function) {
            String name = getClass().getName();
            this.logger = Logger.getLogger(name);
            if (!$assertionsDisabled && this.logger == null) {
                throw new AssertionError();
            }
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.entering(name, "<init>", new Object[]{duration, consumer, function});
            }
            Objects.requireNonNull(consumer);
            this.eventConsumer = consumer;
            this.executor = createScheduledThreadPoolExecutor();
            if (this.executor == null) {
                throw new IllegalStateException("createScheduledThreadPoolExecutor() == null");
            }
            this.queue = new LinkedBlockingQueue();
            Function<? super Throwable, Boolean> function2 = function == null ? th -> {
                if (this.logger.isLoggable(Level.SEVERE)) {
                    this.logger.logp(Level.SEVERE, getClass().getName(), "<pumpTask>", th.getMessage(), th);
                }
                return true;
            } : function;
            Function<? super Throwable, Boolean> function3 = function2;
            this.task = this.executor.scheduleWithFixedDelay(() -> {
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        this.eventConsumer.accept(this.queue.take());
                    } catch (Error e) {
                        if (!((Boolean) function3.apply(e)).booleanValue()) {
                            throw e;
                        }
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                    } catch (RuntimeException e3) {
                        if (!((Boolean) function3.apply(e3)).booleanValue()) {
                            throw e3;
                        }
                    }
                }
            }, 0L, 1L, TimeUnit.SECONDS);
            if (!$assertionsDisabled && this.task == null) {
                throw new AssertionError();
            }
            Function<? super Throwable, Boolean> function4 = function2;
            this.errorHandlingTask = this.executor.submit(() -> {
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        this.task.get();
                    } catch (InterruptedException e) {
                        this.task.cancel(true);
                        Future<?> future = this.errorHandlingTask;
                        if (future != null) {
                            future.cancel(true);
                        }
                        Thread.currentThread().interrupt();
                        return;
                    } catch (CancellationException e2) {
                        this.task.cancel(true);
                        return;
                    } catch (ExecutionException e3) {
                        this.task.cancel(true);
                        Future<?> future2 = this.errorHandlingTask;
                        if (future2 != null) {
                            future2.cancel(true);
                        }
                        function4.apply(e3.getCause());
                        return;
                    }
                }
            });
            setSynchronizationInterval(duration);
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.exiting(name, "<init>");
            }
        }

        private final ScheduledExecutorService createScheduledThreadPoolExecutor() {
            String name = getClass().getName();
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.entering(name, "createScheduledThreadPoolExecutor");
            }
            ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2, new PumpThreadFactory());
            scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.exiting(name, "createScheduledThreadPoolExecutor", scheduledThreadPoolExecutor);
            }
            return scheduledThreadPoolExecutor;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public final Consumer<? super AbstractEvent<? extends T>> getEventConsumer() {
            return this.eventConsumer;
        }

        @Override // java.util.function.Consumer
        public final void accept(AbstractEvent<? extends T> abstractEvent) {
            String name = getClass().getName();
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.entering(name, "accept", abstractEvent);
            }
            if (this.closing) {
                throw new IllegalStateException();
            }
            if (abstractEvent != null) {
                boolean add = this.queue.add(abstractEvent);
                if (!$assertionsDisabled && !add) {
                    throw new AssertionError();
                }
            }
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.exiting(name, "accept");
            }
        }

        @Override // java.lang.AutoCloseable
        public final void close() {
            String name = getClass().getName();
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.entering(name, "close");
            }
            this.closing = true;
            this.executor.shutdown();
            this.task.cancel(true);
            this.errorHandlingTask.cancel(true);
            try {
                if (!this.executor.awaitTermination(60L, TimeUnit.SECONDS)) {
                    this.executor.shutdownNow();
                    if (!this.executor.awaitTermination(60L, TimeUnit.SECONDS)) {
                    }
                }
            } catch (InterruptedException e) {
                this.executor.shutdownNow();
                Thread.currentThread().interrupt();
            }
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.exiting(name, "close");
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public final boolean shouldSynchronize(Instant instant) {
            boolean z;
            String name = getClass().getName();
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.entering(name, "shouldSynchronize", instant);
            }
            Duration synchronizationInterval = getSynchronizationInterval();
            if (synchronizationInterval == null || synchronizationInterval.isZero()) {
                z = false;
            } else if (instant == null) {
                z = Instant.now().compareTo(this.nextSynchronizationInstant) >= 0;
            } else {
                z = instant.compareTo(this.nextSynchronizationInstant) >= 0;
            }
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.exiting(name, "shouldSynchronize", Boolean.valueOf(z));
            }
            return z;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public final void determineNextSynchronizationInterval(Instant instant) {
            String name = getClass().getName();
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.entering(name, "determineNextSynchronizationInterval", instant);
            }
            Duration synchronizationInterval = getSynchronizationInterval();
            if (synchronizationInterval == null) {
                if (instant == null) {
                    this.nextSynchronizationInstant = Instant.now();
                } else {
                    this.nextSynchronizationInstant = instant;
                }
            } else if (instant == null) {
                this.nextSynchronizationInstant = Instant.now().plus((TemporalAmount) synchronizationInterval);
            } else {
                this.nextSynchronizationInstant = instant.plus((TemporalAmount) synchronizationInterval);
            }
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.entering(name, "determineNextSynchronizationInterval");
            }
        }

        public final void setSynchronizationInterval(Duration duration) {
            this.synchronizationInterval = duration;
        }

        public final Duration getSynchronizationInterval() {
            return this.synchronizationInterval;
        }

        static {
            $assertionsDisabled = !EventDistributor.class.desiredAssertionStatus();
        }
    }

    public EventDistributor(Map<Object, T> map) {
        this(map, null);
    }

    public EventDistributor(Map<Object, T> map, Duration duration) {
        super(map);
        ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
        this.readLock = reentrantReadWriteLock.readLock();
        this.writeLock = reentrantReadWriteLock.writeLock();
        this.distributors = new ArrayList();
        this.synchronizingDistributors = new ArrayList();
        this.synchronizationInterval = duration;
    }

    public final void addConsumer(Consumer<? super AbstractEvent<? extends T>> consumer) {
        addConsumer(consumer, null);
    }

    public final void addConsumer(Consumer<? super AbstractEvent<? extends T>> consumer, Function<? super Throwable, Boolean> function) {
        if (consumer != null) {
            this.writeLock.lock();
            try {
                Pump<T> pump = new Pump<>(this.synchronizationInterval, consumer, function);
                this.distributors.add(pump);
                this.synchronizingDistributors.add(pump);
                this.writeLock.unlock();
            } catch (Throwable th) {
                this.writeLock.unlock();
                throw th;
            }
        }
    }

    public final void removeConsumer(Consumer<? super AbstractEvent<? extends T>> consumer) {
        if (consumer != null) {
            this.writeLock.lock();
            try {
                Iterator<Pump<T>> it = this.distributors.iterator();
                if (!$assertionsDisabled && it == null) {
                    throw new AssertionError();
                }
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    Pump<T> next = it.next();
                    if (next != null && consumer.equals(next.getEventConsumer())) {
                        it.remove();
                        break;
                    }
                }
            } finally {
                this.writeLock.unlock();
            }
        }
    }

    @Override // java.lang.AutoCloseable
    public final void close() {
        this.writeLock.lock();
        try {
            this.distributors.parallelStream().forEach(pump -> {
                pump.close();
            });
            this.synchronizingDistributors.clear();
            this.distributors.clear();
        } finally {
            this.writeLock.unlock();
        }
    }

    public final boolean shouldSynchronize() {
        this.writeLock.lock();
        try {
            this.synchronizingDistributors.clear();
            Instant now = Instant.now();
            this.distributors.parallelStream().filter(pump -> {
                return pump.shouldSynchronize(now);
            }).forEach(pump2 -> {
                this.synchronizingDistributors.add(pump2);
                pump2.determineNextSynchronizationInterval(now);
            });
            return !this.synchronizingDistributors.isEmpty();
        } finally {
            this.writeLock.unlock();
        }
    }

    @Override // org.microbean.kubernetes.controller.ResourceTrackingEventQueueConsumer
    protected final void accept(AbstractEvent<? extends T> abstractEvent) {
        if (abstractEvent != null) {
            if (abstractEvent instanceof SynchronizationEvent) {
                accept((SynchronizationEvent) abstractEvent);
            } else if (abstractEvent instanceof Event) {
                accept((Event) abstractEvent);
            } else if (!$assertionsDisabled) {
                throw new AssertionError("Unexpected event type: " + abstractEvent.getClass());
            }
        }
    }

    private final void accept(SynchronizationEvent<? extends T> synchronizationEvent) {
        this.readLock.lock();
        try {
            if (!this.synchronizingDistributors.isEmpty()) {
                this.synchronizingDistributors.parallelStream().forEach(pump -> {
                    pump.accept((AbstractEvent) synchronizationEvent);
                });
            }
        } finally {
            this.readLock.unlock();
        }
    }

    private final void accept(Event<? extends T> event) {
        this.readLock.lock();
        try {
            if (!this.distributors.isEmpty()) {
                this.distributors.parallelStream().forEach(pump -> {
                    pump.accept((AbstractEvent) event);
                });
            }
        } finally {
            this.readLock.unlock();
        }
    }

    static {
        $assertionsDisabled = !EventDistributor.class.desiredAssertionStatus();
    }
}
