package io.pravega.controller.server.eventProcessor.requesthandlers;

import com.google.common.base.Preconditions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.tracing.TagLogger;
import io.pravega.controller.store.VersionedMetadata;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.State;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.records.StreamTruncationRecord;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.shared.MetricsTags;
import io.pravega.shared.NameUtils;
import io.pravega.shared.controller.event.TruncateStreamEvent;
import io.pravega.shared.metrics.DynamicLogger;
import io.pravega.shared.metrics.MetricsProvider;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/eventProcessor/requesthandlers/TruncateStreamTask.class */
public class TruncateStreamTask implements StreamTask<TruncateStreamEvent> {
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(TruncateStreamTask.class));
    private static final DynamicLogger DYNAMIC_LOGGER = MetricsProvider.getDynamicLogger();
    private final StreamMetadataTasks streamMetadataTasks;
    private final StreamMetadataStore streamMetadataStore;
    private final ScheduledExecutorService executor;

    public TruncateStreamTask(StreamMetadataTasks streamMetadataTasks, StreamMetadataStore streamMetadataStore, ScheduledExecutorService scheduledExecutorService) {
        Preconditions.checkNotNull(streamMetadataStore);
        Preconditions.checkNotNull(streamMetadataTasks);
        Preconditions.checkNotNull(scheduledExecutorService);
        this.streamMetadataTasks = streamMetadataTasks;
        this.streamMetadataStore = streamMetadataStore;
        this.executor = scheduledExecutorService;
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Void> execute(TruncateStreamEvent truncateStreamEvent) {
        String scope = truncateStreamEvent.getScope();
        String stream = truncateStreamEvent.getStream();
        long requestId = truncateStreamEvent.getRequestId();
        OperationContext createStreamContext = this.streamMetadataStore.createStreamContext(scope, stream, requestId);
        return this.streamMetadataStore.getVersionedState(scope, stream, createStreamContext, this.executor).thenCompose(versionedMetadata -> {
            return ((State) versionedMetadata.getObject()).equals(State.SEALED) ? this.streamMetadataStore.getTruncationRecord(scope, stream, createStreamContext, this.executor).thenCompose(versionedMetadata -> {
                return this.streamMetadataStore.completeTruncation(scope, stream, versionedMetadata, createStreamContext, this.executor).thenAccept(r7 -> {
                    throw new UnsupportedOperationException("Cannot truncate a sealed stream: " + NameUtils.getScopedStreamName(scope, stream));
                });
            }) : this.streamMetadataStore.getTruncationRecord(scope, stream, createStreamContext, this.executor).thenCompose(versionedMetadata2 -> {
                return !((StreamTruncationRecord) versionedMetadata2.getObject()).isUpdating() ? ((State) versionedMetadata.getObject()).equals(State.TRUNCATING) ? Futures.toVoid(this.streamMetadataStore.updateVersionedState(scope, stream, State.ACTIVE, versionedMetadata, createStreamContext, this.executor)) : CompletableFuture.completedFuture(null) : processTruncate(scope, stream, versionedMetadata2, versionedMetadata, createStreamContext, requestId);
            });
        });
    }

    private CompletableFuture<Void> processTruncate(String str, String str2, VersionedMetadata<StreamTruncationRecord> versionedMetadata, VersionedMetadata<State> versionedMetadata2, OperationContext operationContext, long j) {
        String retrieveDelegationToken = this.streamMetadataTasks.retrieveDelegationToken();
        StreamTruncationRecord object = versionedMetadata.getObject();
        log.info(j, "Truncating stream {}/{} at stream cut: {}", new Object[]{str, str2, object.getStreamCut()});
        return Futures.toVoid(this.streamMetadataStore.updateVersionedState(str, str2, State.TRUNCATING, versionedMetadata2, operationContext, this.executor).thenCompose(versionedMetadata3 -> {
            return notifyTruncateSegments(str, str2, object.getStreamCut(), retrieveDelegationToken, j).thenCompose(r15 -> {
                return notifyDeleteSegments(str, str2, object.getToDelete(), retrieveDelegationToken, j);
            }).thenAccept((Consumer<? super U>) r9 -> {
                DYNAMIC_LOGGER.reportGaugeValue("pravega.controller.retention.truncated_size", Long.valueOf(((StreamTruncationRecord) versionedMetadata.getObject()).getSizeTill()), MetricsTags.streamTags(str, str2));
            }).thenCompose(r12 -> {
                return this.streamMetadataStore.completeTruncation(str, str2, versionedMetadata, operationContext, this.executor);
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r13 -> {
                return this.streamMetadataStore.updateVersionedState(str, str2, State.ACTIVE, versionedMetadata3, operationContext, this.executor);
            });
        }));
    }

    private CompletableFuture<Void> notifyDeleteSegments(String str, String str2, Set<Long> set, String str3, long j) {
        log.debug(j, "{}/{} deleting segments {}", new Object[]{str, str2, set});
        return Futures.allOf((Collection) ((Stream) set.stream().parallel()).map(l -> {
            return this.streamMetadataTasks.notifyDeleteSegment(str, str2, l.longValue(), str3, j);
        }).collect(Collectors.toList()));
    }

    private CompletableFuture<Void> notifyTruncateSegments(String str, String str2, Map<Long, Long> map, String str3, long j) {
        log.debug(j, "{}/{} truncating segments", new Object[]{str, str2});
        return Futures.allOf((Collection) ((Stream) map.entrySet().stream().parallel()).map(entry -> {
            return this.streamMetadataTasks.notifyTruncateSegment(str, str2, entry, str3, j);
        }).collect(Collectors.toList()));
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Void> writeBack(TruncateStreamEvent truncateStreamEvent) {
        return this.streamMetadataTasks.writeEvent(truncateStreamEvent);
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Boolean> hasTaskStarted(TruncateStreamEvent truncateStreamEvent) {
        return this.streamMetadataStore.getState(truncateStreamEvent.getScope(), truncateStreamEvent.getStream(), true, null, this.executor).thenApply(state -> {
            return Boolean.valueOf(state.equals(State.TRUNCATING));
        });
    }
}
