package io.pravega.controller.server.eventProcessor.requesthandlers;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.controller.store.stream.BucketStore;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.State;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.VersionedMetadata;
import io.pravega.controller.store.stream.records.CommittingTransactionsRecord;
import io.pravega.controller.store.stream.records.EpochRecord;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.controller.task.Stream.StreamTransactionMetadataTasks;
import io.pravega.shared.controller.event.CommitEvent;
import io.pravega.shared.segment.StreamSegmentNameUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/eventProcessor/requesthandlers/CommitRequestHandler.class */
public class CommitRequestHandler extends AbstractRequestProcessor<CommitEvent> implements StreamTask<CommitEvent> {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(CommitRequestHandler.class);
    private final StreamMetadataTasks streamMetadataTasks;
    private final StreamTransactionMetadataTasks streamTransactionMetadataTasks;
    private final BucketStore bucketStore;
    private final ScheduledExecutorService executor;
    private final BlockingQueue<CommitEvent> processedEvents;

    public CommitRequestHandler(StreamMetadataStore streamMetadataStore, StreamMetadataTasks streamMetadataTasks, StreamTransactionMetadataTasks streamTransactionMetadataTasks, BucketStore bucketStore, ScheduledExecutorService scheduledExecutorService) {
        this(streamMetadataStore, streamMetadataTasks, streamTransactionMetadataTasks, bucketStore, scheduledExecutorService, null);
    }

