package io.pravega.common.concurrent;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.AbstractTimer;
import io.pravega.common.Exceptions;
import io.pravega.common.Timer;
import io.pravega.common.concurrent.DelayedProcessor.Item;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;
import lombok.Generated;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/common/concurrent/DelayedProcessor.class */
public class DelayedProcessor<T extends Item> implements AutoCloseable {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(DelayedProcessor.class);
    private final Function<T, CompletableFuture<Void>> itemProcessor;
    private final Duration itemDelay;
    private final ScheduledExecutorService executor;
    private volatile CompletableFuture<Void> currentIterationDelayTask;
    private final String traceObjectId;
    private final AtomicBoolean closed = new AtomicBoolean(false);

    @VisibleForTesting
    private final AbstractTimer timer = new Timer();
    private final DelayedProcessor<T>.DelayedQueue queue = new DelayedQueue();
    private final CompletableFuture<Void> runTask = start();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/common/concurrent/DelayedProcessor$DelayedQueue.class */
    public class DelayedQueue {

        @GuardedBy("this")
        private final Set<String> keys;

        @GuardedBy("this")
        private final Deque<DelayedProcessor<T>.QueueItem> queue;

        private DelayedQueue() {
            this.keys = new HashSet();
            this.queue = new ArrayDeque();
        }

        synchronized void clear() {
            this.keys.clear();
            this.queue.clear();
        }

        synchronized int size() {
            return this.queue.size();
        }

        synchronized void add(T t) {
            if (this.keys.add(t.key())) {
                this.queue.add(new QueueItem(t));
            }
        }

        synchronized void remove(String str) {
            this.keys.remove(str);
            this.queue.removeIf(queueItem -> {
                return queueItem.getWrappedItem().key().equals(str);
            });
        }

        synchronized DelayedProcessor<T>.QueueItem peekFirst() {
            return this.queue.peekFirst();
        }

        /* JADX WARN: Type inference failed for: r1v2, types: [io.pravega.common.concurrent.DelayedProcessor$Item] */
        synchronized void removeFirstIf(DelayedProcessor<T>.QueueItem queueItem) {
            DelayedProcessor<T>.QueueItem pollFirst = this.queue.pollFirst();
            if (pollFirst != queueItem) {
                this.queue.addFirst(pollFirst);
            } else if (pollFirst != null) {
                this.keys.remove(pollFirst.getWrappedItem().key());
            }
        }
    }

    /* loaded from: input_file:io/pravega/common/concurrent/DelayedProcessor$Item.class */
    public interface Item {
        String key();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/common/concurrent/DelayedProcessor$QueueItem.class */
    public class QueueItem {
        private final T wrappedItem;
        private final long addedTime;
        private final long expirationTime;

        QueueItem(T t) {
            this.wrappedItem = t;
            this.addedTime = DelayedProcessor.this.getTimer().getElapsedMillis();
            this.expirationTime = this.addedTime + DelayedProcessor.this.itemDelay.toMillis();
        }

        long getRemainingMillis() {
            return Math.max(0L, this.expirationTime - DelayedProcessor.this.getTimer().getElapsedMillis());
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public T getWrappedItem() {
            return this.wrappedItem;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public long getAddedTime() {
            return this.addedTime;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public long getExpirationTime() {
            return this.expirationTime;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String toString() {
            return "DelayedProcessor.QueueItem(wrappedItem=" + getWrappedItem() + ", addedTime=" + getAddedTime() + ", expirationTime=" + getExpirationTime() + ")";
        }
    }

    public DelayedProcessor(Function<T, CompletableFuture<Void>> function, Duration duration, ScheduledExecutorService scheduledExecutorService, String str) {
        this.itemProcessor = function;
        this.itemDelay = duration;
        this.executor = scheduledExecutorService;
        this.traceObjectId = str;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.closed.getAndSet(true)) {
            return;
        }
        this.runTask.cancel(true);
        CompletableFuture<Void> completableFuture = this.currentIterationDelayTask;
        if (completableFuture != null) {
            completableFuture.cancel(true);
        }
        this.queue.clear();
        log.info("{}: Closed.", this.traceObjectId);
    }

    public void process(@NonNull T t) {
        if (t == null) {
            throw new NullPointerException("item is marked non-null but is null");
        }
        Exceptions.checkNotClosed(this.closed.get(), this);
        this.queue.add(t);
    }

    public void cancel(@NonNull String str) {
        if (str == null) {
            throw new NullPointerException("key is marked non-null but is null");
        }
        Exceptions.checkNotClosed(this.closed.get(), this);
        this.queue.remove(str);
    }

    @VisibleForTesting
    int size() {
        return this.queue.size();
    }

    private CompletableFuture<Void> start() {
        log.info("{}: Started. Iteration Delay = {} ms.", this.traceObjectId, Long.valueOf(this.itemDelay.toMillis()));
        return Futures.loop((Supplier<Boolean>) () -> {
            return Boolean.valueOf(!this.closed.get());
        }, (Supplier<CompletableFuture<Void>>) () -> {
            return delay().thenComposeAsync(r3 -> {
                return runOneIteration();
            }, (Executor) this.executor);
        }, this.executor);
    }

    private CompletableFuture<Void> runOneIteration() {
        if (this.closed.get()) {
            log.debug("{}: Not running iteration due to shutting down.", this.traceObjectId);
            return CompletableFuture.completedFuture(null);
        }
        DelayedProcessor<T>.QueueItem peekFirst = this.queue.peekFirst();
        if (peekFirst != null && peekFirst.getRemainingMillis() <= 0) {
            return ((CompletableFuture) this.itemProcessor.apply(peekFirst.getWrappedItem())).handle((r9, th) -> {
                if (th != null) {
                    log.error("{}: Unable to process {}.", new Object[]{this.traceObjectId, peekFirst, th});
                }
                this.queue.removeFirstIf(peekFirst);
                return null;
            });
        }
        log.warn("{}: Not running iteration due premature wake-up.", this.traceObjectId);
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> delay() {
        Duration calculateDelay = calculateDelay();
        log.debug("{}: Iteration delay = {} ms. Queue size = {}.", new Object[]{this.traceObjectId, Long.valueOf(calculateDelay.toMillis()), Integer.valueOf(this.queue.size())});
        CompletableFuture<Void> whenCompleteAsync = createDelayedFuture(calculateDelay).whenCompleteAsync((r4, th) -> {
            this.currentIterationDelayTask = null;
        }, (Executor) this.executor);
        this.currentIterationDelayTask = whenCompleteAsync;
        return whenCompleteAsync;
    }

    @VisibleForTesting
    protected CompletableFuture<Void> createDelayedFuture(Duration duration) {
        return Futures.delayedFuture(duration, this.executor);
    }

    private Duration calculateDelay() {
        DelayedProcessor<T>.QueueItem peekFirst = this.queue.peekFirst();
        Duration ofMillis = peekFirst == null ? this.itemDelay : Duration.ofMillis(peekFirst.getRemainingMillis());
        return ofMillis.compareTo(this.itemDelay) < 0 ? ofMillis : this.itemDelay;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    AbstractTimer getTimer() {
        return this.timer;
    }
}
