package io.pravega.controller.server.eventProcessor.requesthandlers;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.TxnStatus;
import io.pravega.controller.store.stream.tables.ActiveTxnRecord;
import io.pravega.controller.store.stream.tables.HistoryRecord;
import io.pravega.controller.store.stream.tables.State;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.controller.task.Stream.StreamTransactionMetadataTasks;
import io.pravega.shared.controller.event.CommitEvent;
import io.pravega.shared.segment.StreamSegmentNameUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/eventProcessor/requesthandlers/CommitRequestHandler.class */
public class CommitRequestHandler extends AbstractRequestProcessor<CommitEvent> implements StreamTask<CommitEvent> {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(CommitRequestHandler.class);
    private final StreamMetadataTasks streamMetadataTasks;
    private final StreamTransactionMetadataTasks streamTransactionMetadataTasks;
    private final ScheduledExecutorService executor;
    private final BlockingQueue<CommitEvent> processedEvents;

    public CommitRequestHandler(StreamMetadataStore streamMetadataStore, StreamMetadataTasks streamMetadataTasks, StreamTransactionMetadataTasks streamTransactionMetadataTasks, ScheduledExecutorService scheduledExecutorService) {
        this(streamMetadataStore, streamMetadataTasks, streamTransactionMetadataTasks, scheduledExecutorService, null);
    }

