package io.pravega.controller.server.eventProcessor.requesthandlers;

import com.google.common.base.Preconditions;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.tracing.TagLogger;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.State;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.TxnStatus;
import io.pravega.controller.store.stream.records.ActiveTxnRecord;
import io.pravega.controller.store.stream.records.StreamSegmentRecord;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.controller.task.Stream.StreamTransactionMetadataTasks;
import io.pravega.shared.controller.event.SealStreamEvent;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/eventProcessor/requesthandlers/SealStreamTask.class */
public class SealStreamTask implements StreamTask<SealStreamEvent> {
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(SealStreamTask.class));
    private final StreamMetadataTasks streamMetadataTasks;
    private final StreamTransactionMetadataTasks streamTransactionMetadataTasks;
    private final StreamMetadataStore streamMetadataStore;
    private final ScheduledExecutorService executor;

    public SealStreamTask(StreamMetadataTasks streamMetadataTasks, StreamTransactionMetadataTasks streamTransactionMetadataTasks, StreamMetadataStore streamMetadataStore, ScheduledExecutorService scheduledExecutorService) {
        Preconditions.checkNotNull(streamMetadataStore);
        Preconditions.checkNotNull(streamMetadataTasks);
        Preconditions.checkNotNull(streamTransactionMetadataTasks);
        Preconditions.checkNotNull(scheduledExecutorService);
        this.streamMetadataTasks = streamMetadataTasks;
        this.streamTransactionMetadataTasks = streamTransactionMetadataTasks;
        this.streamMetadataStore = streamMetadataStore;
        this.executor = scheduledExecutorService;
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Void> execute(SealStreamEvent sealStreamEvent) {
        String scope = sealStreamEvent.getScope();
        String stream = sealStreamEvent.getStream();
        long requestId = sealStreamEvent.getRequestId();
        OperationContext createContext = this.streamMetadataStore.createContext(scope, stream);
        return this.streamMetadataStore.getState(scope, stream, true, createContext, this.executor).thenAccept(state -> {
            if (!state.equals(State.SEALING) && !state.equals(State.SEALED)) {
                throw new IllegalStateException("Seal stream not started.");
            }
        }).thenCompose(r13 -> {
            return abortTransaction(createContext, scope, stream, requestId).thenAccept(bool -> {
                if (bool.booleanValue()) {
                    return;
                }
                log.debug(requestId, "Found open transactions on stream {}/{}. Postponing its sealing.", new Object[]{scope, stream});
                throw StoreException.create(StoreException.Type.OPERATION_NOT_ALLOWED, "Found ongoing transactions. Abort transaction requested.Sealing stream segments should wait until transactions are aborted.");
            });
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r10 -> {
            return this.streamMetadataStore.getActiveSegments(scope, stream, createContext, this.executor);
        }).thenCompose(list -> {
            return list.isEmpty() ? CompletableFuture.completedFuture(null) : notifySealed(scope, stream, createContext, list, requestId);
        });
    }

    private CompletableFuture<Boolean> abortTransaction(OperationContext operationContext, String str, String str2, long j) {
        return this.streamMetadataStore.getActiveTxns(str, str2, operationContext, this.executor).thenCompose(map -> {
            return (map == null || map.isEmpty()) ? CompletableFuture.completedFuture(true) : Futures.allOf((Collection) map.entrySet().stream().map(entry -> {
                TxnStatus txnStatus = ((ActiveTxnRecord) entry.getValue()).getTxnStatus();
                return (txnStatus.equals(TxnStatus.OPEN) || txnStatus.equals(TxnStatus.ABORTING)) ? Futures.toVoid(this.streamTransactionMetadataTasks.abortTxn(str, str2, (UUID) entry.getKey(), null, operationContext).exceptionally(th -> {
                    Throwable unwrap = Exceptions.unwrap(th);
                    if ((unwrap instanceof StoreException.IllegalStateException) || (unwrap instanceof StoreException.WriteConflictException) || (unwrap instanceof StoreException.DataNotFoundException)) {
                        log.debug(j, "A known exception thrown during seal stream while trying to abort transaction on stream {}/{}", new Object[]{str, str2, unwrap});
                        return null;
                    }
                    log.warn(j, "Exception thrown during seal stream while trying to abort transaction on stream {}/{}", new Object[]{str, str2, unwrap});
                    return null;
                })) : CompletableFuture.completedFuture(null);
            }).collect(Collectors.toList())).thenApply(r2 -> {
                return false;
            });
        });
    }

    private CompletionStage<Void> notifySealed(String str, String str2, OperationContext operationContext, List<StreamSegmentRecord> list, long j) {
        List<Long> list2 = (List) list.stream().map((v0) -> {
            return v0.segmentId();
        }).collect(Collectors.toList());
        log.debug(j, "Sending notification to segment store to seal segments for stream {}/{}", new Object[]{str, str2});
        return this.streamMetadataTasks.notifySealedSegments(str, str2, list2, this.streamMetadataTasks.retrieveDelegationToken(), j).thenCompose(r9 -> {
            return setSealed(str, str2, operationContext);
        });
    }

    private CompletableFuture<Void> setSealed(String str, String str2, OperationContext operationContext) {
        return this.streamMetadataStore.setSealed(str, str2, operationContext, this.executor);
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Void> writeBack(SealStreamEvent sealStreamEvent) {
        return this.streamMetadataTasks.writeEvent(sealStreamEvent);
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Boolean> hasTaskStarted(SealStreamEvent sealStreamEvent) {
        return this.streamMetadataStore.getState(sealStreamEvent.getScope(), sealStreamEvent.getStream(), true, null, this.executor).thenApply(state -> {
            return Boolean.valueOf(state.equals(State.SEALING));
        });
    }
}
