package io.pravega.controller.store.stream;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.concurrent.Futures;
import io.pravega.controller.store.index.HostIndex;
import io.pravega.controller.store.stream.CreateStreamResponse;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.tables.ActiveTxnRecord;
import io.pravega.controller.store.stream.tables.State;
import io.pravega.controller.store.stream.tables.StreamTruncationRecord;
import io.pravega.controller.store.task.TxnResource;
import io.pravega.controller.stream.api.grpc.v1.Controller;
import io.pravega.shared.MetricsNames;
import io.pravega.shared.metrics.DynamicLogger;
import io.pravega.shared.metrics.MetricsProvider;
import io.pravega.shared.metrics.OpStatsLogger;
import io.pravega.shared.metrics.StatsLogger;
import io.pravega.shared.metrics.StatsProvider;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/store/stream/AbstractStreamMetadataStore.class */
public abstract class AbstractStreamMetadataStore implements StreamMetadataStore {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log;
    protected static final StatsProvider METRICS_PROVIDER;
    private static final DynamicLogger DYNAMIC_LOGGER;
    private static final StatsLogger STATS_LOGGER;
    private static final OpStatsLogger CREATE_STREAM;
    private static final OpStatsLogger SEAL_STREAM;
    private static final OpStatsLogger DELETE_STREAM;
    private static final String RESOURCE_PART_SEPARATOR = "_%_";
    protected final int bucketCount;
    private final HostIndex hostIndex;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final LoadingCache<Pair<String, String>, Stream> cache = CacheBuilder.newBuilder().maximumSize(10000).refreshAfterWrite(10, TimeUnit.MINUTES).expireAfterWrite(10, TimeUnit.MINUTES).build(new CacheLoader<Pair<String, String>, Stream>() { // from class: io.pravega.controller.store.stream.AbstractStreamMetadataStore.1
        @ParametersAreNonnullByDefault
        public Stream load(Pair<String, String> pair) {
            try {
                return AbstractStreamMetadataStore.this.newStream((String) pair.getKey(), (String) pair.getValue());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    });
    private final LoadingCache<String, Scope> scopeCache = CacheBuilder.newBuilder().maximumSize(1000).refreshAfterWrite(10, TimeUnit.MINUTES).expireAfterWrite(10, TimeUnit.MINUTES).build(new CacheLoader<String, Scope>() { // from class: io.pravega.controller.store.stream.AbstractStreamMetadataStore.2
        @ParametersAreNonnullByDefault
        public Scope load(String str) {
            try {
                return AbstractStreamMetadataStore.this.newScope(str);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    });

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractStreamMetadataStore(HostIndex hostIndex, int i) {
        this.hostIndex = hostIndex;
        this.bucketCount = i;
    }

    abstract Scope newScope(String str);

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public OperationContext createContext(String str, String str2) {
        return new OperationContextImpl(getStream(str, str2, null));
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<CreateStreamResponse> createStream(String str, String str2, StreamConfiguration streamConfiguration, long j, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).create(streamConfiguration, j), executor).thenApply(createStreamResponse -> {
            if (createStreamResponse.getStatus().equals(CreateStreamResponse.CreateStatus.NEW)) {
                CREATE_STREAM.reportSuccessValue(1L);
                DYNAMIC_LOGGER.reportGaugeValue(MetricsNames.nameFromStream("controller.transactions_opened", str, str2), 0);
                DYNAMIC_LOGGER.reportGaugeValue(MetricsNames.nameFromStream("controller.segments_count", str, str2), Integer.valueOf(streamConfiguration.getScalingPolicy().getMinNumSegments()));
                DYNAMIC_LOGGER.incCounterValue(MetricsNames.nameFromStream("controller.segment_splits", str, str2), 0L);
                DYNAMIC_LOGGER.incCounterValue(MetricsNames.nameFromStream("controller.segment_merges", str, str2), 0L);
            }
            return createStreamResponse;
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> deleteStream(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).delete(), executor).thenAccept(r9 -> {
            this.cache.invalidate(new ImmutablePair(str, str2));
        }).thenApply(r7 -> {
            DELETE_STREAM.reportSuccessValue(1L);
            DYNAMIC_LOGGER.freezeCounter(MetricsNames.nameFromStream("controller.transactions_committed", str, str2));
            DYNAMIC_LOGGER.freezeGaugeValue(MetricsNames.nameFromStream("controller.transactions_opened", str, str2));
            DYNAMIC_LOGGER.freezeCounter(MetricsNames.nameFromStream("controller.segment_splits", str, str2));
            DYNAMIC_LOGGER.freezeCounter(MetricsNames.nameFromStream("controller.segment_merges", str, str2));
            return r7;
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Boolean> setState(String str, String str2, State state, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).updateState(state), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<State> getState(String str, String str2, boolean z, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getState(z), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Controller.CreateScopeStatus> createScope(String str) {
        return getScope(str).createScope().handle((r4, th) -> {
            if (th == null) {
                return Controller.CreateScopeStatus.newBuilder().setStatus(Controller.CreateScopeStatus.Status.SUCCESS).build();
            }
            if ((th instanceof StoreException.DataExistsException) || (th.getCause() instanceof StoreException.DataExistsException)) {
                return Controller.CreateScopeStatus.newBuilder().setStatus(Controller.CreateScopeStatus.Status.SCOPE_EXISTS).build();
            }
            log.debug("Create scope failed due to ", th);
            return Controller.CreateScopeStatus.newBuilder().setStatus(Controller.CreateScopeStatus.Status.FAILURE).build();
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Controller.DeleteScopeStatus> deleteScope(String str) {
        return getScope(str).deleteScope().handle((r4, th) -> {
            if (th == null) {
                return Controller.DeleteScopeStatus.newBuilder().setStatus(Controller.DeleteScopeStatus.Status.SUCCESS).build();
            }
            if ((th.getCause() instanceof StoreException.DataNotFoundException) || (th instanceof StoreException.DataNotFoundException)) {
                return Controller.DeleteScopeStatus.newBuilder().setStatus(Controller.DeleteScopeStatus.Status.SCOPE_NOT_FOUND).build();
            }
            if ((th.getCause() instanceof StoreException.DataNotEmptyException) || (th instanceof StoreException.DataNotEmptyException)) {
                return Controller.DeleteScopeStatus.newBuilder().setStatus(Controller.DeleteScopeStatus.Status.SCOPE_NOT_EMPTY).build();
            }
            log.debug("DeleteScope failed due to {} ", th);
            return Controller.DeleteScopeStatus.newBuilder().setStatus(Controller.DeleteScopeStatus.Status.FAILURE).build();
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<StreamConfiguration>> listStreamsInScope(String str) {
        return getScope(str).listStreamsInScope().thenCompose(list -> {
            return Futures.allOfWithResults((List) list.stream().map(str2 -> {
                return getStream(str, str2, null).getConfiguration();
            }).collect(Collectors.toList()));
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> startTruncation(String str, String str2, Map<Integer, Long> map, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).startTruncation(map), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> completeTruncation(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).completeTruncation(), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<StreamTruncationRecord> getTruncationRecord(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getTruncationRecord(), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<StreamProperty<StreamTruncationRecord>> getTruncationProperty(String str, String str2, boolean z, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getTruncationProperty(z), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> startUpdateConfiguration(String str, String str2, StreamConfiguration streamConfiguration, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).startUpdateConfiguration(streamConfiguration), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> completeUpdateConfiguration(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).completeUpdateConfiguration(), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<StreamConfiguration> getConfiguration(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getConfiguration(), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<StreamProperty<StreamConfiguration>> getConfigurationProperty(String str, String str2, boolean z, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getConfigurationProperty(z), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Boolean> isSealed(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getState(true).thenApply(state -> {
            return Boolean.valueOf(state.equals(State.SEALED));
        }), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Boolean> setSealed(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).updateState(State.SEALED), executor).thenApply(bool -> {
            SEAL_STREAM.reportSuccessValue(1L);
            DYNAMIC_LOGGER.reportGaugeValue(MetricsNames.nameFromStream("controller.transactions_opened", str, str2), 0);
            return bool;
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Segment> getSegment(String str, String str2, int i, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getSegment(i), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Integer> getSegmentCount(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getSegmentCount(), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<Segment>> getActiveSegments(String str, String str2, OperationContext operationContext, Executor executor) {
        Stream stream = getStream(str, str2, operationContext);
        return withCompletion(stream.getState(true).thenComposeAsync(state -> {
            return State.SEALED.equals(state) ? CompletableFuture.completedFuture(Collections.emptyList()) : stream.getActiveSegments();
        }, executor).thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) list -> {
            java.util.stream.Stream stream2 = list.stream();
            stream.getClass();
            return Futures.allOfWithResults((List) stream2.map((v1) -> {
                return r1.getSegment(v1);
            }).collect(Collectors.toList()));
        }, executor), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<Integer>> getActiveSegments(String str, String str2, long j, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getActiveSegments(j), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<Segment>> getActiveSegments(String str, String str2, int i, OperationContext operationContext, Executor executor) {
        Stream stream = getStream(str, str2, operationContext);
        return withCompletion(stream.getActiveSegments(i).thenComposeAsync(list -> {
            java.util.stream.Stream stream2 = list.stream();
            stream.getClass();
            return Futures.allOfWithResults((List) stream2.map((v1) -> {
                return r1.getSegment(v1);
            }).collect(Collectors.toList()));
        }, executor), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<Integer>> getActiveSegmentIds(String str, String str2, int i, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getActiveSegments(i), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Map<Integer, List<Integer>>> getSuccessors(String str, String str2, int i, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getSuccessorsWithPredecessors(i), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<StartScaleResponse> startScale(String str, String str2, List<Integer> list, List<AbstractMap.SimpleEntry<Double, Double>> list2, long j, boolean z, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).startScale(list, list2, j, z), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> scaleNewSegmentsCreated(String str, String str2, List<Integer> list, List<Segment> list2, int i, long j, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).scaleNewSegmentsCreated(list, (List) list2.stream().map((v0) -> {
            return v0.getNumber();
        }).collect(Collectors.toList()), i, j), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> scaleSegmentsSealed(String str, String str2, List<Integer> list, List<Segment> list2, int i, long j, OperationContext operationContext, Executor executor) {
        CompletableFuture<Void> withCompletion = withCompletion(getStream(str, str2, operationContext).scaleOldSegmentsSealed(list, (List) list2.stream().map((v0) -> {
            return v0.getNumber();
        }).collect(Collectors.toList()), i, j), executor);
        withCompletion.thenCompose(r15 -> {
            return CompletableFuture.allOf(getActiveSegments(str, str2, System.currentTimeMillis(), (OperationContext) null, executor).thenAccept(list3 -> {
                DYNAMIC_LOGGER.reportGaugeValue(MetricsNames.nameFromStream("controller.segments_count", str, str2), Integer.valueOf(list3.size()));
            }), findNumSplitsMerges(str, str2, executor).thenAccept(simpleEntry -> {
                DYNAMIC_LOGGER.updateCounterValue(MetricsNames.nameFromStream("controller.segment_splits", str, str2), ((Long) simpleEntry.getKey()).longValue());
                DYNAMIC_LOGGER.updateCounterValue(MetricsNames.nameFromStream("controller.segment_merges", str, str2), ((Long) simpleEntry.getValue()).longValue());
            }));
        });
        return withCompletion;
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<DeleteEpochResponse> tryDeleteEpochIfScaling(String str, String str2, int i, OperationContext operationContext, Executor executor) {
        Stream stream = getStream(str, str2, operationContext);
        return withCompletion(stream.scaleTryDeleteEpoch(i), executor).thenCompose(bool -> {
            return bool.booleanValue() ? stream.latestScaleData().thenCompose(pair -> {
                List list = (List) pair.getLeft();
                java.util.stream.Stream stream2 = ((List) pair.getRight()).stream();
                stream.getClass();
                return Futures.allOfWithResults((List) stream2.map((v1) -> {
                    return r1.getSegment(v1);
                }).collect(Collectors.toList())).thenApply(list2 -> {
                    return new DeleteEpochResponse(true, list, list2);
                });
            }) : CompletableFuture.completedFuture(new DeleteEpochResponse(false, null, null));
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> addStreamCutToRetentionSet(String str, String str2, StreamCutRecord streamCutRecord, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).addStreamCutToRetentionSet(streamCutRecord), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<StreamCutRecord>> getStreamCutsFromRetentionSet(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getRetentionStreamCuts(), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> deleteStreamCutBefore(String str, String str2, StreamCutRecord streamCutRecord, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).deleteStreamCutBefore(streamCutRecord), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedTransactionData> createTransaction(String str, String str2, UUID uuid, long j, long j2, long j3, OperationContext operationContext, Executor executor) {
        Stream stream = getStream(str, str2, operationContext);
        return withCompletion(stream.createTransaction(uuid, j, j2, j3), executor).thenApply(versionedTransactionData -> {
            stream.getNumberOfOngoingTransactions().thenAccept(num -> {
                DYNAMIC_LOGGER.incCounterValue(MetricsNames.nameFromStream("controller.transactions_created", str, str2), 1L);
                DYNAMIC_LOGGER.reportGaugeValue(MetricsNames.nameFromStream("controller.transactions_opened", str, str2), num);
            });
            return versionedTransactionData;
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedTransactionData> pingTransaction(String str, String str2, VersionedTransactionData versionedTransactionData, long j, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).pingTransaction(versionedTransactionData, j), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<VersionedTransactionData> getTransactionData(String str, String str2, UUID uuid, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getTransactionData(uuid), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<TxnStatus> transactionStatus(String str, String str2, UUID uuid, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).checkTransactionStatus(uuid), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<TxnStatus> commitTransaction(String str, String str2, int i, UUID uuid, OperationContext operationContext, Executor executor) {
        Stream stream = getStream(str, str2, operationContext);
        CompletableFuture<TxnStatus> withCompletion = withCompletion(stream.commitTransaction(i, uuid), executor);
        withCompletion.thenCompose(txnStatus -> {
            return stream.getNumberOfOngoingTransactions().thenAccept(num -> {
                DYNAMIC_LOGGER.incCounterValue(MetricsNames.nameFromStream("controller.transactions_committed", str, str2), 1L);
                DYNAMIC_LOGGER.reportGaugeValue(MetricsNames.nameFromStream("controller.transactions_opened", str, str2), num);
            });
        });
        return withCompletion;
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<AbstractMap.SimpleEntry<TxnStatus, Integer>> sealTransaction(String str, String str2, UUID uuid, boolean z, Optional<Integer> optional, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).sealTransaction(uuid, z, optional), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<TxnStatus> abortTransaction(String str, String str2, int i, UUID uuid, OperationContext operationContext, Executor executor) {
        Stream stream = getStream(str, str2, operationContext);
        CompletableFuture<TxnStatus> withCompletion = withCompletion(stream.abortTransaction(i, uuid), executor);
        withCompletion.thenApply(txnStatus -> {
            stream.getNumberOfOngoingTransactions().thenAccept(num -> {
                DYNAMIC_LOGGER.incCounterValue(MetricsNames.nameFromStream("controller.transactions_aborted", str, str2), 1L);
                DYNAMIC_LOGGER.reportGaugeValue(MetricsNames.nameFromStream("controller.transactions_opened", str, str2), num);
            });
            return txnStatus;
        });
        return withCompletion;
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Boolean> isTransactionOngoing(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getNumberOfOngoingTransactions(), executor).thenApply(num -> {
            return Boolean.valueOf(num.intValue() > 0);
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> addTxnToIndex(String str, TxnResource txnResource, int i) {
        return this.hostIndex.addEntity(str, getTxnResourceString(txnResource), ByteBuffer.allocate(4).putInt(i).array());
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> removeTxnFromIndex(String str, TxnResource txnResource, boolean z) {
        return this.hostIndex.removeEntity(str, getTxnResourceString(txnResource), z);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Optional<TxnResource>> getRandomTxnFromIndex(String str) {
        return this.hostIndex.getEntities(str).thenApply(list -> {
            return (list == null || list.size() <= 0) ? Optional.empty() : Optional.of(getTxnResource((String) list.get(new Random().nextInt(list.size()))));
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Integer> getTxnVersionFromIndex(String str, TxnResource txnResource) {
        return this.hostIndex.getEntityData(str, getTxnResourceString(txnResource)).thenApply(bArr -> {
            if (bArr != null) {
                return Integer.valueOf(ByteBuffer.wrap(bArr).getInt());
            }
            return null;
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> removeHostFromIndex(String str) {
        return this.hostIndex.removeHost(str);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Set<String>> listHostsOwningTxn() {
        return this.hostIndex.getHosts();
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> markCold(String str, String str2, int i, long j, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).setColdMarker(i, j), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Boolean> isCold(String str, String str2, int i, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getColdMarker(i).thenApply(l -> {
            return Boolean.valueOf(l != null && l.longValue() > System.currentTimeMillis());
        }), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> removeMarker(String str, String str2, int i, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).removeColdMarker(i), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Map<UUID, ActiveTxnRecord>> getActiveTxns(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getActiveTxns(), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<ScaleMetadata>> getScaleMetadata(String str, String str2, OperationContext operationContext, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getScaleMetadata(), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Pair<Integer, List<Integer>>> getActiveEpoch(String str, String str2, OperationContext operationContext, boolean z, Executor executor) {
        return withCompletion(getStream(str, str2, operationContext).getActiveEpoch(z), executor);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Stream getStream(String str, String str2, OperationContext operationContext) {
        Stream stream;
        if (operationContext != null) {
            stream = operationContext.getStream();
            if (!$assertionsDisabled && !stream.getScope().equals(str)) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && !stream.getName().equals(str2)) {
                throw new AssertionError();
            }
        } else {
            stream = (Stream) this.cache.getUnchecked(new ImmutablePair(str, str2));
            stream.refresh();
        }
        return stream;
    }

    private Scope getScope(String str) {
        Scope scope = (Scope) this.scopeCache.getUnchecked(str);
        scope.refresh();
        return scope;
    }

    private <T> CompletableFuture<T> withCompletion(CompletableFuture<T> completableFuture, Executor executor) {
        CompletableFuture<T> completableFuture2 = new CompletableFuture<>();
        completableFuture.whenCompleteAsync((BiConsumer) (obj, th) -> {
            if (th != null) {
                completableFuture2.completeExceptionally(th);
            } else {
                completableFuture2.complete(obj);
            }
        }, executor);
        return completableFuture2;
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<AbstractMap.SimpleEntry<Long, Long>> findNumSplitsMerges(String str, String str2, Executor executor) {
        return getScaleMetadata(str, str2, null, executor).thenApply(list -> {
            long j;
            long findSegmentSplitsMerges;
            int size = list.size();
            long j2 = 0;
            long j3 = 0;
            boolean z = size > 1 ? ((ScaleMetadata) list.get(0)).getTimestamp() > ((ScaleMetadata) list.get(1)).getTimestamp() : true;
            for (int i = 0; i < size - 1; i++) {
                List<Segment> segments = ((ScaleMetadata) list.get(i)).getSegments();
                List<Segment> segments2 = ((ScaleMetadata) list.get(i + 1)).getSegments();
                if (z) {
                    j2 += findSegmentSplitsMerges(segments2, segments);
                    j = j3;
                    findSegmentSplitsMerges = findSegmentSplitsMerges(segments, segments2);
                } else {
                    j2 += findSegmentSplitsMerges(segments, segments2);
                    j = j3;
                    findSegmentSplitsMerges = findSegmentSplitsMerges(segments2, segments);
                }
                j3 = j + findSegmentSplitsMerges;
            }
            return new AbstractMap.SimpleEntry(Long.valueOf(j2), Long.valueOf(j3));
        });
    }

    private long findSegmentSplitsMerges(List<Segment> list, List<Segment> list2) {
        return list.stream().filter(segment -> {
            return list2.stream().filter(segment -> {
                return segment.overlaps(segment);
            }).count() > 1;
        }).count();
    }

    abstract Stream newStream(String str, String str2);

    private String getTxnResourceString(TxnResource txnResource) {
        return txnResource.toString(RESOURCE_PART_SEPARATOR);
    }

    private TxnResource getTxnResource(String str) {
        return TxnResource.parse(str, RESOURCE_PART_SEPARATOR);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getBucket(String str, String str2) {
        return getScopedStreamName(str, str2).hashCode() % this.bucketCount;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getScopedStreamName(String str, String str2) {
        return String.format("%s/%s", str, str2);
    }

    static {
        $assertionsDisabled = !AbstractStreamMetadataStore.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(AbstractStreamMetadataStore.class);
        METRICS_PROVIDER = MetricsProvider.getMetricsProvider();
        DYNAMIC_LOGGER = MetricsProvider.getDynamicLogger();
        STATS_LOGGER = METRICS_PROVIDER.createStatsLogger("controller");
        CREATE_STREAM = STATS_LOGGER.createStats("stream_created");
        SEAL_STREAM = STATS_LOGGER.createStats("stream_sealed");
        DELETE_STREAM = STATS_LOGGER.createStats("stream_deleted");
    }
}
