package io.pravega.controller.server.eventProcessor.requesthandlers;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.netty.impl.ConnectionFactory;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.Retry;
import io.pravega.controller.eventProcessor.impl.SerializedRequestHandler;
import io.pravega.controller.retryable.RetryableException;
import io.pravega.controller.server.SegmentHelper;
import io.pravega.controller.store.host.HostControllerStore;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.stream.api.grpc.v1.Controller;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.shared.controller.event.AbortEvent;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/eventProcessor/requesthandlers/AbortRequestHandler.class */
public class AbortRequestHandler extends SerializedRequestHandler<AbortEvent> {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(AbortRequestHandler.class);
    private final StreamMetadataStore streamMetadataStore;
    private final StreamMetadataTasks streamMetadataTasks;
    private final HostControllerStore hostControllerStore;
    private final ScheduledExecutorService executor;
    private final SegmentHelper segmentHelper;
    private final ConnectionFactory connectionFactory;
    private final BlockingQueue<AbortEvent> processedEvents;

    @VisibleForTesting
    public AbortRequestHandler(StreamMetadataStore streamMetadataStore, StreamMetadataTasks streamMetadataTasks, HostControllerStore hostControllerStore, ScheduledExecutorService scheduledExecutorService, SegmentHelper segmentHelper, ConnectionFactory connectionFactory, BlockingQueue<AbortEvent> blockingQueue) {
        super(scheduledExecutorService);
        this.streamMetadataStore = streamMetadataStore;
        this.streamMetadataTasks = streamMetadataTasks;
        this.hostControllerStore = hostControllerStore;
        this.segmentHelper = segmentHelper;
        this.executor = scheduledExecutorService;
        this.connectionFactory = connectionFactory;
        this.processedEvents = blockingQueue;
    }

    public AbortRequestHandler(StreamMetadataStore streamMetadataStore, StreamMetadataTasks streamMetadataTasks, HostControllerStore hostControllerStore, ScheduledExecutorService scheduledExecutorService, SegmentHelper segmentHelper, ConnectionFactory connectionFactory) {
        super(scheduledExecutorService);
        this.streamMetadataStore = streamMetadataStore;
        this.streamMetadataTasks = streamMetadataTasks;
        this.hostControllerStore = hostControllerStore;
        this.segmentHelper = segmentHelper;
        this.executor = scheduledExecutorService;
        this.connectionFactory = connectionFactory;
        this.processedEvents = null;
    }

    @Override // io.pravega.controller.eventProcessor.impl.SerializedRequestHandler
    public CompletableFuture<Void> processEvent(AbortEvent abortEvent) {
        String scope = abortEvent.getScope();
        String stream = abortEvent.getStream();
        int epoch = abortEvent.getEpoch();
        UUID txid = abortEvent.getTxid();
        OperationContext createContext = this.streamMetadataStore.createContext(scope, stream);
        log.debug("Aborting transaction {} on stream {}/{}", new Object[]{abortEvent.getTxid(), abortEvent.getScope(), abortEvent.getStream()});
        return this.streamMetadataStore.getActiveSegmentIds(abortEvent.getScope(), abortEvent.getStream(), epoch, createContext, this.executor).thenCompose(list -> {
            return Futures.allOfWithResults((List) ((Stream) list.stream().parallel()).map(num -> {
                return notifyAbortToHost(scope, stream, num.intValue(), txid);
            }).collect(Collectors.toList()));
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) list2 -> {
            return this.streamMetadataStore.abortTransaction(scope, stream, epoch, txid, createContext, this.executor);
        }).thenCompose(txnStatus -> {
            return Futures.toVoid(this.streamMetadataTasks.tryCompleteScale(scope, stream, epoch, createContext));
        }).whenComplete((r9, th) -> {
            if (th != null) {
                log.error("Failed aborting transaction {} on stream {}/{}", new Object[]{abortEvent.getTxid(), abortEvent.getScope(), abortEvent.getStream()});
                return;
            }
            log.debug("Successfully aborted transaction {} on stream {}/{}", new Object[]{abortEvent.getTxid(), abortEvent.getScope(), abortEvent.getStream()});
            if (this.processedEvents != null) {
                this.processedEvents.offer(abortEvent);
            }
        });
    }

    private CompletableFuture<Controller.TxnStatus> notifyAbortToHost(String str, String str2, int i, UUID uuid) {
        return Retry.withExpBackoff(100L, 10, 100, 100000L).retryWhen(RetryableException::isRetryable).throwingOn(RuntimeException.class).runAsync(() -> {
            return this.segmentHelper.abortTransaction(str, str2, i, uuid, this.hostControllerStore, this.connectionFactory);
        }, this.executor);
    }
}
