package io.pravega.controller.store.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.BitConverter;
import io.pravega.controller.store.stream.CreateStreamResponse;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.Version;
import io.pravega.controller.store.stream.records.ActiveTxnRecord;
import io.pravega.controller.store.stream.records.CommittingTransactionsRecord;
import io.pravega.controller.store.stream.records.CompletedTxnRecord;
import io.pravega.controller.store.stream.records.EpochRecord;
import io.pravega.controller.store.stream.records.EpochTransitionRecord;
import io.pravega.controller.store.stream.records.HistoryTimeSeries;
import io.pravega.controller.store.stream.records.RetentionSet;
import io.pravega.controller.store.stream.records.SealedSegmentsMapShard;
import io.pravega.controller.store.stream.records.StateRecord;
import io.pravega.controller.store.stream.records.StreamConfigurationRecord;
import io.pravega.controller.store.stream.records.StreamCutRecord;
import io.pravega.controller.store.stream.records.StreamTruncationRecord;
import io.pravega.controller.store.stream.records.WriterMark;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/pravega/controller/store/stream/ZKStream.class */
public class ZKStream extends PersistentStreamBase {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(ZKStream.class);
    private static final String SCOPE_PATH = "/store/%s";
    private static final String STREAM_PATH = "/store/%s/%s";
    private static final String CREATION_TIME_PATH = "/store/%s/%s/creationTime";
    private static final String CONFIGURATION_PATH = "/store/%s/%s/configuration";
    private static final String TRUNCATION_PATH = "/store/%s/%s/truncation";
    private static final String STATE_PATH = "/store/%s/%s/state";
    private static final String EPOCH_TRANSITION_PATH = "/store/%s/%s/epochTransition";
    private static final String RETENTION_SET_PATH = "/store/%s/%s/retention";
    private static final String RETENTION_STREAM_CUT_RECORD_PATH = "/store/%s/%s/retentionCuts";
    private static final String CURRENT_EPOCH_RECORD = "/store/%s/%s/currentEpochRecord";
    private static final String EPOCH_RECORD = "/store/%s/%s/epochRecords";
    private static final String HISTORY_TIMESERIES_CHUNK_PATH = "/store/%s/%s/historyTimeSeriesChunks";
    private static final String SEGMENTS_SEALED_SIZE_MAP_SHARD_PATH = "/store/%s/%s/segmentsSealedSizeMapShardPath";
    private static final String SEGMENT_SEALED_EPOCH_PATH = "/store/%s/%s/segmentSealedEpochPath";
    private static final String COMMITTING_TXNS_PATH = "/store/%s/%s/committingTxns";
    private static final String WRITER_POSITIONS_PATH = "/store/%s/%s/writerPositions";
    private static final String WAITING_REQUEST_PROCESSOR_PATH = "/store/%s/%s/waitingRequestProcessor";
    private static final String MARKER_PATH = "/store/%s/%s/markers";
    private static final String ID_PATH = "/store/%s/%s/id";
    private static final String STREAM_ACTIVE_TX_PATH = "/transactions/activeTx/%s/%S";
    private static final String STREAM_COMPLETED_TX_BATCH_PATH = "/transactions/completedTx/batches/%d/%s/%s";
    private final ZKStoreHelper store;

    @VisibleForTesting
    private final String creationPath;
    private final String configurationPath;
    private final String truncationPath;
    private final String statePath;
    private final String epochTransitionPath;
    private final String committingTxnsPath;
    private final String waitingRequestProcessorPath;
    private final String activeTxRoot;
    private final String markerPath;
    private final String idPath;
    private final String streamPath;
    private final String retentionSetPath;
    private final String retentionStreamCutRecordPathFormat;
    private final String currentEpochRecordPath;
    private final String epochRecordPathFormat;
    private final String historyTimeSeriesChunkPathFormat;
    private final String segmentSealedEpochPathFormat;
    private final String segmentsSealedSizeMapShardPathFormat;
    private final String writerPositionsPath;
    private final Supplier<Integer> currentBatchSupplier;
    private final Executor executor;
    private final ZkOrderedStore txnCommitOrderer;
    private final AtomicReference<String> idRef;

