package io.pravega.controller.task.Stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.control.impl.ModelHelper;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.RetentionPolicy;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.Exceptions;
import io.pravega.common.Timer;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.tracing.RequestTracker;
import io.pravega.common.tracing.TagLogger;
import io.pravega.common.util.RetriesExhaustedException;
import io.pravega.controller.metrics.StreamMetrics;
import io.pravega.controller.metrics.TransactionMetrics;
import io.pravega.controller.server.SegmentHelper;
import io.pravega.controller.server.eventProcessor.ControllerEventProcessors;
import io.pravega.controller.server.rest.generated.api.ApiResponseMessage;
import io.pravega.controller.server.rpc.auth.GrpcAuthHelper;
import io.pravega.controller.store.stream.AbstractStreamMetadataStore;
import io.pravega.controller.store.stream.BucketStore;
import io.pravega.controller.store.stream.CreateStreamResponse;
import io.pravega.controller.store.stream.EpochTransitionOperationExceptions;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.State;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.records.EpochRecord;
import io.pravega.controller.store.stream.records.EpochTransitionRecord;
import io.pravega.controller.store.stream.records.RetentionSet;
import io.pravega.controller.store.stream.records.StreamConfigurationRecord;
import io.pravega.controller.store.stream.records.StreamCutRecord;
import io.pravega.controller.store.stream.records.StreamCutReferenceRecord;
import io.pravega.controller.store.stream.records.StreamSegmentRecord;
import io.pravega.controller.store.stream.records.StreamTruncationRecord;
import io.pravega.controller.store.task.LockFailedException;
import io.pravega.controller.store.task.Resource;
import io.pravega.controller.store.task.TaskMetadataStore;
import io.pravega.controller.stream.api.grpc.v1.Controller;
import io.pravega.controller.task.EventHelper;
import io.pravega.controller.task.Task;
import io.pravega.controller.task.TaskBase;
import io.pravega.controller.util.Config;
import io.pravega.controller.util.RetryHelper;
import io.pravega.shared.NameUtils;
import io.pravega.shared.controller.event.ControllerEvent;
import io.pravega.shared.controller.event.DeleteStreamEvent;
import io.pravega.shared.controller.event.ScaleOpEvent;
import io.pravega.shared.controller.event.SealStreamEvent;
import io.pravega.shared.controller.event.TruncateStreamEvent;
import io.pravega.shared.controller.event.UpdateStreamEvent;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import lombok.Generated;
import org.apache.commons.lang3.NotImplementedException;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/task/Stream/StreamMetadataTasks.class */
public class StreamMetadataTasks extends TaskBase {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private final Object $lock;
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(StreamMetadataTasks.class));
    private final AtomicLong retentionFrequencyMillis;
    private final StreamMetadataStore streamMetadataStore;
    private final BucketStore bucketStore;
    private final SegmentHelper segmentHelper;
    private final GrpcAuthHelper authHelper;
    private final RequestTracker requestTracker;
    private final ScheduledExecutorService eventExecutor;
    private EventHelper eventHelper;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.pravega.controller.task.Stream.StreamMetadataTasks$1, reason: invalid class name */
    /* loaded from: input_file:io/pravega/controller/task/Stream/StreamMetadataTasks$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$pravega$client$stream$RetentionPolicy$RetentionType;
        static final /* synthetic */ int[] $SwitchMap$io$pravega$controller$store$stream$CreateStreamResponse$CreateStatus = new int[CreateStreamResponse.CreateStatus.values().length];

        static {
            try {
                $SwitchMap$io$pravega$controller$store$stream$CreateStreamResponse$CreateStatus[CreateStreamResponse.CreateStatus.NEW.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$CreateStreamResponse$CreateStatus[CreateStreamResponse.CreateStatus.EXISTS_ACTIVE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$CreateStreamResponse$CreateStatus[CreateStreamResponse.CreateStatus.EXISTS_CREATING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$CreateStreamResponse$CreateStatus[CreateStreamResponse.CreateStatus.FAILED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            $SwitchMap$io$pravega$client$stream$RetentionPolicy$RetentionType = new int[RetentionPolicy.RetentionType.values().length];
            try {
                $SwitchMap$io$pravega$client$stream$RetentionPolicy$RetentionType[RetentionPolicy.RetentionType.TIME.ordinal()] = 1;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$pravega$client$stream$RetentionPolicy$RetentionType[RetentionPolicy.RetentionType.SIZE.ordinal()] = 2;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    public StreamMetadataTasks(StreamMetadataStore streamMetadataStore, BucketStore bucketStore, TaskMetadataStore taskMetadataStore, SegmentHelper segmentHelper, ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService2, String str, GrpcAuthHelper grpcAuthHelper, RequestTracker requestTracker, long j) {
        this(streamMetadataStore, bucketStore, taskMetadataStore, segmentHelper, scheduledExecutorService, scheduledExecutorService2, new TaskBase.Context(str), grpcAuthHelper, requestTracker);
        this.retentionFrequencyMillis.set(j);
    }

    @VisibleForTesting
    public StreamMetadataTasks(StreamMetadataStore streamMetadataStore, BucketStore bucketStore, TaskMetadataStore taskMetadataStore, SegmentHelper segmentHelper, ScheduledExecutorService scheduledExecutorService, String str, GrpcAuthHelper grpcAuthHelper, RequestTracker requestTracker) {
        this(streamMetadataStore, bucketStore, taskMetadataStore, segmentHelper, scheduledExecutorService, scheduledExecutorService, new TaskBase.Context(str), grpcAuthHelper, requestTracker);
    }

    @VisibleForTesting
    public StreamMetadataTasks(StreamMetadataStore streamMetadataStore, BucketStore bucketStore, TaskMetadataStore taskMetadataStore, SegmentHelper segmentHelper, ScheduledExecutorService scheduledExecutorService, String str, GrpcAuthHelper grpcAuthHelper, RequestTracker requestTracker, EventHelper eventHelper) {
        this(streamMetadataStore, bucketStore, taskMetadataStore, segmentHelper, scheduledExecutorService, scheduledExecutorService, new TaskBase.Context(str), grpcAuthHelper, requestTracker);
        this.eventHelper = eventHelper;
    }

    private StreamMetadataTasks(StreamMetadataStore streamMetadataStore, BucketStore bucketStore, TaskMetadataStore taskMetadataStore, SegmentHelper segmentHelper, ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService2, TaskBase.Context context, GrpcAuthHelper grpcAuthHelper, RequestTracker requestTracker) {
        super(taskMetadataStore, scheduledExecutorService, context);
        this.$lock = new Object[0];
        this.eventExecutor = scheduledExecutorService2;
        this.streamMetadataStore = streamMetadataStore;
        this.bucketStore = bucketStore;
        this.segmentHelper = segmentHelper;
        this.authHelper = grpcAuthHelper;
        this.requestTracker = requestTracker;
        this.retentionFrequencyMillis = new AtomicLong(Duration.ofMinutes(Config.MINIMUM_RETENTION_FREQUENCY_IN_MINUTES).toMillis());
        setReady();
    }

    public void initializeStreamWriters(EventStreamClientFactory eventStreamClientFactory, String str) {
        synchronized (this.$lock) {
            this.eventHelper = new EventHelper(eventStreamClientFactory.createEventWriter(str, ControllerEventProcessors.CONTROLLER_EVENT_SERIALIZER, EventWriterConfig.builder().build()), this.executor, this.eventExecutor, this.context.getHostId(), ((AbstractStreamMetadataStore) this.streamMetadataStore).getHostTaskIndex());
        }
    }

    public CompletableFuture<Controller.CreateStreamStatus.Status> createStreamRetryOnLockFailure(String str, String str2, StreamConfiguration streamConfiguration, long j, int i) {
        return RetryHelper.withRetriesAsync(() -> {
            return createStream(str, str2, streamConfiguration, j);
        }, th -> {
            return Exceptions.unwrap(th) instanceof LockFailedException;
        }, i, this.executor).exceptionally(th2 -> {
            Throwable unwrap = Exceptions.unwrap(th2);
            if (unwrap instanceof RetriesExhaustedException) {
                throw new CompletionException(unwrap.getCause());
            }
            throw new CompletionException(unwrap);
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Task(name = "createStream", version = "1.0", resource = "{scope}/{stream}")
    public CompletableFuture<Controller.CreateStreamStatus.Status> createStream(String str, String str2, StreamConfiguration streamConfiguration, long j) {
        return execute(new Resource(str, str2), new Serializable[]{str, str2, streamConfiguration, Long.valueOf(j)}, () -> {
            return createStreamBody(str, str2, streamConfiguration, j);
        });
    }

    public CompletableFuture<Controller.UpdateStreamStatus.Status> updateStream(String str, String str2, StreamConfiguration streamConfiguration, OperationContext operationContext) {
        OperationContext createContext = operationContext == null ? this.streamMetadataStore.createContext(str, str2) : operationContext;
        long requestIdFor = this.requestTracker.getRequestIdFor(new String[]{"updateStream", str, str2});
        return this.streamMetadataStore.getConfigurationRecord(str, str2, createContext, this.executor).thenCompose(versionedMetadata -> {
            if (!((StreamConfigurationRecord) versionedMetadata.getObject()).isUpdating()) {
                return this.eventHelper.addIndexAndSubmitTask(new UpdateStreamEvent(str, str2, requestIdFor), () -> {
                    return this.streamMetadataStore.startUpdateConfiguration(str, str2, streamConfiguration, createContext, this.executor);
                }).thenCompose(r12 -> {
                    return this.eventHelper.checkDone(() -> {
                        return isUpdated(str, str2, streamConfiguration, createContext);
                    }).thenApply(r2 -> {
                        return Controller.UpdateStreamStatus.Status.SUCCESS;
                    });
                });
            }
            log.warn(requestIdFor, "Another update in progress for {}/{}", new Object[]{str, str2});
            return CompletableFuture.completedFuture(Controller.UpdateStreamStatus.Status.FAILURE);
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            log.warn(requestIdFor, "Exception thrown in trying to update stream configuration {}", new Object[]{th.getMessage()});
            return handleUpdateStreamError(th, requestIdFor);
        });
    }

    @VisibleForTesting
    CompletableFuture<Boolean> isUpdated(String str, String str2, StreamConfiguration streamConfiguration, OperationContext operationContext) {
        CompletableFuture<State> state = this.streamMetadataStore.getState(str, str2, true, operationContext, this.executor);
        CompletableFuture thenApply = this.streamMetadataStore.getConfigurationRecord(str, str2, operationContext, this.executor).thenApply((v0) -> {
            return v0.getObject();
        });
        return CompletableFuture.allOf(state, thenApply).thenApply(r6 -> {
            State state2 = (State) state.join();
            StreamConfigurationRecord streamConfigurationRecord = (StreamConfigurationRecord) thenApply.join();
            if (streamConfigurationRecord.isUpdating()) {
                return Boolean.valueOf(!streamConfigurationRecord.getStreamConfiguration().equals(streamConfiguration));
            }
            return Boolean.valueOf((streamConfigurationRecord.getStreamConfiguration().equals(streamConfiguration) && state2.equals(State.UPDATING)) ? false : true);
        });
    }

    public CompletableFuture<Void> retention(String str, String str2, RetentionPolicy retentionPolicy, long j, OperationContext operationContext, String str3) {
        Preconditions.checkNotNull(retentionPolicy);
        OperationContext createContext = operationContext == null ? this.streamMetadataStore.createContext(str, str2) : operationContext;
        long requestIdFor = this.requestTracker.getRequestIdFor(new String[]{"truncateStream", str, str2});
        return this.streamMetadataStore.getRetentionSet(str, str2, createContext, this.executor).thenCompose(retentionSet -> {
            return generateStreamCutIfRequired(str, str2, retentionSet.getLatest(), j, createContext, str3).thenCompose(streamCutRecord -> {
                return truncate(str, str2, retentionPolicy, createContext, retentionSet, streamCutRecord, j, requestIdFor);
            });
        }).thenAccept((Consumer<? super U>) r5 -> {
            StreamMetrics.reportRetentionEvent(str, str2);
        });
    }

    private CompletableFuture<StreamCutRecord> generateStreamCutIfRequired(String str, String str2, StreamCutReferenceRecord streamCutReferenceRecord, long j, OperationContext operationContext, String str3) {
        if (streamCutReferenceRecord == null || j - streamCutReferenceRecord.getRecordingTime() > this.retentionFrequencyMillis.get()) {
            return Futures.exceptionallyComposeExpecting(streamCutReferenceRecord == null ? CompletableFuture.completedFuture(null) : this.streamMetadataStore.getStreamCutRecord(str, str2, streamCutReferenceRecord, operationContext, this.executor), th -> {
                return th instanceof StoreException.DataNotFoundException;
            }, () -> {
                return null;
            }).thenCompose(streamCutRecord -> {
                return generateStreamCut(str, str2, streamCutRecord, operationContext, str3).thenCompose(streamCutRecord -> {
                    return this.streamMetadataStore.addStreamCutToRetentionSet(str, str2, streamCutRecord, operationContext, this.executor).thenApply(r8 -> {
                        log.debug("New streamCut generated for stream {}/{}", str, str2);
                        return streamCutRecord;
                    });
                });
            });
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> truncate(String str, String str2, RetentionPolicy retentionPolicy, OperationContext operationContext, RetentionSet retentionSet, StreamCutRecord streamCutRecord, long j, long j2) {
        Optional<StreamCutReferenceRecord> findTruncationRecord = findTruncationRecord(retentionPolicy, retentionSet, streamCutRecord, j);
        if (findTruncationRecord.isPresent()) {
            log.info("Found truncation record for stream {}/{} truncationRecord time/size: {}/{}", new Object[]{str, str2, Long.valueOf(findTruncationRecord.get().getRecordingTime()), Long.valueOf(findTruncationRecord.get().getRecordingSize())});
            return this.streamMetadataStore.getStreamCutRecord(str, str2, findTruncationRecord.get(), operationContext, this.executor).thenCompose(streamCutRecord2 -> {
                return startTruncation(str, str2, streamCutRecord2.getStreamCut(), operationContext, j2);
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) bool -> {
                if (bool.booleanValue()) {
                    return this.streamMetadataStore.deleteStreamCutBefore(str, str2, (StreamCutReferenceRecord) findTruncationRecord.get(), operationContext, this.executor);
                }
                throw new RuntimeException("Could not start truncation");
            }).exceptionally(th -> {
                if (!(Exceptions.unwrap(th) instanceof IllegalArgumentException)) {
                    throw new CompletionException(th);
                }
                log.debug(j2, "Cannot truncate at given streamCut because it intersects with existing truncation point", new Object[0]);
                return null;
            });
        }
        log.info("No suitable truncation record found, per retention policy for stream {}/{}", str, str2);
        return CompletableFuture.completedFuture(null);
    }

    private Optional<StreamCutReferenceRecord> findTruncationRecord(RetentionPolicy retentionPolicy, RetentionSet retentionSet, StreamCutRecord streamCutRecord, long j) {
        switch (AnonymousClass1.$SwitchMap$io$pravega$client$stream$RetentionPolicy$RetentionType[retentionPolicy.getRetentionType().ordinal()]) {
            case ApiResponseMessage.ERROR /* 1 */:
                return retentionSet.getRetentionRecords().stream().filter(streamCutReferenceRecord -> {
                    return streamCutReferenceRecord.getRecordingTime() < j - retentionPolicy.getRetentionParam();
                }).max(Comparator.comparingLong((v0) -> {
                    return v0.getRecordingTime();
                }));
            case ApiResponseMessage.WARNING /* 2 */:
                return Optional.ofNullable(streamCutRecord).flatMap(streamCutRecord2 -> {
                    return retentionSet.getRetentionRecords().stream().filter(streamCutReferenceRecord2 -> {
                        return streamCutRecord2.getRecordingSize() - streamCutReferenceRecord2.getRecordingSize() > retentionPolicy.getRetentionParam();
                    }).max(Comparator.comparingLong((v0) -> {
                        return v0.getRecordingTime();
                    }));
                });
            default:
                throw new NotImplementedException(retentionPolicy.getRetentionType().toString());
        }
    }

    public CompletableFuture<StreamCutRecord> generateStreamCut(String str, String str2, StreamCutRecord streamCutRecord, OperationContext operationContext, String str3) {
        OperationContext createContext = operationContext == null ? this.streamMetadataStore.createContext(str, str2) : operationContext;
        return this.streamMetadataStore.getActiveSegments(str, str2, createContext, this.executor).thenCompose(list -> {
            return Futures.allOfWithResults((Map) ((Stream) list.stream().parallel()).collect(Collectors.toMap(streamSegmentRecord -> {
                return streamSegmentRecord;
            }, streamSegmentRecord2 -> {
                return getSegmentOffset(str, str2, streamSegmentRecord2.segmentId(), str3);
            })));
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) map -> {
            long currentTimeMillis = System.currentTimeMillis();
            ImmutableMap.Builder builder = ImmutableMap.builder();
            map.forEach((streamSegmentRecord, l) -> {
                builder.put(Long.valueOf(streamSegmentRecord.segmentId()), l);
            });
            Map<Long, Long> build = builder.build();
            return this.streamMetadataStore.getSizeTillStreamCut(str, str2, build, Optional.ofNullable(streamCutRecord), createContext, this.executor).thenApply(l2 -> {
                return new StreamCutRecord(currentTimeMillis, l2.longValue(), build);
            });
        });
    }

    public CompletableFuture<Controller.UpdateStreamStatus.Status> truncateStream(String str, String str2, Map<Long, Long> map, OperationContext operationContext) {
        OperationContext createContext = operationContext == null ? this.streamMetadataStore.createContext(str, str2) : operationContext;
        long requestIdFor = this.requestTracker.getRequestIdFor(new String[]{"truncateStream", str, str2});
        return startTruncation(str, str2, map, createContext, requestIdFor).thenCompose(bool -> {
            if (bool.booleanValue()) {
                return this.eventHelper.checkDone(() -> {
                    return isTruncated(str, str2, map, createContext);
                }, 1000L).thenApply(r2 -> {
                    return Controller.UpdateStreamStatus.Status.SUCCESS;
                });
            }
            log.warn(requestIdFor, "Unable to start truncation for {}/{}", new Object[]{str, str2});
            return CompletableFuture.completedFuture(Controller.UpdateStreamStatus.Status.FAILURE);
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            log.warn(requestIdFor, "Exception thrown in trying to update stream configuration", new Object[]{th});
            return handleUpdateStreamError(th, requestIdFor);
        });
    }

    public CompletableFuture<Boolean> startTruncation(String str, String str2, Map<Long, Long> map, OperationContext operationContext, long j) {
        OperationContext createContext = operationContext == null ? this.streamMetadataStore.createContext(str, str2) : operationContext;
        return this.streamMetadataStore.getTruncationRecord(str, str2, createContext, this.executor).thenCompose(versionedMetadata -> {
            if (!((StreamTruncationRecord) versionedMetadata.getObject()).isUpdating()) {
                return this.eventHelper.addIndexAndSubmitTask(new TruncateStreamEvent(str, str2, j), () -> {
                    return this.streamMetadataStore.startTruncation(str, str2, map, createContext, this.executor);
                }).thenApply(r13 -> {
                    log.debug(j, "Started truncation request for stream {}/{}", new Object[]{str, str2});
                    return true;
                });
            }
            log.warn(j, "Another truncation in progress for {}/{}", new Object[]{str, str2});
            return CompletableFuture.completedFuture(false);
        });
    }

    @VisibleForTesting
    CompletableFuture<Boolean> isTruncated(String str, String str2, Map<Long, Long> map, OperationContext operationContext) {
        CompletableFuture<State> state = this.streamMetadataStore.getState(str, str2, true, operationContext, this.executor);
        CompletableFuture thenApply = this.streamMetadataStore.getTruncationRecord(str, str2, operationContext, this.executor).thenApply((v0) -> {
            return v0.getObject();
        });
        return CompletableFuture.allOf(state, thenApply).thenApply(r6 -> {
            State state2 = (State) state.join();
            StreamTruncationRecord streamTruncationRecord = (StreamTruncationRecord) thenApply.join();
            if (streamTruncationRecord.isUpdating()) {
                return Boolean.valueOf(!streamTruncationRecord.getStreamCut().equals(map));
            }
            return Boolean.valueOf((streamTruncationRecord.getStreamCut().equals(map) && state2.equals(State.TRUNCATING)) ? false : true);
        });
    }

    public CompletableFuture<Controller.UpdateStreamStatus.Status> sealStream(String str, String str2, OperationContext operationContext) {
        OperationContext createContext = operationContext == null ? this.streamMetadataStore.createContext(str, str2) : operationContext;
        long requestIdFor = this.requestTracker.getRequestIdFor(new String[]{"sealStream", str, str2});
        return this.eventHelper.addIndexAndSubmitTask(new SealStreamEvent(str, str2, requestIdFor), () -> {
            return this.streamMetadataStore.getVersionedState(str, str2, createContext, this.executor).thenCompose(versionedMetadata -> {
                return ((State) versionedMetadata.getObject()).equals(State.SEALED) ? CompletableFuture.completedFuture(versionedMetadata) : this.streamMetadataStore.updateVersionedState(str, str2, State.SEALING, versionedMetadata, createContext, this.executor);
            });
        }).thenCompose(versionedMetadata -> {
            return (((State) versionedMetadata.getObject()).equals(State.SEALED) || ((State) versionedMetadata.getObject()).equals(State.SEALING)) ? this.eventHelper.checkDone(() -> {
                return isSealed(str, str2, createContext);
            }).thenApply(r2 -> {
                return Controller.UpdateStreamStatus.Status.SUCCESS;
            }) : CompletableFuture.completedFuture(Controller.UpdateStreamStatus.Status.FAILURE);
        }).exceptionally(th -> {
            log.warn(requestIdFor, "Exception thrown in trying to notify sealed segments {}", new Object[]{th.getMessage()});
            return handleUpdateStreamError(th, requestIdFor);
        });
    }

    private CompletableFuture<Boolean> isSealed(String str, String str2, OperationContext operationContext) {
        return this.streamMetadataStore.getState(str, str2, true, operationContext, this.executor).thenApply(state -> {
            return Boolean.valueOf(state.equals(State.SEALED));
        });
    }

    public CompletableFuture<Controller.DeleteStreamStatus.Status> deleteStream(String str, String str2, OperationContext operationContext) {
        OperationContext createContext = operationContext == null ? this.streamMetadataStore.createContext(str, str2) : operationContext;
        long requestIdFor = this.requestTracker.getRequestIdFor(new String[]{"deleteStream", str, str2});
        return Futures.exceptionallyExpecting(this.streamMetadataStore.getState(str, str2, false, createContext, this.executor), th -> {
            return Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException;
        }, State.UNKNOWN).thenCompose(state -> {
            return (State.SEALED.equals(state) || State.CREATING.equals(state)) ? this.streamMetadataStore.getCreationTime(str, str2, createContext, this.executor).thenApply(l -> {
                return new DeleteStreamEvent(str, str2, requestIdFor, l.longValue());
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) deleteStreamEvent -> {
                return this.eventHelper.writeEvent(deleteStreamEvent);
            }).thenApply(r2 -> {
                return true;
            }) : State.UNKNOWN.equals(state) ? this.streamMetadataStore.deleteStream(str, str2, createContext, this.executor).exceptionally(th2 -> {
                throw new CompletionException(th2);
            }).thenApply(r22 -> {
                return true;
            }) : CompletableFuture.completedFuture(false);
        }).thenCompose(bool -> {
            return bool.booleanValue() ? this.eventHelper.checkDone(() -> {
                return isDeleted(str, str2);
            }).thenApply(r2 -> {
                return Controller.DeleteStreamStatus.Status.SUCCESS;
            }) : CompletableFuture.completedFuture(Controller.DeleteStreamStatus.Status.STREAM_NOT_SEALED);
        }).exceptionally(th2 -> {
            log.warn(requestIdFor, "Exception thrown while deleting stream {}", new Object[]{th2.getMessage()});
            return handleDeleteStreamError(th2, requestIdFor);
        });
    }

    private CompletableFuture<Boolean> isDeleted(String str, String str2) {
        return this.streamMetadataStore.checkStreamExists(str, str2).thenApply(bool -> {
            return Boolean.valueOf(!bool.booleanValue());
        });
    }

    public CompletableFuture<Controller.ScaleResponse> manualScale(String str, String str2, List<Long> list, List<Map.Entry<Double, Double>> list2, long j, OperationContext operationContext) {
        long requestIdFor = this.requestTracker.getRequestIdFor(new String[]{"scaleStream", str, str2, String.valueOf(j)});
        return this.eventHelper.addIndexAndSubmitTask(new ScaleOpEvent(str, str2, list, list2, true, j, requestIdFor), () -> {
            return this.streamMetadataStore.submitScale(str, str2, list, new ArrayList(list2), j, null, operationContext, this.executor);
        }).handle((versionedMetadata, th) -> {
            Controller.ScaleResponse.Builder newBuilder = Controller.ScaleResponse.newBuilder();
            if (th != null) {
                Throwable unwrap = Exceptions.unwrap(th);
                if (unwrap instanceof EpochTransitionOperationExceptions.PreConditionFailureException) {
                    newBuilder.setStatus(Controller.ScaleResponse.ScaleStreamStatus.PRECONDITION_FAILED);
                } else {
                    log.warn(requestIdFor, "Scale for stream {}/{} failed with exception {}", new Object[]{str, str2, unwrap});
                    newBuilder.setStatus(Controller.ScaleResponse.ScaleStreamStatus.FAILURE);
                }
            } else {
                log.info(requestIdFor, "scale for stream {}/{} started successfully", new Object[]{str, str2});
                newBuilder.setStatus(Controller.ScaleResponse.ScaleStreamStatus.STARTED);
                newBuilder.addAllSegments((Iterable) ((EpochTransitionRecord) versionedMetadata.getObject()).getNewSegmentsWithRange().entrySet().stream().map(entry -> {
                    return convert(str, str2, entry);
                }).collect(Collectors.toList()));
                newBuilder.setEpoch(((EpochTransitionRecord) versionedMetadata.getObject()).getActiveEpoch());
            }
            return newBuilder.build();
        });
    }

    public CompletableFuture<Controller.ScaleStatusResponse> checkScale(String str, String str2, int i, OperationContext operationContext) {
        CompletableFuture<EpochRecord> activeEpoch = this.streamMetadataStore.getActiveEpoch(str, str2, operationContext, true, this.executor);
        CompletableFuture<State> state = this.streamMetadataStore.getState(str, str2, true, operationContext, this.executor);
        CompletableFuture<U> thenApply = this.streamMetadataStore.getEpochTransition(str, str2, operationContext, this.executor).thenApply((v0) -> {
            return v0.getObject();
        });
        return CompletableFuture.allOf(state, activeEpoch).handle((r7, th) -> {
            Controller.ScaleStatusResponse.Builder newBuilder = Controller.ScaleStatusResponse.newBuilder();
            if (th == null) {
                EpochRecord epochRecord = (EpochRecord) activeEpoch.join();
                State state2 = (State) state.join();
                EpochTransitionRecord epochTransitionRecord = (EpochTransitionRecord) thenApply.join();
                if (i > epochRecord.getEpoch()) {
                    newBuilder.setStatus(Controller.ScaleStatusResponse.ScaleStatus.INVALID_INPUT);
                } else if (epochRecord.getEpoch() == i || epochRecord.getReferenceEpoch() == i) {
                    newBuilder.setStatus(Controller.ScaleStatusResponse.ScaleStatus.IN_PROGRESS);
                } else if (i + 1 == epochRecord.getReferenceEpoch() && state2.equals(State.SCALING) && (epochTransitionRecord.equals(EpochTransitionRecord.EMPTY) || epochTransitionRecord.getNewEpoch() == epochRecord.getEpoch())) {
                    newBuilder.setStatus(Controller.ScaleStatusResponse.ScaleStatus.IN_PROGRESS);
                } else {
                    newBuilder.setStatus(Controller.ScaleStatusResponse.ScaleStatus.SUCCESS);
                }
            } else if (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException) {
                newBuilder.setStatus(Controller.ScaleStatusResponse.ScaleStatus.INVALID_INPUT);
            } else {
                newBuilder.setStatus(Controller.ScaleStatusResponse.ScaleStatus.INTERNAL_ERROR);
            }
            return newBuilder.build();
        });
    }

    @VisibleForTesting
    <T> CompletableFuture<T> addIndexAndSubmitTask(ControllerEvent controllerEvent, Supplier<CompletableFuture<T>> supplier) {
        return this.eventHelper.addIndexAndSubmitTask(controllerEvent, supplier);
    }

    public CompletableFuture<Void> writeEvent(ControllerEvent controllerEvent) {
        return this.eventHelper.writeEvent(controllerEvent);
    }

    @VisibleForTesting
    public void setRequestEventWriter(EventStreamWriter<ControllerEvent> eventStreamWriter) {
        this.eventHelper.setRequestEventWriter(eventStreamWriter);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> removeTaskFromIndex(String str, String str2) {
        return this.eventHelper.removeTaskFromIndex(str, str2);
    }

    @VisibleForTesting
    CompletableFuture<Controller.CreateStreamStatus.Status> createStreamBody(String str, String str2, StreamConfiguration streamConfiguration, long j) {
        long requestIdFor = this.requestTracker.getRequestIdFor(new String[]{"createStream", str, str2});
        return this.streamMetadataStore.createStream(str, str2, streamConfiguration, j, null, this.executor).thenComposeAsync(createStreamResponse -> {
            log.info(requestIdFor, "{}/{} created in metadata store", new Object[]{str, str2});
            Controller.CreateStreamStatus.Status translate = translate(createStreamResponse.getStatus());
            if (!createStreamResponse.getStatus().equals(CreateStreamResponse.CreateStatus.NEW) && !createStreamResponse.getStatus().equals(CreateStreamResponse.CreateStatus.EXISTS_CREATING)) {
                return CompletableFuture.completedFuture(translate);
            }
            int startingSegmentNumber = createStreamResponse.getStartingSegmentNumber();
            return notifyNewSegments(str, str2, createStreamResponse.getConfiguration(), (List<Long>) IntStream.range(startingSegmentNumber, startingSegmentNumber + createStreamResponse.getConfiguration().getScalingPolicy().getMinNumSegments()).boxed().map(num -> {
                return Long.valueOf(NameUtils.computeSegmentId(num.intValue(), 0));
            }).collect(Collectors.toList()), retrieveDelegationToken(), requestIdFor).thenCompose(r15 -> {
                return createMarkStream(str, str2, j, requestIdFor);
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r11 -> {
                OperationContext createContext = this.streamMetadataStore.createContext(str, str2);
                return TaskStepsRetryHelper.withRetries(() -> {
                    return (streamConfiguration.getRetentionPolicy() != null ? this.bucketStore.addStreamToBucketStore(BucketStore.ServiceType.RetentionService, str, str2, this.executor) : CompletableFuture.completedFuture(null)).thenCompose((Function<? super Void, ? extends CompletionStage<U>>) r10 -> {
                        return this.streamMetadataStore.getVersionedState(str, str2, createContext, this.executor).thenCompose(versionedMetadata -> {
                            return ((State) versionedMetadata.getObject()).equals(State.CREATING) ? this.streamMetadataStore.updateVersionedState(str, str2, State.ACTIVE, versionedMetadata, createContext, this.executor) : CompletableFuture.completedFuture(versionedMetadata);
                        });
                    });
                }, this.executor).thenApply(versionedMetadata -> {
                    return translate;
                });
            });
        }, (Executor) this.executor).handle((BiFunction<? super U, Throwable, ? extends U>) (status, th) -> {
            if (th == null) {
                return status;
            }
            if (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException) {
                return Controller.CreateStreamStatus.Status.SCOPE_NOT_FOUND;
            }
            log.warn(requestIdFor, "Create stream failed due to ", new Object[]{th});
            return Controller.CreateStreamStatus.Status.FAILURE;
        });
    }

    private CompletableFuture<Void> createMarkStream(String str, String str2, long j, long j2) {
        String markStreamForStream = NameUtils.getMarkStreamForStream(str2);
        return this.streamMetadataStore.createStream(str, markStreamForStream, StreamConfiguration.builder().scalingPolicy(ScalingPolicy.fixed(1)).build(), j, null, this.executor).thenCompose(createStreamResponse -> {
            return notifyNewSegment(str, markStreamForStream, NameUtils.computeSegmentId(createStreamResponse.getStartingSegmentNumber(), 0), createStreamResponse.getConfiguration().getScalingPolicy(), retrieveDelegationToken(), j2);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r9 -> {
            OperationContext createContext = this.streamMetadataStore.createContext(str, markStreamForStream);
            return this.streamMetadataStore.getVersionedState(str, markStreamForStream, createContext, this.executor).thenCompose(versionedMetadata -> {
                return Futures.toVoid(this.streamMetadataStore.updateVersionedState(str, markStreamForStream, State.ACTIVE, versionedMetadata, createContext, this.executor));
            });
        });
    }

    private Controller.CreateStreamStatus.Status translate(CreateStreamResponse.CreateStatus createStatus) {
        Controller.CreateStreamStatus.Status status;
        switch (AnonymousClass1.$SwitchMap$io$pravega$controller$store$stream$CreateStreamResponse$CreateStatus[createStatus.ordinal()]) {
            case ApiResponseMessage.ERROR /* 1 */:
                status = Controller.CreateStreamStatus.Status.SUCCESS;
                break;
            case ApiResponseMessage.WARNING /* 2 */:
            case ApiResponseMessage.INFO /* 3 */:
                status = Controller.CreateStreamStatus.Status.STREAM_EXISTS;
                break;
            case ApiResponseMessage.OK /* 4 */:
            default:
                status = Controller.CreateStreamStatus.Status.FAILURE;
                break;
        }
        return status;
    }

    public CompletableFuture<Void> notifyNewSegments(String str, String str2, List<Long> list, OperationContext operationContext, String str3) {
        return notifyNewSegments(str, str2, list, operationContext, str3, 0L);
    }

    public CompletableFuture<Void> notifyNewSegments(String str, String str2, List<Long> list, OperationContext operationContext, String str3, long j) {
        return TaskStepsRetryHelper.withRetries(() -> {
            return this.streamMetadataStore.getConfiguration(str, str2, operationContext, this.executor);
        }, this.executor).thenCompose(streamConfiguration -> {
            return notifyNewSegments(str, str2, streamConfiguration, (List<Long>) list, str3, j);
        });
    }

    public CompletableFuture<Void> notifyNewSegments(String str, String str2, StreamConfiguration streamConfiguration, List<Long> list, String str3, long j) {
        return Futures.toVoid(Futures.allOfWithResults((List) ((Stream) list.stream().parallel()).map(l -> {
            return notifyNewSegment(str, str2, l.longValue(), streamConfiguration.getScalingPolicy(), str3, j);
        }).collect(Collectors.toList())));
    }

    public CompletableFuture<Void> notifyNewSegment(String str, String str2, long j, ScalingPolicy scalingPolicy, String str3) {
        return Futures.toVoid(TaskStepsRetryHelper.withRetries(() -> {
            return this.segmentHelper.createSegment(str, str2, j, scalingPolicy, str3, 0L);
        }, this.executor));
    }

    public CompletableFuture<Void> notifyNewSegment(String str, String str2, long j, ScalingPolicy scalingPolicy, String str3, long j2) {
        return Futures.toVoid(TaskStepsRetryHelper.withRetries(() -> {
            return this.segmentHelper.createSegment(str, str2, j, scalingPolicy, str3, j2);
        }, this.executor));
    }

    public CompletableFuture<Void> notifyDeleteSegments(String str, String str2, Set<Long> set, String str3, long j) {
        return Futures.allOf((Collection) ((Stream) set.stream().parallel()).map(l -> {
            return notifyDeleteSegment(str, str2, l.longValue(), str3, j);
        }).collect(Collectors.toList()));
    }

    public CompletableFuture<Void> notifyDeleteSegment(String str, String str2, long j, String str3, long j2) {
        return Futures.toVoid(TaskStepsRetryHelper.withRetries(() -> {
            return this.segmentHelper.deleteSegment(str, str2, j, str3, j2);
        }, this.executor));
    }

    public CompletableFuture<Void> notifyTruncateSegment(String str, String str2, Map.Entry<Long, Long> entry, String str3, long j) {
        return Futures.toVoid(TaskStepsRetryHelper.withRetries(() -> {
            return this.segmentHelper.truncateSegment(str, str2, ((Long) entry.getKey()).longValue(), ((Long) entry.getValue()).longValue(), str3, j);
        }, this.executor));
    }

    public CompletableFuture<Map<Long, Long>> getSealedSegmentsSize(String str, String str2, List<Long> list, String str3) {
        return Futures.allOfWithResults((Map) ((Stream) list.stream().parallel()).collect(Collectors.toMap(l -> {
            return l;
        }, l2 -> {
            return getSegmentOffset(str, str2, l2.longValue(), str3);
        })));
    }

    public CompletableFuture<Void> notifySealedSegments(String str, String str2, List<Long> list, String str3) {
        return notifySealedSegments(str, str2, list, str3, 0L);
    }

    public CompletableFuture<Void> notifySealedSegments(String str, String str2, List<Long> list, String str3, long j) {
        return Futures.allOf((Collection) ((Stream) list.stream().parallel()).map(l -> {
            return notifySealedSegment(str, str2, l.longValue(), str3, j);
        }).collect(Collectors.toList()));
    }

    private CompletableFuture<Void> notifySealedSegment(String str, String str2, long j, String str3, long j2) {
        return Futures.toVoid(TaskStepsRetryHelper.withRetries(() -> {
            return this.segmentHelper.sealSegment(str, str2, j, str3, j2);
        }, this.executor));
    }

    public CompletableFuture<Void> notifyPolicyUpdates(String str, String str2, List<StreamSegmentRecord> list, ScalingPolicy scalingPolicy, String str3, long j) {
        return Futures.toVoid(Futures.allOfWithResults((List) ((Stream) list.stream().parallel()).map(streamSegmentRecord -> {
            return notifyPolicyUpdate(str, str2, scalingPolicy, streamSegmentRecord.segmentId(), str3, j);
        }).collect(Collectors.toList())));
    }

    private CompletableFuture<Long> getSegmentOffset(String str, String str2, long j, String str3) {
        return TaskStepsRetryHelper.withRetries(() -> {
            return this.segmentHelper.getSegmentInfo(str, str2, j, str3);
        }, this.executor).thenApply((v0) -> {
            return v0.getWriteOffset();
        });
    }

    private CompletableFuture<Void> notifyPolicyUpdate(String str, String str2, ScalingPolicy scalingPolicy, long j, String str3, long j2) {
        return TaskStepsRetryHelper.withRetries(() -> {
            return this.segmentHelper.updatePolicy(str, str2, scalingPolicy, j, str3, j2);
        }, this.executor);
    }

    private Controller.SegmentRange convert(String str, String str2, Map.Entry<Long, Map.Entry<Double, Double>> entry) {
        return ModelHelper.createSegmentRange(str, str2, entry.getKey().longValue(), entry.getValue().getKey().doubleValue(), entry.getValue().getValue().doubleValue());
    }

    private Controller.UpdateStreamStatus.Status handleUpdateStreamError(Throwable th, long j) {
        Throwable unwrap = Exceptions.unwrap(th);
        if (unwrap instanceof StoreException.DataNotFoundException) {
            return Controller.UpdateStreamStatus.Status.STREAM_NOT_FOUND;
        }
        if (unwrap instanceof TimeoutException) {
            throw new CompletionException(unwrap);
        }
        log.warn(j, "Update stream failed due to ", new Object[]{unwrap});
        return Controller.UpdateStreamStatus.Status.FAILURE;
    }

    private Controller.DeleteStreamStatus.Status handleDeleteStreamError(Throwable th, long j) {
        Throwable unwrap = Exceptions.unwrap(th);
        if (unwrap instanceof StoreException.DataNotFoundException) {
            return Controller.DeleteStreamStatus.Status.STREAM_NOT_FOUND;
        }
        if (unwrap instanceof TimeoutException) {
            throw new CompletionException(unwrap);
        }
        log.warn(j, "Delete stream failed.", new Object[]{th});
        return Controller.DeleteStreamStatus.Status.FAILURE;
    }

    public CompletableFuture<Void> notifyTxnCommit(String str, String str2, List<Long> list, UUID uuid) {
        Timer timer = new Timer();
        return Futures.allOf((Collection) ((Stream) list.stream().parallel()).map(l -> {
            return notifyTxnCommit(str, str2, l.longValue(), uuid);
        }).collect(Collectors.toList())).thenRun(() -> {
            TransactionMetrics.getInstance().commitTransactionSegments(timer.getElapsed());
        });
    }

    private CompletableFuture<Controller.TxnStatus> notifyTxnCommit(String str, String str2, long j, UUID uuid) {
        return TaskStepsRetryHelper.withRetries(() -> {
            return this.segmentHelper.commitTransaction(str, str2, j, j, uuid, retrieveDelegationToken());
        }, this.executor);
    }

    public CompletableFuture<Void> notifyTxnAbort(String str, String str2, List<Long> list, UUID uuid) {
        Timer timer = new Timer();
        return Futures.allOf((Collection) ((Stream) list.stream().parallel()).map(l -> {
            return notifyTxnAbort(str, str2, l.longValue(), uuid);
        }).collect(Collectors.toList())).thenRun(() -> {
            TransactionMetrics.getInstance().abortTransactionSegments(timer.getElapsed());
        });
    }

    private CompletableFuture<Controller.TxnStatus> notifyTxnAbort(String str, String str2, long j, UUID uuid) {
        return TaskStepsRetryHelper.withRetries(() -> {
            return this.segmentHelper.abortTransaction(str, str2, j, uuid, retrieveDelegationToken());
        }, this.executor);
    }

    public CompletableFuture<Map<Long, Long>> getCurrentSegmentSizes(String str, String str2, List<Long> list) {
        return Futures.allOfWithResults((Map) list.stream().collect(Collectors.toMap(l -> {
            return l;
        }, l2 -> {
            return getSegmentOffset(str, str2, l2.longValue(), retrieveDelegationToken());
        })));
    }

    @Override // io.pravega.controller.task.TaskBase
    public TaskBase copyWithContext(TaskBase.Context context) {
        return new StreamMetadataTasks(this.streamMetadataStore, this.bucketStore, this.taskMetadataStore, this.segmentHelper, this.executor, this.eventExecutor, context, this.authHelper, this.requestTracker);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.eventHelper != null) {
            this.eventHelper.close();
        }
    }

    public String retrieveDelegationToken() {
        return this.authHelper.retrieveMasterToken();
    }

    @VisibleForTesting
    public void setCompletionTimeoutMillis(long j) {
        this.eventHelper.setCompletionTimeoutMillis(j);
    }
}