    @VisibleForTesting
    public CommitRequestHandler(StreamMetadataStore streamMetadataStore, StreamMetadataTasks streamMetadataTasks, StreamTransactionMetadataTasks streamTransactionMetadataTasks, BucketStore bucketStore, ScheduledExecutorService scheduledExecutorService, BlockingQueue<CommitEvent> blockingQueue) {
        super(streamMetadataStore, scheduledExecutorService);
        this.bucketStore = bucketStore;
        Preconditions.checkNotNull(streamMetadataStore);
        Preconditions.checkNotNull(streamMetadataTasks);
        Preconditions.checkNotNull(scheduledExecutorService);
        this.streamMetadataTasks = streamMetadataTasks;
        this.streamTransactionMetadataTasks = streamTransactionMetadataTasks;
        this.executor = scheduledExecutorService;
        this.processedEvents = blockingQueue;
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.AbstractRequestProcessor
    public CompletableFuture<Void> processCommitTxnRequest(CommitEvent commitEvent) {
        return withCompletion(this, commitEvent, commitEvent.getScope(), commitEvent.getStream(), OPERATION_NOT_ALLOWED_PREDICATE);
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Void> execute(CommitEvent commitEvent) {
        String scope = commitEvent.getScope();
        String stream = commitEvent.getStream();
        OperationContext createContext = this.streamMetadataStore.createContext(scope, stream);
        log.debug("Attempting to commit available transactions on stream {}/{}", commitEvent.getScope(), commitEvent.getStream());
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        tryCommitTransactions(scope, stream, createContext).whenComplete((num, th) -> {
            if (th != null) {
                Throwable unwrap = Exceptions.unwrap(th);
                if (unwrap instanceof StoreException.OperationNotAllowedException) {
                    log.debug("Cannot commit transaction on stream {}/{}. Postponing", scope, stream);
                } else {
                    log.error("Exception while attempting to commit transaction on stream {}/{}", new Object[]{scope, stream, th});
                }
                completableFuture.completeExceptionally(unwrap);
                return;
            }
            if (num.intValue() >= 0) {
                log.info("Successfully committed transactions on epoch {} on stream {}/{}", new Object[]{num, scope, stream});
            } else {
                log.info("No transactions found in committing state on stream {}/{}", new Object[]{num, scope, stream});
            }
            if (this.processedEvents != null) {
                try {
                    this.processedEvents.offer(commitEvent);
                } catch (Exception e) {
                }
            }
            completableFuture.complete(null);
        });
        return completableFuture;
    }

    private CompletableFuture<Integer> tryCommitTransactions(String str, String str2, OperationContext operationContext) {
        return this.streamMetadataStore.getVersionedState(str, str2, operationContext, this.executor).thenComposeAsync(versionedMetadata -> {
            AtomicReference atomicReference = new AtomicReference(versionedMetadata);
            return this.streamMetadataStore.startCommitTransactions(str, str2, operationContext, this.executor).thenComposeAsync(versionedMetadata -> {
                CompletableFuture<Void> thenAccept;
                if (((CommittingTransactionsRecord) versionedMetadata.getObject()).equals(CommittingTransactionsRecord.EMPTY)) {
                    return CompletableFuture.completedFuture(versionedMetadata);
                }
                int epoch = ((CommittingTransactionsRecord) versionedMetadata.getObject()).getEpoch();
                ImmutableList<UUID> transactionsToCommit = ((CommittingTransactionsRecord) versionedMetadata.getObject()).getTransactionsToCommit();
                log.info("Committing {} transactions on epoch {} on stream {}/{}", new Object[]{transactionsToCommit, Integer.valueOf(epoch), str, str2});
                if (((State) versionedMetadata.getObject()).equals(State.SEALING)) {
                    thenAccept = CompletableFuture.completedFuture(null);
                } else {
                    CompletableFuture<VersionedMetadata<State>> updateVersionedState = this.streamMetadataStore.updateVersionedState(str, str2, State.COMMITTING_TXN, versionedMetadata, operationContext, this.executor);
                    atomicReference.getClass();
                    thenAccept = updateVersionedState.thenAccept((v1) -> {
                        r1.set(v1);
                    });
                }
                return thenAccept.thenCompose(r16 -> {
                    return getEpochRecords(str, str2, epoch, operationContext).thenCompose(list -> {
                        EpochRecord epochRecord = (EpochRecord) list.get(0);
                        EpochRecord epochRecord2 = (EpochRecord) list.get(1);
                        return (epochRecord2.getEpoch() == epoch || epochRecord2.getReferenceEpoch() == epochRecord.getReferenceEpoch()) ? commitTransactions(str, str2, new ArrayList(epochRecord2.getSegmentIds()), transactionsToCommit, operationContext).thenApply(r3 -> {
                            return versionedMetadata;
                        }) : rollTransactions(str, str2, epochRecord, epochRecord2, versionedMetadata, operationContext);
                    });
                });
            }, (Executor) this.executor).thenCompose((Function<? super U, ? extends CompletionStage<U>>) versionedMetadata2 -> {
                return this.streamMetadataStore.completeCommitTransactions(str, str2, versionedMetadata2, operationContext, this.executor).thenCompose(r11 -> {
                    return resetStateConditionally(str, str2, (VersionedMetadata) atomicReference.get(), operationContext);
                }).thenApply((Function<? super U, ? extends U>) r3 -> {
                    return Integer.valueOf(((CommittingTransactionsRecord) versionedMetadata2.getObject()).getEpoch());
                });
            });
        }, (Executor) this.executor);
    }

    private CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> rollTransactions(String str, String str2, EpochRecord epochRecord, EpochRecord epochRecord2, VersionedMetadata<CommittingTransactionsRecord> versionedMetadata, OperationContext operationContext) {
        CompletableFuture completedFuture = CompletableFuture.completedFuture(versionedMetadata);
        if (!versionedMetadata.getObject().isRollingTxnRecord()) {
            completedFuture = completedFuture.thenCompose(versionedMetadata2 -> {
                return this.streamMetadataStore.startRollingTxn(str, str2, epochRecord2.getEpoch(), versionedMetadata, operationContext, this.executor);
            });
        }
        return completedFuture.thenCompose(versionedMetadata3 -> {
            return epochRecord2.getEpoch() > ((CommittingTransactionsRecord) versionedMetadata3.getObject()).getCurrentEpoch() ? CompletableFuture.completedFuture(versionedMetadata3) : runRollingTxn(str, str2, epochRecord, epochRecord2, versionedMetadata3, operationContext).thenApply(r3 -> {
                return versionedMetadata3;
            });
        });
    }

    private CompletableFuture<Void> runRollingTxn(String str, String str2, EpochRecord epochRecord, EpochRecord epochRecord2, VersionedMetadata<CommittingTransactionsRecord> versionedMetadata, OperationContext operationContext) {
        String retrieveDelegationToken = this.streamMetadataTasks.retrieveDelegationToken();
        long currentTimeMillis = System.currentTimeMillis();
        int newTxnEpoch = versionedMetadata.getObject().getNewTxnEpoch();
        int newActiveEpoch = versionedMetadata.getObject().getNewActiveEpoch();
        List<Long> list = (List) epochRecord.getSegments().stream().map(streamSegmentRecord -> {
            return Long.valueOf(StreamSegmentNameUtils.computeSegmentId(streamSegmentRecord.getSegmentNumber(), newTxnEpoch));
        }).collect(Collectors.toList());
        ArrayList arrayList = new ArrayList(epochRecord2.getSegmentIds());
        List list2 = (List) epochRecord2.getSegments().stream().map(streamSegmentRecord2 -> {
            return Long.valueOf(StreamSegmentNameUtils.computeSegmentId(streamSegmentRecord2.getSegmentNumber(), newActiveEpoch));
        }).collect(Collectors.toList());
        return copyTxnEpochSegmentsAndCommitTxns(str, str2, versionedMetadata.getObject().getTransactionsToCommit(), list, operationContext).thenCompose(r13 -> {
            return this.streamMetadataTasks.notifyNewSegments(str, str2, list2, operationContext, retrieveDelegationToken);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r11 -> {
            return this.streamMetadataTasks.getSealedSegmentsSize(str, str2, list, retrieveDelegationToken);
        }).thenCompose(map -> {
            log.info("Rolling transaction, created duplicate of active epoch {} for stream {}/{}", new Object[]{epochRecord2, str, str2});
            return this.streamMetadataStore.rollingTxnCreateDuplicateEpochs(str, str2, map, currentTimeMillis, versionedMetadata, operationContext, this.executor);
        }).thenCompose(r16 -> {
            return this.streamMetadataTasks.notifySealedSegments(str, str2, arrayList, retrieveDelegationToken).thenCompose(r112 -> {
                return this.streamMetadataTasks.getSealedSegmentsSize(str, str2, arrayList, retrieveDelegationToken);
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) map2 -> {
                log.info("Rolling transaction, sealed active epoch {} for stream {}/{}", new Object[]{epochRecord2, str, str2});
                return this.streamMetadataStore.completeRollingTxn(str, str2, map2, versionedMetadata, operationContext, this.executor);
            });
        });
    }

    private CompletableFuture<Void> copyTxnEpochSegmentsAndCommitTxns(String str, String str2, List<UUID> list, List<Long> list2, OperationContext operationContext) {
        String retrieveDelegationToken = this.streamMetadataTasks.retrieveDelegationToken();
        return Futures.allOf((Collection) list2.stream().map(l -> {
            return this.streamMetadataTasks.notifyNewSegment(str, str2, l.longValue(), ScalingPolicy.fixed(1), retrieveDelegationToken);
        }).collect(Collectors.toList())).thenCompose(r13 -> {
            log.info("Rolling transaction, successfully created duplicate txn epoch {} for stream {}/{}", new Object[]{list2, str, str2});
            return commitTransactions(str, str2, list2, list, operationContext);
        }).thenCompose(r11 -> {
            return this.streamMetadataTasks.notifySealedSegments(str, str2, list2, retrieveDelegationToken);
        });
    }

    private CompletableFuture<Void> commitTransactions(String str, String str2, List<Long> list, List<UUID> list2, OperationContext operationContext) {
        CompletableFuture completedFuture = CompletableFuture.completedFuture(null);
        for (UUID uuid : list2) {
            log.info("Committing transaction {} on stream {}/{}", new Object[]{uuid, str, str2});
            completedFuture = completedFuture.thenCompose(r11 -> {
                return this.streamMetadataTasks.notifyTxnCommit(str, str2, (List<Long>) list, uuid);
            }).thenCompose(r9 -> {
                return this.streamMetadataTasks.getCurrentSegmentSizes(str, str2, list);
            }).thenCompose(map -> {
                return this.streamMetadataStore.recordCommitOffsets(str, str2, uuid, map, operationContext, this.executor);
            });
        }
        return completedFuture.thenCompose(r92 -> {
            return this.bucketStore.addStreamToBucketStore(BucketStore.ServiceType.WatermarkingService, str, str2, this.executor);
        });
    }

    private CompletableFuture<List<EpochRecord>> getEpochRecords(String str, String str2, int i, OperationContext operationContext) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.streamMetadataStore.getEpoch(str, str2, i, operationContext, this.executor));
        arrayList.add(this.streamMetadataStore.getActiveEpoch(str, str2, operationContext, true, this.executor));
        return Futures.allOfWithResults(arrayList);
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Void> writeBack(CommitEvent commitEvent) {
        return this.streamTransactionMetadataTasks.writeCommitEvent(commitEvent);
    }

    private CompletableFuture<Void> resetStateConditionally(String str, String str2, VersionedMetadata<State> versionedMetadata, OperationContext operationContext) {
        return versionedMetadata.getObject().equals(State.COMMITTING_TXN) ? Futures.toVoid(this.streamMetadataStore.updateVersionedState(str, str2, State.ACTIVE, versionedMetadata, operationContext, this.executor)) : CompletableFuture.completedFuture(null);
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Boolean> hasTaskStarted(CommitEvent commitEvent) {
        return this.streamMetadataStore.getState(commitEvent.getScope(), commitEvent.getStream(), true, null, this.executor).thenApply(state -> {
            return Boolean.valueOf(state.equals(State.COMMITTING_TXN) || state.equals(State.SEALING));
        });
    }
}
