package io.pravega.controller.server.eventProcessor.requesthandlers;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.common.Exceptions;
import io.pravega.common.Timer;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.tracing.TagLogger;
import io.pravega.controller.metrics.TransactionMetrics;
import io.pravega.controller.store.VersionedMetadata;
import io.pravega.controller.store.stream.BucketStore;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.State;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.TxnWriterMark;
import io.pravega.controller.store.stream.records.CommittingTransactionsRecord;
import io.pravega.controller.store.stream.records.EpochRecord;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.controller.task.Stream.StreamTransactionMetadataTasks;
import io.pravega.shared.NameUtils;
import io.pravega.shared.controller.event.CommitEvent;
import java.beans.ConstructorProperties;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.curator.shaded.com.google.common.base.Strings;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/eventProcessor/requesthandlers/CommitRequestHandler.class */
public class CommitRequestHandler extends AbstractRequestProcessor<CommitEvent> implements StreamTask<CommitEvent> {
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(CommitRequestHandler.class));
    private static final int MAX_TRANSACTION_COMMIT_BATCH_SIZE = 100;
    private final StreamMetadataTasks streamMetadataTasks;
    private final StreamTransactionMetadataTasks streamTransactionMetadataTasks;
    private final BucketStore bucketStore;
    private final ScheduledExecutorService executor;
    private final BlockingQueue<CommitEvent> processedEvents;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/controller/server/eventProcessor/requesthandlers/CommitRequestHandler$CommitTxnContext.class */
    public static class CommitTxnContext {
        private final String scope;
        private final String stream;
        private final OperationContext context;
        private final Map<UUID, String> txnIdToWriterId;
        private final Map<String, TxnWriterMark> writerMarks;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        @ConstructorProperties({"scope", "stream", "context", "txnIdToWriterId", "writerMarks"})
        public CommitTxnContext(String str, String str2, OperationContext operationContext, Map<UUID, String> map, Map<String, TxnWriterMark> map2) {
            this.scope = str;
            this.stream = str2;
            this.context = operationContext;
            this.txnIdToWriterId = map;
            this.writerMarks = map2;
        }
    }

    public CommitRequestHandler(StreamMetadataStore streamMetadataStore, StreamMetadataTasks streamMetadataTasks, StreamTransactionMetadataTasks streamTransactionMetadataTasks, BucketStore bucketStore, ScheduledExecutorService scheduledExecutorService) {
        this(streamMetadataStore, streamMetadataTasks, streamTransactionMetadataTasks, bucketStore, scheduledExecutorService, null);
    }

    @VisibleForTesting
    public CommitRequestHandler(StreamMetadataStore streamMetadataStore, StreamMetadataTasks streamMetadataTasks, StreamTransactionMetadataTasks streamTransactionMetadataTasks, BucketStore bucketStore, ScheduledExecutorService scheduledExecutorService, BlockingQueue<CommitEvent> blockingQueue) {
        super(streamMetadataStore, scheduledExecutorService);
        this.bucketStore = bucketStore;
        Preconditions.checkNotNull(streamMetadataStore);
        Preconditions.checkNotNull(streamMetadataTasks);
        Preconditions.checkNotNull(scheduledExecutorService);
        this.streamMetadataTasks = streamMetadataTasks;
        this.streamTransactionMetadataTasks = streamTransactionMetadataTasks;
        this.executor = scheduledExecutorService;
        this.processedEvents = blockingQueue;
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.AbstractRequestProcessor
    public CompletableFuture<Void> processCommitTxnRequest(CommitEvent commitEvent) {
        return withCompletion(this, commitEvent, commitEvent.getScope(), commitEvent.getStream(), OPERATION_NOT_ALLOWED_PREDICATE);
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Void> execute(CommitEvent commitEvent) {
        String scope = commitEvent.getScope();
        String stream = commitEvent.getStream();
        long requestId = this.streamMetadataTasks.getRequestId(null);
        OperationContext createStreamContext = this.streamMetadataStore.createStreamContext(scope, stream, requestId);
        log.debug(requestId, "Attempting to commit available transactions on stream {}/{}", new Object[]{commitEvent.getScope(), commitEvent.getStream()});
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        tryCommitTransactions(scope, stream, createStreamContext).whenComplete((num, th) -> {
            if (th != null) {
                Throwable unwrap = Exceptions.unwrap(th);
                if (unwrap instanceof StoreException.OperationNotAllowedException) {
                    log.debug(requestId, "Cannot commit transaction on stream {}/{}. Postponing", new Object[]{scope, stream});
                } else {
                    log.warn(requestId, "Exception while attempting to commit transaction on stream {}/{}", new Object[]{scope, stream, th});
                    TransactionMetrics.getInstance().commitTransactionFailed(scope, stream);
                }
                completableFuture.completeExceptionally(unwrap);
                return;
            }
            if (num.intValue() >= 0) {
                log.info(requestId, "Successfully committed transactions on epoch {} on stream {}/{}", new Object[]{num, scope, stream});
            } else {
                log.info(requestId, "No transactions found in committing state on stream {}/{}", new Object[]{scope, stream});
            }
            if (this.processedEvents != null) {
                try {
                    this.processedEvents.offer(commitEvent);
                } catch (Exception e) {
                }
            }
            completableFuture.complete(null);
        });
        return completableFuture;
    }

    private CompletableFuture<Integer> tryCommitTransactions(String str, String str2, OperationContext operationContext) {
        Timer timer = new Timer();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        return this.streamMetadataStore.getVersionedState(str, str2, operationContext, this.executor).thenComposeAsync(versionedMetadata -> {
            AtomicReference atomicReference = new AtomicReference(versionedMetadata);
            return this.streamMetadataStore.startCommitTransactions(str, str2, 100, operationContext, this.executor).thenComposeAsync(entry -> {
                CompletableFuture<Void> thenAccept;
                VersionedMetadata versionedMetadata = (VersionedMetadata) entry.getKey();
                if (((CommittingTransactionsRecord) versionedMetadata.getObject()).equals(CommittingTransactionsRecord.EMPTY)) {
                    return CompletableFuture.completedFuture(versionedMetadata);
                }
                int epoch = ((CommittingTransactionsRecord) versionedMetadata.getObject()).getEpoch();
                log.info(operationContext.getRequestId(), "Committing {} transactions on epoch {} on stream {}/{}", new Object[]{((CommittingTransactionsRecord) versionedMetadata.getObject()).getTransactionsToCommit(), Integer.valueOf(epoch), str, str2});
                if (((State) versionedMetadata.getObject()).equals(State.SEALING)) {
                    thenAccept = CompletableFuture.completedFuture(null);
                } else {
                    CompletableFuture<VersionedMetadata<State>> updateVersionedState = this.streamMetadataStore.updateVersionedState(str, str2, State.COMMITTING_TXN, versionedMetadata, operationContext, this.executor);
                    Objects.requireNonNull(atomicReference);
                    thenAccept = updateVersionedState.thenAccept((v1) -> {
                        r1.set(v1);
                    });
                }
                ((List) entry.getValue()).forEach(versionedTransactionData -> {
                    if (Strings.isNullOrEmpty(versionedTransactionData.getWriterId())) {
                        return;
                    }
                    hashMap2.put(versionedTransactionData.getId(), versionedTransactionData.getWriterId());
                    if (!hashMap.containsKey(versionedTransactionData.getWriterId()) || ((TxnWriterMark) hashMap.get(versionedTransactionData.getWriterId())).getTimestamp() < versionedTransactionData.getCommitTime().longValue()) {
                        hashMap.put(versionedTransactionData.getWriterId(), new TxnWriterMark(versionedTransactionData.getCommitTime().longValue(), ImmutableMap.of(), versionedTransactionData.getId()));
                    }
                });
                return thenAccept.thenCompose(r18 -> {
                    return getEpochRecords(str, str2, epoch, operationContext).thenCompose(list -> {
                        EpochRecord epochRecord = (EpochRecord) list.get(0);
                        EpochRecord epochRecord2 = (EpochRecord) list.get(1);
                        CommitTxnContext commitTxnContext = new CommitTxnContext(str, str2, operationContext, hashMap2, hashMap);
                        return (epochRecord2.getEpoch() == epoch || epochRecord2.getReferenceEpoch() == epochRecord.getReferenceEpoch()) ? commitTransactions(commitTxnContext, versionedMetadata, (List) epochRecord2.getSegmentIds().stream().collect(Collectors.toList())).thenApply(r3 -> {
                            return versionedMetadata;
                        }) : rollTransactions(commitTxnContext, versionedMetadata, epochRecord, epochRecord2);
                    });
                });
            }, (Executor) this.executor).thenCompose((Function<? super U, ? extends CompletionStage<U>>) versionedMetadata -> {
                return this.streamMetadataStore.completeCommitTransactions(str, str2, versionedMetadata, operationContext, this.executor, hashMap).thenCompose(r11 -> {
                    return resetStateConditionally(str, str2, (VersionedMetadata) atomicReference.get(), operationContext);
                }).thenRun(() -> {
                    TransactionMetrics.getInstance().commitTransaction(str, str2, timer.getElapsed());
                }).thenApply(r3 -> {
                    return Integer.valueOf(((CommittingTransactionsRecord) versionedMetadata.getObject()).getEpoch());
                });
            });
        }, (Executor) this.executor);
    }

    private CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> rollTransactions(CommitTxnContext commitTxnContext, VersionedMetadata<CommittingTransactionsRecord> versionedMetadata, EpochRecord epochRecord, EpochRecord epochRecord2) {
        CompletableFuture completedFuture = CompletableFuture.completedFuture(versionedMetadata);
        if (!versionedMetadata.getObject().isRollingTxnRecord()) {
            completedFuture = completedFuture.thenCompose(versionedMetadata2 -> {
                return this.streamMetadataStore.startRollingTxn(commitTxnContext.scope, commitTxnContext.stream, epochRecord2.getEpoch(), versionedMetadata2, commitTxnContext.context, this.executor);
            });
        }
        return completedFuture.thenCompose(versionedMetadata3 -> {
            return epochRecord2.getEpoch() > ((CommittingTransactionsRecord) versionedMetadata3.getObject()).getCurrentEpoch() ? CompletableFuture.completedFuture(versionedMetadata3) : runRollingTxn(commitTxnContext, versionedMetadata3, epochRecord, epochRecord2).thenApply(r3 -> {
                return versionedMetadata3;
            });
        });
    }

    private CompletableFuture<Void> runRollingTxn(CommitTxnContext commitTxnContext, VersionedMetadata<CommittingTransactionsRecord> versionedMetadata, EpochRecord epochRecord, EpochRecord epochRecord2) {
        String retrieveDelegationToken = this.streamMetadataTasks.retrieveDelegationToken();
        long currentTimeMillis = System.currentTimeMillis();
        int newTxnEpoch = versionedMetadata.getObject().getNewTxnEpoch();
        int newActiveEpoch = versionedMetadata.getObject().getNewActiveEpoch();
        List<Long> list = (List) epochRecord.getSegments().stream().map(streamSegmentRecord -> {
            return Long.valueOf(NameUtils.computeSegmentId(streamSegmentRecord.getSegmentNumber(), newTxnEpoch));
        }).collect(Collectors.toList());
        ArrayList arrayList = new ArrayList(epochRecord2.getSegmentIds());
        List list2 = (List) epochRecord2.getSegments().stream().map(streamSegmentRecord2 -> {
            return Long.valueOf(NameUtils.computeSegmentId(streamSegmentRecord2.getSegmentNumber(), newActiveEpoch));
        }).collect(Collectors.toList());
        return copyTxnEpochSegmentsAndCommitTxns(commitTxnContext, versionedMetadata, list).thenCompose(r13 -> {
            return this.streamMetadataTasks.notifyNewSegments(commitTxnContext.scope, commitTxnContext.stream, (List<Long>) list2, commitTxnContext.context, retrieveDelegationToken, commitTxnContext.context.getRequestId());
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r12 -> {
            return this.streamMetadataTasks.getSealedSegmentsSize(commitTxnContext.scope, commitTxnContext.stream, list, retrieveDelegationToken, commitTxnContext.context.getRequestId());
        }).thenCompose(map -> {
            log.info(commitTxnContext.context.getRequestId(), "Rolling transaction, created duplicate of active epoch {} for stream {}/{}", new Object[]{epochRecord2, commitTxnContext.scope, commitTxnContext.stream});
            return this.streamMetadataStore.rollingTxnCreateDuplicateEpochs(commitTxnContext.scope, commitTxnContext.stream, map, currentTimeMillis, versionedMetadata, commitTxnContext.context, this.executor);
        }).thenCompose(r14 -> {
            return this.streamMetadataTasks.notifySealedSegments(commitTxnContext.scope, commitTxnContext.stream, arrayList, retrieveDelegationToken, commitTxnContext.context.getRequestId()).thenCompose(r122 -> {
                return this.streamMetadataTasks.getSealedSegmentsSize(commitTxnContext.scope, commitTxnContext.stream, arrayList, retrieveDelegationToken, commitTxnContext.context.getRequestId());
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) map2 -> {
                log.info(commitTxnContext.context.getRequestId(), "Rolling transaction, sealed active epoch {} for stream {}/{}", new Object[]{epochRecord2, commitTxnContext.scope, commitTxnContext.stream});
                return this.streamMetadataStore.completeRollingTxn(commitTxnContext.scope, commitTxnContext.stream, map2, versionedMetadata, commitTxnContext.context, this.executor);
            });
        });
    }

    private CompletableFuture<Void> copyTxnEpochSegmentsAndCommitTxns(CommitTxnContext commitTxnContext, VersionedMetadata<CommittingTransactionsRecord> versionedMetadata, List<Long> list) {
        String retrieveDelegationToken = this.streamMetadataTasks.retrieveDelegationToken();
        return Futures.allOf((Collection) list.stream().map(l -> {
            return this.streamMetadataStore.getConfiguration(commitTxnContext.scope, commitTxnContext.stream, commitTxnContext.context, this.executor).thenCompose(streamConfiguration -> {
                return this.streamMetadataTasks.notifyNewSegment(commitTxnContext.scope, commitTxnContext.stream, l.longValue(), ScalingPolicy.fixed(1), retrieveDelegationToken, commitTxnContext.context.getRequestId(), streamConfiguration.getRolloverSizeBytes());
            });
        }).collect(Collectors.toList())).thenCompose(r13 -> {
            log.info(commitTxnContext.context.getRequestId(), "Rolling transaction, successfully created duplicate txn epoch {} for stream {}/{}", new Object[]{list, commitTxnContext.scope, commitTxnContext.stream});
            return commitTransactions(commitTxnContext, versionedMetadata, list);
        }).thenAccept(r12 -> {
            this.streamMetadataTasks.notifySealedSegments(commitTxnContext.scope, commitTxnContext.stream, list, retrieveDelegationToken, commitTxnContext.context.getRequestId());
        });
    }

    private CompletableFuture<Void> commitTransactions(CommitTxnContext commitTxnContext, VersionedMetadata<CommittingTransactionsRecord> versionedMetadata, List<Long> list) {
        List<UUID> transactionsToCommit = versionedMetadata.getObject().getTransactionsToCommit();
        boolean z = commitTxnContext.writerMarks.size() > 0;
        Timer timer = new Timer();
        return this.streamMetadataTasks.mergeTxnSegmentsIntoStreamSegments(commitTxnContext.scope, commitTxnContext.stream, list, transactionsToCommit, commitTxnContext.context.getRequestId()).thenCompose(map -> {
            TransactionMetrics.getInstance().commitTransactionSegments(timer.getElapsed());
            if (!z) {
                return CompletableFuture.completedFuture(null);
            }
            for (int i = 0; i < transactionsToCommit.size(); i++) {
                int i2 = i;
                Map map = (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, entry -> {
                    return (Long) ((List) entry.getValue()).get(i2);
                }));
                UUID uuid = (UUID) transactionsToCommit.get(i);
                String str = commitTxnContext.txnIdToWriterId.get(uuid);
                if (!Strings.isNullOrEmpty(str) && commitTxnContext.writerMarks.get(str).getTransactionId().equals(uuid)) {
                    TxnWriterMark txnWriterMark = commitTxnContext.writerMarks.get(str);
                    commitTxnContext.writerMarks.put(str, new TxnWriterMark(txnWriterMark.getTimestamp(), map, txnWriterMark.getTransactionId()));
                }
            }
            return this.bucketStore.addStreamToBucketStore(BucketStore.ServiceType.WatermarkingService, commitTxnContext.scope, commitTxnContext.stream, this.executor);
        });
    }

    private CompletableFuture<List<EpochRecord>> getEpochRecords(String str, String str2, int i, OperationContext operationContext) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.streamMetadataStore.getEpoch(str, str2, i, operationContext, this.executor));
        arrayList.add(this.streamMetadataStore.getActiveEpoch(str, str2, operationContext, true, this.executor));
        return Futures.allOfWithResults(arrayList);
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Void> writeBack(CommitEvent commitEvent) {
        return this.streamTransactionMetadataTasks.writeCommitEvent(commitEvent);
    }

    private CompletableFuture<Void> resetStateConditionally(String str, String str2, VersionedMetadata<State> versionedMetadata, OperationContext operationContext) {
        return versionedMetadata.getObject().equals(State.COMMITTING_TXN) ? Futures.toVoid(this.streamMetadataStore.updateVersionedState(str, str2, State.ACTIVE, versionedMetadata, operationContext, this.executor)) : CompletableFuture.completedFuture(null);
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Boolean> hasTaskStarted(CommitEvent commitEvent) {
        return this.streamMetadataStore.getState(commitEvent.getScope(), commitEvent.getStream(), true, null, this.executor).thenApply(state -> {
            return Boolean.valueOf(state.equals(State.COMMITTING_TXN) || state.equals(State.SEALING));
        });
    }
}
