package io.pravega.controller.server.eventProcessor;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.netty.impl.ConnectionFactory;
import io.pravega.client.stream.Position;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.Retry;
import io.pravega.controller.eventProcessor.impl.EventProcessor;
import io.pravega.controller.server.SegmentHelper;
import io.pravega.controller.store.host.HostControllerStore;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.stream.api.grpc.v1.Controller;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.controller.task.Stream.WriteFailedException;
import io.pravega.shared.controller.event.CommitEvent;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/eventProcessor/CommitEventProcessor.class */
public class CommitEventProcessor extends EventProcessor<CommitEvent> {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(CommitEventProcessor.class);
    private final StreamMetadataStore streamMetadataStore;
    private final StreamMetadataTasks streamMetadataTasks;
    private final HostControllerStore hostControllerStore;
    private final ConnectionFactory connectionFactory;
    private final ScheduledExecutorService executor;
    private final SegmentHelper segmentHelper;
    private final BlockingQueue<CommitEvent> processedEvents;

    @VisibleForTesting
    public CommitEventProcessor(StreamMetadataStore streamMetadataStore, StreamMetadataTasks streamMetadataTasks, HostControllerStore hostControllerStore, ScheduledExecutorService scheduledExecutorService, SegmentHelper segmentHelper, ConnectionFactory connectionFactory, BlockingQueue<CommitEvent> blockingQueue) {
        this.streamMetadataStore = streamMetadataStore;
        this.streamMetadataTasks = streamMetadataTasks;
        this.hostControllerStore = hostControllerStore;
        this.segmentHelper = segmentHelper;
        this.executor = scheduledExecutorService;
        this.connectionFactory = connectionFactory;
        this.processedEvents = blockingQueue;
    }

    public CommitEventProcessor(StreamMetadataStore streamMetadataStore, StreamMetadataTasks streamMetadataTasks, HostControllerStore hostControllerStore, ScheduledExecutorService scheduledExecutorService, SegmentHelper segmentHelper, ConnectionFactory connectionFactory) {
        this.streamMetadataStore = streamMetadataStore;
        this.streamMetadataTasks = streamMetadataTasks;
        this.hostControllerStore = hostControllerStore;
        this.segmentHelper = segmentHelper;
        this.executor = scheduledExecutorService;
        this.connectionFactory = connectionFactory;
        this.processedEvents = null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.pravega.controller.eventProcessor.impl.EventProcessor
    public void process(CommitEvent commitEvent, Position position) {
        String scope = commitEvent.getScope();
        String stream = commitEvent.getStream();
        int epoch = commitEvent.getEpoch();
        UUID txid = commitEvent.getTxid();
        OperationContext createContext = this.streamMetadataStore.createContext(scope, stream);
        log.debug("Committing transaction {} on stream {}/{}", new Object[]{commitEvent.getTxid(), commitEvent.getScope(), commitEvent.getStream()});
        this.streamMetadataStore.getActiveEpoch(scope, stream, createContext, false, this.executor).thenComposeAsync(pair -> {
            return epoch < ((Integer) pair.getKey()).intValue() ? CompletableFuture.completedFuture(null) : epoch == ((Integer) pair.getKey()).intValue() ? completeCommit(scope, stream, epoch, txid, createContext) : postponeCommitEvent(commitEvent);
        }).whenCompleteAsync((BiConsumer<? super U, ? super Throwable>) (r12, th) -> {
            if (th != null) {
                log.error("Failed committing transaction {} on stream {}/{}", new Object[]{txid, scope, stream});
                return;
            }
            log.debug("Successfully committed transaction {} on stream {}/{}", new Object[]{txid, scope, stream});
            if (this.processedEvents != null) {
                this.processedEvents.offer(commitEvent);
            }
        }, (Executor) this.executor).join();
    }

    private CompletableFuture<Void> completeCommit(String str, String str2, int i, UUID uuid, OperationContext operationContext) {
        return this.streamMetadataStore.getActiveSegmentIds(str, str2, i, operationContext, this.executor).thenComposeAsync(list -> {
            return notifyCommitToHost(str, str2, (List<Integer>) list, uuid).thenComposeAsync(r14 -> {
                return this.streamMetadataStore.commitTransaction(str, str2, i, uuid, operationContext, this.executor);
            }, (Executor) this.executor).thenApply((Function<? super U, ? extends U>) txnStatus -> {
                return null;
            });
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) obj -> {
            return Futures.toVoid(this.streamMetadataTasks.tryCompleteScale(str, str2, i, operationContext));
        });
    }

    private CompletableFuture<Void> postponeCommitEvent(CommitEvent commitEvent) {
        return Retry.indefinitelyWithExpBackoff("Error writing event back into CommitStream").runAsync(() -> {
            return writeEvent(commitEvent);
        }, this.executor);
    }

    private CompletableFuture<Void> writeEvent(CommitEvent commitEvent) {
        UUID txid = commitEvent.getTxid();
        log.debug("Transaction {}, pushing back CommitEvent to commitStream", txid);
        return getSelfWriter().write(commitEvent).handleAsync((r6, th) -> {
            if (th == null) {
                log.debug("Transaction {}, sent request to commitStream", txid);
                return null;
            }
            Throwable unwrap = Exceptions.unwrap(th);
            log.warn("Transaction {}, failed sending event to commitStream. Exception: {} Retrying...", txid, unwrap.getClass().getSimpleName());
            throw new WriteFailedException(unwrap);
        }, (Executor) this.executor);
    }

    private CompletableFuture<Void> notifyCommitToHost(String str, String str2, List<Integer> list, UUID uuid) {
        return Futures.allOf((Collection) ((Stream) list.stream().parallel()).map(num -> {
            return notifyCommitToHost(str, str2, num.intValue(), uuid);
        }).collect(Collectors.toList()));
    }

    private CompletableFuture<Controller.TxnStatus> notifyCommitToHost(String str, String str2, int i, UUID uuid) {
        return Retry.indefinitelyWithExpBackoff(String.format("Transaction = %s, error sending commit notification for segment %d", uuid, Integer.valueOf(i))).runAsync(() -> {
            return this.segmentHelper.commitTransaction(str, str2, i, uuid, this.hostControllerStore, this.connectionFactory);
        }, this.executor);
    }
}
