package io.pravega.controller.eventProcessor.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.Position;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.RetriesExhaustedException;
import io.pravega.controller.eventProcessor.RequestHandler;
import io.pravega.controller.eventProcessor.impl.EventProcessor;
import io.pravega.controller.retryable.RetryableException;
import io.pravega.shared.controller.event.ControllerEvent;
import java.beans.ConstructorProperties;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executor;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/eventProcessor/impl/ConcurrentEventProcessor.class */
public class ConcurrentEventProcessor<R extends ControllerEvent, H extends RequestHandler<R>> extends EventProcessor<R> {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private final Object $lock;
    private static final int MAX_CONCURRENT = 1000;
    private final ConcurrentSkipListSet<PositionCounter> running;
    private final ConcurrentSkipListSet<PositionCounter> completed;
    private final AtomicReference<PositionCounter> checkpoint;
    private final ScheduledExecutorService executor;
    private final H requestHandler;
    private final AtomicLong counter;
    private final AtomicBoolean stop;
    private final Comparator<PositionCounter> positionCounterComparator;
    private final Semaphore semaphore;
    private final ScheduledFuture<?> periodicCheckpoint;
    private final EventProcessor.Checkpointer checkpointer;
    private final EventProcessor.Writer<R> internalWriter;
    private final Phaser phaser;

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ConcurrentEventProcessor.class);
    private static final PositionCounter MAX = new PositionCounter(null, Long.MAX_VALUE);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/controller/eventProcessor/impl/ConcurrentEventProcessor$PositionCounter.class */
    public static class PositionCounter {
        private final Position position;
        private final long counter;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        @ConstructorProperties({"position", "counter"})
        public PositionCounter(Position position, long j) {
            this.position = position;
            this.counter = j;
        }
    }

    public ConcurrentEventProcessor(H h, ScheduledExecutorService scheduledExecutorService) {
        this(h, 1000, scheduledExecutorService, null, null, 1L, TimeUnit.MINUTES);
    }

    @VisibleForTesting
    ConcurrentEventProcessor(H h, int i, ScheduledExecutorService scheduledExecutorService, EventProcessor.Checkpointer checkpointer, EventProcessor.Writer<R> writer, long j, TimeUnit timeUnit) {
        this.$lock = new Object[0];
        this.counter = new AtomicLong(0L);
        this.stop = new AtomicBoolean(false);
        this.positionCounterComparator = Comparator.comparingLong(positionCounter -> {
            return positionCounter.counter;
        });
        Preconditions.checkNotNull(h);
        Preconditions.checkNotNull(scheduledExecutorService);
        this.requestHandler = h;
        this.running = new ConcurrentSkipListSet<>(this.positionCounterComparator);
        this.completed = new ConcurrentSkipListSet<>(this.positionCounterComparator);
        this.checkpointer = checkpointer;
        this.checkpoint = new AtomicReference<>();
        this.internalWriter = writer;
        this.executor = scheduledExecutorService;
        this.periodicCheckpoint = this.executor.scheduleAtFixedRate(this::periodicCheckpoint, 0L, j, timeUnit);
        this.semaphore = new Semaphore(i);
        this.phaser = new Phaser(1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.pravega.controller.eventProcessor.impl.EventProcessor
    public void process(R r, Position position) {
        if (this.stop.get()) {
            log.warn("processing requested after processor is stopped.");
            return;
        }
        this.semaphore.acquireUninterruptibly();
        this.phaser.register();
        PositionCounter positionCounter = new PositionCounter(position, this.counter.incrementAndGet());
        this.running.add(positionCounter);
        EventProcessorHelper.withRetries(() -> {
            H h = this.requestHandler;
            AtomicBoolean atomicBoolean = this.stop;
            Objects.requireNonNull(atomicBoolean);
            return h.process(r, atomicBoolean::get);
        }, this.executor).whenCompleteAsync((r7, th) -> {
            CompletableFuture<Void> completedFuture;
            if (th != null) {
                log.warn("ConcurrentEventProcessor Processing failed {}", th.getClass().getName());
                completedFuture = handleProcessingError(r, th);
            } else {
                log.debug("ConcurrentEventProcessor Processing complete");
                completedFuture = CompletableFuture.completedFuture(null);
            }
            completedFuture.whenCompleteAsync((r5, th) -> {
                if (!this.stop.get() || th == null || !(Exceptions.unwrap(th) instanceof CancellationException)) {
                    checkpoint(positionCounter);
                }
                this.phaser.arriveAndDeregister();
                this.semaphore.release();
            }, (Executor) this.executor);
        }, (Executor) this.executor);
    }

    private CompletableFuture<Void> handleProcessingError(R r, Throwable th) {
        CompletableFuture<Void> failedFuture;
        Throwable unwrap = Exceptions.unwrap(th);
        if (unwrap instanceof RetriesExhaustedException) {
            unwrap = unwrap.getCause();
        }
        if (RetryableException.isRetryable(unwrap)) {
            log.warn("ConcurrentEventProcessor Processing failed, Retryable Exception {}. Putting the event back.", unwrap.getClass().getName());
            EventProcessor.Writer selfWriter = this.internalWriter != null ? this.internalWriter : getSelfWriter() != null ? getSelfWriter() : null;
            failedFuture = EventProcessorHelper.indefiniteRetries(() -> {
                return EventProcessorHelper.writeBack(r, selfWriter);
            }, this.executor);
        } else {
            Throwable unwrap2 = Exceptions.unwrap(th);
            log.warn("ConcurrentEventProcessor Processing failed, {} {}", unwrap2.getClass(), unwrap2.getMessage());
            failedFuture = Futures.failedFuture(unwrap2);
        }
        return failedFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.pravega.controller.eventProcessor.impl.EventProcessor
    public void afterStop() {
        this.stop.set(true);
        this.phaser.arriveAndAwaitAdvance();
        this.phaser.arriveAndDeregister();
        this.periodicCheckpoint.cancel(true);
    }

    @VisibleForTesting
    boolean isStopFlagSet() {
        return this.stop.get();
    }

    private void checkpoint(PositionCounter positionCounter) {
        synchronized (this.$lock) {
            this.running.remove(positionCounter);
            this.completed.add(positionCounter);
            PositionCounter first = this.running.isEmpty() ? MAX : this.running.first();
            List list = (List) this.completed.stream().filter(positionCounter2 -> {
                return this.positionCounterComparator.compare(positionCounter2, first) < 0;
            }).collect(Collectors.toList());
            if (list.size() > 0) {
                PositionCounter positionCounter3 = (PositionCounter) list.get(list.size() - 1);
                this.completed.removeAll(list);
                this.checkpoint.set(positionCounter3);
            }
        }
    }

    @VisibleForTesting
    void periodicCheckpoint() {
        try {
            if (this.checkpoint.get() != null && this.checkpoint.get().position != null) {
                if (this.checkpointer != null) {
                    this.checkpointer.store(this.checkpoint.get().position);
                } else if (getCheckpointer() != null) {
                    getCheckpointer().store(this.checkpoint.get().position);
                }
            }
        } catch (Exception e) {
            log.warn("error while trying to store checkpoint in the store {}", e);
        }
    }
}
