package io.pravega.controller.server.eventProcessor.requesthandlers;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.tracing.TagLogger;
import io.pravega.controller.eventProcessor.impl.EventProcessorHelper;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.records.StreamSegmentRecord;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.shared.NameUtils;
import io.pravega.shared.controller.event.AutoScaleEvent;
import io.pravega.shared.controller.event.ScaleOpEvent;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/eventProcessor/requesthandlers/AutoScaleTask.class */
public class AutoScaleTask {
    private static final TagLogger log;
    private static final long REQUEST_VALIDITY_PERIOD;
    private final StreamMetadataTasks streamMetadataTasks;
    private final StreamMetadataStore streamMetadataStore;
    private final ScheduledExecutorService executor;
    static final /* synthetic */ boolean $assertionsDisabled;

    public AutoScaleTask(StreamMetadataTasks streamMetadataTasks, StreamMetadataStore streamMetadataStore, ScheduledExecutorService scheduledExecutorService) {
        Preconditions.checkNotNull(streamMetadataStore);
        Preconditions.checkNotNull(streamMetadataTasks);
        Preconditions.checkNotNull(scheduledExecutorService);
        this.streamMetadataTasks = streamMetadataTasks;
        this.streamMetadataStore = streamMetadataStore;
        this.executor = scheduledExecutorService;
    }

