package io.pravega.controller.store.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.hash.RandomFactory;
import io.pravega.common.lang.Int96;
import io.pravega.common.tracing.TagLogger;
import io.pravega.controller.metrics.StreamMetrics;
import io.pravega.controller.metrics.TransactionMetrics;
import io.pravega.controller.server.ControllerService;
import io.pravega.controller.store.Scope;
import io.pravega.controller.store.Version;
import io.pravega.controller.store.VersionedMetadata;
import io.pravega.controller.store.index.HostIndex;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.records.ActiveTxnRecord;
import io.pravega.controller.store.stream.records.CommittingTransactionsRecord;
import io.pravega.controller.store.stream.records.EpochRecord;
import io.pravega.controller.store.stream.records.EpochTransitionRecord;
import io.pravega.controller.store.stream.records.HistoryTimeSeries;
import io.pravega.controller.store.stream.records.ReaderGroupConfigRecord;
import io.pravega.controller.store.stream.records.RetentionSet;
import io.pravega.controller.store.stream.records.SealedSegmentsMapShard;
import io.pravega.controller.store.stream.records.StreamConfigurationRecord;
import io.pravega.controller.store.stream.records.StreamCutRecord;
import io.pravega.controller.store.stream.records.StreamCutReferenceRecord;
import io.pravega.controller.store.stream.records.StreamSegmentRecord;
import io.pravega.controller.store.stream.records.StreamSubscriber;
import io.pravega.controller.store.stream.records.StreamTruncationRecord;
import io.pravega.controller.store.stream.records.WriterMark;
import io.pravega.controller.store.task.TxnResource;
import io.pravega.controller.stream.api.grpc.v1.Controller;
import io.pravega.shared.controller.event.ControllerEvent;
import io.pravega.shared.controller.event.ControllerEventSerializer;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.ParametersAreNonnullByDefault;
import lombok.Generated;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/store/stream/AbstractStreamMetadataStore.class */
public abstract class AbstractStreamMetadataStore implements StreamMetadataStore {
    public static final Predicate<Throwable> DATA_NOT_FOUND_PREDICATE = th -> {
        return Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException;
    };
    public static final Predicate<Throwable> DATA_NOT_EMPTY_PREDICATE = th -> {
        return Exceptions.unwrap(th) instanceof StoreException.DataNotEmptyException;
    };
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(AbstractStreamMetadataStore.class));
    private static final String RESOURCE_PART_SEPARATOR = "_%_";
    private final HostIndex hostTxnIndex;
    private final HostIndex hostTaskIndex;
    private final ControllerEventSerializer controllerEventSerializer = new ControllerEventSerializer();
    private final LoadingCache<Pair<String, String>, Stream> cache = CacheBuilder.newBuilder().maximumSize(10000).refreshAfterWrite(10, TimeUnit.MINUTES).expireAfterWrite(10, TimeUnit.MINUTES).build(new CacheLoader<Pair<String, String>, Stream>() { // from class: io.pravega.controller.store.stream.AbstractStreamMetadataStore.1
        @ParametersAreNonnullByDefault
        public Stream load(Pair<String, String> pair) {
            try {
                return AbstractStreamMetadataStore.this.newStream((String) pair.getKey(), (String) pair.getValue());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    });
    private final LoadingCache<Pair<String, String>, ReaderGroup> rgCache = CacheBuilder.newBuilder().maximumSize(10000).refreshAfterWrite(10, TimeUnit.MINUTES).expireAfterWrite(10, TimeUnit.MINUTES).build(new CacheLoader<Pair<String, String>, ReaderGroup>() { // from class: io.pravega.controller.store.stream.AbstractStreamMetadataStore.2
        @ParametersAreNonnullByDefault
        public ReaderGroup load(Pair<String, String> pair) {
            try {
                return AbstractStreamMetadataStore.this.newReaderGroup((String) pair.getKey(), (String) pair.getValue());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    });
    private final LoadingCache<String, Scope> scopeCache = CacheBuilder.newBuilder().maximumSize(1000).refreshAfterWrite(10, TimeUnit.MINUTES).expireAfterWrite(10, TimeUnit.MINUTES).build(new CacheLoader<String, Scope>() { // from class: io.pravega.controller.store.stream.AbstractStreamMetadataStore.3
        @ParametersAreNonnullByDefault
        public Scope load(String str) {
            try {
                return AbstractStreamMetadataStore.this.newScope(str);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    });

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractStreamMetadataStore(HostIndex hostIndex, HostIndex hostIndex2) {
        this.hostTxnIndex = hostIndex;
        this.hostTaskIndex = hostIndex2;
    }

    abstract Scope newScope(String str);

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public OperationContext createScopeContext(String str, long j) {
        return new ScopeOperationContext(getScope(str, null), j);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public OperationContext createStreamContext(String str, String str2, long j) {
        return new StreamOperationContext(getScope(str, null), getStream(str, str2, null), j);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<CreateStreamResponse> createStream(String str, String str2, StreamConfiguration streamConfiguration, long j, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return getSafeStartingSegmentNumberFor(str, str2, operationContext2, executor).thenCompose(num -> {
            return Futures.completeOn(checkScopeExists(str, operationContext2, executor).thenCompose(bool -> {
                return bool.booleanValue() ? getStream(str, str2, operationContext2).create(streamConfiguration, j, num.intValue(), operationContext2) : Futures.failedFuture(StoreException.create(StoreException.Type.DATA_NOT_FOUND, "scope does not exist"));
            }), executor);
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> deleteStream(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        Stream stream = getStream(str, str2, operationContext2);
        return Futures.exceptionallyExpecting(stream.getActiveEpoch(true, operationContext2).thenApply(epochRecord -> {
            return (Integer) epochRecord.getSegments().stream().map((v0) -> {
                return v0.getSegmentNumber();
            }).reduce((v0, v1) -> {
                return Integer.max(v0, v1);
            }).get();
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) num -> {
            return recordLastStreamSegment(str, str2, num.intValue(), operationContext2, executor);
        }), DATA_NOT_FOUND_PREDICATE, (Object) null).thenCompose(r6 -> {
            return Futures.completeOn(stream.delete(operationContext2), executor);
        }).thenAccept(r9 -> {
            this.cache.invalidate(new ImmutablePair(str, str2));
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Long> getCreationTime(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getCreationTime(operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> setState(String str, String str2, State state, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).updateState(state, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<State> getState(String str, String str2, boolean z, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getState(z, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<State>> updateVersionedState(String str, String str2, State state, VersionedMetadata<State> versionedMetadata, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).updateVersionedState(versionedMetadata, state, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<State>> getVersionedState(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getVersionedState(operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Controller.CreateScopeStatus> createScope(String str, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        Scope scope = getScope(str, operationContext2);
        return Futures.completeOn(scope.isScopeSealed(str, operationContext2).thenCompose(bool -> {
            return bool.booleanValue() ? CompletableFuture.completedFuture(Controller.CreateScopeStatus.newBuilder().setStatus(Controller.CreateScopeStatus.Status.SCOPE_EXISTS).build()) : scope.createScope(operationContext2).handle((r11, th) -> {
                if (th == null) {
                    return Controller.CreateScopeStatus.newBuilder().setStatus(Controller.CreateScopeStatus.Status.SUCCESS).build();
                }
                if (Exceptions.unwrap(th) instanceof StoreException.DataExistsException) {
                    return Controller.CreateScopeStatus.newBuilder().setStatus(Controller.CreateScopeStatus.Status.SCOPE_EXISTS).build();
                }
                log.error(operationContext2.getRequestId(), "Create scope failed for scope {} due to ", new Object[]{str, th});
                return Controller.CreateScopeStatus.newBuilder().setStatus(Controller.CreateScopeStatus.Status.FAILURE).build();
            });
        }), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Controller.DeleteScopeStatus> deleteScope(String str, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getScope(str, operationContext2).deleteScope(operationContext2).handle((r9, th) -> {
            return getDeleteScopeStatus(str, operationContext2, Exceptions.unwrap(th), "DeleteScope failed for scope");
        }), executor);
    }

    private Controller.DeleteScopeStatus getDeleteScopeStatus(String str, OperationContext operationContext, Throwable th, String str2) {
        if (th == null) {
            return Controller.DeleteScopeStatus.newBuilder().setStatus(Controller.DeleteScopeStatus.Status.SUCCESS).build();
        }
        if (th instanceof StoreException.DataNotFoundException) {
            return Controller.DeleteScopeStatus.newBuilder().setStatus(Controller.DeleteScopeStatus.Status.SCOPE_NOT_FOUND).build();
        }
        if (th instanceof StoreException.DataNotEmptyException) {
            return Controller.DeleteScopeStatus.newBuilder().setStatus(Controller.DeleteScopeStatus.Status.SCOPE_NOT_EMPTY).build();
        }
        log.error(operationContext.getRequestId(), str2 + " {} due to {} ", new Object[]{str, th});
        return Controller.DeleteScopeStatus.newBuilder().setStatus(Controller.DeleteScopeStatus.Status.FAILURE).build();
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Controller.DeleteScopeStatus> deleteScopeRecursive(String str, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getScope(str, operationContext2).deleteScopeRecursive(operationContext2).handle((r9, th) -> {
            return getDeleteScopeStatus(str, operationContext2, Exceptions.unwrap(th), "DeleteScopeRecursive failed for scope");
        }), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Map<String, StreamConfiguration>> listStreamsInScope(String str, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getScope(str, operationContext2).listStreamsInScope(operationContext2).thenCompose(list -> {
            HashMap hashMap = new HashMap();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                Stream stream = getStream(str, (String) it.next(), null);
                CompletableFuture<StreamConfiguration> configuration = stream.getConfiguration(operationContext2);
                CompletableFuture<State> state = stream.getState(true, operationContext2);
                hashMap.put(stream.getName(), Futures.exceptionallyExpecting(CompletableFuture.allOf(configuration, state).thenApply(r5 -> {
                    State state2 = (State) state.join();
                    if (state2.equals(State.CREATING) || state2.equals(State.UNKNOWN)) {
                        throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Partially created stream");
                    }
                    return (StreamConfiguration) configuration.join();
                }), th -> {
                    return th instanceof StoreException.DataNotFoundException;
                }, (Object) null).thenApply((v0) -> {
                    return Optional.ofNullable(v0);
                }));
            }
            return Futures.allOfWithResults(hashMap).thenApply(map -> {
                return (Map) map.entrySet().stream().filter(entry -> {
                    return ((Optional) entry.getValue()).isPresent();
                }).collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, entry2 -> {
                    return (StreamConfiguration) ((Optional) entry2.getValue()).get();
                }));
            });
        }), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Pair<List<String>, String>> listStream(String str, String str2, int i, Executor executor, OperationContext operationContext) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getScope(str, operationContext2).listStreams(i, str2, executor, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Pair<List<String>, String>> listStreamsForTag(String str, String str2, String str3, Executor executor, OperationContext operationContext) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getScope(str, operationContext2).listStreamsForTag(str2, str3, executor, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> startTruncation(String str, String str2, Map<Long, Long> map, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).startTruncation(map, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> completeTruncation(String str, String str2, VersionedMetadata<StreamTruncationRecord> versionedMetadata, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).completeTruncation(versionedMetadata, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<StreamTruncationRecord>> getTruncationRecord(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getTruncationRecord(operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> startUpdateConfiguration(String str, String str2, StreamConfiguration streamConfiguration, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).startUpdateConfiguration(streamConfiguration, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> completeUpdateConfiguration(String str, String str2, VersionedMetadata<StreamConfigurationRecord> versionedMetadata, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).completeUpdateConfiguration(versionedMetadata, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<StreamConfiguration> getConfiguration(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getConfiguration(operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<StreamConfigurationRecord>> getConfigurationRecord(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getVersionedConfigurationRecord(operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Boolean> isSealed(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getState(true, operationContext2).thenApply(state -> {
            return Boolean.valueOf(state.equals(State.SEALED));
        }), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> setSealed(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).updateState(State.SEALED, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<StreamSegmentRecord> getSegment(String str, String str2, long j, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getSegment(j, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Set<Long>> getAllSegmentIds(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getAllSegmentIds(operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Map<StreamSegmentRecord, Long>> getSegmentsAtHead(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getSegmentsAtHead(operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<StreamSegmentRecord>> getActiveSegments(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        Stream stream = getStream(str, str2, operationContext2);
        return Futures.completeOn(stream.getState(true, operationContext2).thenComposeAsync(state -> {
            return State.SEALED.equals(state) ? CompletableFuture.completedFuture(Collections.emptyList()) : stream.getActiveSegments(operationContext2);
        }, executor), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<StreamSegmentRecord>> getSegmentsInEpoch(String str, String str2, int i, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getSegmentsInEpoch(i, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Map<StreamSegmentRecord, List<Long>>> getSuccessors(String str, String str2, long j, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getSuccessorsWithPredecessors(j, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<StreamSegmentRecord>> getSegmentsBetweenStreamCuts(String str, String str2, Map<Long, Long> map, Map<Long, Long> map2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getSegmentsBetweenStreamCuts(map, map2, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Boolean> isStreamCutValid(String str, String str2, Map<Long, Long> map, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).isStreamCutValid(map, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> submitScale(String str, String str2, List<Long> list, List<Map.Entry<Double, Double>> list2, long j, VersionedMetadata<EpochTransitionRecord> versionedMetadata, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).submitScale(list, list2, j, versionedMetadata, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> getEpochTransition(String str, String str2, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getEpochTransition(operationContext2), scheduledExecutorService);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> resetEpochTransition(String str, String str2, VersionedMetadata<EpochTransitionRecord> versionedMetadata, OperationContext operationContext, Executor executor) {
        return Futures.completeOn(getStream(str, str2, operationContext).resetEpochTransition(versionedMetadata, getOperationContext(operationContext)), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> startScale(String str, String str2, boolean z, VersionedMetadata<EpochTransitionRecord> versionedMetadata, VersionedMetadata<State> versionedMetadata2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).startScale(z, versionedMetadata, versionedMetadata2, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> scaleCreateNewEpochs(String str, String str2, VersionedMetadata<EpochTransitionRecord> versionedMetadata, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).scaleCreateNewEpoch(versionedMetadata, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> scaleSegmentsSealed(String str, String str2, Map<Long, Long> map, VersionedMetadata<EpochTransitionRecord> versionedMetadata, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        CompletableFuture<Void> completeOn = Futures.completeOn(getStream(str, str2, operationContext2).scaleOldSegmentsSealed(map, versionedMetadata, operationContext2), executor);
        completeOn.thenCompose(r14 -> {
            return CompletableFuture.allOf(getActiveSegments(str, str2, operationContext2, executor).thenAccept(list -> {
                StreamMetrics.reportActiveSegments(str, str2, list.size());
            }), findNumSplitsMerges(str, str2, operationContext2, executor).thenAccept(simpleEntry -> {
                StreamMetrics.reportSegmentSplitsAndMerges(str, str2, ((Long) simpleEntry.getKey()).longValue(), ((Long) simpleEntry.getValue()).longValue());
            }));
        });
        return completeOn;
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> completeScale(String str, String str2, VersionedMetadata<EpochTransitionRecord> versionedMetadata, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).completeScale(versionedMetadata, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> startRollingTxn(String str, String str2, int i, VersionedMetadata<CommittingTransactionsRecord> versionedMetadata, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).startRollingTxn(i, versionedMetadata, operationContext2), scheduledExecutorService);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> rollingTxnCreateDuplicateEpochs(String str, String str2, Map<Long, Long> map, long j, VersionedMetadata<CommittingTransactionsRecord> versionedMetadata, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).rollingTxnCreateDuplicateEpochs(map, j, versionedMetadata, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> completeRollingTxn(String str, String str2, Map<Long, Long> map, VersionedMetadata<CommittingTransactionsRecord> versionedMetadata, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        CompletableFuture<Void> completeOn = Futures.completeOn(getStream(str, str2, operationContext2).completeRollingTxn(map, versionedMetadata, operationContext2), executor);
        completeOn.thenCompose(r11 -> {
            return findNumSplitsMerges(str, str2, operationContext2, executor).thenAccept(simpleEntry -> {
                StreamMetrics.reportSegmentSplitsAndMerges(str, str2, ((Long) simpleEntry.getKey()).longValue(), ((Long) simpleEntry.getValue()).longValue());
            });
        });
        return completeOn;
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> addSubscriber(String str, String str2, String str3, long j, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).addSubscriber(str3, j, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> deleteSubscriber(String str, String str2, String str3, long j, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).deleteSubscriber(str3, j, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> updateSubscriberStreamCut(String str, String str2, String str3, long j, ImmutableMap<Long, Long> immutableMap, VersionedMetadata<StreamSubscriber> versionedMetadata, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).updateSubscriberStreamCut(versionedMetadata, str3, j, immutableMap, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<StreamSubscriber>> getSubscriber(String str, String str2, String str3, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getSubscriberRecord(str3, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<String>> listSubscribers(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).listSubscribers(operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> addStreamCutToRetentionSet(String str, String str2, StreamCutRecord streamCutRecord, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).addStreamCutToRetentionSet(streamCutRecord, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<RetentionSet> getRetentionSet(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getRetentionSet(operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<StreamCutRecord> getStreamCutRecord(String str, String str2, StreamCutReferenceRecord streamCutReferenceRecord, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getStreamCutRecord(streamCutReferenceRecord, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> deleteStreamCutBefore(String str, String str2, StreamCutReferenceRecord streamCutReferenceRecord, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).deleteStreamCutBefore(streamCutReferenceRecord, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Long> getSizeTillStreamCut(String str, String str2, Map<Long, Long> map, Optional<StreamCutRecord> optional, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getSizeTillStreamCut(map, optional, operationContext2), scheduledExecutorService);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<UUID> generateTransactionId(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        Stream stream = getStream(str, str2, operationContext2);
        return Futures.completeOn(getNextCounter().thenCompose(int96 -> {
            return stream.generateNewTxnId(int96.getMsb(), int96.getLsb(), operationContext2);
        }), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedTransactionData> createTransaction(String str, String str2, UUID uuid, long j, long j2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).createTransaction(uuid, j, j2, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedTransactionData> pingTransaction(String str, String str2, VersionedTransactionData versionedTransactionData, long j, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).pingTransaction(versionedTransactionData, j, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedTransactionData> getTransactionData(String str, String str2, UUID uuid, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getTransactionData(uuid, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<TxnStatus> transactionStatus(String str, String str2, UUID uuid, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).checkTransactionStatus(uuid, operationContext2), executor);
    }

    @VisibleForTesting
    public CompletableFuture<TxnStatus> commitTransaction(String str, String str2, UUID uuid, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(((PersistentStreamBase) getStream(str, str2, operationContext2)).commitTransaction(uuid, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<AbstractMap.SimpleEntry<TxnStatus, Integer>> sealTransaction(String str, String str2, UUID uuid, boolean z, Optional<Version> optional, String str3, long j, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).sealTransaction(uuid, z, optional, str3, j, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<TxnStatus> abortTransaction(String str, String str2, UUID uuid, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).abortTransaction(uuid, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> addTxnToIndex(String str, TxnResource txnResource, Version version) {
        return this.hostTxnIndex.addEntity(str, getTxnResourceString(txnResource), ((Version) Optional.ofNullable(version).orElse(getEmptyVersion())).toBytes());
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> removeTxnFromIndex(String str, TxnResource txnResource, boolean z) {
        return this.hostTxnIndex.removeEntity(str, getTxnResourceString(txnResource), z);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Optional<TxnResource>> getRandomTxnFromIndex(String str) {
        return this.hostTxnIndex.getEntities(str).thenApply(list -> {
            return (list == null || list.size() <= 0) ? Optional.empty() : Optional.of(getTxnResource((String) list.get(RandomFactory.create().nextInt(list.size()))));
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Version> getTxnVersionFromIndex(String str, TxnResource txnResource) {
        return this.hostTxnIndex.getEntityData(str, getTxnResourceString(txnResource)).thenApply(bArr -> {
            return (Version) Optional.ofNullable(bArr).map(this::parseVersionData).filter(version -> {
                return !version.equals(getEmptyVersion());
            }).orElse(null);
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> removeHostFromIndex(String str) {
        return this.hostTxnIndex.removeHost(str);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Set<String>> listHostsOwningTxn() {
        return this.hostTxnIndex.getHosts();
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Map<String, ControllerEvent>> getPendingsTaskForHost(String str, int i) {
        return this.hostTaskIndex.getEntities(str).thenCompose(list -> {
            return Futures.allOfWithResults((Map) list.stream().limit(i).collect(Collectors.toMap(str2 -> {
                return str2;
            }, str3 -> {
                return getControllerTask(str, str3);
            })));
        });
    }

    private CompletableFuture<ControllerEvent> getControllerTask(String str, String str2) {
        return this.hostTaskIndex.getEntityData(str, str2).thenApply(bArr -> {
            return this.controllerEventSerializer.fromByteBuffer(ByteBuffer.wrap(bArr));
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> removeHostFromTaskIndex(String str) {
        return this.hostTaskIndex.removeHost(str);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Set<String>> listHostsWithPendingTask() {
        return this.hostTaskIndex.getHosts();
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> markCold(String str, String str2, long j, long j2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).setColdMarker(j, j2, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Boolean> isCold(String str, String str2, long j, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getColdMarker(j, operationContext2).thenApply(l -> {
            return Boolean.valueOf(l != null && l.longValue() > System.currentTimeMillis());
        }), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> removeMarker(String str, String str2, long j, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).removeColdMarker(j, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Map<UUID, ActiveTxnRecord>> getActiveTxns(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getActiveTxns(operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Map<UUID, TxnStatus>> listCompletedTxns(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).listCompletedTxns(operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<ScaleMetadata>> getScaleMetadata(String str, String str2, long j, long j2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getScaleMetadata(j, j2, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<EpochRecord> getActiveEpoch(String str, String str2, OperationContext operationContext, boolean z, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getActiveEpoch(z, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<EpochRecord> getEpoch(String str, String str2, int i, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getEpochRecord(i, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Map.Entry<VersionedMetadata<CommittingTransactionsRecord>, List<VersionedTransactionData>>> startCommitTransactions(String str, String str2, int i, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).startCommittingTransactions(i, operationContext2), scheduledExecutorService);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> getVersionedCommittingTransactionsRecord(String str, String str2, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getVersionedCommitTransactionsRecord(operationContext2), scheduledExecutorService);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> completeCommitTransactions(String str, String str2, VersionedMetadata<CommittingTransactionsRecord> versionedMetadata, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService, Map<String, TxnWriterMark> map) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        Stream stream = getStream(str, str2, operationContext2);
        return Futures.completeOn(stream.completeCommittingTransactions(versionedMetadata, operationContext2, map), scheduledExecutorService).thenAcceptAsync(r8 -> {
            stream.getNumberOfOngoingTransactions(operationContext2).thenAccept(l -> {
                TransactionMetrics.reportOpenTransactions(str, str2, l.intValue());
            });
        }, (Executor) scheduledExecutorService);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> createWaitingRequestIfAbsent(String str, String str2, String str3, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).createWaitingRequestIfAbsent(str3, operationContext2), scheduledExecutorService);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<String> getWaitingRequestProcessor(String str, String str2, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getWaitingRequestProcessor(operationContext2), scheduledExecutorService);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> deleteWaitingRequestConditionally(String str, String str2, String str3, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).deleteWaitingRequestConditionally(str3, operationContext2), scheduledExecutorService);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<WriterTimestampResponse> noteWriterMark(String str, String str2, String str3, long j, Map<Long, Long> map, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).noteWriterMark(str3, j, map, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> shutdownWriter(String str, String str2, String str3, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).shutdownWriter(str3, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> removeWriter(String str, String str2, String str3, WriterMark writerMark, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).removeWriter(str3, writerMark, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<WriterMark> getWriterMark(String str, String str2, String str3, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getWriterMark(str3, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Map<String, WriterMark>> getAllWriterMarks(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getAllWriterMarks(operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<HistoryTimeSeries> getHistoryTimeSeriesChunk(String str, String str2, int i, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getHistoryTimeSeriesChunk(i, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<SealedSegmentsMapShard> getSealedSegmentSizeMapShard(String str, String str2, int i, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getSealedSegmentSizeMapShard(i, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Integer> getSegmentSealedEpoch(String str, String str2, long j, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).getSegmentSealedEpoch(j, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<StreamCutComparison> compareStreamCut(String str, String str2, Map<Long, Long> map, Map<Long, Long> map2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).compareStreamCuts(map, map2, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<StreamCutReferenceRecord> findStreamCutReferenceRecordBefore(String str, String str2, Map<Long, Long> map, RetentionSet retentionSet, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getStream(str, str2, operationContext2).findStreamCutReferenceRecordBefore(map, retentionSet, operationContext2), executor);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Stream getStream(String str, String str2, OperationContext operationContext) {
        if (operationContext instanceof StreamOperationContext) {
            return ((StreamOperationContext) operationContext).getStream();
        }
        Stream stream = (Stream) this.cache.getUnchecked(new ImmutablePair(str, str2));
        stream.refresh();
        return stream;
    }

    @VisibleForTesting
    void setStream(Stream stream) {
        this.cache.put(new ImmutablePair(stream.getScope(), stream.getName()), stream);
    }

    public Scope getScope(String str, OperationContext operationContext) {
        if (operationContext != null) {
            if (operationContext instanceof StreamOperationContext) {
                return ((StreamOperationContext) operationContext).getScope();
            }
            if (operationContext instanceof RGOperationContext) {
                return ((RGOperationContext) operationContext).getScope();
            }
        }
        Scope scope = (Scope) this.scopeCache.getUnchecked(str);
        scope.refresh();
        return scope;
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<UUID> getScopeId(String str, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getScope(str, operationContext2).getScopeId(str, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<UUID> getReaderGroupId(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getScope(str, operationContext2).getReaderGroupId(str2, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> startRGConfigUpdate(String str, String str2, ReaderGroupConfig readerGroupConfig, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getReaderGroup(str, str2, operationContext2).startUpdateConfiguration(readerGroupConfig, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> completeRGConfigUpdate(String str, String str2, VersionedMetadata<ReaderGroupConfigRecord> versionedMetadata, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getReaderGroup(str, str2, operationContext2).completeUpdateConfiguration(versionedMetadata, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<ReaderGroupState>> getVersionedReaderGroupState(String str, String str2, boolean z, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getReaderGroup(str, str2, operationContext2).getVersionedState(operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<ReaderGroupState> getReaderGroupState(String str, String str2, boolean z, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getReaderGroup(str, str2, operationContext2).getState(z, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<ReaderGroupConfigRecord>> getReaderGroupConfigRecord(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getReaderGroup(str, str2, operationContext2).getVersionedConfigurationRecord(operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public RGOperationContext createRGContext(String str, String str2, long j) {
        return new RGOperationContext(getScope(str, null), getReaderGroup(str, str2, null), j);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ReaderGroup getReaderGroup(String str, String str2, OperationContext operationContext) {
        if (operationContext instanceof RGOperationContext) {
            return ((RGOperationContext) operationContext).getReaderGroup();
        }
        ReaderGroup readerGroup = (ReaderGroup) this.rgCache.getUnchecked(new ImmutablePair(str, str2));
        readerGroup.refresh();
        return readerGroup;
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> createReaderGroup(String str, String str2, ReaderGroupConfig readerGroupConfig, long j, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(checkScopeExists(str, operationContext2, executor).thenCompose(bool -> {
            return bool.booleanValue() ? getReaderGroup(str, str2, operationContext2).create(readerGroupConfig, j, operationContext2) : Futures.toVoid(Futures.failedFuture(StoreException.create(StoreException.Type.DATA_NOT_FOUND, "scope does not exist")));
        }), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> deleteReaderGroup(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return getReaderGroup(str, str2, operationContext2).delete(operationContext2);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<ReaderGroupState>> updateReaderGroupVersionedState(String str, String str2, ReaderGroupState readerGroupState, VersionedMetadata<ReaderGroupState> versionedMetadata, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(getReaderGroup(str, str2, operationContext2).updateVersionedState(versionedMetadata, readerGroupState, operationContext2), executor);
    }

    private CompletableFuture<AbstractMap.SimpleEntry<Long, Long>> findNumSplitsMerges(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        Stream stream = getStream(str, str2, operationContext2);
        return Futures.completeOn(stream.getActiveEpoch(true, operationContext2).thenCompose(epochRecord -> {
            return stream.getSplitMergeCountsTillEpoch(epochRecord, operationContext2);
        }), executor);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OperationContext getOperationContext(OperationContext operationContext) {
        return operationContext != null ? operationContext : new OperationContext() { // from class: io.pravega.controller.store.stream.AbstractStreamMetadataStore.4
            private final long requestId = ControllerService.nextRequestId();
            private final long operationStartTime = System.currentTimeMillis();

            @Override // io.pravega.controller.store.stream.OperationContext
            public long getOperationStartTime() {
                return this.operationStartTime;
            }

            @Override // io.pravega.controller.store.stream.OperationContext
            public long getRequestId() {
                return this.requestId;
            }
        };
    }

    abstract CompletableFuture<Integer> getSafeStartingSegmentNumberFor(String str, String str2, OperationContext operationContext, Executor executor);

    abstract CompletableFuture<Void> recordLastStreamSegment(String str, String str2, int i, OperationContext operationContext, Executor executor);

    abstract Stream newStream(String str, String str2);

    abstract CompletableFuture<Int96> getNextCounter();

    private String getTxnResourceString(TxnResource txnResource) {
        return txnResource.toString(RESOURCE_PART_SEPARATOR);
    }

    private TxnResource getTxnResource(String str) {
        return TxnResource.parse(str, RESOURCE_PART_SEPARATOR);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getScopedStreamName(String str, String str2) {
        return String.format("%s/%s", str, str2);
    }

    abstract Version getEmptyVersion();

    abstract Version parseVersionData(byte[] bArr);

    abstract ReaderGroup newReaderGroup(String str, String str2);

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public HostIndex getHostTaskIndex() {
        return this.hostTaskIndex;
    }
}