    @VisibleForTesting
    public CommitRequestHandler(StreamMetadataStore streamMetadataStore, StreamMetadataTasks streamMetadataTasks, StreamTransactionMetadataTasks streamTransactionMetadataTasks, ScheduledExecutorService scheduledExecutorService, BlockingQueue<CommitEvent> blockingQueue) {
        super(streamMetadataStore, scheduledExecutorService);
        Preconditions.checkNotNull(streamMetadataStore);
        Preconditions.checkNotNull(streamMetadataTasks);
        Preconditions.checkNotNull(scheduledExecutorService);
        this.streamMetadataTasks = streamMetadataTasks;
        this.streamTransactionMetadataTasks = streamTransactionMetadataTasks;
        this.executor = scheduledExecutorService;
        this.processedEvents = blockingQueue;
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.AbstractRequestProcessor
    public CompletableFuture<Void> processCommitTxnRequest(CommitEvent commitEvent) {
        return withCompletion(this, commitEvent, commitEvent.getScope(), commitEvent.getStream(), OPERATION_NOT_ALLOWED_PREDICATE);
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Void> execute(CommitEvent commitEvent) {
        String scope = commitEvent.getScope();
        String stream = commitEvent.getStream();
        int epoch = commitEvent.getEpoch();
        OperationContext createContext = this.streamMetadataStore.createContext(scope, stream);
        log.debug("Attempting to commit available transactions on epoch {} on stream {}/{}", new Object[]{Integer.valueOf(commitEvent.getEpoch()), commitEvent.getScope(), commitEvent.getStream()});
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        tryCommitTransactions(scope, stream, epoch, createContext).whenComplete((r13, th) -> {
            if (th == null) {
                log.debug("Successfully committed transactions on epoch {} on stream {}/{}", new Object[]{Integer.valueOf(epoch), scope, stream});
                if (this.processedEvents != null) {
                    this.processedEvents.offer(commitEvent);
                }
                completableFuture.complete(r13);
                return;
            }
            Throwable unwrap = Exceptions.unwrap(th);
            if (unwrap instanceof StoreException.OperationNotAllowedException) {
                log.debug("Cannot commit transaction on epoch {} on stream {}/{}. Postponing", new Object[]{Integer.valueOf(epoch), scope, stream});
            } else {
                log.error("Exception while attempting to commit transaction on epoch {} on stream {}/{}", new Object[]{Integer.valueOf(epoch), scope, stream, th});
            }
            completableFuture.completeExceptionally(unwrap);
        });
        return completableFuture;
    }

    private CompletableFuture<Void> tryCommitTransactions(String str, String str2, int i, OperationContext operationContext) {
        return this.streamMetadataStore.getState(str, str2, true, operationContext, this.executor).thenCompose(state -> {
            return Futures.toVoid(createRecordAndGetCommitTxnList(str, str2, i, operationContext).thenCompose(list -> {
                if (list == null) {
                    return this.streamMetadataStore.resetStateConditionally(str, str2, State.COMMITTING_TXN, operationContext, this.executor);
                }
                return (state.equals(State.SEALING) ? new CompletableFuture() : Futures.toVoid(this.streamMetadataStore.setState(str, str2, State.COMMITTING_TXN, operationContext, this.executor))).thenCompose(r14 -> {
                    return getEpochRecords(str, str2, i, operationContext).thenCompose(list -> {
                        HistoryRecord historyRecord = (HistoryRecord) list.get(0);
                        HistoryRecord historyRecord2 = (HistoryRecord) list.get(1);
                        return (historyRecord2.getEpoch() == i || historyRecord2.getReferenceEpoch() == historyRecord.getReferenceEpoch()) ? commitTransactions(str, str2, historyRecord2.getSegments(), list, operationContext) : rollTransactions(str, str2, historyRecord, historyRecord2, list, operationContext);
                    });
                });
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r10 -> {
                return this.streamMetadataStore.deleteCommittingTransactionsRecord(str, str2, operationContext, this.executor);
            }).thenCompose(r11 -> {
                return this.streamMetadataStore.resetStateConditionally(str, str2, State.COMMITTING_TXN, operationContext, this.executor);
            }));
        });
    }

    private CompletableFuture<List<UUID>> createRecordAndGetCommitTxnList(String str, String str2, int i, OperationContext operationContext) {
        return this.streamMetadataStore.getCommittingTransactionsRecord(str, str2, operationContext, this.executor).thenCompose(committingTransactionsRecord -> {
            if (committingTransactionsRecord == null) {
                return createNewTxnCommitList(str, str2, i, operationContext, this.executor);
            }
            if (committingTransactionsRecord.getEpoch() == i) {
                return CompletableFuture.completedFuture(committingTransactionsRecord.getTransactionsToCommit());
            }
            log.debug("Postponing commit on epoch {} as transactions on different epoch {} are being committed for stream {}/{}", new Object[]{Integer.valueOf(i), Integer.valueOf(committingTransactionsRecord.getEpoch()), str, str2});
            throw StoreException.create(StoreException.Type.OPERATION_NOT_ALLOWED, "Transactions on different epoch are being committed");
        });
    }

    private CompletableFuture<Void> rollTransactions(String str, String str2, HistoryRecord historyRecord, HistoryRecord historyRecord2, List<UUID> list, OperationContext operationContext) {
        return this.streamMetadataStore.transactionStatus(str, str2, list.get(list.size() - 1), operationContext, this.executor).thenCompose(txnStatus -> {
            return txnStatus.equals(TxnStatus.COMMITTING) ? runRollingTxn(str, str2, historyRecord, historyRecord2, list, operationContext) : CompletableFuture.completedFuture(null);
        });
    }

    private CompletionStage<Void> runRollingTxn(String str, String str2, HistoryRecord historyRecord, HistoryRecord historyRecord2, List<UUID> list, OperationContext operationContext) {
        String retrieveDelegationToken = this.streamMetadataTasks.retrieveDelegationToken();
        long currentTimeMillis = System.currentTimeMillis();
        this.streamMetadataStore.getActiveEpoch(str, str2, operationContext, true, this.executor);
        int epoch = historyRecord2.getEpoch() + 1;
        int i = epoch + 1;
        List<Long> list2 = (List) historyRecord.getSegments().stream().map(l -> {
            return Long.valueOf(StreamSegmentNameUtils.computeSegmentId(StreamSegmentNameUtils.getSegmentNumber(l.longValue()), epoch));
        }).collect(Collectors.toList());
        List list3 = (List) historyRecord2.getSegments().stream().map(l2 -> {
            return Long.valueOf(StreamSegmentNameUtils.computeSegmentId(StreamSegmentNameUtils.getSegmentNumber(l2.longValue()), i));
        }).collect(Collectors.toList());
        return copyTxnEpochSegmentsAndCommitTxns(str, str2, list, list2, operationContext).thenCompose(r13 -> {
            return this.streamMetadataTasks.notifyNewSegments(str, str2, (List<Long>) list3, operationContext, retrieveDelegationToken);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r11 -> {
            return this.streamMetadataTasks.getSealedSegmentsSize(str, str2, list2, retrieveDelegationToken);
        }).thenCompose(map -> {
            log.debug("Rolling transaction, created duplicate of active epoch {} for stream {}/{}", new Object[]{historyRecord2, str, str2});
            return this.streamMetadataStore.rollingTxnNewSegmentsCreated(str, str2, map, historyRecord.getEpoch(), currentTimeMillis, operationContext, this.executor);
        }).thenCompose(r112 -> {
            return this.streamMetadataTasks.notifySealedSegments(str, str2, historyRecord2.getSegments(), retrieveDelegationToken);
        }).thenCompose(r113 -> {
            return this.streamMetadataTasks.getSealedSegmentsSize(str, str2, historyRecord2.getSegments(), retrieveDelegationToken);
        }).thenCompose(map2 -> {
            log.debug("Rolling transaction, sealed active epoch {} for stream {}/{}", new Object[]{historyRecord2, str, str2});
            return this.streamMetadataStore.rollingTxnActiveEpochSealed(str, str2, map2, historyRecord2.getEpoch(), currentTimeMillis, operationContext, this.executor);
        });
    }

    private CompletableFuture<Void> copyTxnEpochSegmentsAndCommitTxns(String str, String str2, List<UUID> list, List<Long> list2, OperationContext operationContext) {
        String retrieveDelegationToken = this.streamMetadataTasks.retrieveDelegationToken();
        return Futures.allOf((Collection) list2.stream().map(l -> {
            return this.streamMetadataTasks.notifyNewSegment(str, str2, l.longValue(), ScalingPolicy.fixed(1), retrieveDelegationToken);
        }).collect(Collectors.toList())).thenCompose(r13 -> {
            log.debug("Rolling transaction, successfully created duplicate txn epoch {} for stream {}/{}", new Object[]{list2, str, str2});
            return commitTransactions(str, str2, list2, list, operationContext);
        }).thenCompose(r11 -> {
            return this.streamMetadataTasks.notifySealedSegments(str, str2, list2, retrieveDelegationToken);
        });
    }

    private CompletableFuture<Void> commitTransactions(String str, String str2, List<Long> list, List<UUID> list2, OperationContext operationContext) {
        CompletableFuture<Void> completedFuture = CompletableFuture.completedFuture(null);
        for (UUID uuid : list2) {
            log.debug("Committing transaction {} on stream {}/{}", new Object[]{uuid, str, str2});
            completedFuture = completedFuture.thenCompose(r11 -> {
                return this.streamMetadataTasks.notifyTxnCommit(str, str2, (List<Long>) list, uuid);
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r12 -> {
                return this.streamMetadataStore.commitTransaction(str, str2, uuid, operationContext, this.executor).thenAccept(txnStatus -> {
                    log.debug("transaction {} on stream {}/{} committed successfully", new Object[]{uuid, str, str2});
                });
            });
        }
        return completedFuture;
    }

    private CompletableFuture<List<UUID>> createNewTxnCommitList(String str, String str2, int i, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService) {
        return this.streamMetadataStore.getTransactionsInEpoch(str, str2, i, operationContext, scheduledExecutorService).thenApply(map -> {
            return (List) map.entrySet().stream().filter(entry -> {
                return ((ActiveTxnRecord) entry.getValue()).getTxnStatus().equals(TxnStatus.COMMITTING);
            }).map((v0) -> {
                return v0.getKey();
            }).collect(Collectors.toList());
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) list -> {
            return !list.isEmpty() ? this.streamMetadataStore.createCommittingTransactionsRecord(str, str2, i, list, operationContext, scheduledExecutorService).thenApply(r11 -> {
                log.debug("Transactions {} added to commit list for epoch {} stream {}/{}", new Object[]{list, Integer.valueOf(i), str, str2});
                return list;
            }) : CompletableFuture.completedFuture(null);
        });
    }

    private CompletableFuture<List<HistoryRecord>> getEpochRecords(String str, String str2, int i, OperationContext operationContext) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.streamMetadataStore.getEpoch(str, str2, i, operationContext, this.executor));
        arrayList.add(this.streamMetadataStore.getActiveEpoch(str, str2, operationContext, true, this.executor));
        return Futures.allOfWithResults(arrayList);
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Void> writeBack(CommitEvent commitEvent) {
        return this.streamTransactionMetadataTasks.writeCommitEvent(commitEvent);
    }
}