    @VisibleForTesting
    ZKStream(String str, String str2, ZKStoreHelper zKStoreHelper, Executor executor, ZkOrderedStore zkOrderedStore) {
        this(str, str2, zKStoreHelper, () -> {
            return 0;
        }, executor, zkOrderedStore);
    }

    @VisibleForTesting
    ZKStream(String str, String str2, ZKStoreHelper zKStoreHelper, int i, int i2, Executor executor, ZkOrderedStore zkOrderedStore) {
        this(str, str2, zKStoreHelper, () -> {
            return 0;
        }, i, i2, executor, zkOrderedStore);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public ZKStream(String str, String str2, ZKStoreHelper zKStoreHelper, Supplier<Integer> supplier, Executor executor, ZkOrderedStore zkOrderedStore) {
        this(str, str2, zKStoreHelper, supplier, 1000, 1000, executor, zkOrderedStore);
    }

    @VisibleForTesting
    ZKStream(String str, String str2, ZKStoreHelper zKStoreHelper, Supplier<Integer> supplier, int i, int i2, Executor executor, ZkOrderedStore zkOrderedStore) {
        super(str, str2, i, i2);
        this.store = zKStoreHelper;
        this.streamPath = String.format(STREAM_PATH, str, str2);
        this.creationPath = String.format(CREATION_TIME_PATH, str, str2);
        this.configurationPath = String.format(CONFIGURATION_PATH, str, str2);
        this.truncationPath = String.format(TRUNCATION_PATH, str, str2);
        this.statePath = String.format(STATE_PATH, str, str2);
        this.retentionSetPath = String.format(RETENTION_SET_PATH, str, str2);
        this.retentionStreamCutRecordPathFormat = String.format(RETENTION_STREAM_CUT_RECORD_PATH, str, str2) + "/%d";
        this.epochTransitionPath = String.format(EPOCH_TRANSITION_PATH, str, str2);
        this.activeTxRoot = String.format(STREAM_ACTIVE_TX_PATH, str, str2);
        this.committingTxnsPath = String.format(COMMITTING_TXNS_PATH, str, str2);
        this.waitingRequestProcessorPath = String.format(WAITING_REQUEST_PROCESSOR_PATH, str, str2);
        this.markerPath = String.format(MARKER_PATH, str, str2);
        this.idPath = String.format(ID_PATH, str, str2);
        this.currentEpochRecordPath = String.format(CURRENT_EPOCH_RECORD, str, str2);
        this.epochRecordPathFormat = String.format(EPOCH_RECORD, str, str2) + "/%d";
        this.historyTimeSeriesChunkPathFormat = String.format(HISTORY_TIMESERIES_CHUNK_PATH, str, str2) + "/%d";
        this.segmentSealedEpochPathFormat = String.format(SEGMENT_SEALED_EPOCH_PATH, str, str2) + "/%d";
        this.segmentsSealedSizeMapShardPathFormat = String.format(SEGMENTS_SEALED_SIZE_MAP_SHARD_PATH, str, str2) + "/%d";
        this.writerPositionsPath = String.format(WRITER_POSITIONS_PATH, str, str2);
        this.idRef = new AtomicReference<>();
        this.currentBatchSupplier = supplier;
        this.executor = executor;
        this.txnCommitOrderer = zkOrderedStore;
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Integer> getNumberOfOngoingTransactions() {
        return this.store.getChildren(this.activeTxRoot).thenCompose(list -> {
            return Futures.allOfWithResults((List) list.stream().map(str -> {
                return getNumberOfOngoingTransactions(Integer.parseInt(str));
            }).collect(Collectors.toList()));
        }).thenApply((Function<? super U, ? extends U>) list2 -> {
            return (Integer) list2.stream().reduce(0, (v0, v1) -> {
                return Integer.sum(v0, v1);
            });
        });
    }

    private CompletableFuture<Integer> getNumberOfOngoingTransactions(int i) {
        return this.store.getChildren(getEpochPath(i)).thenApply((v0) -> {
            return v0.size();
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> deleteStream() {
        return this.store.deleteTree(this.streamPath);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<CreateStreamResponse> checkStreamExists(StreamConfiguration streamConfiguration, long j, int i) {
        return this.store.checkExists(this.creationPath).thenCompose(bool -> {
            return !bool.booleanValue() ? CompletableFuture.completedFuture(new CreateStreamResponse(CreateStreamResponse.CreateStatus.NEW, streamConfiguration, j, i)) : getCreationTime().thenCompose(l -> {
                return this.store.checkExists(this.configurationPath).thenCompose(bool -> {
                    if (bool.booleanValue()) {
                        return handleConfigExists(l.longValue(), i, l.longValue() == j);
                    }
                    return CompletableFuture.completedFuture(new CreateStreamResponse(CreateStreamResponse.CreateStatus.NEW, streamConfiguration, l.longValue(), i));
                });
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createStreamMetadata() {
        return Futures.toVoid(this.store.createZNodeIfNotExist(getStreamPath()));
    }

    private CompletableFuture<CreateStreamResponse> handleConfigExists(long j, int i, boolean z) {
        CreateStreamResponse.CreateStatus createStatus = z ? CreateStreamResponse.CreateStatus.NEW : CreateStreamResponse.CreateStatus.EXISTS_CREATING;
        return getConfiguration().thenCompose(streamConfiguration -> {
            return this.store.checkExists(this.statePath).thenCompose(bool -> {
                return !bool.booleanValue() ? CompletableFuture.completedFuture(new CreateStreamResponse(createStatus, streamConfiguration, j, i)) : getState(false).thenApply(state -> {
                    return (state.equals(State.UNKNOWN) || state.equals(State.CREATING)) ? new CreateStreamResponse(createStatus, streamConfiguration, j, i) : new CreateStreamResponse(CreateStreamResponse.CreateStatus.EXISTS_ACTIVE, streamConfiguration, j, i);
                });
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Long> getCreationTime() {
        return getId().thenCompose(str -> {
            return this.store.getCachedData(this.creationPath, str, bArr -> {
                return Long.valueOf(BitConverter.readLong(bArr, 0));
            }).thenApply((v0) -> {
                return v0.getObject();
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createRetentionSetDataIfAbsent(RetentionSet retentionSet) {
        return Futures.toVoid(this.store.createZNodeIfNotExist(this.retentionSetPath, retentionSet.toBytes()));
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<RetentionSet>> getRetentionSetData() {
        return this.store.getData(this.retentionSetPath, RetentionSet::fromBytes);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateRetentionSetData(VersionedMetadata<RetentionSet> versionedMetadata) {
        return this.store.setData(this.retentionSetPath, versionedMetadata.getObject().toBytes(), versionedMetadata.getVersion()).thenApply((v1) -> {
            return new Version.IntVersion(v1);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createStreamCutRecordData(long j, StreamCutRecord streamCutRecord) {
        return Futures.toVoid(this.store.createZNodeIfNotExist(String.format(this.retentionStreamCutRecordPathFormat, Long.valueOf(j)), streamCutRecord.toBytes()));
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<StreamCutRecord>> getStreamCutRecordData(long j) {
        String format = String.format(this.retentionStreamCutRecordPathFormat, Long.valueOf(j));
        return getId().thenCompose(str -> {
            return this.store.getCachedData(format, str, StreamCutRecord::fromBytes);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> deleteStreamCutRecordData(long j) {
        String format = String.format(this.retentionStreamCutRecordPathFormat, Long.valueOf(j));
        return getId().thenCompose(str -> {
            return this.store.deletePath(format, false).thenAccept(r7 -> {
                this.store.invalidateCache(format, str);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createHistoryTimeSeriesChunkDataIfAbsent(int i, HistoryTimeSeries historyTimeSeries) {
        return Futures.toVoid(this.store.createZNodeIfNotExist(String.format(this.historyTimeSeriesChunkPathFormat, Integer.valueOf(i)), historyTimeSeries.toBytes()));
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<HistoryTimeSeries>> getHistoryTimeSeriesChunkData(int i, boolean z) {
        return getId().thenCompose(str -> {
            String format = String.format(this.historyTimeSeriesChunkPathFormat, Integer.valueOf(i));
            return z ? this.store.getData(format, HistoryTimeSeries::fromBytes) : this.store.getCachedData(format, str, HistoryTimeSeries::fromBytes);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateHistoryTimeSeriesChunkData(int i, VersionedMetadata<HistoryTimeSeries> versionedMetadata) {
        return this.store.setData(String.format(this.historyTimeSeriesChunkPathFormat, Integer.valueOf(i)), versionedMetadata.getObject().toBytes(), versionedMetadata.getVersion()).thenApply((v1) -> {
            return new Version.IntVersion(v1);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createCurrentEpochRecordDataIfAbsent(EpochRecord epochRecord) {
        byte[] bArr = new byte[4];
        BitConverter.writeInt(bArr, 0, epochRecord.getEpoch());
        return Futures.toVoid(this.store.createZNodeIfNotExist(this.currentEpochRecordPath, bArr));
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateCurrentEpochRecordData(VersionedMetadata<EpochRecord> versionedMetadata) {
        byte[] bArr = new byte[4];
        BitConverter.writeInt(bArr, 0, versionedMetadata.getObject().getEpoch());
        return this.store.setData(this.currentEpochRecordPath, bArr, versionedMetadata.getVersion()).thenApply((v1) -> {
            return new Version.IntVersion(v1);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<EpochRecord>> getCurrentEpochRecordData(boolean z) {
        return getId().thenCompose(str -> {
            return (z ? this.store.getData(this.currentEpochRecordPath, bArr -> {
                return Integer.valueOf(BitConverter.readInt(bArr, 0));
            }) : this.store.getCachedData(this.currentEpochRecordPath, str, bArr2 -> {
                return Integer.valueOf(BitConverter.readInt(bArr2, 0));
            })).thenCompose(versionedMetadata -> {
                return getEpochRecord(((Integer) versionedMetadata.getObject()).intValue()).thenApply(epochRecord -> {
                    return new VersionedMetadata(epochRecord, versionedMetadata.getVersion());
                });
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createEpochRecordDataIfAbsent(int i, EpochRecord epochRecord) {
        return Futures.toVoid(this.store.createZNodeIfNotExist(String.format(this.epochRecordPathFormat, Integer.valueOf(i)), epochRecord.toBytes()));
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<EpochRecord>> getEpochRecordData(int i) {
        String format = String.format(this.epochRecordPathFormat, Integer.valueOf(i));
        return getId().thenCompose(str -> {
            return this.store.getCachedData(format, str, EpochRecord::fromBytes);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createSealedSegmentSizesMapShardDataIfAbsent(int i, SealedSegmentsMapShard sealedSegmentsMapShard) {
        return Futures.toVoid(this.store.createZNodeIfNotExist(String.format(this.segmentsSealedSizeMapShardPathFormat, Integer.valueOf(i)), sealedSegmentsMapShard.toBytes()));
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<SealedSegmentsMapShard>> getSealedSegmentSizesMapShardData(int i) {
        return this.store.getData(String.format(this.segmentsSealedSizeMapShardPathFormat, Integer.valueOf(i)), SealedSegmentsMapShard::fromBytes);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateSealedSegmentSizesMapShardData(int i, VersionedMetadata<SealedSegmentsMapShard> versionedMetadata) {
        return this.store.setData(String.format(this.segmentsSealedSizeMapShardPathFormat, Integer.valueOf(i)), versionedMetadata.getObject().toBytes(), versionedMetadata.getVersion()).thenApply((v1) -> {
            return new Version.IntVersion(v1);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createSegmentSealedEpochRecords(Collection<Long> collection, int i) {
        return Futures.allOf((Collection) collection.stream().map(l -> {
            return createSegmentSealedEpochRecordData(l.longValue(), i);
        }).collect(Collectors.toList()));
    }

    CompletableFuture<Void> createSegmentSealedEpochRecordData(long j, int i) {
        String format = String.format(this.segmentSealedEpochPathFormat, Long.valueOf(j));
        byte[] bArr = new byte[4];
        BitConverter.writeInt(bArr, 0, i);
        return Futures.toVoid(this.store.createZNodeIfNotExist(format, bArr));
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<Integer>> getSegmentSealedRecordData(long j) {
        String format = String.format(this.segmentSealedEpochPathFormat, Long.valueOf(j));
        return getId().thenCompose(str -> {
            return this.store.getCachedData(format, str, bArr -> {
                return Integer.valueOf(BitConverter.readInt(bArr, 0));
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createEpochTransitionIfAbsent(EpochTransitionRecord epochTransitionRecord) {
        return Futures.toVoid(this.store.createZNodeIfNotExist(this.epochTransitionPath, epochTransitionRecord.toBytes()));
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateEpochTransitionNode(VersionedMetadata<EpochTransitionRecord> versionedMetadata) {
        return this.store.setData(this.epochTransitionPath, versionedMetadata.getObject().toBytes(), versionedMetadata.getVersion()).thenApply((v1) -> {
            return new Version.IntVersion(v1);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<EpochTransitionRecord>> getEpochTransitionNode() {
        return this.store.getData(this.epochTransitionPath, EpochTransitionRecord::fromBytes);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> storeCreationTimeIfAbsent(long j) {
        byte[] bArr = new byte[8];
        BitConverter.writeLong(bArr, 0, j);
        return Futures.toVoid(this.store.createZNodeIfNotExist(this.creationPath, bArr));
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> createConfigurationIfAbsent(StreamConfigurationRecord streamConfigurationRecord) {
        return Futures.toVoid(this.store.createZNodeIfNotExist(this.configurationPath, streamConfigurationRecord.toBytes()));
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> createStateIfAbsent(StateRecord stateRecord) {
        return Futures.toVoid(this.store.createZNodeIfNotExist(this.statePath, stateRecord.toBytes()));
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> createMarkerData(long j, long j2) {
        String makePath = ZKPaths.makePath(this.markerPath, String.format("%d", Long.valueOf(j)));
        byte[] bArr = new byte[8];
        BitConverter.writeLong(bArr, 0, j2);
        return getId().thenCompose(str -> {
            return this.store.createZNodeIfNotExist(makePath, bArr).thenAccept(num -> {
                this.store.invalidateCache(this.markerPath, str);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateMarkerData(long j, VersionedMetadata<Long> versionedMetadata) {
        String makePath = ZKPaths.makePath(this.markerPath, String.format("%d", Long.valueOf(j)));
        byte[] bArr = new byte[8];
        BitConverter.writeLong(bArr, 0, versionedMetadata.getObject().longValue());
        return this.store.setData(makePath, bArr, versionedMetadata.getVersion()).thenApply((v1) -> {
            return new Version.IntVersion(v1);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<Long>> getMarkerData(long j) {
        CompletableFuture<VersionedMetadata<Long>> completableFuture = new CompletableFuture<>();
        this.store.getData(ZKPaths.makePath(this.markerPath, String.format("%d", Long.valueOf(j))), bArr -> {
            return Long.valueOf(BitConverter.readLong(bArr, 0));
        }).whenComplete((versionedMetadata, th) -> {
            if (th == null) {
                completableFuture.complete(versionedMetadata);
                return;
            }
            Throwable unwrap = Exceptions.unwrap(th);
            if (unwrap instanceof StoreException.DataNotFoundException) {
                completableFuture.complete(null);
            } else {
                completableFuture.completeExceptionally(unwrap);
            }
        });
        return completableFuture;
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> removeMarkerData(long j) {
        String makePath = ZKPaths.makePath(this.markerPath, String.format("%d", Long.valueOf(j)));
        return getId().thenCompose(str -> {
            return this.store.deletePath(makePath, false).whenComplete((r7, th) -> {
                this.store.invalidateCache(makePath, str);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Map<UUID, ActiveTxnRecord>> getActiveTxns() {
        return this.store.getChildren(this.activeTxRoot).thenCompose(list -> {
            return Futures.allOfWithResults((List) list.stream().map(str -> {
                return getTxnInEpoch(Integer.parseInt(str));
            }).collect(Collectors.toList())).thenApply(list -> {
                HashMap hashMap = new HashMap();
                hashMap.getClass();
                list.forEach(hashMap::putAll);
                return hashMap;
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Map<UUID, ActiveTxnRecord>> getTxnInEpoch(int i) {
        VersionedMetadata emptyData = getEmptyData();
        return Futures.exceptionallyExpecting(this.store.getChildren(getEpochPath(i)), th -> {
            return Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException;
        }, Collections.emptyList()).thenCompose(list -> {
            return Futures.allOfWithResults((Map) list.stream().collect(Collectors.toMap(str -> {
                return str;
            }, str2 -> {
                return Futures.exceptionallyExpecting(this.store.getData(getActiveTxPath(i, str2), ActiveTxnRecord::fromBytes), th2 -> {
                    return Exceptions.unwrap(th2) instanceof StoreException.DataNotFoundException;
                }, emptyData);
            }))).thenApply(map -> {
                return (Map) map.entrySet().stream().filter(entry -> {
                    return !((VersionedMetadata) entry.getValue()).equals(emptyData);
                }).collect(Collectors.toMap(entry2 -> {
                    return UUID.fromString((String) entry2.getKey());
                }, entry3 -> {
                    return (ActiveTxnRecord) ((VersionedMetadata) entry3.getValue()).getObject();
                }));
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<List<Map.Entry<UUID, ActiveTxnRecord>>> getOrderedCommittingTxnInLowestEpoch() {
        return super.getOrderedCommittingTxnInLowestEpochHelper(this.txnCommitOrderer, this.executor);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    @VisibleForTesting
    CompletableFuture<Map<Long, UUID>> getAllOrderedCommittingTxns() {
        return super.getAllOrderedCommittingTxnsHelper(this.txnCommitOrderer);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> createNewTransaction(int i, UUID uuid, ActiveTxnRecord activeTxnRecord) {
        return this.store.createZNodeIfNotExist(getActiveTxPath(i, uuid.toString()), activeTxnRecord.toBytes(), true).thenApply((v1) -> {
            return new Version.IntVersion(v1);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<ActiveTxnRecord>> getActiveTx(int i, UUID uuid) {
        return this.store.getData(getActiveTxPath(i, uuid.toString()), ActiveTxnRecord::fromBytes);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateActiveTx(int i, UUID uuid, VersionedMetadata<ActiveTxnRecord> versionedMetadata) {
        return this.store.setData(getActiveTxPath(i, uuid.toString()), versionedMetadata.getObject().toBytes(), versionedMetadata.getVersion()).thenApply((v1) -> {
            return new Version.IntVersion(v1);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> removeActiveTxEntry(int i, UUID uuid) {
        return this.store.deletePath(getActiveTxPath(i, uuid.toString()), true);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Long> addTxnToCommitOrder(UUID uuid) {
        return this.txnCommitOrderer.addEntity(getScope(), getName(), uuid.toString());
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> removeTxnsFromCommitOrder(List<Long> list) {
        return this.txnCommitOrderer.removeEntities(getScope(), getName(), list);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createCompletedTxEntry(UUID uuid, CompletedTxnRecord completedTxnRecord) {
        return Futures.toVoid(this.store.createZNodeIfNotExist(ZKPaths.makePath(String.format(STREAM_COMPLETED_TX_BATCH_PATH, this.currentBatchSupplier.get(), getScope(), getName()), uuid.toString()), completedTxnRecord.toBytes()));
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<CompletedTxnRecord>> getCompletedTx(UUID uuid) {
        return getId().thenCompose(str -> {
            return this.store.getChildren("/transactions/completedTx/batches").thenCompose(list -> {
                return Futures.allOfWithResults((List) list.stream().map(str -> {
                    return this.store.getCachedData(ZKPaths.makePath(String.format(STREAM_COMPLETED_TX_BATCH_PATH, Long.valueOf(Long.parseLong(str)), getScope(), getName()), uuid.toString()), str, CompletedTxnRecord::fromBytes).exceptionally(th -> {
                        if (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException) {
                            return null;
                        }
                        log.error("Exception while trying to fetch completed transaction status", th);
                        throw new CompletionException(th);
                    });
                }).collect(Collectors.toList()));
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) list2 -> {
                Optional findFirst = list2.stream().filter((v0) -> {
                    return Objects.nonNull(v0);
                }).findFirst();
                if (findFirst.isPresent()) {
                    return CompletableFuture.completedFuture(findFirst.get());
                }
                throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Completed Txn not found");
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> createTruncationDataIfAbsent(StreamTruncationRecord streamTruncationRecord) {
        return Futures.toVoid(this.store.createZNodeIfNotExist(this.truncationPath, streamTruncationRecord.toBytes()));
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> setTruncationData(VersionedMetadata<StreamTruncationRecord> versionedMetadata) {
        return getId().thenCompose(str -> {
            return this.store.setData(this.truncationPath, ((StreamTruncationRecord) versionedMetadata.getObject()).toBytes(), versionedMetadata.getVersion()).thenApply(num -> {
                this.store.invalidateCache(this.truncationPath, str);
                return new Version.IntVersion(num.intValue());
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<StreamTruncationRecord>> getTruncationData(boolean z) {
        return getId().thenCompose(str -> {
            return z ? this.store.getData(this.truncationPath, StreamTruncationRecord::fromBytes) : this.store.getCachedData(this.truncationPath, str, StreamTruncationRecord::fromBytes);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> setConfigurationData(VersionedMetadata<StreamConfigurationRecord> versionedMetadata) {
        return getId().thenCompose(str -> {
            return this.store.setData(this.configurationPath, ((StreamConfigurationRecord) versionedMetadata.getObject()).toBytes(), versionedMetadata.getVersion()).thenApply(num -> {
                this.store.invalidateCache(this.configurationPath, str);
                return new Version.IntVersion(num.intValue());
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<StreamConfigurationRecord>> getConfigurationData(boolean z) {
        return getId().thenCompose(str -> {
            return z ? this.store.getData(this.configurationPath, StreamConfigurationRecord::fromBytes) : this.store.getCachedData(this.configurationPath, str, StreamConfigurationRecord::fromBytes);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> setStateData(VersionedMetadata<StateRecord> versionedMetadata) {
        return getId().thenCompose(str -> {
            return this.store.setData(this.statePath, ((StateRecord) versionedMetadata.getObject()).toBytes(), versionedMetadata.getVersion()).thenApply(num -> {
                this.store.invalidateCache(this.statePath, str);
                return new Version.IntVersion(num.intValue());
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<StateRecord>> getStateData(boolean z) {
        return getId().thenCompose(str -> {
            return z ? this.store.getData(this.statePath, StateRecord::fromBytes) : this.store.getCachedData(this.statePath, str, StateRecord::fromBytes);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createCommitTxnRecordIfAbsent(CommittingTransactionsRecord committingTransactionsRecord) {
        return Futures.toVoid(this.store.createZNodeIfNotExist(this.committingTxnsPath, committingTransactionsRecord.toBytes()));
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> getCommitTxnRecord() {
        return this.store.getData(this.committingTxnsPath, CommittingTransactionsRecord::fromBytes);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateCommittingTxnRecord(VersionedMetadata<CommittingTransactionsRecord> versionedMetadata) {
        return this.store.setData(this.committingTxnsPath, versionedMetadata.getObject().toBytes(), versionedMetadata.getVersion()).thenApply((v1) -> {
            return new Version.IntVersion(v1);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createWaitingRequestNodeIfAbsent(String str) {
        return Futures.toVoid(this.store.createZNodeIfNotExist(this.waitingRequestProcessorPath, str.getBytes(StandardCharsets.UTF_8)));
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<String> getWaitingRequestNode() {
        return this.store.getData(this.waitingRequestProcessorPath, bArr -> {
            return StandardCharsets.UTF_8.decode(ByteBuffer.wrap(bArr)).toString();
        }).thenApply((v0) -> {
            return v0.getObject();
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> deleteWaitingRequestNode() {
        return this.store.deletePath(this.waitingRequestProcessorPath, false);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createWriterMarkRecord(String str, long j, ImmutableMap<Long, Long> immutableMap) {
        return Futures.toVoid(this.store.createZNode(getWriterPath(str), new WriterMark(j, immutableMap).toBytes()));
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> removeWriterRecord(String str, Version version) {
        return this.store.deleteNode(getWriterPath(str), version);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<WriterMark>> getWriterMarkRecord(String str) {
        return this.store.getData(getWriterPath(str), WriterMark::fromBytes);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> updateWriterMarkRecord(String str, long j, ImmutableMap<Long, Long> immutableMap, boolean z, Version version) {
        return Futures.toVoid(this.store.setData(getWriterPath(str), new WriterMark(j, immutableMap, z).toBytes(), version));
    }

    private String getWriterPath(String str) {
        return ZKPaths.makePath(this.writerPositionsPath, str);
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Map<String, WriterMark>> getAllWriterMarks() {
        return this.store.getChildren(this.writerPositionsPath).thenCompose(list -> {
            return Futures.allOfWithResults((Map) list.stream().collect(Collectors.toMap(str -> {
                return str;
            }, str2 -> {
                return Futures.exceptionallyExpecting(getWriterMark(str2), AbstractStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, WriterMark.EMPTY);
            }))).thenApply(map -> {
                return (Map) map.entrySet().stream().filter(entry -> {
                    return !((WriterMark) entry.getValue()).equals(WriterMark.EMPTY);
                }).collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, (v0) -> {
                    return v0.getValue();
                }));
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public void refresh() {
        String andSet = this.idRef.getAndSet(null);
        String str = andSet == null ? "" : andSet;
        this.store.invalidateCache(this.statePath, str);
        this.store.invalidateCache(this.configurationPath, str);
        this.store.invalidateCache(this.truncationPath, str);
        this.store.invalidateCache(this.epochTransitionPath, str);
        this.store.invalidateCache(this.committingTxnsPath, str);
        this.store.invalidateCache(this.currentEpochRecordPath, str);
    }

    private CompletableFuture<String> getId() {
        String str = this.idRef.get();
        return !Strings.isNullOrEmpty(str) ? CompletableFuture.completedFuture(str) : Futures.exceptionallyExpecting(getStreamPosition().thenApply(num -> {
            String num = num.toString();
            this.idRef.compareAndSet(null, num);
            return num;
        }), th -> {
            return Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException;
        }, "");
    }

    @VisibleForTesting
    String getActiveTxPath(int i, String str) {
        return ZKPaths.makePath(ZKPaths.makePath(this.activeTxRoot, Integer.toString(i)), str);
    }

    private String getEpochPath(int i) {
        return ZKPaths.makePath(this.activeTxRoot, Integer.toString(i));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> createStreamPositionNodeIfAbsent(int i) {
        byte[] bArr = new byte[4];
        BitConverter.writeInt(bArr, 0, i);
        return Futures.toVoid(this.store.createZNodeIfNotExist(this.idPath, bArr));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Integer> getStreamPosition() {
        return this.store.getData(this.idPath, bArr -> {
            return Integer.valueOf(BitConverter.readInt(bArr, 0));
        }).thenApply((v0) -> {
            return v0.getObject();
        });
    }

    private static <T> VersionedMetadata<T> getEmptyData() {
        return new VersionedMetadata<>(null, new Version.IntVersion(Integer.MIN_VALUE));
    }

    @SuppressFBWarnings(justification = "generated code")
    String getCreationPath() {
        return this.creationPath;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @SuppressFBWarnings(justification = "generated code")
    public String getStreamPath() {
        return this.streamPath;
    }
}
