package io.pravega.controller.server.eventProcessor.requesthandlers;

import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.concurrent.Futures;
import io.pravega.controller.server.eventProcessor.requesthandlers.TaskExceptions;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.tables.State;
import io.pravega.controller.store.stream.tables.StreamTruncationRecord;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.shared.controller.event.TruncateStreamEvent;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/eventProcessor/requesthandlers/TruncateStreamTask.class */
public class TruncateStreamTask implements StreamTask<TruncateStreamEvent> {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(TruncateStreamTask.class);
    private final StreamMetadataTasks streamMetadataTasks;
    private final StreamMetadataStore streamMetadataStore;
    private final ScheduledExecutorService executor;

    public TruncateStreamTask(StreamMetadataTasks streamMetadataTasks, StreamMetadataStore streamMetadataStore, ScheduledExecutorService scheduledExecutorService) {
        Preconditions.checkNotNull(streamMetadataStore);
        Preconditions.checkNotNull(streamMetadataTasks);
        Preconditions.checkNotNull(scheduledExecutorService);
        this.streamMetadataTasks = streamMetadataTasks;
        this.streamMetadataStore = streamMetadataStore;
        this.executor = scheduledExecutorService;
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Void> execute(TruncateStreamEvent truncateStreamEvent) {
        OperationContext createContext = this.streamMetadataStore.createContext(truncateStreamEvent.getScope(), truncateStreamEvent.getStream());
        String scope = truncateStreamEvent.getScope();
        String stream = truncateStreamEvent.getStream();
        return this.streamMetadataStore.getTruncationProperty(scope, stream, true, createContext, this.executor).thenCompose(streamProperty -> {
            if (streamProperty.isUpdating()) {
                return processTruncate(scope, stream, (StreamTruncationRecord) streamProperty.getProperty(), createContext);
            }
            throw new TaskExceptions.StartException("Truncate Stream not started yet.");
        });
    }

    private CompletableFuture<Void> processTruncate(String str, String str2, StreamTruncationRecord streamTruncationRecord, OperationContext operationContext) {
        return Futures.toVoid(this.streamMetadataStore.setState(str, str2, State.TRUNCATING, operationContext, this.executor).thenCompose(bool -> {
            return notifyTruncateSegments(str, str2, streamTruncationRecord.getStreamCut());
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r9 -> {
            return notifyDeleteSegments(str, str2, streamTruncationRecord.getToDelete());
        }).thenCompose(r10 -> {
            return this.streamMetadataStore.completeTruncation(str, str2, operationContext, this.executor);
        }).thenCompose(r11 -> {
            return this.streamMetadataStore.setState(str, str2, State.ACTIVE, operationContext, this.executor);
        }));
    }

    private CompletableFuture<Void> notifyDeleteSegments(String str, String str2, Set<Integer> set) {
        log.info("{}/{} deleting segments {}", new Object[]{str, str2, set});
        return Futures.allOf((Collection) ((Stream) set.stream().parallel()).map(num -> {
            return this.streamMetadataTasks.notifyDeleteSegment(str, str2, num.intValue());
        }).collect(Collectors.toList()));
    }

    private CompletableFuture<Void> notifyTruncateSegments(String str, String str2, Map<Integer, Long> map) {
        log.info("{}/{} truncating segments", str, str2);
        return Futures.allOf((Collection) ((Stream) map.entrySet().stream().parallel()).map(entry -> {
            return this.streamMetadataTasks.notifyTruncateSegment(str, str2, entry);
        }).collect(Collectors.toList()));
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Void> writeBack(TruncateStreamEvent truncateStreamEvent) {
        return this.streamMetadataTasks.writeEvent(truncateStreamEvent);
    }
}
