package io.pravega.controller.store.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.hash.RandomFactory;
import io.pravega.common.lang.Int96;
import io.pravega.controller.metrics.StreamMetrics;
import io.pravega.controller.metrics.TransactionMetrics;
import io.pravega.controller.store.index.HostIndex;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.records.ActiveTxnRecord;
import io.pravega.controller.store.stream.records.CommittingTransactionsRecord;
import io.pravega.controller.store.stream.records.EpochRecord;
import io.pravega.controller.store.stream.records.EpochTransitionRecord;
import io.pravega.controller.store.stream.records.RetentionSet;
import io.pravega.controller.store.stream.records.StreamConfigurationRecord;
import io.pravega.controller.store.stream.records.StreamCutRecord;
import io.pravega.controller.store.stream.records.StreamCutReferenceRecord;
import io.pravega.controller.store.stream.records.StreamSegmentRecord;
import io.pravega.controller.store.stream.records.StreamTruncationRecord;
import io.pravega.controller.store.stream.records.WriterMark;
import io.pravega.controller.store.task.TxnResource;
import io.pravega.controller.stream.api.grpc.v1.Controller;
import io.pravega.shared.controller.event.ControllerEvent;
import io.pravega.shared.controller.event.ControllerEventSerializer;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/store/stream/AbstractStreamMetadataStore.class */
public abstract class AbstractStreamMetadataStore implements StreamMetadataStore {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log;
    public static final Predicate<Throwable> DATA_NOT_FOUND_PREDICATE;
    public static final Predicate<Throwable> DATA_NOT_EMPTY_PREDICATE;
    private static final String RESOURCE_PART_SEPARATOR = "_%_";
    private final HostIndex hostTxnIndex;
    private final HostIndex hostTaskIndex;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final LoadingCache<Pair<String, String>, Stream> cache = CacheBuilder.newBuilder().maximumSize(10000).refreshAfterWrite(10, TimeUnit.MINUTES).expireAfterWrite(10, TimeUnit.MINUTES).build(new CacheLoader<Pair<String, String>, Stream>() { // from class: io.pravega.controller.store.stream.AbstractStreamMetadataStore.1
        @ParametersAreNonnullByDefault
        public Stream load(Pair<String, String> pair) {
            try {
                return AbstractStreamMetadataStore.this.newStream((String) pair.getKey(), (String) pair.getValue());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    });
    private final LoadingCache<String, Scope> scopeCache = CacheBuilder.newBuilder().maximumSize(1000).refreshAfterWrite(10, TimeUnit.MINUTES).expireAfterWrite(10, TimeUnit.MINUTES).build(new CacheLoader<String, Scope>() { // from class: io.pravega.controller.store.stream.AbstractStreamMetadataStore.2
        @ParametersAreNonnullByDefault
        public Scope load(String str) {
            try {
                return AbstractStreamMetadataStore.this.newScope(str);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    });
    private final ControllerEventSerializer controllerEventSerializer = new ControllerEventSerializer();

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractStreamMetadataStore(HostIndex hostIndex, HostIndex hostIndex2) {
        this.hostTxnIndex = hostIndex;
        this.hostTaskIndex = hostIndex2;
    }

    abstract Scope newScope(String str);

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public OperationContext createContext(String str, String str2) {
        return new OperationContextImpl(getStream(str, str2, null));
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<CreateStreamResponse> createStream(String str, String str2, StreamConfiguration streamConfiguration, long j, OperationContext operationContext, Executor executor) {
        return getSafeStartingSegmentNumberFor(str, str2).thenCompose(num -> {
            return withCompletion(checkScopeExists(str).thenCompose(bool -> {
                return bool.booleanValue() ? getStream(str, str2, operationContext).create(streamConfiguration, j, num.intValue()) : Futures.failedFuture(StoreException.create(StoreException.Type.DATA_NOT_FOUND, "scope does not exist"));
            }), executor);
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> deleteStream(String str, String str2, OperationContext operationContext, Executor executor) {
        Stream stream = getStream(str, str2, operationContext);
        return Futures.exceptionallyExpecting(stream.getActiveEpoch(true).thenApply(epochRecord -> {
            return (Integer) epochRecord.getSegments().stream().map((v0) -> {
                return v0.getSegmentNumber();
            }).reduce((v0, v1) -> {
                return Integer.max(v0, v1);
            }).get();
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) num -> {
            return recordLastStreamSegment(str, str2, num.intValue(), operationContext, executor);
        }), DATA_NOT_FOUND_PREDICATE, (Object) null).thenCompose(r7 -> {
            return withCompletion(stream.delete(), executor);
        }).thenAccept(r9 -> {
            this.cache.invalidate(new ImmutablePair(str, str2));
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Long> getCreationTime(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getCreationTime(), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> setState(String str, String str2, State state, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).updateState(state), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<State> getState(String str, String str2, boolean z, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getState(z), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<State>> updateVersionedState(String str, String str2, State state, VersionedMetadata<State> versionedMetadata, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).updateVersionedState(versionedMetadata, state), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<State>> getVersionedState(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getVersionedState(), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Controller.CreateScopeStatus> createScope(String str) {
        return getScope(str).createScope().handle((r4, th) -> {
            if (th == null) {
                return Controller.CreateScopeStatus.newBuilder().setStatus(Controller.CreateScopeStatus.Status.SUCCESS).build();
            }
            if (Exceptions.unwrap(th) instanceof StoreException.DataExistsException) {
                return Controller.CreateScopeStatus.newBuilder().setStatus(Controller.CreateScopeStatus.Status.SCOPE_EXISTS).build();
            }
            log.debug("Create scope failed due to ", th);
            return Controller.CreateScopeStatus.newBuilder().setStatus(Controller.CreateScopeStatus.Status.FAILURE).build();
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Controller.DeleteScopeStatus> deleteScope(String str) {
        return getScope(str).deleteScope().handle((r4, th) -> {
            Throwable unwrap = Exceptions.unwrap(th);
            if (unwrap == null) {
                return Controller.DeleteScopeStatus.newBuilder().setStatus(Controller.DeleteScopeStatus.Status.SUCCESS).build();
            }
            if (unwrap instanceof StoreException.DataNotFoundException) {
                return Controller.DeleteScopeStatus.newBuilder().setStatus(Controller.DeleteScopeStatus.Status.SCOPE_NOT_FOUND).build();
            }
            if (unwrap instanceof StoreException.DataNotEmptyException) {
                return Controller.DeleteScopeStatus.newBuilder().setStatus(Controller.DeleteScopeStatus.Status.SCOPE_NOT_EMPTY).build();
            }
            log.debug("DeleteScope failed due to {} ", unwrap);
            return Controller.DeleteScopeStatus.newBuilder().setStatus(Controller.DeleteScopeStatus.Status.FAILURE).build();
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Map<String, StreamConfiguration>> listStreamsInScope(String str) {
        return getScope(str).listStreamsInScope().thenCompose(list -> {
            HashMap hashMap = new HashMap();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                Stream stream = getStream(str, (String) it.next(), null);
                hashMap.put(stream.getName(), Futures.exceptionallyExpecting(stream.getConfiguration(), th -> {
                    return th instanceof StoreException.DataNotFoundException;
                }, (Object) null).thenApply((v0) -> {
                    return Optional.ofNullable(v0);
                }));
            }
            return Futures.allOfWithResults(hashMap).thenApply(map -> {
                return (Map) map.entrySet().stream().filter(entry -> {
                    return ((Optional) entry.getValue()).isPresent();
                }).collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, entry2 -> {
                    return (StreamConfiguration) ((Optional) entry2.getValue()).get();
                }));
            });
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Pair<List<String>, String>> listStream(String str, String str2, int i, Executor executor) {
        return getScope(str).listStreams(i, str2, executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> startTruncation(String str, String str2, Map<Long, Long> map, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).startTruncation(map), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> completeTruncation(String str, String str2, VersionedMetadata<StreamTruncationRecord> versionedMetadata, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).completeTruncation(versionedMetadata), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<StreamTruncationRecord>> getTruncationRecord(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getTruncationRecord(), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> startUpdateConfiguration(String str, String str2, StreamConfiguration streamConfiguration, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).startUpdateConfiguration(streamConfiguration), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> completeUpdateConfiguration(String str, String str2, VersionedMetadata<StreamConfigurationRecord> versionedMetadata, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).completeUpdateConfiguration(versionedMetadata), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<StreamConfiguration> getConfiguration(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getConfiguration(), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<StreamConfigurationRecord>> getConfigurationRecord(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getVersionedConfigurationRecord(), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Boolean> isSealed(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getState(true).thenApply(state -> {
            return Boolean.valueOf(state.equals(State.SEALED));
        }), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> setSealed(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).updateState(State.SEALED), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<StreamSegmentRecord> getSegment(String str, String str2, long j, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getSegment(j), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Set<Long>> getAllSegmentIds(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getAllSegmentIds(), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Map<StreamSegmentRecord, Long>> getSegmentsAtHead(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getSegmentsAtHead(), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<StreamSegmentRecord>> getActiveSegments(String str, String str2, OperationContext operationContext, Executor executor) {
        Stream stream = getStream(str, str2, operationContext);
        return withCompletion(stream.getState(true).thenComposeAsync(state -> {
            return State.SEALED.equals(state) ? CompletableFuture.completedFuture(Collections.emptyList()) : stream.getActiveSegments();
        }, executor), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<StreamSegmentRecord>> getSegmentsInEpoch(String str, String str2, int i, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getSegmentsInEpoch(i), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Map<StreamSegmentRecord, List<Long>>> getSuccessors(String str, String str2, long j, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getSuccessorsWithPredecessors(j), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<StreamSegmentRecord>> getSegmentsBetweenStreamCuts(String str, String str2, Map<Long, Long> map, Map<Long, Long> map2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getSegmentsBetweenStreamCuts(map, map2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Boolean> isStreamCutValid(String str, String str2, Map<Long, Long> map, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).isStreamCutValid(map), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> submitScale(String str, String str2, List<Long> list, List<Map.Entry<Double, Double>> list2, long j, VersionedMetadata<EpochTransitionRecord> versionedMetadata, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).submitScale(list, list2, j, versionedMetadata), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> getEpochTransition(String str, String str2, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService) {
        return withCompletion(getStream(str, str2, operationContext).getEpochTransition(), scheduledExecutorService);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> startScale(String str, String str2, boolean z, VersionedMetadata<EpochTransitionRecord> versionedMetadata, VersionedMetadata<State> versionedMetadata2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).startScale(z, versionedMetadata, versionedMetadata2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> scaleCreateNewEpochs(String str, String str2, VersionedMetadata<EpochTransitionRecord> versionedMetadata, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).scaleCreateNewEpoch(versionedMetadata), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> scaleSegmentsSealed(String str, String str2, Map<Long, Long> map, VersionedMetadata<EpochTransitionRecord> versionedMetadata, OperationContext operationContext, Executor executor) {
        CompletableFuture<Void> withCompletion = withCompletion(getStream(str, str2, operationContext).scaleOldSegmentsSealed(map, versionedMetadata), executor);
        withCompletion.thenCompose(r14 -> {
            return CompletableFuture.allOf(getActiveSegments(str, str2, operationContext, executor).thenAccept(list -> {
                StreamMetrics.reportActiveSegments(str, str2, list.size());
            }), findNumSplitsMerges(str, str2, operationContext, executor).thenAccept(simpleEntry -> {
                StreamMetrics.reportSegmentSplitsAndMerges(str, str2, ((Long) simpleEntry.getKey()).longValue(), ((Long) simpleEntry.getValue()).longValue());
            }));
        });
        return withCompletion;
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> completeScale(String str, String str2, VersionedMetadata<EpochTransitionRecord> versionedMetadata, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).completeScale(versionedMetadata), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> startRollingTxn(String str, String str2, int i, VersionedMetadata<CommittingTransactionsRecord> versionedMetadata, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService) {
        return withCompletion(getStream(str, str2, operationContext).startRollingTxn(i, versionedMetadata), scheduledExecutorService);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> rollingTxnCreateDuplicateEpochs(String str, String str2, Map<Long, Long> map, long j, VersionedMetadata<CommittingTransactionsRecord> versionedMetadata, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).rollingTxnCreateDuplicateEpochs(map, j, versionedMetadata), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> completeRollingTxn(String str, String str2, Map<Long, Long> map, VersionedMetadata<CommittingTransactionsRecord> versionedMetadata, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).completeRollingTxn(map, versionedMetadata), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> addStreamCutToRetentionSet(String str, String str2, StreamCutRecord streamCutRecord, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).addStreamCutToRetentionSet(streamCutRecord), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<RetentionSet> getRetentionSet(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getRetentionSet(), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<StreamCutRecord> getStreamCutRecord(String str, String str2, StreamCutReferenceRecord streamCutReferenceRecord, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getStreamCutRecord(streamCutReferenceRecord), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> deleteStreamCutBefore(String str, String str2, StreamCutReferenceRecord streamCutReferenceRecord, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).deleteStreamCutBefore(streamCutReferenceRecord), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Long> getSizeTillStreamCut(String str, String str2, Map<Long, Long> map, Optional<StreamCutRecord> optional, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService) {
        return withCompletion(getStream(str, str2, operationContext).getSizeTillStreamCut(map, optional), scheduledExecutorService);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<UUID> generateTransactionId(String str, String str2, OperationContext operationContext, Executor executor) {
        Stream stream = getStream(str, str2, operationContext);
        return withCompletion(getNextCounter().thenCompose(int96 -> {
            return stream.generateNewTxnId(int96.getMsb(), int96.getLsb());
        }), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedTransactionData> createTransaction(String str, String str2, UUID uuid, long j, long j2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).createTransaction(uuid, j, j2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedTransactionData> pingTransaction(String str, String str2, VersionedTransactionData versionedTransactionData, long j, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).pingTransaction(versionedTransactionData, j), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedTransactionData> getTransactionData(String str, String str2, UUID uuid, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getTransactionData(uuid), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<TxnStatus> transactionStatus(String str, String str2, UUID uuid, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).checkTransactionStatus(uuid), executor);
    }

    @VisibleForTesting
    public CompletableFuture<TxnStatus> commitTransaction(String str, String str2, UUID uuid, OperationContext operationContext, Executor executor) {
        return withCompletion(((PersistentStreamBase) getStream(str, str2, operationContext)).commitTransaction(uuid), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<AbstractMap.SimpleEntry<TxnStatus, Integer>> sealTransaction(String str, String str2, UUID uuid, boolean z, Optional<Version> optional, String str3, long j, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).sealTransaction(uuid, z, optional, str3, j), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<TxnStatus> abortTransaction(String str, String str2, UUID uuid, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).abortTransaction(uuid), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> addTxnToIndex(String str, TxnResource txnResource, Version version) {
        return this.hostTxnIndex.addEntity(str, getTxnResourceString(txnResource), ((Version) Optional.ofNullable(version).orElse(getEmptyVersion())).toBytes());
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> removeTxnFromIndex(String str, TxnResource txnResource, boolean z) {
        return this.hostTxnIndex.removeEntity(str, getTxnResourceString(txnResource), z);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Optional<TxnResource>> getRandomTxnFromIndex(String str) {
        return this.hostTxnIndex.getEntities(str).thenApply(list -> {
            return (list == null || list.size() <= 0) ? Optional.empty() : Optional.of(getTxnResource((String) list.get(RandomFactory.create().nextInt(list.size()))));
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Version> getTxnVersionFromIndex(String str, TxnResource txnResource) {
        return this.hostTxnIndex.getEntityData(str, getTxnResourceString(txnResource)).thenApply(bArr -> {
            return (Version) Optional.ofNullable(bArr).map(this::parseVersionData).filter(version -> {
                return !version.equals(getEmptyVersion());
            }).orElse(null);
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> removeHostFromIndex(String str) {
        return this.hostTxnIndex.removeHost(str);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Set<String>> listHostsOwningTxn() {
        return this.hostTxnIndex.getHosts();
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> addRequestToIndex(String str, String str2, ControllerEvent controllerEvent) {
        return this.hostTaskIndex.addEntity(str, str2, this.controllerEventSerializer.toByteBuffer(controllerEvent).array());
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> removeTaskFromIndex(String str, String str2) {
        return this.hostTaskIndex.removeEntity(str, str2, true);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Map<String, ControllerEvent>> getPendingsTaskForHost(String str, int i) {
        return this.hostTaskIndex.getEntities(str).thenCompose(list -> {
            return Futures.allOfWithResults((Map) list.stream().limit(i).collect(Collectors.toMap(str2 -> {
                return str2;
            }, str3 -> {
                return getControllerTask(str, str3);
            })));
        });
    }

    private CompletableFuture<ControllerEvent> getControllerTask(String str, String str2) {
        return this.hostTaskIndex.getEntityData(str, str2).thenApply(bArr -> {
            return this.controllerEventSerializer.fromByteBuffer(ByteBuffer.wrap(bArr));
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> removeHostFromTaskIndex(String str) {
        return this.hostTaskIndex.removeHost(str);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Set<String>> listHostsWithPendingTask() {
        return this.hostTaskIndex.getHosts();
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> markCold(String str, String str2, long j, long j2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).setColdMarker(j, j2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Boolean> isCold(String str, String str2, long j, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getColdMarker(j).thenApply(l -> {
            return Boolean.valueOf(l != null && l.longValue() > System.currentTimeMillis());
        }), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> removeMarker(String str, String str2, long j, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).removeColdMarker(j), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Map<UUID, ActiveTxnRecord>> getActiveTxns(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getActiveTxns(), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<ScaleMetadata>> getScaleMetadata(String str, String str2, long j, long j2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getScaleMetadata(j, j2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<EpochRecord> getActiveEpoch(String str, String str2, OperationContext operationContext, boolean z, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getActiveEpoch(z), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<EpochRecord> getEpoch(String str, String str2, int i, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getEpochRecord(i), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> startCommitTransactions(String str, String str2, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService) {
        return withCompletion(getStream(str, str2, operationContext).startCommittingTransactions(), scheduledExecutorService);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> getVersionedCommittingTransactionsRecord(String str, String str2, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService) {
        return withCompletion(getStream(str, str2, operationContext).getVersionedCommitTransactionsRecord(), scheduledExecutorService);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> recordCommitOffsets(String str, String str2, UUID uuid, Map<Long, Long> map, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService) {
        return withCompletion(getStream(str, str2, operationContext).recordCommitOffsets(uuid, map), scheduledExecutorService);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> completeCommitTransactions(String str, String str2, VersionedMetadata<CommittingTransactionsRecord> versionedMetadata, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService) {
        Stream stream = getStream(str, str2, operationContext);
        return withCompletion(stream.completeCommittingTransactions(versionedMetadata), scheduledExecutorService).thenAccept(r7 -> {
            stream.getNumberOfOngoingTransactions().thenAccept(num -> {
                TransactionMetrics.reportOpenTransactions(str, str2, num.intValue());
            });
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> createWaitingRequestIfAbsent(String str, String str2, String str3, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService) {
        return withCompletion(getStream(str, str2, operationContext).createWaitingRequestIfAbsent(str3), scheduledExecutorService);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<String> getWaitingRequestProcessor(String str, String str2, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService) {
        return withCompletion(getStream(str, str2, operationContext).getWaitingRequestProcessor(), scheduledExecutorService);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> deleteWaitingRequestConditionally(String str, String str2, String str3, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService) {
        return withCompletion(getStream(str, str2, operationContext).deleteWaitingRequestConditionally(str3), scheduledExecutorService);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<WriterTimestampResponse> noteWriterMark(String str, String str2, String str3, long j, Map<Long, Long> map, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).noteWriterMark(str3, j, map), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> shutdownWriter(String str, String str2, String str3, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).shutdownWriter(str3), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> removeWriter(String str, String str2, String str3, WriterMark writerMark, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).removeWriter(str3, writerMark), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<WriterMark> getWriterMark(String str, String str2, String str3, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getWriterMark(str3), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Map<String, WriterMark>> getAllWriterMarks(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getAllWriterMarks(), executor);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Stream getStream(String str, String str2, OperationContext operationContext) {
        Stream stream;
        if (operationContext != null) {
            stream = operationContext.getStream();
            if (!$assertionsDisabled && !stream.getScope().equals(str)) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && !stream.getName().equals(str2)) {
                throw new AssertionError();
            }
        } else {
            stream = (Stream) this.cache.getUnchecked(new ImmutablePair(str, str2));
            stream.refresh();
        }
        return stream;
    }

    @VisibleForTesting
    void setStream(Stream stream) {
        this.cache.put(new ImmutablePair(stream.getScope(), stream.getName()), stream);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Scope getScope(String str) {
        Scope scope = (Scope) this.scopeCache.getUnchecked(str);
        scope.refresh();
        return scope;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> CompletableFuture<T> withCompletion(CompletableFuture<T> completableFuture, Executor executor) {
        CompletableFuture<T> completableFuture2 = new CompletableFuture<>();
        completableFuture.whenCompleteAsync((BiConsumer) (obj, th) -> {
            if (th != null) {
                completableFuture2.completeExceptionally(th);
            } else {
                completableFuture2.complete(obj);
            }
        }, executor);
        return completableFuture2;
    }

    private CompletableFuture<AbstractMap.SimpleEntry<Long, Long>> findNumSplitsMerges(String str, String str2, OperationContext operationContext, Executor executor) {
        return getScaleMetadata(str, str2, 0L, Long.MAX_VALUE, operationContext, executor).thenApply(list -> {
            AtomicLong atomicLong = new AtomicLong(0L);
            AtomicLong atomicLong2 = new AtomicLong(0L);
            list.forEach(scaleMetadata -> {
                atomicLong2.addAndGet(scaleMetadata.getMerges());
                atomicLong.addAndGet(scaleMetadata.getSplits());
            });
            return new AbstractMap.SimpleEntry(Long.valueOf(atomicLong.get()), Long.valueOf(atomicLong2.get()));
        });
    }

    abstract CompletableFuture<Integer> getSafeStartingSegmentNumberFor(String str, String str2);

    abstract CompletableFuture<Void> recordLastStreamSegment(String str, String str2, int i, OperationContext operationContext, Executor executor);

    abstract Stream newStream(String str, String str2);

    abstract CompletableFuture<Int96> getNextCounter();

    abstract CompletableFuture<Boolean> checkScopeExists(String str);

    private String getTxnResourceString(TxnResource txnResource) {
        return txnResource.toString(RESOURCE_PART_SEPARATOR);
    }

    private TxnResource getTxnResource(String str) {
        return TxnResource.parse(str, RESOURCE_PART_SEPARATOR);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getScopedStreamName(String str, String str2) {
        return String.format("%s/%s", str, str2);
    }

    abstract Version getEmptyVersion();

    abstract Version parseVersionData(byte[] bArr);

    static {
        $assertionsDisabled = !AbstractStreamMetadataStore.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(AbstractStreamMetadataStore.class);
        DATA_NOT_FOUND_PREDICATE = th -> {
            return Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException;
        };
        DATA_NOT_EMPTY_PREDICATE = th2 -> {
            return Exceptions.unwrap(th2) instanceof StoreException.DataNotEmptyException;
        };
    }
}
