package io.pravega.controller.server.eventProcessor.requesthandlers;

import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.util.RetriesExhaustedException;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.shared.controller.event.ScaleOpEvent;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/eventProcessor/requesthandlers/ScaleOperationTask.class */
public class ScaleOperationTask implements StreamTask<ScaleOpEvent> {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(ScaleOperationTask.class);
    private final StreamMetadataTasks streamMetadataTasks;
    private final StreamMetadataStore streamMetadataStore;
    private final ScheduledExecutorService executor;

    public ScaleOperationTask(StreamMetadataTasks streamMetadataTasks, StreamMetadataStore streamMetadataStore, ScheduledExecutorService scheduledExecutorService) {
        Preconditions.checkNotNull(streamMetadataStore);
        Preconditions.checkNotNull(streamMetadataTasks);
        Preconditions.checkNotNull(scheduledExecutorService);
        this.streamMetadataTasks = streamMetadataTasks;
        this.streamMetadataStore = streamMetadataStore;
        this.executor = scheduledExecutorService;
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Void> execute(ScaleOpEvent scaleOpEvent) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        OperationContext createContext = this.streamMetadataStore.createContext(scaleOpEvent.getScope(), scaleOpEvent.getStream());
        log.info("starting scale request for {}/{} segments {} to new ranges {}", new Object[]{scaleOpEvent.getScope(), scaleOpEvent.getStream(), scaleOpEvent.getSegmentsToSeal(), scaleOpEvent.getNewRanges()});
        this.streamMetadataTasks.startScale(scaleOpEvent, scaleOpEvent.isRunOnlyIfStarted(), createContext).whenCompleteAsync((list, th) -> {
            if (th == null) {
                log.info("scale request for {}/{} segments {} to new ranges {} completed successfully.", new Object[]{scaleOpEvent.getScope(), scaleOpEvent.getStream(), scaleOpEvent.getSegmentsToSeal(), scaleOpEvent.getNewRanges()});
                completableFuture.complete(null);
                return;
            }
            Throwable unwrap = Exceptions.unwrap(th);
            if (unwrap instanceof RetriesExhaustedException) {
                unwrap = unwrap.getCause();
            }
            log.warn("processing scale request for {}/{} segments {} failed {}", new Object[]{scaleOpEvent.getScope(), scaleOpEvent.getStream(), scaleOpEvent.getSegmentsToSeal(), unwrap});
            completableFuture.completeExceptionally(unwrap);
        }, (Executor) this.executor);
        return completableFuture;
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Void> writeBack(ScaleOpEvent scaleOpEvent) {
        return this.streamMetadataTasks.writeEvent(scaleOpEvent);
    }
}