    public CompletableFuture<Void> execute(AutoScaleEvent autoScaleEvent) {
        if (autoScaleEvent.getTimestamp() + REQUEST_VALIDITY_PERIOD <= System.currentTimeMillis()) {
            log.info(autoScaleEvent.getRequestId(), "Scale Request for stream {}/{} expired", new Object[]{autoScaleEvent.getScope(), autoScaleEvent.getStream()});
            return CompletableFuture.completedFuture(null);
        }
        OperationContext createStreamContext = this.streamMetadataStore.createStreamContext(autoScaleEvent.getScope(), autoScaleEvent.getStream(), autoScaleEvent.getRequestId());
        return EventProcessorHelper.withRetries(() -> {
            CompletableFuture<U> thenApply = this.streamMetadataStore.getConfiguration(autoScaleEvent.getScope(), autoScaleEvent.getStream(), createStreamContext, this.executor).thenApply((v0) -> {
                return v0.getScalingPolicy();
            });
            return autoScaleEvent.getDirection() == 0 ? thenApply.thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) scalingPolicy -> {
                return processScaleUp(autoScaleEvent, scalingPolicy, createStreamContext);
            }, (Executor) this.executor) : thenApply.thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) scalingPolicy2 -> {
                return processScaleDown(autoScaleEvent, scalingPolicy2, createStreamContext);
            }, (Executor) this.executor);
        }, this.executor);
    }

    private CompletableFuture<Void> processScaleUp(AutoScaleEvent autoScaleEvent, ScalingPolicy scalingPolicy, OperationContext operationContext) {
        log.info(autoScaleEvent.getRequestId(), "Scale up request received for stream segment {}", new Object[]{NameUtils.getQualifiedStreamSegmentName(autoScaleEvent.getScope(), autoScaleEvent.getStream(), autoScaleEvent.getSegmentId())});
        return scalingPolicy.getScaleType().equals(ScalingPolicy.ScaleType.FIXED_NUM_SEGMENTS) ? CompletableFuture.completedFuture(null) : this.streamMetadataStore.getSegment(autoScaleEvent.getScope(), autoScaleEvent.getStream(), autoScaleEvent.getSegmentId(), operationContext, this.executor).thenComposeAsync(streamSegmentRecord -> {
            int min = Math.min(Math.max(2, autoScaleEvent.getNumOfSplits()), Math.max(2, scalingPolicy.getScaleFactor()));
            double keyEnd = (streamSegmentRecord.getKeyEnd() - streamSegmentRecord.getKeyStart()) / min;
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < min - 1; i++) {
                arrayList.add(new AbstractMap.SimpleEntry(Double.valueOf(streamSegmentRecord.getKeyStart() + (keyEnd * i)), Double.valueOf(streamSegmentRecord.getKeyStart() + (keyEnd * (i + 1)))));
            }
            arrayList.add(new AbstractMap.SimpleEntry(Double.valueOf(streamSegmentRecord.getKeyStart() + (keyEnd * (min - 1))), Double.valueOf(streamSegmentRecord.getKeyEnd())));
            return postScaleRequest(autoScaleEvent, Lists.newArrayList(new Long[]{Long.valueOf(autoScaleEvent.getSegmentId())}), arrayList, autoScaleEvent.getRequestId());
        }, (Executor) this.executor);
    }

    private CompletableFuture<Void> processScaleDown(AutoScaleEvent autoScaleEvent, ScalingPolicy scalingPolicy, OperationContext operationContext) {
        log.info(autoScaleEvent.getRequestId(), "Scale down request received for stream segment {}", new Object[]{NameUtils.getQualifiedStreamSegmentName(autoScaleEvent.getScope(), autoScaleEvent.getStream(), autoScaleEvent.getSegmentId())});
        if (scalingPolicy.getScaleType().equals(ScalingPolicy.ScaleType.FIXED_NUM_SEGMENTS)) {
            return CompletableFuture.completedFuture(null);
        }
        return this.streamMetadataStore.markCold(autoScaleEvent.getScope(), autoScaleEvent.getStream(), autoScaleEvent.getSegmentId(), autoScaleEvent.isSilent() ? Long.MAX_VALUE : autoScaleEvent.getTimestamp() + REQUEST_VALIDITY_PERIOD, operationContext, this.executor).thenCompose(r9 -> {
            return this.streamMetadataStore.getActiveSegments(autoScaleEvent.getScope(), autoScaleEvent.getStream(), operationContext, this.executor);
        }).thenApply((Function<? super U, ? extends U>) list -> {
            if (!$assertionsDisabled && list == null) {
                throw new AssertionError();
            }
            Optional findAny = list.stream().filter(streamSegmentRecord -> {
                return streamSegmentRecord.segmentId() == autoScaleEvent.getSegmentId();
            }).findAny();
            if (!findAny.isPresent() || list.size() == scalingPolicy.getMinNumSegments()) {
                return null;
            }
            return new ImmutablePair((List) list.stream().filter(streamSegmentRecord2 -> {
                return streamSegmentRecord2.getKeyEnd() == ((StreamSegmentRecord) findAny.get()).getKeyStart() || streamSegmentRecord2.getKeyStart() == ((StreamSegmentRecord) findAny.get()).getKeyEnd() || streamSegmentRecord2.segmentId() == autoScaleEvent.getSegmentId();
            }).sorted(Comparator.comparingDouble((v0) -> {
                return v0.getKeyStart();
            })).collect(Collectors.toList()), Integer.valueOf(list.size() - scalingPolicy.getMinNumSegments()));
        }).thenCompose(immutablePair -> {
            if (immutablePair == null || ((List) immutablePair.getLeft()).size() <= 1) {
                return CompletableFuture.completedFuture(null);
            }
            List list2 = (List) immutablePair.getLeft();
            int intValue = ((Integer) immutablePair.getRight()).intValue();
            return Futures.filter(list2, streamSegmentRecord -> {
                return this.streamMetadataStore.isCold(autoScaleEvent.getScope(), autoScaleEvent.getStream(), streamSegmentRecord.segmentId(), operationContext, this.executor);
            }).thenApply(list3 -> {
                return (intValue == 1 && list3.size() == 3) ? Lists.newArrayList(new StreamSegmentRecord[]{(StreamSegmentRecord) list3.get(0), (StreamSegmentRecord) list3.get(1)}) : list3;
            });
        }).thenCompose(list2 -> {
            if (list2 == null || list2.size() <= 1) {
                return CompletableFuture.completedFuture(null);
            }
            list2.forEach(streamSegmentRecord -> {
                log.debug(autoScaleEvent.getRequestId(), "Merging stream segment {} ", new Object[]{NameUtils.getQualifiedStreamSegmentName(autoScaleEvent.getScope(), autoScaleEvent.getStream(), streamSegmentRecord.segmentId())});
            });
            ArrayList arrayList = new ArrayList();
            arrayList.add(new AbstractMap.SimpleEntry(Double.valueOf(list2.stream().mapToDouble((v0) -> {
                return v0.getKeyStart();
            }).min().getAsDouble()), Double.valueOf(list2.stream().mapToDouble((v0) -> {
                return v0.getKeyEnd();
            }).max().getAsDouble())));
            ArrayList arrayList2 = new ArrayList();
            list2.forEach(streamSegmentRecord2 -> {
                arrayList2.add(Long.valueOf(streamSegmentRecord2.segmentId()));
            });
            return postScaleRequest(autoScaleEvent, arrayList2, arrayList, autoScaleEvent.getRequestId());
        });
    }

    private CompletableFuture<Void> postScaleRequest(AutoScaleEvent autoScaleEvent, List<Long> list, List<Map.Entry<Double, Double>> list2, long j) {
        return this.streamMetadataTasks.writeEvent(new ScaleOpEvent(autoScaleEvent.getScope(), autoScaleEvent.getStream(), list, list2, false, System.currentTimeMillis(), j));
    }

    static {
        $assertionsDisabled = !AutoScaleTask.class.desiredAssertionStatus();
        log = new TagLogger(LoggerFactory.getLogger(AutoScaleTask.class));
        REQUEST_VALIDITY_PERIOD = Duration.ofMinutes(10L).toMillis();
    }
}
