package io.pravega.controller.server.eventProcessor.requesthandlers;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.pravega.common.Exceptions;
import io.pravega.common.tracing.TagLogger;
import io.pravega.common.util.RetriesExhaustedException;
import io.pravega.controller.server.eventProcessor.requesthandlers.TaskExceptions;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.State;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.VersionedMetadata;
import io.pravega.controller.store.stream.records.EpochTransitionRecord;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.shared.controller.event.ScaleOpEvent;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/eventProcessor/requesthandlers/ScaleOperationTask.class */
public class ScaleOperationTask implements StreamTask<ScaleOpEvent> {
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(ScaleOperationTask.class));
    private final StreamMetadataTasks streamMetadataTasks;
    private final StreamMetadataStore streamMetadataStore;
    private final ScheduledExecutorService executor;

    public ScaleOperationTask(StreamMetadataTasks streamMetadataTasks, StreamMetadataStore streamMetadataStore, ScheduledExecutorService scheduledExecutorService) {
        Preconditions.checkNotNull(streamMetadataStore);
        Preconditions.checkNotNull(streamMetadataTasks);
        Preconditions.checkNotNull(scheduledExecutorService);
        this.streamMetadataTasks = streamMetadataTasks;
        this.streamMetadataStore = streamMetadataStore;
        this.executor = scheduledExecutorService;
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Void> execute(ScaleOpEvent scaleOpEvent) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        OperationContext createContext = this.streamMetadataStore.createContext(scaleOpEvent.getScope(), scaleOpEvent.getStream());
        log.info(scaleOpEvent.getRequestId(), "starting scale request for {}/{} segments {} to new ranges {}", new Object[]{scaleOpEvent.getScope(), scaleOpEvent.getStream(), scaleOpEvent.getSegmentsToSeal(), scaleOpEvent.getNewRanges()});
        runScale(scaleOpEvent, scaleOpEvent.isRunOnlyIfStarted(), createContext, this.streamMetadataTasks.retrieveDelegationToken()).whenCompleteAsync((r11, th) -> {
            if (th == null) {
                log.info(scaleOpEvent.getRequestId(), "scale request for {}/{} segments {} to new ranges {} completed successfully.", new Object[]{scaleOpEvent.getScope(), scaleOpEvent.getStream(), scaleOpEvent.getSegmentsToSeal(), scaleOpEvent.getNewRanges()});
                completableFuture.complete(null);
                return;
            }
            Throwable unwrap = Exceptions.unwrap(th);
            if (unwrap instanceof RetriesExhaustedException) {
                unwrap = unwrap.getCause();
            }
            log.warn(scaleOpEvent.getRequestId(), "processing scale request for {}/{} segments {} failed {}", new Object[]{scaleOpEvent.getScope(), scaleOpEvent.getStream(), scaleOpEvent.getSegmentsToSeal(), unwrap});
            completableFuture.completeExceptionally(unwrap);
        }, (Executor) this.executor);
        return completableFuture;
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Void> writeBack(ScaleOpEvent scaleOpEvent) {
        return this.streamMetadataTasks.writeEvent(scaleOpEvent);
    }

    @VisibleForTesting
    public CompletableFuture<Void> runScale(ScaleOpEvent scaleOpEvent, boolean z, OperationContext operationContext, String str) {
        String scope = scaleOpEvent.getScope();
        String stream = scaleOpEvent.getStream();
        long requestId = scaleOpEvent.getRequestId();
        return this.streamMetadataStore.getVersionedState(scope, stream, operationContext, this.executor).thenCompose(versionedMetadata -> {
            return this.streamMetadataStore.getEpochTransition(scope, stream, operationContext, this.executor).thenCompose(versionedMetadata -> {
                AtomicReference atomicReference = new AtomicReference(versionedMetadata);
                CompletableFuture completedFuture = CompletableFuture.completedFuture(versionedMetadata);
                if (((EpochTransitionRecord) versionedMetadata.getObject()).equals(EpochTransitionRecord.EMPTY)) {
                    if (((State) versionedMetadata.getObject()).equals(State.SCALING)) {
                        completedFuture = this.streamMetadataStore.updateVersionedState(scope, stream, State.ACTIVE, versionedMetadata, operationContext, this.executor).thenApply(versionedMetadata -> {
                            atomicReference.set(versionedMetadata);
                            return versionedMetadata;
                        });
                    }
                    completedFuture = z ? completedFuture.thenApply(versionedMetadata2 -> {
                        throw new TaskExceptions.StartException("Scale Stream not started yet.");
                    }) : completedFuture.thenCompose(versionedMetadata3 -> {
                        return this.streamMetadataStore.submitScale(scope, stream, scaleOpEvent.getSegmentsToSeal(), new ArrayList(scaleOpEvent.getNewRanges()), scaleOpEvent.getScaleTime(), versionedMetadata, operationContext, this.executor);
                    });
                }
                return completedFuture.thenCompose(versionedMetadata4 -> {
                    return processScale(scope, stream, z, versionedMetadata4, (VersionedMetadata) atomicReference.get(), operationContext, str, requestId);
                });
            });
        });
    }

    private CompletableFuture<Void> processScale(String str, String str2, boolean z, VersionedMetadata<EpochTransitionRecord> versionedMetadata, VersionedMetadata<State> versionedMetadata2, OperationContext operationContext, String str3, long j) {
        return this.streamMetadataStore.updateVersionedState(str, str2, State.SCALING, versionedMetadata2, operationContext, this.executor).thenCompose(versionedMetadata3 -> {
            return this.streamMetadataStore.startScale(str, str2, z, versionedMetadata, versionedMetadata3, operationContext, this.executor).thenCompose(versionedMetadata3 -> {
                ArrayList arrayList = new ArrayList(((EpochTransitionRecord) versionedMetadata3.getObject()).getNewSegmentsWithRange().keySet());
                ArrayList arrayList2 = new ArrayList(((EpochTransitionRecord) versionedMetadata3.getObject()).getSegmentsToSeal());
                return this.streamMetadataTasks.notifyNewSegments(str, str2, arrayList, operationContext, str3, j).thenCompose(r12 -> {
                    return this.streamMetadataStore.scaleCreateNewEpochs(str, str2, versionedMetadata3, operationContext, this.executor);
                }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) versionedMetadata3 -> {
                    return this.streamMetadataTasks.notifySealedSegments(str, str2, arrayList2, str3, j);
                }).thenCompose(r11 -> {
                    return this.streamMetadataTasks.getSealedSegmentsSize(str, str2, arrayList2, str3);
                }).thenCompose(map -> {
                    return this.streamMetadataStore.scaleSegmentsSealed(str, str2, map, versionedMetadata3, operationContext, this.executor);
                }).thenCompose(r122 -> {
                    return this.streamMetadataStore.completeScale(str, str2, versionedMetadata3, operationContext, this.executor);
                }).thenCompose(r13 -> {
                    return this.streamMetadataStore.updateVersionedState(str, str2, State.ACTIVE, versionedMetadata3, operationContext, this.executor);
                }).thenAccept(versionedMetadata4 -> {
                    log.info(j, "scale processing for {}/{} epoch {} completed.", new Object[]{str, str2, Integer.valueOf(((EpochTransitionRecord) versionedMetadata3.getObject()).getActiveEpoch())});
                });
            });
        });
    }
}
