package io.pravega.controller.server.eventProcessor.requesthandlers;

import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.Retry;
import io.pravega.controller.eventProcessor.impl.EventProcessorHelper;
import io.pravega.controller.eventProcessor.impl.SerializedRequestHandler;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.shared.controller.event.AbortEvent;
import io.pravega.shared.controller.event.AutoScaleEvent;
import io.pravega.shared.controller.event.CommitEvent;
import io.pravega.shared.controller.event.ControllerEvent;
import io.pravega.shared.controller.event.CreateReaderGroupEvent;
import io.pravega.shared.controller.event.DeleteReaderGroupEvent;
import io.pravega.shared.controller.event.DeleteScopeEvent;
import io.pravega.shared.controller.event.DeleteStreamEvent;
import io.pravega.shared.controller.event.ScaleOpEvent;
import io.pravega.shared.controller.event.SealStreamEvent;
import io.pravega.shared.controller.event.StreamRequestProcessor;
import io.pravega.shared.controller.event.TruncateStreamEvent;
import io.pravega.shared.controller.event.UpdateReaderGroupEvent;
import io.pravega.shared.controller.event.UpdateStreamEvent;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/eventProcessor/requesthandlers/AbstractRequestProcessor.class */
public abstract class AbstractRequestProcessor<T extends ControllerEvent> extends SerializedRequestHandler<T> implements StreamRequestProcessor {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractRequestProcessor.class);
    protected static final Predicate<Throwable> OPERATION_NOT_ALLOWED_PREDICATE = th -> {
        return Exceptions.unwrap(th) instanceof StoreException.OperationNotAllowedException;
    };
    protected final StreamMetadataStore streamMetadataStore;

    public AbstractRequestProcessor(StreamMetadataStore streamMetadataStore, ScheduledExecutorService scheduledExecutorService) {
        super(scheduledExecutorService);
        Preconditions.checkNotNull(streamMetadataStore);
        this.streamMetadataStore = streamMetadataStore;
    }

    public String getProcessorName() {
        return getClass().getSimpleName();
    }

    @Override // io.pravega.controller.eventProcessor.impl.SerializedRequestHandler
    public CompletableFuture<Void> processEvent(ControllerEvent controllerEvent) {
        return controllerEvent.process(this);
    }

    public CompletableFuture<Void> processAbortTxnRequest(AbortEvent abortEvent) {
        return Futures.failedFuture(new RequestUnsupportedException("Request Unsupported"));
    }

    public CompletableFuture<Void> processCommitTxnRequest(CommitEvent commitEvent) {
        return Futures.failedFuture(new RequestUnsupportedException("Request Unsupported"));
    }

    public CompletableFuture<Void> processAutoScaleRequest(AutoScaleEvent autoScaleEvent) {
        return Futures.failedFuture(new RequestUnsupportedException("Request Unsupported"));
    }

    public CompletableFuture<Void> processScaleOpRequest(ScaleOpEvent scaleOpEvent) {
        return Futures.failedFuture(new RequestUnsupportedException("Request Unsupported"));
    }

    public CompletableFuture<Void> processUpdateStream(UpdateStreamEvent updateStreamEvent) {
        return Futures.failedFuture(new RequestUnsupportedException("Request Unsupported"));
    }

    public CompletableFuture<Void> processTruncateStream(TruncateStreamEvent truncateStreamEvent) {
        return Futures.failedFuture(new RequestUnsupportedException("Request Unsupported"));
    }

    public CompletableFuture<Void> processSealStream(SealStreamEvent sealStreamEvent) {
        return Futures.failedFuture(new RequestUnsupportedException("Request Unsupported"));
    }

    public CompletableFuture<Void> processDeleteStream(DeleteStreamEvent deleteStreamEvent) {
        return Futures.failedFuture(new RequestUnsupportedException("Request Unsupported"));
    }

    public CompletableFuture<Void> processCreateReaderGroup(CreateReaderGroupEvent createReaderGroupEvent) {
        return Futures.failedFuture(new RequestUnsupportedException("Request Unsupported"));
    }

    public CompletableFuture<Void> processDeleteReaderGroup(DeleteReaderGroupEvent deleteReaderGroupEvent) {
        return Futures.failedFuture(new RequestUnsupportedException("Request Unsupported"));
    }

    public CompletableFuture<Void> processUpdateReaderGroup(UpdateReaderGroupEvent updateReaderGroupEvent) {
        return Futures.failedFuture(new RequestUnsupportedException("Request Unsupported"));
    }

    public CompletableFuture<Void> processDeleteScopeRecursive(DeleteScopeEvent deleteScopeEvent) {
        return Futures.failedFuture(new RequestUnsupportedException("Request Unsupported"));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T extends ControllerEvent> CompletableFuture<Void> withCompletion(StreamTask<T> streamTask, T t, String str, String str2, Predicate<Throwable> predicate) {
        Preconditions.checkNotNull(streamTask);
        Preconditions.checkNotNull(t);
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(str2);
        Preconditions.checkNotNull(predicate);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        OperationContext createStreamContext = this.streamMetadataStore.createStreamContext(str, str2, 0L);
        CompletableFuture suppressException = suppressException(this.streamMetadataStore.getWaitingRequestProcessor(str, str2, createStreamContext, this.executor), null, "Exception while trying to fetch waiting request. Logged and ignored.");
        CompletableFuture<Boolean> hasTaskStarted = streamTask.hasTaskStarted(t);
        CompletableFuture.allOf(suppressException, hasTaskStarted).thenAccept(r20 -> {
            boolean booleanValue = ((Boolean) hasTaskStarted.join()).booleanValue();
            String str3 = (String) suppressException.join();
            if (booleanValue || str3 == null || str3.equals(getProcessorName())) {
                EventProcessorHelper.withRetries(() -> {
                    return streamTask.execute(t);
                }, this.executor).whenComplete((r16, th) -> {
                    if (th == null || !predicate.test(th)) {
                        retryIndefinitelyThenComplete(() -> {
                            return this.streamMetadataStore.deleteWaitingRequestConditionally(str, str2, getProcessorName(), createStreamContext, this.executor);
                        }, completableFuture, th);
                    } else {
                        suppressException(this.streamMetadataStore.createWaitingRequestIfAbsent(str, str2, getProcessorName(), createStreamContext, this.executor), null, "Exception while trying to create waiting request. Logged and ignored.").thenCompose((Function<? super R, ? extends CompletionStage<U>>) r10 -> {
                            return retryIndefinitelyThenComplete(() -> {
                                return streamTask.writeBack(t);
                            }, completableFuture, th);
                        });
                    }
                });
            } else {
                log.debug("Found another processing requested by a different processor {}. Will postpone the event {}.", str3, t);
                retryIndefinitelyThenComplete(() -> {
                    return streamTask.writeBack(t);
                }, completableFuture, StoreException.create(StoreException.Type.OPERATION_NOT_ALLOWED, "Postponed " + t + " so that waiting processor" + str3 + " can work. "));
            }
        }).exceptionally(th -> {
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    private <R> CompletableFuture<R> suppressException(CompletableFuture<R> completableFuture, R r, String str) {
        return Futures.exceptionallyExpecting(completableFuture, th -> {
            log.warn("{}. Exception {}", str, Exceptions.unwrap(th).toString());
            return true;
        }, r);
    }

    private CompletableFuture<Void> retryIndefinitelyThenComplete(Supplier<CompletableFuture<Void>> supplier, CompletableFuture<Void> completableFuture, Throwable th) {
        return Retry.indefinitelyWithExpBackoff(String.format("Error writing event back into stream from processor %s", getProcessorName())).runAsync(supplier, this.executor).thenRun(() -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
            } else {
                completableFuture.complete(null);
            }
        });
    }
}
