package io.pravega.controller.server.eventProcessor.requesthandlers;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.Retry;
import io.pravega.controller.eventProcessor.impl.EventProcessorHelper;
import io.pravega.controller.eventProcessor.impl.SerializedRequestHandler;
import io.pravega.controller.server.eventProcessor.requesthandlers.TaskExceptions;
import io.pravega.controller.store.stream.ScaleOperationExceptions;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.shared.controller.event.AbortEvent;
import io.pravega.shared.controller.event.AutoScaleEvent;
import io.pravega.shared.controller.event.CommitEvent;
import io.pravega.shared.controller.event.ControllerEvent;
import io.pravega.shared.controller.event.DeleteStreamEvent;
import io.pravega.shared.controller.event.RequestProcessor;
import io.pravega.shared.controller.event.ScaleOpEvent;
import io.pravega.shared.controller.event.SealStreamEvent;
import io.pravega.shared.controller.event.TruncateStreamEvent;
import io.pravega.shared.controller.event.UpdateStreamEvent;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/eventProcessor/requesthandlers/StreamRequestHandler.class */
public class StreamRequestHandler extends SerializedRequestHandler<ControllerEvent> implements RequestProcessor {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(StreamRequestHandler.class);
    private static final Predicate<Throwable> OPERATION_NOT_ALLOWED_PREDICATE = th -> {
        return th instanceof StoreException.OperationNotAllowedException;
    };
    private final AutoScaleTask autoScaleTask;
    private final ScaleOperationTask scaleOperationTask;
    private final UpdateStreamTask updateStreamTask;
    private final SealStreamTask sealStreamTask;
    private final DeleteStreamTask deleteStreamTask;
    private final TruncateStreamTask truncateStreamTask;

    public StreamRequestHandler(AutoScaleTask autoScaleTask, ScaleOperationTask scaleOperationTask, UpdateStreamTask updateStreamTask, SealStreamTask sealStreamTask, DeleteStreamTask deleteStreamTask, TruncateStreamTask truncateStreamTask, ScheduledExecutorService scheduledExecutorService) {
        super(scheduledExecutorService);
        this.autoScaleTask = autoScaleTask;
        this.scaleOperationTask = scaleOperationTask;
        this.updateStreamTask = updateStreamTask;
        this.sealStreamTask = sealStreamTask;
        this.deleteStreamTask = deleteStreamTask;
        this.truncateStreamTask = truncateStreamTask;
    }

    @Override // io.pravega.controller.eventProcessor.impl.SerializedRequestHandler
    public CompletableFuture<Void> processEvent(ControllerEvent controllerEvent) {
        return controllerEvent.process(this);
    }

    @Override // io.pravega.controller.eventProcessor.impl.SerializedRequestHandler
    public boolean toPostpone(ControllerEvent controllerEvent, long j, Throwable th) {
        return (Exceptions.unwrap(th) instanceof TaskExceptions.StartException) && System.currentTimeMillis() - j < Duration.ofMinutes(2L).toMillis();
    }

    public CompletableFuture<Void> processAbortTxnRequest(AbortEvent abortEvent) {
        return Futures.failedFuture(new RequestUnsupportedException("StreamRequestHandler: abort txn received on Stream Request Multiplexer"));
    }

    public CompletableFuture<Void> processCommitTxnRequest(CommitEvent commitEvent) {
        return Futures.failedFuture(new RequestUnsupportedException("StreamRequestHandler: commit txn received on Stream Request Multiplexer"));
    }

    public CompletableFuture<Void> processAutoScaleRequest(AutoScaleEvent autoScaleEvent) {
        return this.autoScaleTask.execute(autoScaleEvent);
    }

    public CompletableFuture<Void> processScaleOpRequest(ScaleOpEvent scaleOpEvent) {
        return withCompletion(this.scaleOperationTask, scaleOpEvent, OPERATION_NOT_ALLOWED_PREDICATE.or(th -> {
            return th instanceof ScaleOperationExceptions.ScaleConflictException;
        }));
    }

    public CompletableFuture<Void> processUpdateStream(UpdateStreamEvent updateStreamEvent) {
        return withCompletion(this.updateStreamTask, updateStreamEvent, OPERATION_NOT_ALLOWED_PREDICATE);
    }

    public CompletableFuture<Void> processTruncateStream(TruncateStreamEvent truncateStreamEvent) {
        return withCompletion(this.truncateStreamTask, truncateStreamEvent, OPERATION_NOT_ALLOWED_PREDICATE);
    }

    public CompletableFuture<Void> processSealStream(SealStreamEvent sealStreamEvent) {
        return withCompletion(this.sealStreamTask, sealStreamEvent, OPERATION_NOT_ALLOWED_PREDICATE);
    }

    public CompletableFuture<Void> processDeleteStream(DeleteStreamEvent deleteStreamEvent) {
        return withCompletion(this.deleteStreamTask, deleteStreamEvent, OPERATION_NOT_ALLOWED_PREDICATE);
    }

    private <T extends ControllerEvent> CompletableFuture<Void> withCompletion(StreamTask<T> streamTask, T t, Predicate<Throwable> predicate) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        EventProcessorHelper.withRetries(() -> {
            return streamTask.execute(t);
        }, this.executor).whenCompleteAsync((r9, th) -> {
            if (th == null) {
                completableFuture.complete(r9);
                return;
            }
            Throwable unwrap = Exceptions.unwrap(th);
            if (predicate.test(unwrap)) {
                Retry.indefinitelyWithExpBackoff("Error writing event back into requeststream").runAsync(() -> {
                    return streamTask.writeBack(t);
                }, this.executor).thenAccept(r5 -> {
                    completableFuture.completeExceptionally(unwrap);
                });
            } else {
                completableFuture.completeExceptionally(unwrap);
            }
        }, (Executor) this.executor);
        return completableFuture;
    }
}
