package io.pravega.controller.task.Stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.RetentionPolicy;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.StreamCut;
import io.pravega.common.Exceptions;
import io.pravega.common.Timer;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.tracing.TagLogger;
import io.pravega.common.util.RetriesExhaustedException;
import io.pravega.controller.metrics.StreamMetrics;
import io.pravega.controller.metrics.TransactionMetrics;
import io.pravega.controller.retryable.RetryableException;
import io.pravega.controller.server.ControllerService;
import io.pravega.controller.server.SegmentHelper;
import io.pravega.controller.server.WireCommandFailedException;
import io.pravega.controller.server.eventProcessor.ControllerEventProcessors;
import io.pravega.controller.server.rest.ModelHelper;
import io.pravega.controller.server.rest.generated.api.ApiResponseMessage;
import io.pravega.controller.server.security.auth.GrpcAuthHelper;
import io.pravega.controller.store.VersionedMetadata;
import io.pravega.controller.store.stream.AbstractStreamMetadataStore;
import io.pravega.controller.store.stream.BucketStore;
import io.pravega.controller.store.stream.CreateStreamResponse;
import io.pravega.controller.store.stream.EpochTransitionOperationExceptions;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.ReaderGroupState;
import io.pravega.controller.store.stream.State;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.StreamCutComparison;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.records.EpochRecord;
import io.pravega.controller.store.stream.records.EpochTransitionRecord;
import io.pravega.controller.store.stream.records.ReaderGroupConfigRecord;
import io.pravega.controller.store.stream.records.RetentionSet;
import io.pravega.controller.store.stream.records.StreamConfigurationRecord;
import io.pravega.controller.store.stream.records.StreamCutRecord;
import io.pravega.controller.store.stream.records.StreamCutReferenceRecord;
import io.pravega.controller.store.stream.records.StreamSegmentRecord;
import io.pravega.controller.store.stream.records.StreamSubscriber;
import io.pravega.controller.store.stream.records.StreamTruncationRecord;
import io.pravega.controller.store.task.LockFailedException;
import io.pravega.controller.store.task.Resource;
import io.pravega.controller.store.task.TaskMetadataStore;
import io.pravega.controller.stream.api.grpc.v1.Controller;
import io.pravega.controller.task.EventHelper;
import io.pravega.controller.task.Task;
import io.pravega.controller.task.TaskBase;
import io.pravega.controller.util.Config;
import io.pravega.controller.util.RetryHelper;
import io.pravega.shared.NameUtils;
import io.pravega.shared.controller.event.ControllerEvent;
import io.pravega.shared.controller.event.CreateReaderGroupEvent;
import io.pravega.shared.controller.event.DeleteReaderGroupEvent;
import io.pravega.shared.controller.event.DeleteScopeEvent;
import io.pravega.shared.controller.event.DeleteStreamEvent;
import io.pravega.shared.controller.event.RGStreamCutRecord;
import io.pravega.shared.controller.event.ScaleOpEvent;
import io.pravega.shared.controller.event.SealStreamEvent;
import io.pravega.shared.controller.event.TruncateStreamEvent;
import io.pravega.shared.controller.event.UpdateReaderGroupEvent;
import io.pravega.shared.controller.event.UpdateStreamEvent;
import java.io.Serializable;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.concurrent.GuardedBy;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/task/Stream/StreamMetadataTasks.class */
public class StreamMetadataTasks extends TaskBase {
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(StreamMetadataTasks.class));
    private static final int SUBSCRIBER_OPERATION_RETRIES = 10;
    private static final int READER_GROUP_OPERATION_MAX_RETRIES = 10;
    private static final int SCOPE_DELETION_MAX_RETRIES = 10;
    private static final long READER_GROUP_SEGMENT_ROLLOVER_SIZE_BYTES = 4194304;
    private final AtomicLong retentionFrequencyMillis;
    private final StreamMetadataStore streamMetadataStore;
    private final BucketStore bucketStore;
    private final SegmentHelper segmentHelper;
    private final GrpcAuthHelper authHelper;
    private final ScheduledExecutorService eventExecutor;
    private final CompletableFuture<EventHelper> eventHelperFuture;
    private final AtomicReference<Supplier<Long>> retentionClock;

    @GuardedBy("lock")
    private EventHelper eventHelper;

    @GuardedBy("lock")
    private boolean toSetEventHelper;
    private final Object lock;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.pravega.controller.task.Stream.StreamMetadataTasks$1, reason: invalid class name */
    /* loaded from: input_file:io/pravega/controller/task/Stream/StreamMetadataTasks$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$pravega$controller$store$stream$CreateStreamResponse$CreateStatus;
        static final /* synthetic */ int[] $SwitchMap$io$pravega$controller$store$stream$StreamCutComparison = new int[StreamCutComparison.values().length];

        static {
            try {
                $SwitchMap$io$pravega$controller$store$stream$StreamCutComparison[StreamCutComparison.EqualOrAfter.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$StreamCutComparison[StreamCutComparison.Before.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$StreamCutComparison[StreamCutComparison.Overlaps.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$io$pravega$controller$store$stream$CreateStreamResponse$CreateStatus = new int[CreateStreamResponse.CreateStatus.values().length];
            try {
                $SwitchMap$io$pravega$controller$store$stream$CreateStreamResponse$CreateStatus[CreateStreamResponse.CreateStatus.NEW.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$CreateStreamResponse$CreateStatus[CreateStreamResponse.CreateStatus.EXISTS_ACTIVE.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$CreateStreamResponse$CreateStatus[CreateStreamResponse.CreateStatus.EXISTS_CREATING.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$CreateStreamResponse$CreateStatus[CreateStreamResponse.CreateStatus.FAILED.ordinal()] = 4;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    public StreamMetadataTasks(StreamMetadataStore streamMetadataStore, BucketStore bucketStore, TaskMetadataStore taskMetadataStore, SegmentHelper segmentHelper, ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService2, String str, GrpcAuthHelper grpcAuthHelper, long j) {
        this(streamMetadataStore, bucketStore, taskMetadataStore, segmentHelper, scheduledExecutorService, scheduledExecutorService2, new TaskBase.Context(str), grpcAuthHelper);
        this.retentionFrequencyMillis.set(j);
    }

    @VisibleForTesting
    public StreamMetadataTasks(StreamMetadataStore streamMetadataStore, BucketStore bucketStore, TaskMetadataStore taskMetadataStore, SegmentHelper segmentHelper, ScheduledExecutorService scheduledExecutorService, String str, GrpcAuthHelper grpcAuthHelper) {
        this(streamMetadataStore, bucketStore, taskMetadataStore, segmentHelper, scheduledExecutorService, scheduledExecutorService, new TaskBase.Context(str), grpcAuthHelper);
    }

    @VisibleForTesting
    public StreamMetadataTasks(StreamMetadataStore streamMetadataStore, BucketStore bucketStore, TaskMetadataStore taskMetadataStore, SegmentHelper segmentHelper, ScheduledExecutorService scheduledExecutorService, String str, GrpcAuthHelper grpcAuthHelper, EventHelper eventHelper) {
        this(streamMetadataStore, bucketStore, taskMetadataStore, segmentHelper, scheduledExecutorService, scheduledExecutorService, new TaskBase.Context(str), grpcAuthHelper);
        this.eventHelperFuture.complete(eventHelper);
    }

    private StreamMetadataTasks(StreamMetadataStore streamMetadataStore, BucketStore bucketStore, TaskMetadataStore taskMetadataStore, SegmentHelper segmentHelper, ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService2, TaskBase.Context context, GrpcAuthHelper grpcAuthHelper) {
        super(taskMetadataStore, scheduledExecutorService, context);
        this.eventHelper = null;
        this.toSetEventHelper = true;
        this.lock = new Object();
        this.eventExecutor = scheduledExecutorService2;
        this.streamMetadataStore = streamMetadataStore;
        this.bucketStore = bucketStore;
        this.segmentHelper = segmentHelper;
        this.authHelper = grpcAuthHelper;
        this.retentionFrequencyMillis = new AtomicLong(Duration.ofMinutes(Config.MINIMUM_RETENTION_FREQUENCY_IN_MINUTES).toMillis());
        this.retentionClock = new AtomicReference<>(System::currentTimeMillis);
        this.eventHelperFuture = new CompletableFuture<>();
        setReady();
    }

    public void initializeStreamWriters(EventStreamClientFactory eventStreamClientFactory, String str) {
        EventHelper eventHelper = null;
        synchronized (this.lock) {
            if (this.toSetEventHelper) {
                this.eventHelper = new EventHelper(eventStreamClientFactory.createEventWriter(str, ControllerEventProcessors.CONTROLLER_EVENT_SERIALIZER, EventWriterConfig.builder().enableConnectionPooling(true).retryAttempts(Integer.MAX_VALUE).build()), this.executor, this.eventExecutor, this.context.getHostId(), ((AbstractStreamMetadataStore) this.streamMetadataStore).getHostTaskIndex());
                this.toSetEventHelper = false;
                eventHelper = this.eventHelper;
            }
        }
        if (eventHelper != null) {
            this.eventHelperFuture.complete(eventHelper);
        }
    }

    public CompletableFuture<Controller.CreateStreamStatus.Status> createRGStream(String str, String str2, StreamConfiguration streamConfiguration, long j, int i, long j2) {
        Preconditions.checkNotNull(streamConfiguration, "streamConfig");
        Preconditions.checkArgument(j >= 0);
        NameUtils.validateStreamName(str2);
        return Futures.exceptionallyExpecting(this.streamMetadataStore.getState(str, str2, true, this.streamMetadataStore.createStreamContext(str, str2, j2), this.executor), th -> {
            return Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException;
        }, State.UNKNOWN).thenCompose(state -> {
            if (!state.equals(State.UNKNOWN) && !state.equals(State.CREATING)) {
                return CompletableFuture.completedFuture(Controller.CreateStreamStatus.Status.STREAM_EXISTS);
            }
            log.debug(j2, "Creating StateSynchronizer Stream {}", new Object[]{str2});
            return createStreamRetryOnLockFailure(str, str2, streamConfiguration, j, i, j2);
        });
    }

    public CompletableFuture<Controller.CreateStreamStatus.Status> createStreamRetryOnLockFailure(String str, String str2, StreamConfiguration streamConfiguration, long j, int i, long j2) {
        return RetryHelper.withRetriesAsync(() -> {
            return createStream(str, str2, streamConfiguration, j, j2);
        }, th -> {
            return Exceptions.unwrap(th) instanceof LockFailedException;
        }, i, this.executor).exceptionally(th2 -> {
            log.warn(j2, "createStream threw exception {}", new Object[]{th2.getCause().getMessage()});
            Throwable unwrap = Exceptions.unwrap(th2);
            if (unwrap instanceof RetriesExhaustedException) {
                throw new CompletionException(unwrap.getCause());
            }
            throw new CompletionException(unwrap);
        });
    }

    public CompletableFuture<Controller.ReaderGroupConfigResponse> getReaderGroupConfig(String str, String str2, long j) {
        OperationContext createRGContext = this.streamMetadataStore.createRGContext(str, str2, j);
        return RetryHelper.withRetriesAsync(() -> {
            return this.streamMetadataStore.checkReaderGroupExists(str, str2, createRGContext, this.executor).thenCompose(bool -> {
                return !bool.booleanValue() ? CompletableFuture.completedFuture(Controller.ReaderGroupConfigResponse.newBuilder().setConfig(Controller.ReaderGroupConfiguration.getDefaultInstance()).setStatus(Controller.ReaderGroupConfigResponse.Status.RG_NOT_FOUND).build()) : this.streamMetadataStore.getReaderGroupId(str, str2, createRGContext, this.executor).thenCompose(uuid -> {
                    return this.streamMetadataStore.getReaderGroupConfigRecord(str, str2, createRGContext, this.executor).thenApply(versionedMetadata -> {
                        return Controller.ReaderGroupConfigResponse.newBuilder().setConfig(getRGConfigurationFromRecord(str, str2, (ReaderGroupConfigRecord) versionedMetadata.getObject(), uuid.toString())).setStatus(Controller.ReaderGroupConfigResponse.Status.SUCCESS).build();
                    });
                });
            });
        }, th -> {
            return Exceptions.unwrap(th) instanceof RetryableException;
        }, 10, this.executor);
    }

    private Controller.ReaderGroupConfiguration getRGConfigurationFromRecord(String str, String str2, ReaderGroupConfigRecord readerGroupConfigRecord, String str3) {
        List list = (List) readerGroupConfigRecord.getStartingStreamCuts().entrySet().stream().map(entry -> {
            return Controller.StreamCut.newBuilder().setStreamInfo(Controller.StreamInfo.newBuilder().setStream(Stream.of((String) entry.getKey()).getStreamName()).setScope(Stream.of((String) entry.getKey()).getScope()).build()).putAllCut(((RGStreamCutRecord) entry.getValue()).getStreamCut()).build();
        }).collect(Collectors.toList());
        return Controller.ReaderGroupConfiguration.newBuilder().setScope(str).setReaderGroupName(str2).setGroupRefreshTimeMillis(readerGroupConfigRecord.getGroupRefreshTimeMillis()).setAutomaticCheckpointIntervalMillis(readerGroupConfigRecord.getAutomaticCheckpointIntervalMillis()).setMaxOutstandingCheckpointRequest(readerGroupConfigRecord.getMaxOutstandingCheckpointRequest()).setRetentionType(readerGroupConfigRecord.getRetentionTypeOrdinal()).setGeneration(readerGroupConfigRecord.getGeneration()).addAllStartingStreamCuts(list).addAllEndingStreamCuts((List) readerGroupConfigRecord.getEndingStreamCuts().entrySet().stream().map(entry2 -> {
            return Controller.StreamCut.newBuilder().setStreamInfo(Controller.StreamInfo.newBuilder().setStream(Stream.of((String) entry2.getKey()).getStreamName()).setScope(Stream.of((String) entry2.getKey()).getScope()).build()).putAllCut(((RGStreamCutRecord) entry2.getValue()).getStreamCut()).build();
        }).collect(Collectors.toList())).setReaderGroupId(str3).build();
    }

    public CompletableFuture<Controller.CreateReaderGroupResponse> createReaderGroup(String str, String str2, ReaderGroupConfig readerGroupConfig, long j, long j2) {
        OperationContext createRGContext = this.streamMetadataStore.createRGContext(str, str2, j2);
        return RetryHelper.withRetriesAsync(() -> {
            return this.streamMetadataStore.checkScopeExists(str, createRGContext, this.executor).thenCompose(bool -> {
                return !bool.booleanValue() ? CompletableFuture.completedFuture(Controller.CreateReaderGroupResponse.newBuilder().setStatus(Controller.CreateReaderGroupResponse.Status.SCOPE_NOT_FOUND).build()) : isRGCreationComplete(str, str2, createRGContext).thenCompose(bool -> {
                    if (!bool.booleanValue()) {
                        return validateReaderGroupId(readerGroupConfig).thenCompose(readerGroupConfig2 -> {
                            return this.eventHelperFuture.thenCompose(eventHelper -> {
                                return eventHelper.addIndexAndSubmitTask(buildCreateRGEvent(str, str2, readerGroupConfig2, j2, j), () -> {
                                    return this.streamMetadataStore.addReaderGroupToScope(str, str2, readerGroupConfig2.getReaderGroupId(), createRGContext, this.executor);
                                }).thenCompose(r11 -> {
                                    return eventHelper.checkDone(() -> {
                                        return isRGCreated(str, str2, createRGContext);
                                    }).thenCompose(r9 -> {
                                        return buildCreateSuccessResponse(str, str2, createRGContext);
                                    });
                                });
                            });
                        });
                    }
                    log.info(j2, "Reader Group {} already exists", new Object[]{NameUtils.getScopedReaderGroupName(str, str2)});
                    return buildCreateSuccessResponse(str, str2, createRGContext);
                });
            });
        }, th -> {
            return Exceptions.unwrap(th) instanceof RetryableException;
        }, 10, this.executor);
    }

    private CompletableFuture<ReaderGroupConfig> validateReaderGroupId(ReaderGroupConfig readerGroupConfig) {
        return ReaderGroupConfig.DEFAULT_UUID.equals(readerGroupConfig.getReaderGroupId()) ? CompletableFuture.completedFuture(ReaderGroupConfig.cloneConfig(readerGroupConfig, UUID.randomUUID(), 0L)) : CompletableFuture.completedFuture(readerGroupConfig);
    }

    public CompletableFuture<Boolean> isRGCreationComplete(String str, String str2, OperationContext operationContext) {
        return Futures.exceptionallyExpecting(this.streamMetadataStore.getReaderGroupState(str, str2, true, operationContext, this.executor), th -> {
            return Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException;
        }, ReaderGroupState.UNKNOWN).thenCompose(readerGroupState -> {
            return (readerGroupState.equals(ReaderGroupState.UNKNOWN) || readerGroupState.equals(ReaderGroupState.CREATING)) ? CompletableFuture.completedFuture(Boolean.FALSE) : CompletableFuture.completedFuture(Boolean.TRUE);
        });
    }

    private CompletableFuture<Controller.CreateReaderGroupResponse> buildCreateSuccessResponse(String str, String str2, OperationContext operationContext) {
        return this.streamMetadataStore.getReaderGroupId(str, str2, operationContext, this.executor).thenCompose(uuid -> {
            return this.streamMetadataStore.getReaderGroupConfigRecord(str, str2, operationContext, this.executor).thenApply(versionedMetadata -> {
                return Controller.CreateReaderGroupResponse.newBuilder().setConfig(ModelHelper.encodeReaderGroupConfigRecord(str, str2, (ReaderGroupConfigRecord) versionedMetadata.getObject(), uuid)).setStatus(Controller.CreateReaderGroupResponse.Status.SUCCESS).build();
            });
        });
    }

    private CreateReaderGroupEvent buildCreateRGEvent(String str, String str2, ReaderGroupConfig readerGroupConfig, long j, long j2) {
        return new CreateReaderGroupEvent(j, str, str2, readerGroupConfig.getGroupRefreshTimeMillis(), readerGroupConfig.getAutomaticCheckpointIntervalMillis(), readerGroupConfig.getMaxOutstandingCheckpointRequest(), readerGroupConfig.getRetentionType().ordinal(), readerGroupConfig.getGeneration(), readerGroupConfig.getReaderGroupId(), (Map) readerGroupConfig.getStartingStreamCuts().entrySet().stream().collect(Collectors.toMap(entry -> {
            return ((Stream) entry.getKey()).getScopedName();
        }, entry2 -> {
            return new RGStreamCutRecord(ImmutableMap.copyOf(io.pravega.client.control.impl.ModelHelper.getStreamCutMap((StreamCut) entry2.getValue())));
        })), (Map) readerGroupConfig.getEndingStreamCuts().entrySet().stream().collect(Collectors.toMap(entry3 -> {
            return ((Stream) entry3.getKey()).getScopedName();
        }, entry4 -> {
            return new RGStreamCutRecord(ImmutableMap.copyOf(io.pravega.client.control.impl.ModelHelper.getStreamCutMap((StreamCut) entry4.getValue())));
        })), j2);
    }

    public CompletableFuture<Controller.CreateReaderGroupResponse.Status> createReaderGroupTasks(String str, String str2, ReaderGroupConfig readerGroupConfig, long j, long j2) {
        return createReaderGroupTasks(str, str2, readerGroupConfig, j, this.streamMetadataStore.createRGContext(str, str2, j2));
    }

    public CompletableFuture<Controller.CreateReaderGroupResponse.Status> createReaderGroupTasks(String str, String str2, ReaderGroupConfig readerGroupConfig, long j, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context not null");
        return this.streamMetadataStore.createReaderGroup(str, str2, readerGroupConfig, j, operationContext, this.executor).thenCompose(r12 -> {
            if (ReaderGroupConfig.StreamDataRetention.NONE.equals(readerGroupConfig.getRetentionType())) {
                return CompletableFuture.completedFuture(null);
            }
            String scopedReaderGroupName = NameUtils.getScopedReaderGroupName(str, str2);
            Iterator it = readerGroupConfig.getStartingStreamCuts().keySet().stream().map((v0) -> {
                return v0.getScopedName();
            }).iterator();
            Objects.requireNonNull(it);
            return Futures.loop(it::hasNext, () -> {
                Stream of = Stream.of((String) it.next());
                return this.streamMetadataStore.addSubscriber(of.getScope(), of.getStreamName(), scopedReaderGroupName, readerGroupConfig.getGeneration(), operationContext, this.executor);
            }, this.executor);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r14 -> {
            return createRGStream(str, NameUtils.getStreamForReaderGroup(str2), StreamConfiguration.builder().scalingPolicy(ScalingPolicy.fixed(1)).rolloverSizeBytes(READER_GROUP_SEGMENT_ROLLOVER_SIZE_BYTES).build(), System.currentTimeMillis(), 10, getRequestId(operationContext)).thenCompose(status -> {
                return (status.equals(Controller.CreateStreamStatus.Status.STREAM_EXISTS) || status.equals(Controller.CreateStreamStatus.Status.SUCCESS)) ? this.streamMetadataStore.getVersionedReaderGroupState(str, str2, true, operationContext, this.executor).thenCompose(versionedMetadata -> {
                    return this.streamMetadataStore.updateReaderGroupVersionedState(str, str2, ReaderGroupState.ACTIVE, versionedMetadata, operationContext, this.executor);
                }).thenApply((Function<? super U, ? extends U>) versionedMetadata2 -> {
                    return Controller.CreateReaderGroupResponse.Status.SUCCESS;
                }) : Futures.failedFuture(new IllegalStateException(String.format("Error creating StateSynchronizer Stream for Reader Group %s: %s", str2, status.toString())));
            });
        }).exceptionally(th -> {
            log.warn(getRequestId(operationContext), "Error creating StateSynchronizer Stream:{} for Reader Group: {}. Exception: {} ", new Object[]{NameUtils.getStreamForReaderGroup(str2), str2, th.getMessage()});
            throw new CompletionException(Exceptions.unwrap(th));
        });
    }

    public CompletableFuture<Controller.CreateReaderGroupResponse> createReaderGroupInternal(String str, String str2, ReaderGroupConfig readerGroupConfig, long j, long j2) {
        Preconditions.checkNotNull(str, "ReaderGroup scope is null");
        Preconditions.checkNotNull(str2, "ReaderGroup name is null");
        Preconditions.checkNotNull(readerGroupConfig, "ReaderGroup config is null");
        Preconditions.checkArgument(j >= 0);
        try {
            NameUtils.validateReaderGroupName(str2);
            OperationContext createRGContext = this.streamMetadataStore.createRGContext(str, str2, j2);
            return RetryHelper.withRetriesAsync(() -> {
                return this.streamMetadataStore.checkScopeExists(str, createRGContext, this.executor).thenCompose(bool -> {
                    return !bool.booleanValue() ? CompletableFuture.completedFuture(Controller.CreateReaderGroupResponse.newBuilder().setStatus(Controller.CreateReaderGroupResponse.Status.SCOPE_NOT_FOUND).build()) : isRGCreationComplete(str, str2, createRGContext).thenCompose(bool -> {
                        return !bool.booleanValue() ? validateReaderGroupId(readerGroupConfig).thenCompose(readerGroupConfig2 -> {
                            return this.streamMetadataStore.addReaderGroupToScope(str, str2, readerGroupConfig2.getReaderGroupId(), createRGContext, this.executor).thenCompose(r15 -> {
                                return createReaderGroupTasks(str, str2, readerGroupConfig2, j, createRGContext);
                            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) status -> {
                                return Controller.CreateReaderGroupResponse.Status.SUCCESS.equals(status) ? buildCreateSuccessResponse(str, str2, createRGContext) : CompletableFuture.completedFuture(Controller.CreateReaderGroupResponse.newBuilder().setStatus(status).build());
                            });
                        }) : buildCreateSuccessResponse(str, str2, createRGContext);
                    });
                });
            }, th -> {
                return Exceptions.unwrap(th) instanceof RetryableException;
            }, 10, this.executor);
        } catch (IllegalArgumentException | NullPointerException e) {
            return CompletableFuture.completedFuture(Controller.CreateReaderGroupResponse.newBuilder().setStatus(Controller.CreateReaderGroupResponse.Status.INVALID_RG_NAME).build());
        }
    }

    private CompletableFuture<Boolean> isRGCreated(String str, String str2, OperationContext operationContext) {
        return Futures.exceptionallyExpecting(this.streamMetadataStore.getReaderGroupState(str, str2, true, operationContext, this.executor), th -> {
            return Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException;
        }, ReaderGroupState.UNKNOWN).thenApply(readerGroupState -> {
            log.debug(operationContext.getRequestId(), "ReaderGroup State is {}", new Object[]{readerGroupState.toString()});
            return Boolean.valueOf(ReaderGroupState.ACTIVE.equals(readerGroupState));
        });
    }

    public CompletableFuture<Controller.UpdateReaderGroupResponse> updateReaderGroup(String str, String str2, ReaderGroupConfig readerGroupConfig, long j) {
        OperationContext createRGContext = this.streamMetadataStore.createRGContext(str, str2, j);
        return RetryHelper.withRetriesAsync(() -> {
            return this.streamMetadataStore.checkReaderGroupExists(str, str2, createRGContext, this.executor).thenCompose(bool -> {
                return !bool.booleanValue() ? CompletableFuture.completedFuture(Controller.UpdateReaderGroupResponse.newBuilder().setStatus(Controller.UpdateReaderGroupResponse.Status.RG_NOT_FOUND).setGeneration(readerGroupConfig.getGeneration()).build()) : this.streamMetadataStore.getReaderGroupConfigRecord(str, str2, createRGContext, this.executor).thenCompose(versionedMetadata -> {
                    if (((ReaderGroupConfigRecord) versionedMetadata.getObject()).getGeneration() != readerGroupConfig.getGeneration()) {
                        return CompletableFuture.completedFuture(Controller.UpdateReaderGroupResponse.newBuilder().setStatus(Controller.UpdateReaderGroupResponse.Status.INVALID_CONFIG).setGeneration(readerGroupConfig.getGeneration()).build());
                    }
                    if (!((ReaderGroupConfigRecord) versionedMetadata.getObject()).isUpdating()) {
                        return this.streamMetadataStore.getReaderGroupId(str, str2, createRGContext, this.executor).thenCompose(uuid -> {
                            if (!readerGroupConfig.getReaderGroupId().equals(uuid)) {
                                return CompletableFuture.completedFuture(Controller.UpdateReaderGroupResponse.newBuilder().setStatus(Controller.UpdateReaderGroupResponse.Status.INVALID_CONFIG).setGeneration(readerGroupConfig.getGeneration()).build());
                            }
                            ImmutableSet<String> streamsToBeUnsubscribed = getStreamsToBeUnsubscribed((ReaderGroupConfigRecord) versionedMetadata.getObject(), readerGroupConfig);
                            UpdateReaderGroupEvent updateReaderGroupEvent = new UpdateReaderGroupEvent(str, str2, j, uuid, ((ReaderGroupConfigRecord) versionedMetadata.getObject()).getGeneration() + 1, isTransitionToOrFromSubscriber((ReaderGroupConfigRecord) versionedMetadata.getObject(), readerGroupConfig), streamsToBeUnsubscribed);
                            return this.eventHelperFuture.thenCompose(eventHelper -> {
                                return eventHelper.addIndexAndSubmitTask(updateReaderGroupEvent, () -> {
                                    return this.streamMetadataStore.startRGConfigUpdate(str, str2, readerGroupConfig, createRGContext, this.executor);
                                }).thenCompose(r11 -> {
                                    return eventHelper.checkDone(() -> {
                                        return isRGUpdated(str, str2, this.executor, createRGContext);
                                    }).thenCompose(r10 -> {
                                        return this.streamMetadataStore.getReaderGroupConfigRecord(str, str2, createRGContext, this.executor).thenApply(versionedMetadata -> {
                                            return Controller.UpdateReaderGroupResponse.newBuilder().setStatus(Controller.UpdateReaderGroupResponse.Status.SUCCESS).setGeneration(((ReaderGroupConfigRecord) versionedMetadata.getObject()).getGeneration()).build();
                                        });
                                    });
                                });
                            });
                        });
                    }
                    log.error(j, "Reader group config update failed as another update was in progress.", new Object[0]);
                    return CompletableFuture.completedFuture(Controller.UpdateReaderGroupResponse.newBuilder().setStatus(Controller.UpdateReaderGroupResponse.Status.FAILURE).setGeneration(readerGroupConfig.getGeneration()).build());
                });
            });
        }, th -> {
            return Exceptions.unwrap(th) instanceof RetryableException;
        }, 10, this.executor);
    }

    private boolean isTransitionToOrFromSubscriber(ReaderGroupConfigRecord readerGroupConfigRecord, ReaderGroupConfig readerGroupConfig) {
        return (ReaderGroupConfig.StreamDataRetention.NONE.equals(ReaderGroupConfig.StreamDataRetention.values()[readerGroupConfigRecord.getRetentionTypeOrdinal()]) && ReaderGroupConfig.StreamDataRetention.NONE.equals(readerGroupConfig.getRetentionType())) ? false : true;
    }

    private ImmutableSet<String> getStreamsToBeUnsubscribed(ReaderGroupConfigRecord readerGroupConfigRecord, ReaderGroupConfig readerGroupConfig) {
        if (isNonSubscriberToSubscriberTransition(readerGroupConfigRecord, readerGroupConfig)) {
            return ImmutableSet.of();
        }
        if (isSubscriberToNonSubscriberTransition(readerGroupConfigRecord, readerGroupConfig)) {
            ImmutableSet.Builder builder = ImmutableSet.builder();
            Set<String> keySet = readerGroupConfigRecord.getStartingStreamCuts().keySet();
            Objects.requireNonNull(builder);
            keySet.forEach((v1) -> {
                r1.add(v1);
            });
            return builder.build();
        }
        Set<String> keySet2 = readerGroupConfigRecord.getStartingStreamCuts().keySet();
        Set set = (Set) readerGroupConfig.getStartingStreamCuts().keySet().stream().map((v0) -> {
            return v0.getScopedName();
        }).collect(Collectors.toSet());
        ImmutableSet.Builder builder2 = ImmutableSet.builder();
        java.util.stream.Stream<String> filter = keySet2.stream().filter(str -> {
            return !set.contains(str);
        });
        Objects.requireNonNull(builder2);
        filter.forEach((v1) -> {
            r1.add(v1);
        });
        return builder2.build();
    }

    private boolean isNonSubscriberToSubscriberTransition(ReaderGroupConfigRecord readerGroupConfigRecord, ReaderGroupConfig readerGroupConfig) {
        return ReaderGroupConfig.StreamDataRetention.NONE.equals(ReaderGroupConfig.StreamDataRetention.values()[readerGroupConfigRecord.getRetentionTypeOrdinal()]) && !ReaderGroupConfig.StreamDataRetention.NONE.equals(readerGroupConfig.getRetentionType());
    }

    private boolean isSubscriberToNonSubscriberTransition(ReaderGroupConfigRecord readerGroupConfigRecord, ReaderGroupConfig readerGroupConfig) {
        return !ReaderGroupConfig.StreamDataRetention.NONE.equals(ReaderGroupConfig.StreamDataRetention.values()[readerGroupConfigRecord.getRetentionTypeOrdinal()]) && ReaderGroupConfig.StreamDataRetention.NONE.equals(readerGroupConfig.getRetentionType());
    }

    private CompletableFuture<Boolean> isRGUpdated(String str, String str2, Executor executor, OperationContext operationContext) {
        return this.streamMetadataStore.getReaderGroupConfigRecord(str, str2, operationContext, executor).thenCompose(versionedMetadata -> {
            log.debug(operationContext.getRequestId(), "Is ReaderGroup Config update complete ? {}", new Object[]{Boolean.valueOf(((ReaderGroupConfigRecord) versionedMetadata.getObject()).isUpdating())});
            return CompletableFuture.completedFuture(Boolean.valueOf(!((ReaderGroupConfigRecord) versionedMetadata.getObject()).isUpdating()));
        });
    }

    public CompletableFuture<Controller.DeleteReaderGroupStatus.Status> deleteReaderGroup(String str, String str2, String str3, long j) {
        OperationContext createRGContext = this.streamMetadataStore.createRGContext(str, str2, j);
        return RetryHelper.withRetriesAsync(() -> {
            return this.streamMetadataStore.checkReaderGroupExists(str, str2, createRGContext, this.executor).thenCompose(bool -> {
                return !bool.booleanValue() ? CompletableFuture.completedFuture(Controller.DeleteReaderGroupStatus.Status.RG_NOT_FOUND) : this.streamMetadataStore.getReaderGroupId(str, str2, createRGContext, this.executor).thenCompose(uuid -> {
                    return !uuid.equals(UUID.fromString(str3)) ? CompletableFuture.completedFuture(Controller.DeleteReaderGroupStatus.Status.RG_NOT_FOUND) : this.eventHelperFuture.thenCompose(eventHelper -> {
                        return this.streamMetadataStore.getReaderGroupConfigRecord(str, str2, createRGContext, this.executor).thenCompose(versionedMetadata -> {
                            return this.streamMetadataStore.getVersionedReaderGroupState(str, str2, true, createRGContext, this.executor).thenCompose(versionedMetadata -> {
                                return eventHelper.addIndexAndSubmitTask(new DeleteReaderGroupEvent(str, str2, j, uuid), () -> {
                                    return startReaderGroupDelete(str, str2, versionedMetadata, createRGContext);
                                }).thenCompose(r11 -> {
                                    return eventHelper.checkDone(() -> {
                                        return isRGDeleted(str, str2, createRGContext);
                                    }).thenApply(r2 -> {
                                        return Controller.DeleteReaderGroupStatus.Status.SUCCESS;
                                    });
                                });
                            });
                        });
                    });
                });
            });
        }, th -> {
            return Exceptions.unwrap(th) instanceof RetryableException;
        }, 10, this.executor);
    }

    private CompletableFuture<Boolean> isRGDeleted(String str, String str2, OperationContext operationContext) {
        return this.streamMetadataStore.checkReaderGroupExists(str, str2, operationContext, this.executor).thenApply(bool -> {
            return Boolean.valueOf(!bool.booleanValue());
        });
    }

    private CompletableFuture<Void> startReaderGroupDelete(String str, String str2, VersionedMetadata<ReaderGroupState> versionedMetadata, OperationContext operationContext) {
        return Futures.toVoid(this.streamMetadataStore.updateReaderGroupVersionedState(str, str2, ReaderGroupState.DELETING, versionedMetadata, operationContext, this.executor));
    }

    public CompletableFuture<Controller.DeleteScopeStatus.Status> deleteScopeRecursive(String str, long j) {
        OperationContext createScopeContext = this.streamMetadataStore.createScopeContext(str, j);
        return RetryHelper.withRetriesAsync(() -> {
            return this.streamMetadataStore.isScopeSealed(str, createScopeContext, this.executor).thenCompose(bool -> {
                if (!bool.booleanValue()) {
                    return this.streamMetadataStore.checkScopeExists(str, createScopeContext, this.executor).thenCompose(bool -> {
                        if (bool.booleanValue()) {
                            return this.streamMetadataStore.getScopeId(str, createScopeContext, this.executor).thenCompose(uuid -> {
                                return this.eventHelper.addIndexAndSubmitTask(new DeleteScopeEvent(str, j, uuid), () -> {
                                    return this.streamMetadataStore.sealScope(str, createScopeContext, this.executor);
                                }).thenCompose(r8 -> {
                                    return this.eventHelper.checkDone(() -> {
                                        return isScopeDeletionComplete(str, createScopeContext);
                                    });
                                }).thenApply(r2 -> {
                                    return Controller.DeleteScopeStatus.Status.SUCCESS;
                                });
                            });
                        }
                        log.info(j, "Requested Scope {} doesn't exist", new Object[]{str});
                        return CompletableFuture.completedFuture(Controller.DeleteScopeStatus.Status.SUCCESS);
                    });
                }
                log.debug(j, "Another Scope deletion API call for {} is already in progress", new Object[]{str});
                return CompletableFuture.completedFuture(Controller.DeleteScopeStatus.Status.SUCCESS);
            });
        }, th -> {
            return Exceptions.unwrap(th) instanceof RetryableException;
        }, 10, this.executor).exceptionally(th2 -> {
            return handleDeleteScopeError(th2, j, str);
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Task(name = "createStream", version = "1.0", resource = "{scope}/{stream}")
    public CompletableFuture<Controller.CreateStreamStatus.Status> createStream(String str, String str2, StreamConfiguration streamConfiguration, long j, long j2) {
        log.debug(j2, "createStream with resource called.", new Object[0]);
        OperationContext createStreamContext = this.streamMetadataStore.createStreamContext(str, str2, j2);
        return execute(new Resource(str, str2), new Serializable[]{str, str2, streamConfiguration, Long.valueOf(j), Long.valueOf(j2)}, () -> {
            return createStreamBody(str, str2, streamConfiguration, j, createStreamContext);
        });
    }

    public CompletableFuture<Controller.UpdateStreamStatus.Status> updateStream(String str, String str2, StreamConfiguration streamConfiguration, long j) {
        OperationContext createStreamContext = this.streamMetadataStore.createStreamContext(str, str2, j);
        return this.streamMetadataStore.getState(str, str2, true, createStreamContext, this.executor).thenCompose(state -> {
            if (!state.equals(State.SEALED)) {
                return this.streamMetadataStore.getConfigurationRecord(str, str2, createStreamContext, this.executor).thenCompose(versionedMetadata -> {
                    if (!((StreamConfigurationRecord) versionedMetadata.getObject()).isUpdating()) {
                        return this.eventHelperFuture.thenCompose(eventHelper -> {
                            return eventHelper.addIndexAndSubmitTask(new UpdateStreamEvent(str, str2, j), () -> {
                                return this.streamMetadataStore.startUpdateConfiguration(str, str2, streamConfiguration, createStreamContext, this.executor);
                            }).thenCompose(r13 -> {
                                return eventHelper.checkDone(() -> {
                                    return isUpdated(str, str2, streamConfiguration, createStreamContext);
                                }).thenApply(r2 -> {
                                    return Controller.UpdateStreamStatus.Status.SUCCESS;
                                });
                            });
                        });
                    }
                    log.error(j, "Another update in progress for {}/{}", new Object[]{str, str2});
                    return CompletableFuture.completedFuture(Controller.UpdateStreamStatus.Status.FAILURE);
                });
            }
            log.error(j, "Cannot update a sealed stream {}/{}", new Object[]{str, str2});
            return CompletableFuture.completedFuture(Controller.UpdateStreamStatus.Status.STREAM_SEALED);
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            return handleUpdateStreamError(th, j, "Exception updating stream configuration {}", NameUtils.getScopedStreamName(str, str2));
        });
    }

    @VisibleForTesting
    CompletableFuture<Boolean> isUpdated(String str, String str2, StreamConfiguration streamConfiguration, OperationContext operationContext) {
        CompletableFuture<State> state = this.streamMetadataStore.getState(str, str2, true, operationContext, this.executor);
        CompletableFuture thenApply = this.streamMetadataStore.getConfigurationRecord(str, str2, operationContext, this.executor).thenApply((v0) -> {
            return v0.getObject();
        });
        return CompletableFuture.allOf(state, thenApply).thenApply(r10 -> {
            State state2 = (State) state.join();
            StreamConfigurationRecord streamConfigurationRecord = (StreamConfigurationRecord) thenApply.join();
            if (streamConfigurationRecord.isUpdating()) {
                return Boolean.valueOf(!streamConfigurationRecord.getStreamConfiguration().equals(streamConfiguration));
            }
            if (!state2.equals(State.SEALED)) {
                return Boolean.valueOf((streamConfigurationRecord.getStreamConfiguration().equals(streamConfiguration) && state2.equals(State.UPDATING)) ? false : true);
            }
            log.error("Cannot update a sealed stream {}/{}", str, str2);
            throw new UnsupportedOperationException("Cannot update a sealed stream: " + NameUtils.getScopedStreamName(str, str2));
        });
    }

    public CompletableFuture<Controller.SubscribersResponse> listSubscribers(String str, String str2, long j) {
        OperationContext createStreamContext = this.streamMetadataStore.createStreamContext(str, str2, j);
        return this.streamMetadataStore.checkStreamExists(str, str2, createStreamContext, this.executor).thenCompose(bool -> {
            return !bool.booleanValue() ? CompletableFuture.completedFuture(Controller.SubscribersResponse.newBuilder().setStatus(Controller.SubscribersResponse.Status.STREAM_NOT_FOUND).build()) : this.streamMetadataStore.listSubscribers(str, str2, createStreamContext, this.executor).thenApply(list -> {
                return Controller.SubscribersResponse.newBuilder().setStatus(Controller.SubscribersResponse.Status.SUCCESS).addAllSubscribers(list).build();
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                log.error(j, "Exception trying to get list of subscribers for Stream {}. Cause {}", new Object[]{NameUtils.getScopedStreamName(str, str2), th});
                Throwable unwrap = Exceptions.unwrap(th);
                if (unwrap instanceof TimeoutException) {
                    throw new CompletionException(unwrap);
                }
                return Controller.SubscribersResponse.newBuilder().setStatus(Controller.SubscribersResponse.Status.FAILURE).build();
            });
        });
    }

    public CompletableFuture<Controller.UpdateSubscriberStatus.Status> updateSubscriberStreamCut(String str, String str2, String str3, String str4, long j, ImmutableMap<Long, Long> immutableMap, long j2) {
        OperationContext createStreamContext = this.streamMetadataStore.createStreamContext(str, str2, j2);
        return RetryHelper.withRetriesAsync(() -> {
            return this.streamMetadataStore.checkStreamExists(str, str2, createStreamContext, this.executor).thenCompose(bool -> {
                return !bool.booleanValue() ? CompletableFuture.completedFuture(Controller.UpdateSubscriberStatus.Status.STREAM_NOT_FOUND) : Futures.exceptionallyExpecting(this.streamMetadataStore.getSubscriber(str, str2, str3, createStreamContext, this.executor), th -> {
                    return Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException;
                }, (Object) null).thenCompose(versionedMetadata -> {
                    if (versionedMetadata == null) {
                        return CompletableFuture.completedFuture(Controller.UpdateSubscriberStatus.Status.SUBSCRIBER_NOT_FOUND);
                    }
                    List extractScopedNameTokens = NameUtils.extractScopedNameTokens(str3);
                    return this.streamMetadataStore.getReaderGroupId((String) extractScopedNameTokens.get(0), (String) extractScopedNameTokens.get(1), createStreamContext, this.executor).thenCompose(uuid -> {
                        return !uuid.equals(UUID.fromString(str4)) ? CompletableFuture.completedFuture(Controller.UpdateSubscriberStatus.Status.SUBSCRIBER_NOT_FOUND) : ((StreamSubscriber) versionedMetadata.getObject()).getGeneration() != j ? CompletableFuture.completedFuture(Controller.UpdateSubscriberStatus.Status.GENERATION_MISMATCH) : this.streamMetadataStore.updateSubscriberStreamCut(str, str2, str3, j, immutableMap, versionedMetadata, createStreamContext, this.executor).thenApply(r2 -> {
                            return Controller.UpdateSubscriberStatus.Status.SUCCESS;
                        }).exceptionally((Function<Throwable, ? extends U>) th2 -> {
                            log.error(j2, "Exception updating StreamCut for Subscriber {} on Stream {}. Cause:{}", new Object[]{str3, NameUtils.getScopedStreamName(str, str2), th2});
                            Throwable unwrap = Exceptions.unwrap(th2);
                            if (unwrap instanceof StoreException.OperationNotAllowedException) {
                                return Controller.UpdateSubscriberStatus.Status.STREAM_CUT_NOT_VALID;
                            }
                            if (unwrap instanceof TimeoutException) {
                                throw new CompletionException(unwrap);
                            }
                            return Controller.UpdateSubscriberStatus.Status.FAILURE;
                        });
                    });
                });
            });
        }, th -> {
            return Exceptions.unwrap(th) instanceof RetryableException;
        }, 10, this.executor);
    }

    public CompletableFuture<Void> retention(String str, String str2, RetentionPolicy retentionPolicy, long j, OperationContext operationContext, String str3) {
        Preconditions.checkNotNull(retentionPolicy);
        OperationContext createStreamContext = operationContext != null ? operationContext : this.streamMetadataStore.createStreamContext(str, str2, ControllerService.nextRequestId());
        return this.streamMetadataStore.getRetentionSet(str, str2, createStreamContext, this.executor).thenCompose(retentionSet -> {
            return generateStreamCutIfRequired(str, str2, retentionSet.getLatest(), j, createStreamContext, str3).thenCompose(streamCutRecord -> {
                return truncate(str, str2, retentionPolicy, createStreamContext, retentionSet, streamCutRecord);
            });
        }).thenAccept((Consumer<? super U>) r5 -> {
            StreamMetrics.reportRetentionEvent(str, str2);
        });
    }

    private CompletableFuture<StreamCutRecord> generateStreamCutIfRequired(String str, String str2, StreamCutReferenceRecord streamCutReferenceRecord, long j, OperationContext operationContext, String str3) {
        if (streamCutReferenceRecord == null || j - streamCutReferenceRecord.getRecordingTime() > this.retentionFrequencyMillis.get()) {
            return Futures.exceptionallyComposeExpecting(streamCutReferenceRecord == null ? CompletableFuture.completedFuture(null) : this.streamMetadataStore.getStreamCutRecord(str, str2, streamCutReferenceRecord, operationContext, this.executor), th -> {
                return th instanceof StoreException.DataNotFoundException;
            }, () -> {
                return null;
            }).thenCompose(streamCutRecord -> {
                return generateStreamCut(str, str2, streamCutRecord, operationContext, str3).thenCompose(streamCutRecord -> {
                    return this.streamMetadataStore.addStreamCutToRetentionSet(str, str2, streamCutRecord, operationContext, this.executor).thenApply(r13 -> {
                        log.debug(operationContext.getRequestId(), "New streamCut generated for stream {}/{}", new Object[]{str, str2});
                        return streamCutRecord;
                    });
                });
            });
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> truncate(String str, String str2, RetentionPolicy retentionPolicy, OperationContext operationContext, RetentionSet retentionSet, StreamCutRecord streamCutRecord) {
        return truncateInternal(str, str2, operationContext, retentionPolicy, streamCutRecord == null ? retentionSet : RetentionSet.addReferenceToStreamCutIfLatest(retentionSet, streamCutRecord));
    }

    private CompletableFuture<Void> truncateInternal(String str, String str2, OperationContext operationContext, RetentionPolicy retentionPolicy, RetentionSet retentionSet) {
        long requestId = operationContext.getRequestId();
        return this.streamMetadataStore.listSubscribers(str, str2, operationContext, this.executor).thenCompose(list -> {
            return Futures.allOfWithResults((List) list.stream().map(str3 -> {
                return this.streamMetadataStore.getSubscriber(str, str2, str3, operationContext, this.executor);
            }).collect(Collectors.toList()));
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) list2 -> {
            return Futures.allOfWithResults((List) list2.stream().map(versionedMetadata -> {
                return Futures.keysAllOfWithResults((Map) ((StreamSubscriber) versionedMetadata.getObject()).getTruncationStreamCut().entrySet().stream().collect(Collectors.toMap(entry -> {
                    return this.streamMetadataStore.getSegment(str, str2, ((Long) entry.getKey()).longValue(), operationContext, this.executor);
                }, (v0) -> {
                    return v0.getValue();
                })));
            }).collect(Collectors.toList()));
        }).thenApply(this::computeSubscribersLowerBound).thenCompose(map -> {
            return (retentionPolicy.getRetentionType().equals(RetentionPolicy.RetentionType.SIZE) ? getTruncationStreamCutBySizeLimit(str, str2, operationContext, retentionPolicy, retentionSet, map) : getTruncationStreamCutByTimeLimit(str, str2, operationContext, retentionPolicy, retentionSet, map)).thenCompose(map -> {
                if (map != null && !map.isEmpty()) {
                    return startTruncation(str, str2, map, operationContext).thenCompose(bool -> {
                        if (bool.booleanValue()) {
                            return this.streamMetadataStore.findStreamCutReferenceRecordBefore(str, str2, map, retentionSet, operationContext, this.executor).thenCompose(streamCutReferenceRecord -> {
                                return streamCutReferenceRecord != null ? this.streamMetadataStore.deleteStreamCutBefore(str, str2, streamCutReferenceRecord, operationContext, this.executor) : CompletableFuture.completedFuture(null);
                            });
                        }
                        throw new RuntimeException("Could not start truncation");
                    }).exceptionally((Function<Throwable, ? extends U>) th -> {
                        if (!(Exceptions.unwrap(th) instanceof IllegalArgumentException)) {
                            throw new CompletionException(th);
                        }
                        log.debug(requestId, "Cannot truncate at given streamCut because it intersects with existing truncation point", new Object[0]);
                        return null;
                    });
                }
                log.debug(operationContext.getRequestId(), "no truncation record could be compute that satisfied retention policy", new Object[0]);
                return CompletableFuture.completedFuture(null);
            });
        });
    }

    private CompletableFuture<Map<Long, Long>> getTruncationStreamCutBySizeLimit(String str, String str2, OperationContext operationContext, RetentionPolicy retentionPolicy, RetentionSet retentionSet, Map<Long, Long> map) {
        long recordingSize = retentionSet.getLatest().getRecordingSize();
        Map.Entry<StreamCutReferenceRecord, StreamCutReferenceRecord> boundStreamCuts = getBoundStreamCuts(retentionPolicy, retentionSet, streamCutReferenceRecord -> {
            return Long.valueOf(recordingSize - streamCutReferenceRecord.getRecordingSize());
        });
        return (map == null || map.isEmpty()) ? (CompletableFuture) Optional.ofNullable(boundStreamCuts.getValue()).map(streamCutReferenceRecord2 -> {
            return this.streamMetadataStore.getStreamCutRecord(str, str2, streamCutReferenceRecord2, operationContext, this.executor).thenApply((v0) -> {
                return v0.getStreamCut();
            });
        }).orElse(CompletableFuture.completedFuture(null)) : this.streamMetadataStore.getSizeTillStreamCut(str, str2, map, Optional.empty(), operationContext, this.executor).thenCompose(l -> {
            long longValue = recordingSize - l.longValue();
            return longValue < retentionPolicy.getRetentionParam() ? (CompletionStage) Optional.ofNullable((StreamCutReferenceRecord) boundStreamCuts.getValue()).map(streamCutReferenceRecord3 -> {
                return this.streamMetadataStore.getStreamCutRecord(str, str2, (StreamCutReferenceRecord) boundStreamCuts.getValue(), operationContext, this.executor).thenCompose(streamCutRecord -> {
                    return this.streamMetadataStore.compareStreamCut(str, str2, streamCutRecord.getStreamCut(), map, operationContext, this.executor).thenCompose(streamCutComparison -> {
                        switch (AnonymousClass1.$SwitchMap$io$pravega$controller$store$stream$StreamCutComparison[streamCutComparison.ordinal()]) {
                            case ApiResponseMessage.WARNING /* 2 */:
                                return CompletableFuture.completedFuture(streamCutRecord.getStreamCut());
                            default:
                                return getStreamcutBeforeLowerbound(str, str2, operationContext, retentionSet, map);
                        }
                    });
                });
            }).orElse(CompletableFuture.completedFuture(null)) : longValue < retentionPolicy.getRetentionMax() ? CompletableFuture.completedFuture(map) : (CompletionStage) Optional.ofNullable((StreamCutReferenceRecord) boundStreamCuts.getKey()).filter(streamCutReferenceRecord4 -> {
                return recordingSize - streamCutReferenceRecord4.getRecordingSize() < longValue;
            }).map(streamCutReferenceRecord5 -> {
                return this.streamMetadataStore.getStreamCutRecord(str, str2, streamCutReferenceRecord5, operationContext, this.executor).thenApply((v0) -> {
                    return v0.getStreamCut();
                });
            }).orElse(CompletableFuture.completedFuture(map));
        });
    }

    private CompletableFuture<Map<Long, Long>> getTruncationStreamCutByTimeLimit(String str, String str2, OperationContext operationContext, RetentionPolicy retentionPolicy, RetentionSet retentionSet, Map<Long, Long> map) {
        long longValue = this.retentionClock.get().get().longValue();
        Map.Entry<StreamCutReferenceRecord, StreamCutReferenceRecord> boundStreamCuts = getBoundStreamCuts(retentionPolicy, retentionSet, streamCutReferenceRecord -> {
            return Long.valueOf(longValue - streamCutReferenceRecord.getRecordingTime());
        });
        CompletableFuture<StreamCutRecord> completedFuture = boundStreamCuts.getValue() == null ? CompletableFuture.completedFuture(null) : this.streamMetadataStore.getStreamCutRecord(str, str2, boundStreamCuts.getValue(), operationContext, this.executor);
        if (map == null || map.isEmpty()) {
            return completedFuture.thenApply(streamCutRecord -> {
                return (Map) Optional.ofNullable(streamCutRecord).map((v0) -> {
                    return v0.getStreamCut();
                }).orElse(null);
            });
        }
        Optional max = retentionSet.getRetentionRecords().stream().filter(streamCutReferenceRecord2 -> {
            return longValue - streamCutReferenceRecord2.getRecordingTime() >= retentionPolicy.getRetentionMax();
        }).max(Comparator.comparingLong((v0) -> {
            return v0.getRecordingTime();
        }));
        CompletableFuture<StreamCutRecord> completedFuture2 = boundStreamCuts.getKey() == null ? CompletableFuture.completedFuture(null) : this.streamMetadataStore.getStreamCutRecord(str, str2, boundStreamCuts.getKey(), operationContext, this.executor);
        CompletableFuture completableFuture = (CompletableFuture) max.map(streamCutReferenceRecord3 -> {
            return this.streamMetadataStore.getStreamCutRecord(str, str2, streamCutReferenceRecord3, operationContext, this.executor);
        }).orElse(CompletableFuture.completedFuture(null));
        return CompletableFuture.allOf(completedFuture2, completedFuture, completableFuture).thenCompose(r20 -> {
            StreamCutRecord streamCutRecord2 = (StreamCutRecord) completedFuture2.join();
            StreamCutRecord streamCutRecord3 = (StreamCutRecord) completedFuture.join();
            StreamCutRecord streamCutRecord4 = (StreamCutRecord) completableFuture.join();
            return streamCutRecord3 != null ? this.streamMetadataStore.compareStreamCut(str, str2, streamCutRecord3.getStreamCut(), map, operationContext, this.executor).thenCompose(streamCutComparison -> {
                switch (AnonymousClass1.$SwitchMap$io$pravega$controller$store$stream$StreamCutComparison[streamCutComparison.ordinal()]) {
                    case ApiResponseMessage.ERROR /* 1 */:
                        return truncateAtLowerBoundOrMax(str, str2, operationContext, map, streamCutRecord2, streamCutRecord4);
                    case ApiResponseMessage.WARNING /* 2 */:
                        return CompletableFuture.completedFuture(streamCutRecord3.getStreamCut());
                    case ApiResponseMessage.INFO /* 3 */:
                        return getStreamcutBeforeLowerbound(str, str2, operationContext, retentionSet, map);
                    default:
                        throw new IllegalArgumentException("Invalid Compare streamcut response");
                }
            }) : CompletableFuture.completedFuture(null);
        });
    }

    private CompletableFuture<Map<Long, Long>> truncateAtLowerBoundOrMax(String str, String str2, OperationContext operationContext, Map<Long, Long> map, StreamCutRecord streamCutRecord, StreamCutRecord streamCutRecord2) {
        return streamCutRecord2 == null ? CompletableFuture.completedFuture(map) : this.streamMetadataStore.compareStreamCut(str, str2, map, streamCutRecord2.getStreamCut(), operationContext, this.executor).thenCompose(streamCutComparison -> {
            switch (AnonymousClass1.$SwitchMap$io$pravega$controller$store$stream$StreamCutComparison[streamCutComparison.ordinal()]) {
                case ApiResponseMessage.ERROR /* 1 */:
                    return CompletableFuture.completedFuture(map);
                case ApiResponseMessage.WARNING /* 2 */:
                    return CompletableFuture.completedFuture(streamCutRecord.getStreamCut());
                default:
                    return CompletableFuture.completedFuture(streamCutRecord2.getStreamCut());
            }
        });
    }

    private CompletableFuture<Map<Long, Long>> getStreamcutBeforeLowerbound(String str, String str2, OperationContext operationContext, RetentionSet retentionSet, Map<Long, Long> map) {
        return this.streamMetadataStore.findStreamCutReferenceRecordBefore(str, str2, map, retentionSet, operationContext, this.executor).thenCompose(streamCutReferenceRecord -> {
            return (CompletionStage) Optional.ofNullable(streamCutReferenceRecord).map(streamCutReferenceRecord -> {
                return this.streamMetadataStore.getStreamCutRecord(str, str2, streamCutReferenceRecord, operationContext, this.executor).thenApply((v0) -> {
                    return v0.getStreamCut();
                });
            }).orElse(CompletableFuture.completedFuture(null));
        });
    }

    private Map<Long, Long> computeSubscribersLowerBound(List<Map<StreamSegmentRecord, Long>> list) {
        HashMap hashMap = new HashMap();
        list.forEach(map -> {
            map.forEach((streamSegmentRecord, l) -> {
                if (hashMap.containsKey(streamSegmentRecord)) {
                    if (((Long) hashMap.get(streamSegmentRecord)).longValue() > l.longValue()) {
                        hashMap.put(streamSegmentRecord, l);
                        return;
                    }
                    return;
                }
                HashMap hashMap2 = new HashMap();
                HashMap hashMap3 = new HashMap();
                hashMap.forEach((streamSegmentRecord, l) -> {
                    if (streamSegmentRecord.overlaps(streamSegmentRecord)) {
                        if (streamSegmentRecord.segmentId() < streamSegmentRecord.segmentId()) {
                            hashMap2.put(streamSegmentRecord, l);
                        } else {
                            hashMap3.put(streamSegmentRecord, l);
                        }
                    }
                });
                if (hashMap3.isEmpty() && hashMap2.isEmpty()) {
                    hashMap.put(streamSegmentRecord, l);
                    return;
                }
                if (l.longValue() >= 0) {
                    if (hashMap2.isEmpty()) {
                        hashMap.put(streamSegmentRecord, l);
                        hashMap3.forEach((streamSegmentRecord2, l2) -> {
                            if (l2.longValue() >= 0) {
                                hashMap.remove(streamSegmentRecord2);
                                return;
                            }
                            TreeMap<Double, Double> treeMap = new TreeMap<>();
                            hashMap.keySet().forEach(streamSegmentRecord2 -> {
                                if (!streamSegmentRecord2.overlaps(streamSegmentRecord2) || streamSegmentRecord2.segmentId() >= streamSegmentRecord2.segmentId()) {
                                    return;
                                }
                                treeMap.put(Double.valueOf(streamSegmentRecord2.getKeyStart()), Double.valueOf(streamSegmentRecord2.getKeyEnd()));
                            });
                            if (checkCoverage(streamSegmentRecord2.getKeyStart(), streamSegmentRecord2.getKeyEnd(), treeMap)) {
                                hashMap.remove(streamSegmentRecord2);
                            }
                        });
                        return;
                    }
                    return;
                }
                TreeMap<Double, Double> treeMap = new TreeMap<>();
                hashMap2.keySet().forEach(streamSegmentRecord3 -> {
                    treeMap.put(Double.valueOf(streamSegmentRecord3.getKeyStart()), Double.valueOf(streamSegmentRecord3.getKeyEnd()));
                });
                if (checkCoverage(streamSegmentRecord.getKeyStart(), streamSegmentRecord.getKeyEnd(), treeMap)) {
                    return;
                }
                hashMap.put(streamSegmentRecord, l);
            });
        });
        return (Map) hashMap.entrySet().stream().collect(Collectors.toMap(entry -> {
            return Long.valueOf(((StreamSegmentRecord) entry.getKey()).segmentId());
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private boolean checkCoverage(double d, double d2, TreeMap<Double, Double> treeMap) {
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference(Double.valueOf(d));
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        Iterator<Map.Entry<Double, Double>> it = treeMap.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<Double, Double> next = it.next();
            if (((Double) atomicReference2.get()).doubleValue() >= d2) {
                atomicBoolean.set(true);
                break;
            }
            if (atomicReference.get() == null) {
                atomicReference.set(next.getValue());
                if (d < next.getKey().doubleValue()) {
                    atomicBoolean.set(false);
                    break;
                }
                atomicReference2.set(next.getValue());
            } else if (((Double) atomicReference.get()).doubleValue() >= next.getKey().doubleValue()) {
                atomicReference2.set(Double.valueOf(Math.max(((Double) atomicReference.get()).doubleValue(), next.getValue().doubleValue())));
            } else if (StreamSegmentRecord.overlaps(new AbstractMap.SimpleEntry((Double) atomicReference.get(), next.getKey()), new AbstractMap.SimpleEntry((Double) atomicReference2.get(), Double.valueOf(d2)))) {
                atomicBoolean.set(false);
                break;
            }
        }
        atomicBoolean.compareAndSet(true, ((Double) atomicReference2.get()).doubleValue() >= d2);
        return atomicBoolean.get();
    }

    private Map.Entry<StreamCutReferenceRecord, StreamCutReferenceRecord> getBoundStreamCuts(RetentionPolicy retentionPolicy, RetentionSet retentionSet, Function<StreamCutReferenceRecord, Long> function) {
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        AtomicLong atomicLong = new AtomicLong(Long.MIN_VALUE);
        AtomicLong atomicLong2 = new AtomicLong(Long.MAX_VALUE);
        retentionSet.getRetentionRecords().forEach(streamCutReferenceRecord -> {
            long longValue = ((Long) function.apply(streamCutReferenceRecord)).longValue();
            if (longValue >= retentionPolicy.getRetentionParam() && longValue <= retentionPolicy.getRetentionMax() && longValue > atomicLong.get()) {
                atomicReference.set(streamCutReferenceRecord);
                atomicLong.set(longValue);
            }
            if (longValue < retentionPolicy.getRetentionParam() || longValue >= atomicLong2.get()) {
                return;
            }
            atomicReference2.set(streamCutReferenceRecord);
            atomicLong2.set(longValue);
        });
        if (atomicReference.get() == null) {
            atomicReference.set((StreamCutReferenceRecord) atomicReference2.get());
        }
        return new AbstractMap.SimpleEntry((StreamCutReferenceRecord) atomicReference.get(), (StreamCutReferenceRecord) atomicReference2.get());
    }

    public CompletableFuture<StreamCutRecord> generateStreamCut(String str, String str2, StreamCutRecord streamCutRecord, OperationContext operationContext, String str3) {
        OperationContext createStreamContext = operationContext != null ? operationContext : this.streamMetadataStore.createStreamContext(str, str2, ControllerService.nextRequestId());
        return this.streamMetadataStore.getActiveSegments(str, str2, createStreamContext, this.executor).thenCompose(list -> {
            return Futures.allOfWithResults((Map) ((java.util.stream.Stream) list.stream().parallel()).collect(Collectors.toMap(streamSegmentRecord -> {
                return streamSegmentRecord;
            }, streamSegmentRecord2 -> {
                return getSegmentOffset(str, str2, streamSegmentRecord2.segmentId(), str3, createStreamContext.getRequestId());
            })));
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) map -> {
            long longValue = this.retentionClock.get().get().longValue();
            ImmutableMap.Builder builder = ImmutableMap.builder();
            map.forEach((streamSegmentRecord, l) -> {
                builder.put(Long.valueOf(streamSegmentRecord.segmentId()), l);
            });
            Map<Long, Long> build = builder.build();
            return this.streamMetadataStore.getSizeTillStreamCut(str, str2, build, Optional.ofNullable(streamCutRecord), createStreamContext, this.executor).thenApply(l2 -> {
                return new StreamCutRecord(longValue, l2.longValue(), build);
            });
        });
    }

    public CompletableFuture<Controller.UpdateStreamStatus.Status> truncateStream(String str, String str2, Map<Long, Long> map, long j) {
        OperationContext createStreamContext = this.streamMetadataStore.createStreamContext(str, str2, j);
        return this.streamMetadataStore.getState(str, str2, true, createStreamContext, this.executor).thenCompose(state -> {
            if (!state.equals(State.SEALED)) {
                return this.eventHelperFuture.thenCompose(eventHelper -> {
                    return startTruncation(str, str2, map, createStreamContext).thenCompose(bool -> {
                        if (bool.booleanValue()) {
                            return eventHelper.checkDone(() -> {
                                return isTruncated(str, str2, map, createStreamContext);
                            }, 1000L).thenApply(r2 -> {
                                return Controller.UpdateStreamStatus.Status.SUCCESS;
                            });
                        }
                        log.error(j, "Unable to start truncation for {}/{}", new Object[]{str, str2});
                        return CompletableFuture.completedFuture(Controller.UpdateStreamStatus.Status.FAILURE);
                    });
                });
            }
            log.error(j, "Cannot truncate a sealed stream {}/{}", new Object[]{str, str2});
            return CompletableFuture.completedFuture(Controller.UpdateStreamStatus.Status.STREAM_SEALED);
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            return handleUpdateStreamError(th, j, "Exception thrown in trying to truncate stream", NameUtils.getScopedStreamName(str, str2));
        });
    }

    public CompletableFuture<Boolean> startTruncation(String str, String str2, Map<Long, Long> map, OperationContext operationContext) {
        OperationContext createStreamContext = operationContext != null ? operationContext : this.streamMetadataStore.createStreamContext(str, str2, ControllerService.nextRequestId());
        long requestId = createStreamContext.getRequestId();
        return this.streamMetadataStore.getTruncationRecord(str, str2, createStreamContext, this.executor).thenCompose(versionedMetadata -> {
            if (!((StreamTruncationRecord) versionedMetadata.getObject()).isUpdating()) {
                return this.eventHelperFuture.thenCompose(eventHelper -> {
                    return eventHelper.addIndexAndSubmitTask(new TruncateStreamEvent(str, str2, requestId), () -> {
                        return this.streamMetadataStore.startTruncation(str, str2, map, createStreamContext, this.executor);
                    }).thenApply(r13 -> {
                        log.debug(requestId, "Started truncation request for stream {}/{}", new Object[]{str, str2});
                        return true;
                    });
                });
            }
            log.error(requestId, "Another truncation in progress for {}/{}", new Object[]{str, str2});
            return CompletableFuture.completedFuture(false);
        });
    }

    @VisibleForTesting
    CompletableFuture<Boolean> isTruncated(String str, String str2, Map<Long, Long> map, OperationContext operationContext) {
        CompletableFuture<State> state = this.streamMetadataStore.getState(str, str2, true, operationContext, this.executor);
        CompletableFuture thenApply = this.streamMetadataStore.getTruncationRecord(str, str2, operationContext, this.executor).thenApply((v0) -> {
            return v0.getObject();
        });
        return CompletableFuture.allOf(state, thenApply).thenApply(r10 -> {
            State state2 = (State) state.join();
            StreamTruncationRecord streamTruncationRecord = (StreamTruncationRecord) thenApply.join();
            if (streamTruncationRecord.isUpdating()) {
                return Boolean.valueOf(!streamTruncationRecord.getStreamCut().equals(map));
            }
            if (!state2.equals(State.SEALED)) {
                return Boolean.valueOf((streamTruncationRecord.getStreamCut().equals(map) && state2.equals(State.TRUNCATING)) ? false : true);
            }
            log.error("Cannot truncate a sealed stream {}/{}", str, str2);
            throw new UnsupportedOperationException("Cannot truncate a sealed stream: " + NameUtils.getScopedStreamName(str, str2));
        });
    }

    public CompletableFuture<Controller.UpdateStreamStatus.Status> sealStream(String str, String str2, long j) {
        return sealStream(str, str2, this.streamMetadataStore.createStreamContext(str, str2, j));
    }

    public CompletableFuture<Controller.UpdateStreamStatus.Status> sealStream(String str, String str2, OperationContext operationContext) {
        return sealStream(str, str2, operationContext, 10);
    }

    @VisibleForTesting
    CompletableFuture<Controller.UpdateStreamStatus.Status> sealStream(String str, String str2, OperationContext operationContext, int i) {
        long requestId = operationContext.getRequestId();
        SealStreamEvent sealStreamEvent = new SealStreamEvent(str, str2, requestId);
        return this.eventHelperFuture.thenCompose(eventHelper -> {
            return eventHelper.addIndexAndSubmitTask(sealStreamEvent, () -> {
                return RetryHelper.withRetriesAsync(() -> {
                    return this.streamMetadataStore.getVersionedState(str, str2, operationContext, this.executor).thenCompose(versionedMetadata -> {
                        return ((State) versionedMetadata.getObject()).equals(State.SEALED) ? CompletableFuture.completedFuture(versionedMetadata) : this.streamMetadataStore.updateVersionedState(str, str2, State.SEALING, versionedMetadata, operationContext, this.executor);
                    });
                }, RetryHelper.RETRYABLE_PREDICATE.or(th -> {
                    return Exceptions.unwrap(th) instanceof StoreException.OperationNotAllowedException;
                }), i, this.executor);
            }).thenCompose(versionedMetadata -> {
                return (((State) versionedMetadata.getObject()).equals(State.SEALED) || ((State) versionedMetadata.getObject()).equals(State.SEALING)) ? eventHelper.checkDone(() -> {
                    return isSealed(str, str2, operationContext);
                }).thenApply(r2 -> {
                    return Controller.UpdateStreamStatus.Status.SUCCESS;
                }) : CompletableFuture.completedFuture(Controller.UpdateStreamStatus.Status.FAILURE);
            });
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            return handleUpdateStreamError(th, requestId, "Exception thrown in trying to notify sealed segments.", NameUtils.getScopedStreamName(str, str2));
        });
    }

    private CompletableFuture<Boolean> isSealed(String str, String str2, OperationContext operationContext) {
        return this.streamMetadataStore.getState(str, str2, true, operationContext, this.executor).thenApply(state -> {
            return Boolean.valueOf(state.equals(State.SEALED));
        });
    }

    public CompletableFuture<Controller.DeleteStreamStatus.Status> deleteStream(String str, String str2, long j) {
        return deleteStream(str, str2, this.streamMetadataStore.createStreamContext(str, str2, j));
    }

    public CompletableFuture<Controller.DeleteStreamStatus.Status> deleteStream(String str, String str2, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation Context is null");
        long requestId = operationContext.getRequestId();
        return this.eventHelperFuture.thenCompose(eventHelper -> {
            return Futures.exceptionallyExpecting(this.streamMetadataStore.getState(str, str2, false, operationContext, this.executor), th -> {
                return Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException;
            }, State.UNKNOWN).thenCompose(state -> {
                if (!State.SEALED.equals(state) && !State.CREATING.equals(state)) {
                    return State.UNKNOWN.equals(state) ? this.streamMetadataStore.deleteStream(str, str2, operationContext, this.executor).exceptionally(th2 -> {
                        throw new CompletionException(th2);
                    }).thenApply(r2 -> {
                        return true;
                    }) : CompletableFuture.completedFuture(false);
                }
                CompletableFuture<U> thenApply = this.streamMetadataStore.getCreationTime(str, str2, operationContext, this.executor).thenApply(l -> {
                    return new DeleteStreamEvent(str, str2, requestId, l.longValue());
                });
                Objects.requireNonNull(eventHelper);
                return thenApply.thenCompose((Function<? super U, ? extends CompletionStage<U>>) (v1) -> {
                    return r1.writeEvent(v1);
                }).thenApply(r22 -> {
                    return true;
                });
            }).thenCompose(bool -> {
                return bool.booleanValue() ? eventHelper.checkDone(() -> {
                    return isDeleted(str, str2, operationContext);
                }).thenApply(r2 -> {
                    return Controller.DeleteStreamStatus.Status.SUCCESS;
                }) : CompletableFuture.completedFuture(Controller.DeleteStreamStatus.Status.STREAM_NOT_SEALED);
            });
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            return handleDeleteStreamError(th, requestId, NameUtils.getScopedStreamName(str, str2));
        });
    }

    private CompletableFuture<Boolean> isDeleted(String str, String str2, OperationContext operationContext) {
        return this.streamMetadataStore.checkStreamExists(str, str2, operationContext, this.executor).thenApply(bool -> {
            return Boolean.valueOf(!bool.booleanValue());
        });
    }

    private CompletableFuture<Boolean> isScopeDeletionComplete(String str, OperationContext operationContext) {
        return this.streamMetadataStore.isScopeSealed(str, operationContext, this.executor).thenApply(bool -> {
            return Boolean.valueOf(!bool.booleanValue());
        });
    }

    public CompletableFuture<Controller.ScaleResponse> manualScale(String str, String str2, List<Long> list, List<Map.Entry<Double, Double>> list2, long j, long j2) {
        OperationContext createStreamContext = this.streamMetadataStore.createStreamContext(str, str2, j2);
        ScaleOpEvent scaleOpEvent = new ScaleOpEvent(str, str2, list, list2, true, j, j2);
        return this.eventHelperFuture.thenCompose(eventHelper -> {
            return eventHelper.addIndexAndSubmitTask(scaleOpEvent, () -> {
                return this.streamMetadataStore.submitScale(str, str2, list, new ArrayList(list2), j, null, createStreamContext, this.executor);
            }).handle((versionedMetadata, th) -> {
                Controller.ScaleResponse.Builder newBuilder = Controller.ScaleResponse.newBuilder();
                if (th != null) {
                    Throwable unwrap = Exceptions.unwrap(th);
                    if (unwrap instanceof EpochTransitionOperationExceptions.PreConditionFailureException) {
                        newBuilder.setStatus(Controller.ScaleResponse.ScaleStreamStatus.PRECONDITION_FAILED);
                    } else {
                        log.error(j2, "Scale for stream {}/{} failed with exception {}", new Object[]{str, str2, unwrap});
                        newBuilder.setStatus(Controller.ScaleResponse.ScaleStreamStatus.FAILURE);
                    }
                } else {
                    log.info(j2, "scale for stream {}/{} started successfully", new Object[]{str, str2});
                    newBuilder.setStatus(Controller.ScaleResponse.ScaleStreamStatus.STARTED);
                    newBuilder.addAllSegments((Iterable) ((EpochTransitionRecord) versionedMetadata.getObject()).getNewSegmentsWithRange().entrySet().stream().map(entry -> {
                        return convert(str, str2, entry);
                    }).collect(Collectors.toList()));
                    newBuilder.setEpoch(((EpochTransitionRecord) versionedMetadata.getObject()).getActiveEpoch());
                }
                return newBuilder.build();
            });
        });
    }

    public CompletableFuture<Controller.ScaleStatusResponse> checkScale(String str, String str2, int i, long j) {
        OperationContext createStreamContext = this.streamMetadataStore.createStreamContext(str, str2, j);
        CompletableFuture<EpochRecord> activeEpoch = this.streamMetadataStore.getActiveEpoch(str, str2, createStreamContext, true, this.executor);
        CompletableFuture<State> state = this.streamMetadataStore.getState(str, str2, true, createStreamContext, this.executor);
        CompletableFuture thenApply = this.streamMetadataStore.getEpochTransition(str, str2, createStreamContext, this.executor).thenApply((v0) -> {
            return v0.getObject();
        });
        return CompletableFuture.allOf(state, activeEpoch, thenApply).handle((r7, th) -> {
            Controller.ScaleStatusResponse.Builder newBuilder = Controller.ScaleStatusResponse.newBuilder();
            if (th == null) {
                EpochRecord epochRecord = (EpochRecord) activeEpoch.join();
                State state2 = (State) state.join();
                EpochTransitionRecord epochTransitionRecord = (EpochTransitionRecord) thenApply.join();
                if (i > epochRecord.getEpoch()) {
                    newBuilder.setStatus(Controller.ScaleStatusResponse.ScaleStatus.INVALID_INPUT);
                } else if (epochRecord.getEpoch() == i || epochRecord.getReferenceEpoch() == i) {
                    newBuilder.setStatus(Controller.ScaleStatusResponse.ScaleStatus.IN_PROGRESS);
                } else if (i + 1 == epochRecord.getReferenceEpoch() && state2.equals(State.SCALING) && (epochTransitionRecord.equals(EpochTransitionRecord.EMPTY) || epochTransitionRecord.getNewEpoch() == epochRecord.getEpoch())) {
                    newBuilder.setStatus(Controller.ScaleStatusResponse.ScaleStatus.IN_PROGRESS);
                } else {
                    newBuilder.setStatus(Controller.ScaleStatusResponse.ScaleStatus.SUCCESS);
                }
            } else if (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException) {
                newBuilder.setStatus(Controller.ScaleStatusResponse.ScaleStatus.INVALID_INPUT);
            } else {
                newBuilder.setStatus(Controller.ScaleStatusResponse.ScaleStatus.INTERNAL_ERROR);
            }
            return newBuilder.build();
        });
    }

    @VisibleForTesting
    <T> CompletableFuture<T> addIndexAndSubmitTask(ControllerEvent controllerEvent, Supplier<CompletableFuture<T>> supplier) {
        return (CompletableFuture<T>) this.eventHelperFuture.thenCompose(eventHelper -> {
            return eventHelper.addIndexAndSubmitTask(controllerEvent, supplier);
        });
    }

    public CompletableFuture<Void> writeEvent(ControllerEvent controllerEvent) {
        return this.eventHelperFuture.thenCompose(eventHelper -> {
            return eventHelper.writeEvent(controllerEvent);
        });
    }

    @VisibleForTesting
    public void setRequestEventWriter(EventStreamWriter<ControllerEvent> eventStreamWriter) {
        this.eventHelperFuture.thenAccept(eventHelper -> {
            eventHelper.setRequestEventWriter(eventStreamWriter);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> removeTaskFromIndex(String str, String str2) {
        return this.eventHelperFuture.thenCompose(eventHelper -> {
            return eventHelper.removeTaskFromIndex(str, str2);
        });
    }

    @VisibleForTesting
    CompletableFuture<Controller.CreateStreamStatus.Status> createStreamBody(String str, String str2, StreamConfiguration streamConfiguration, long j, OperationContext operationContext) {
        long requestId = getRequestId(operationContext);
        return this.streamMetadataStore.isScopeSealed(str, operationContext, this.executor).thenCompose(bool -> {
            if (!bool.booleanValue()) {
                return this.streamMetadataStore.createStream(str, str2, streamConfiguration, j, operationContext, this.executor).thenComposeAsync(createStreamResponse -> {
                    log.debug(requestId, "{}/{} created in metadata store", new Object[]{str, str2});
                    Controller.CreateStreamStatus.Status translate = translate(createStreamResponse.getStatus());
                    if (!createStreamResponse.getStatus().equals(CreateStreamResponse.CreateStatus.NEW) && !createStreamResponse.getStatus().equals(CreateStreamResponse.CreateStatus.EXISTS_CREATING)) {
                        return CompletableFuture.completedFuture(translate);
                    }
                    int startingSegmentNumber = createStreamResponse.getStartingSegmentNumber();
                    return notifyNewSegments(str, str2, createStreamResponse.getConfiguration(), (List<Long>) IntStream.range(startingSegmentNumber, startingSegmentNumber + createStreamResponse.getConfiguration().getScalingPolicy().getMinNumSegments()).boxed().map(num -> {
                        return Long.valueOf(NameUtils.computeSegmentId(num.intValue(), 0));
                    }).collect(Collectors.toList()), retrieveDelegationToken(), requestId).thenCompose(r15 -> {
                        return createMarkStream(str, str2, j, requestId);
                    }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r12 -> {
                        return TaskStepsRetryHelper.withRetries(() -> {
                            return (streamConfiguration.getRetentionPolicy() != null ? this.bucketStore.addStreamToBucketStore(BucketStore.ServiceType.RetentionService, str, str2, this.executor) : CompletableFuture.completedFuture(null)).thenCompose((Function<? super Void, ? extends CompletionStage<U>>) r12 -> {
                                return this.streamMetadataStore.addStreamTagsToIndex(str, str2, streamConfiguration, operationContext, this.executor);
                            }).thenCompose(r10 -> {
                                return this.streamMetadataStore.getVersionedState(str, str2, operationContext, this.executor).thenCompose(versionedMetadata -> {
                                    return ((State) versionedMetadata.getObject()).equals(State.CREATING) ? this.streamMetadataStore.updateVersionedState(str, str2, State.ACTIVE, versionedMetadata, operationContext, this.executor) : CompletableFuture.completedFuture(versionedMetadata);
                                });
                            });
                        }, this.executor).thenApply(versionedMetadata -> {
                            return translate;
                        });
                    });
                }, (Executor) this.executor).handle((BiFunction<? super U, Throwable, ? extends U>) (status, th) -> {
                    if (th == null) {
                        return status;
                    }
                    Throwable unwrap = Exceptions.unwrap(th);
                    log.warn(requestId, "Create stream failed due to ", new Object[]{th});
                    return unwrap instanceof StoreException.DataNotFoundException ? Controller.CreateStreamStatus.Status.SCOPE_NOT_FOUND : Controller.CreateStreamStatus.Status.FAILURE;
                });
            }
            log.warn(requestId, "Create stream failed due to scope in sealed state", new Object[0]);
            return CompletableFuture.completedFuture(Controller.CreateStreamStatus.Status.SCOPE_NOT_FOUND);
        });
    }

    private CompletableFuture<Void> createMarkStream(String str, String str2, long j, long j2) {
        String markStreamForStream = NameUtils.getMarkStreamForStream(str2);
        StreamConfiguration build = StreamConfiguration.builder().scalingPolicy(ScalingPolicy.fixed(1)).build();
        OperationContext createStreamContext = this.streamMetadataStore.createStreamContext(str, markStreamForStream, j2);
        return this.streamMetadataStore.createStream(str, markStreamForStream, build, j, createStreamContext, this.executor).thenCompose(createStreamResponse -> {
            return notifyNewSegment(str, markStreamForStream, NameUtils.computeSegmentId(createStreamResponse.getStartingSegmentNumber(), 0), createStreamResponse.getConfiguration().getScalingPolicy(), retrieveDelegationToken(), j2, build.getRolloverSizeBytes());
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r10 -> {
            return this.streamMetadataStore.getVersionedState(str, markStreamForStream, createStreamContext, this.executor).thenCompose(versionedMetadata -> {
                return Futures.toVoid(this.streamMetadataStore.updateVersionedState(str, markStreamForStream, State.ACTIVE, versionedMetadata, createStreamContext, this.executor));
            });
        });
    }

    private Controller.CreateStreamStatus.Status translate(CreateStreamResponse.CreateStatus createStatus) {
        Controller.CreateStreamStatus.Status status;
        switch (AnonymousClass1.$SwitchMap$io$pravega$controller$store$stream$CreateStreamResponse$CreateStatus[createStatus.ordinal()]) {
            case ApiResponseMessage.ERROR /* 1 */:
                status = Controller.CreateStreamStatus.Status.SUCCESS;
                break;
            case ApiResponseMessage.WARNING /* 2 */:
            case ApiResponseMessage.INFO /* 3 */:
                status = Controller.CreateStreamStatus.Status.STREAM_EXISTS;
                break;
            case ApiResponseMessage.OK /* 4 */:
            default:
                status = Controller.CreateStreamStatus.Status.FAILURE;
                break;
        }
        return status;
    }

    public CompletableFuture<Void> notifyNewSegments(String str, String str2, List<Long> list, OperationContext operationContext, String str3, long j) {
        return TaskStepsRetryHelper.withRetries(() -> {
            return this.streamMetadataStore.getConfiguration(str, str2, operationContext, this.executor);
        }, this.executor).thenCompose(streamConfiguration -> {
            return notifyNewSegments(str, str2, streamConfiguration, (List<Long>) list, str3, j);
        });
    }

    public CompletableFuture<Void> notifyNewSegments(String str, String str2, StreamConfiguration streamConfiguration, List<Long> list, String str3, long j) {
        return Futures.toVoid(Futures.allOfWithResults((List) ((java.util.stream.Stream) list.stream().parallel()).map(l -> {
            return notifyNewSegment(str, str2, l.longValue(), streamConfiguration.getScalingPolicy(), str3, j, streamConfiguration.getRolloverSizeBytes());
        }).collect(Collectors.toList())));
    }

    public CompletableFuture<Void> notifyNewSegment(String str, String str2, long j, ScalingPolicy scalingPolicy, String str3, long j2, long j3) {
        return Futures.toVoid(TaskStepsRetryHelper.withRetries(() -> {
            return this.segmentHelper.createSegment(str, str2, j, scalingPolicy, str3, j2, j3);
        }, this.executor));
    }

    public CompletableFuture<Void> notifyDeleteSegments(String str, String str2, Set<Long> set, String str3, long j) {
        return Futures.allOf((Collection) ((java.util.stream.Stream) set.stream().parallel()).map(l -> {
            return notifyDeleteSegment(str, str2, l.longValue(), str3, j);
        }).collect(Collectors.toList()));
    }

    public CompletableFuture<Void> notifyDeleteSegment(String str, String str2, long j, String str3, long j2) {
        return Futures.toVoid(TaskStepsRetryHelper.withRetries(() -> {
            return this.segmentHelper.deleteSegment(str, str2, j, str3, j2);
        }, this.executor));
    }

    public CompletableFuture<Void> notifyTruncateSegment(String str, String str2, Map.Entry<Long, Long> entry, String str3, long j) {
        return Futures.toVoid(TaskStepsRetryHelper.withRetries(() -> {
            return this.segmentHelper.truncateSegment(str, str2, ((Long) entry.getKey()).longValue(), ((Long) entry.getValue()).longValue(), str3, j);
        }, this.executor));
    }

    public CompletableFuture<Map<Long, Long>> getSealedSegmentsSize(String str, String str2, List<Long> list, String str3, long j) {
        return Futures.allOfWithResults((Map) ((java.util.stream.Stream) list.stream().parallel()).collect(Collectors.toMap(l -> {
            return l;
        }, l2 -> {
            return getSegmentOffset(str, str2, l2.longValue(), str3, j);
        })));
    }

    public CompletableFuture<Void> notifySealedSegments(String str, String str2, List<Long> list, String str3, long j) {
        return Futures.allOf((Collection) ((java.util.stream.Stream) list.stream().parallel()).map(l -> {
            return notifySealedSegment(str, str2, l.longValue(), str3, j);
        }).collect(Collectors.toList()));
    }

    private CompletableFuture<Void> notifySealedSegment(String str, String str2, long j, String str3, long j2) {
        return Futures.toVoid(TaskStepsRetryHelper.withRetries(() -> {
            return this.segmentHelper.sealSegment(str, str2, j, str3, j2);
        }, this.executor));
    }

    public CompletableFuture<Void> notifyPolicyUpdates(String str, String str2, List<StreamSegmentRecord> list, ScalingPolicy scalingPolicy, String str3, long j) {
        return Futures.toVoid(Futures.allOfWithResults((List) ((java.util.stream.Stream) list.stream().parallel()).map(streamSegmentRecord -> {
            return notifyPolicyUpdate(str, str2, scalingPolicy, streamSegmentRecord.segmentId(), str3, j);
        }).collect(Collectors.toList())));
    }

    private CompletableFuture<Long> getSegmentOffset(String str, String str2, long j, String str3, long j2) {
        return TaskStepsRetryHelper.withRetries(() -> {
            return this.segmentHelper.getSegmentInfo(str, str2, j, str3, j2);
        }, this.executor).thenApply((v0) -> {
            return v0.getWriteOffset();
        });
    }

    private CompletableFuture<Void> notifyPolicyUpdate(String str, String str2, ScalingPolicy scalingPolicy, long j, String str3, long j2) {
        return TaskStepsRetryHelper.withRetries(() -> {
            return this.segmentHelper.updatePolicy(str, str2, scalingPolicy, j, str3, j2);
        }, this.executor);
    }

    private Controller.SegmentRange convert(String str, String str2, Map.Entry<Long, Map.Entry<Double, Double>> entry) {
        return io.pravega.client.control.impl.ModelHelper.createSegmentRange(str, str2, entry.getKey().longValue(), entry.getValue().getKey().doubleValue(), entry.getValue().getValue().doubleValue());
    }

    private Controller.UpdateStreamStatus.Status handleUpdateStreamError(Throwable th, long j, String str, String str2) {
        Throwable unwrap = Exceptions.unwrap(th);
        log.error(j, "Exception updating Stream {}. Cause: {}.", new Object[]{str2, str, unwrap});
        if (unwrap instanceof StoreException.DataNotFoundException) {
            return Controller.UpdateStreamStatus.Status.STREAM_NOT_FOUND;
        }
        if (unwrap instanceof UnsupportedOperationException) {
            return Controller.UpdateStreamStatus.Status.STREAM_SEALED;
        }
        if (unwrap instanceof TimeoutException) {
            throw new CompletionException(unwrap);
        }
        return Controller.UpdateStreamStatus.Status.FAILURE;
    }

    private Controller.DeleteStreamStatus.Status handleDeleteStreamError(Throwable th, long j, String str) {
        Throwable unwrap = Exceptions.unwrap(th);
        log.error(j, "Exception deleting stream {}. Cause: {}", new Object[]{str, th});
        if (unwrap instanceof StoreException.DataNotFoundException) {
            return Controller.DeleteStreamStatus.Status.STREAM_NOT_FOUND;
        }
        if (unwrap instanceof TimeoutException) {
            throw new CompletionException(unwrap);
        }
        return Controller.DeleteStreamStatus.Status.FAILURE;
    }

    private Controller.DeleteScopeStatus.Status handleDeleteScopeError(Throwable th, long j, String str) {
        Throwable unwrap = Exceptions.unwrap(th);
        log.error(j, "Exception deleting scope {}. Cause: {}", new Object[]{str, th});
        if (unwrap instanceof StoreException.DataNotFoundException) {
            return Controller.DeleteScopeStatus.Status.SCOPE_NOT_FOUND;
        }
        if (unwrap instanceof TimeoutException) {
            throw new CompletionException(unwrap);
        }
        return Controller.DeleteScopeStatus.Status.FAILURE;
    }

    public CompletableFuture<Map<Long, List<Long>>> mergeTxnSegmentsIntoStreamSegments(String str, String str2, List<Long> list, List<UUID> list2, long j) {
        return Futures.allOfWithResults((Map) list.stream().collect(Collectors.toMap(l -> {
            return l;
        }, l2 -> {
            return mergeTxnSegments(str, str2, l2.longValue(), list2, j);
        })));
    }

    private CompletableFuture<List<Long>> mergeTxnSegments(String str, String str2, long j, List<UUID> list, long j2) {
        return TaskStepsRetryHelper.withRetries(() -> {
            return this.segmentHelper.mergeTxnSegments(str, str2, j, j, list, retrieveDelegationToken(), j2).exceptionally(th -> {
                if ((th instanceof WireCommandFailedException) && WireCommandFailedException.Reason.SegmentDoesNotExist.equals(((WireCommandFailedException) th).getReason())) {
                    throw new IllegalStateException(String.format("Segment Merge failed as target segment [%s] was not found.", NameUtils.getQualifiedStreamSegmentName(str, str2, j)));
                }
                throw new CompletionException(th);
            });
        }, this.executor);
    }

    public CompletableFuture<Void> notifyTxnAbort(String str, String str2, List<Long> list, UUID uuid, long j) {
        Timer timer = new Timer();
        return Futures.allOf((Collection) ((java.util.stream.Stream) list.stream().parallel()).map(l -> {
            return notifyTxnAbort(str, str2, l.longValue(), uuid, j);
        }).collect(Collectors.toList())).thenRun(() -> {
            TransactionMetrics.getInstance().abortTransactionSegments(timer.getElapsed());
        });
    }

    private CompletableFuture<Controller.TxnStatus> notifyTxnAbort(String str, String str2, long j, UUID uuid, long j2) {
        return TaskStepsRetryHelper.withRetries(() -> {
            return this.segmentHelper.abortTransaction(str, str2, j, uuid, retrieveDelegationToken(), j2);
        }, this.executor);
    }

    public CompletableFuture<Map<Long, Long>> getCurrentSegmentSizes(String str, String str2, List<Long> list, long j) {
        return Futures.allOfWithResults((Map) list.stream().collect(Collectors.toMap(l -> {
            return l;
        }, l2 -> {
            return getSegmentOffset(str, str2, l2.longValue(), retrieveDelegationToken(), j);
        })));
    }

    public CompletableFuture<Void> processScale(String str, String str2, VersionedMetadata<EpochTransitionRecord> versionedMetadata, OperationContext operationContext, long j, StreamMetadataStore streamMetadataStore) {
        ArrayList arrayList = new ArrayList((Collection) versionedMetadata.getObject().getNewSegmentsWithRange().keySet());
        ArrayList arrayList2 = new ArrayList((Collection) versionedMetadata.getObject().getSegmentsToSeal());
        String retrieveDelegationToken = retrieveDelegationToken();
        return notifyNewSegments(str, str2, arrayList, operationContext, retrieveDelegationToken, j).thenCompose(r13 -> {
            return streamMetadataStore.scaleCreateNewEpochs(str, str2, versionedMetadata, operationContext, this.executor);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) versionedMetadata2 -> {
            return notifySealedSegments(str, str2, arrayList2, retrieveDelegationToken, j);
        }).thenCompose(r15 -> {
            return getSealedSegmentsSize(str, str2, arrayList2, retrieveDelegationToken, j);
        }).thenCompose(map -> {
            return streamMetadataStore.scaleSegmentsSealed(str, str2, map, versionedMetadata, operationContext, this.executor);
        }).thenCompose(r132 -> {
            return streamMetadataStore.completeScale(str, str2, versionedMetadata, operationContext, this.executor);
        });
    }

    @Override // io.pravega.controller.task.TaskBase
    public TaskBase copyWithContext(TaskBase.Context context) {
        return new StreamMetadataTasks(this.streamMetadataStore, this.bucketStore, this.taskMetadataStore, this.segmentHelper, this.executor, this.eventExecutor, context, this.authHelper);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        synchronized (this.lock) {
            this.toSetEventHelper = false;
            if (this.eventHelper != null) {
                this.eventHelper.close();
            }
        }
        this.eventHelperFuture.cancel(true);
    }

    public String retrieveDelegationToken() {
        return this.authHelper.retrieveMasterToken();
    }

    @VisibleForTesting
    public void setCompletionTimeoutMillis(long j) {
        this.eventHelperFuture.thenAccept(eventHelper -> {
            eventHelper.setCompletionTimeoutMillis(j);
        });
    }

    @VisibleForTesting
    void setRetentionFrequencyMillis(long j) {
        this.retentionFrequencyMillis.set(j);
    }

    @VisibleForTesting
    void setRetentionClock(Supplier<Long> supplier) {
        this.retentionClock.set(supplier);
    }

    public long getRequestId(OperationContext operationContext) {
        return operationContext != null ? operationContext.getRequestId() : ControllerService.nextRequestId();
    }
}
