package io.pravega.controller.server.eventProcessor.requesthandlers;

import com.google.common.base.Preconditions;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.tracing.TagLogger;
import io.pravega.controller.store.VersionedMetadata;
import io.pravega.controller.store.stream.BucketStore;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.State;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.records.EpochRecord;
import io.pravega.controller.store.stream.records.EpochTransitionRecord;
import io.pravega.controller.store.stream.records.StreamConfigurationRecord;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.shared.NameUtils;
import io.pravega.shared.controller.event.UpdateStreamEvent;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/eventProcessor/requesthandlers/UpdateStreamTask.class */
public class UpdateStreamTask implements StreamTask<UpdateStreamEvent> {
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(UpdateStreamTask.class));
    private final StreamMetadataTasks streamMetadataTasks;
    private final StreamMetadataStore streamMetadataStore;
    private final BucketStore bucketStore;
    private final ScheduledExecutorService executor;

    public UpdateStreamTask(StreamMetadataTasks streamMetadataTasks, StreamMetadataStore streamMetadataStore, BucketStore bucketStore, ScheduledExecutorService scheduledExecutorService) {
        Preconditions.checkNotNull(streamMetadataStore);
        Preconditions.checkNotNull(streamMetadataTasks);
        Preconditions.checkNotNull(bucketStore);
        Preconditions.checkNotNull(scheduledExecutorService);
        this.streamMetadataTasks = streamMetadataTasks;
        this.streamMetadataStore = streamMetadataStore;
        this.bucketStore = bucketStore;
        this.executor = scheduledExecutorService;
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Void> execute(UpdateStreamEvent updateStreamEvent) {
        String scope = updateStreamEvent.getScope();
        String stream = updateStreamEvent.getStream();
        long requestId = updateStreamEvent.getRequestId();
        OperationContext createStreamContext = this.streamMetadataStore.createStreamContext(scope, stream, requestId);
        return this.streamMetadataStore.getVersionedState(scope, stream, createStreamContext, this.executor).thenCompose(versionedMetadata -> {
            return ((State) versionedMetadata.getObject()).equals(State.SEALED) ? this.streamMetadataStore.getConfigurationRecord(scope, stream, createStreamContext, this.executor).thenCompose(versionedMetadata -> {
                return this.streamMetadataStore.completeUpdateConfiguration(scope, stream, versionedMetadata, createStreamContext, this.executor).thenAccept(r7 -> {
                    throw new UnsupportedOperationException("Cannot update a sealed stream: " + NameUtils.getScopedStreamName(scope, stream));
                });
            }) : this.streamMetadataStore.getConfigurationRecord(scope, stream, createStreamContext, this.executor).thenCompose(versionedMetadata2 -> {
                return !((StreamConfigurationRecord) versionedMetadata2.getObject()).isUpdating() ? ((State) versionedMetadata.getObject()).equals(State.UPDATING) ? Futures.toVoid(this.streamMetadataStore.updateVersionedState(scope, stream, State.ACTIVE, versionedMetadata, createStreamContext, this.executor)) : CompletableFuture.completedFuture(null) : processUpdate(scope, stream, versionedMetadata2, versionedMetadata, createStreamContext, requestId);
            });
        });
    }

    private CompletableFuture<Void> processUpdate(String str, String str2, VersionedMetadata<StreamConfigurationRecord> versionedMetadata, VersionedMetadata<State> versionedMetadata2, OperationContext operationContext, long j) {
        StreamConfigurationRecord object = versionedMetadata.getObject();
        return Futures.toVoid(this.streamMetadataStore.getEpochTransition(str, str2, operationContext, this.executor).thenCompose(versionedMetadata3 -> {
            return this.streamMetadataStore.updateVersionedState(str, str2, State.UPDATING, versionedMetadata2, operationContext, this.executor).thenCompose(versionedMetadata3 -> {
                return updateStreamForAutoStreamCut(str, str2, object, versionedMetadata3).thenCompose(r15 -> {
                    return notifyPolicyUpdate(operationContext, str, str2, object.getStreamConfiguration(), j);
                }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) bool -> {
                    return handleSegmentCountUpdates(str, str2, object, versionedMetadata3, operationContext, this.executor, j);
                }).thenCompose(r12 -> {
                    return this.streamMetadataStore.removeTagsFromIndex(str, str2, object.getRemoveTags(), operationContext, this.executor);
                }).thenCompose(r122 -> {
                    return this.streamMetadataStore.addStreamTagsToIndex(str, str2, object.getStreamConfiguration(), operationContext, this.executor);
                }).thenCompose(r123 -> {
                    return this.streamMetadataStore.completeUpdateConfiguration(str, str2, versionedMetadata, operationContext, this.executor);
                }).thenCompose(r13 -> {
                    return this.streamMetadataStore.updateVersionedState(str, str2, State.ACTIVE, versionedMetadata3, operationContext, this.executor);
                });
            });
        }));
    }

    private CompletableFuture<Void> handleSegmentCountUpdates(String str, String str2, StreamConfigurationRecord streamConfigurationRecord, VersionedMetadata<EpochTransitionRecord> versionedMetadata, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService, long j) {
        return this.streamMetadataStore.getActiveEpoch(str, str2, operationContext, true, scheduledExecutorService).thenCompose(epochRecord -> {
            ScalingPolicy scalingPolicy = streamConfigurationRecord.getStreamConfiguration().getScalingPolicy();
            int minNumSegments = scalingPolicy.getMinNumSegments();
            return ((scalingPolicy.getScaleType() != ScalingPolicy.ScaleType.FIXED_NUM_SEGMENTS || epochRecord.getSegments().size() == minNumSegments) && epochRecord.getSegments().size() >= minNumSegments) ? CompletableFuture.completedFuture(null) : processScale(str, str2, minNumSegments, versionedMetadata, epochRecord, operationContext, j);
        });
    }

    private CompletableFuture<Void> processScale(String str, String str2, int i, VersionedMetadata<EpochTransitionRecord> versionedMetadata, EpochRecord epochRecord, OperationContext operationContext, long j) {
        double d = 1.0d / i;
        List list = (List) IntStream.range(0, i).boxed().map(num -> {
            return new AbstractMap.SimpleEntry(Double.valueOf(num.intValue() * d), Double.valueOf((num.intValue() + 1) * d));
        }).collect(Collectors.toList());
        log.debug("{} Scaling stream to update minimum number of segments to {}", Long.valueOf(j), Integer.valueOf(i));
        return this.streamMetadataStore.resetEpochTransition(str, str2, versionedMetadata, operationContext, this.executor).thenCompose(versionedMetadata2 -> {
            return this.streamMetadataStore.submitScale(str, str2, new ArrayList(epochRecord.getSegmentIds()), list, System.currentTimeMillis(), versionedMetadata2, operationContext, this.executor);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) versionedMetadata3 -> {
            return this.streamMetadataTasks.processScale(str, str2, versionedMetadata3, operationContext, j, this.streamMetadataStore).thenAccept(r12 -> {
                log.info("{} Stream scaled to epoch {} to update minimum number of segments to {}", new Object[]{Long.valueOf(j), Integer.valueOf(((EpochTransitionRecord) versionedMetadata3.getObject()).getActiveEpoch()), Integer.valueOf(i)});
            });
        });
    }

    private CompletableFuture<Void> updateStreamForAutoStreamCut(String str, String str2, StreamConfigurationRecord streamConfigurationRecord, VersionedMetadata<State> versionedMetadata) {
        return streamConfigurationRecord.getStreamConfiguration().getRetentionPolicy() != null ? this.bucketStore.addStreamToBucketStore(BucketStore.ServiceType.RetentionService, str, str2, this.executor) : this.bucketStore.removeStreamFromBucketStore(BucketStore.ServiceType.RetentionService, str, str2, this.executor);
    }

    private CompletableFuture<Boolean> notifyPolicyUpdate(OperationContext operationContext, String str, String str2, StreamConfiguration streamConfiguration, long j) {
        return this.streamMetadataStore.getActiveSegments(str, str2, operationContext, this.executor).thenCompose(list -> {
            return this.streamMetadataTasks.notifyPolicyUpdates(str, str2, list, streamConfiguration.getScalingPolicy(), this.streamMetadataTasks.retrieveDelegationToken(), j);
        }).handle((BiFunction<? super U, Throwable, ? extends U>) (r4, th) -> {
            if (th == null) {
                return true;
            }
            throw new CompletionException(th);
        });
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Void> writeBack(UpdateStreamEvent updateStreamEvent) {
        return this.streamMetadataTasks.writeEvent(updateStreamEvent);
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.StreamTask
    public CompletableFuture<Boolean> hasTaskStarted(UpdateStreamEvent updateStreamEvent) {
        return this.streamMetadataStore.getState(updateStreamEvent.getScope(), updateStreamEvent.getStream(), true, null, this.executor).thenApply(state -> {
            return Boolean.valueOf(state.equals(State.UPDATING));
        });
    }
}
