package io.pravega.controller.store.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.BitConverter;
import io.pravega.controller.store.stream.CreateStreamResponse;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.Version;
import io.pravega.controller.store.stream.records.ActiveTxnRecord;
import io.pravega.controller.store.stream.records.CommittingTransactionsRecord;
import io.pravega.controller.store.stream.records.CompletedTxnRecord;
import io.pravega.controller.store.stream.records.EpochRecord;
import io.pravega.controller.store.stream.records.EpochTransitionRecord;
import io.pravega.controller.store.stream.records.HistoryTimeSeries;
import io.pravega.controller.store.stream.records.RetentionSet;
import io.pravega.controller.store.stream.records.SealedSegmentsMapShard;
import io.pravega.controller.store.stream.records.StateRecord;
import io.pravega.controller.store.stream.records.StreamConfigurationRecord;
import io.pravega.controller.store.stream.records.StreamCutRecord;
import io.pravega.controller.store.stream.records.StreamTruncationRecord;
import io.pravega.controller.store.stream.records.WriterMark;
import io.pravega.shared.NameUtils;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/pravega/controller/store/stream/PravegaTablesStream.class */
public class PravegaTablesStream extends PersistentStreamBase {
    private static final String METADATA_TABLE = "metadata.#.%s";
    private static final String EPOCHS_WITH_TRANSACTIONS_TABLE = "epochsWithTransactions.#.%s";
    private static final String WRITERS_POSITIONS_TABLE = "writersPositions.#.%s";
    private static final String TRANSACTIONS_IN_EPOCH_TABLE_FORMAT = "transactionsInEpoch-%d.#.%s";
    private static final String CREATION_TIME_KEY = "creationTime";
    private static final String CONFIGURATION_KEY = "configuration";
    private static final String TRUNCATION_KEY = "truncation";
    private static final String STATE_KEY = "state";
    private static final String EPOCH_TRANSITION_KEY = "epochTransition";
    private static final String RETENTION_SET_KEY = "retention";
    private static final String RETENTION_STREAM_CUT_RECORD_KEY_FORMAT = "retentionCuts-%s";
    private static final String CURRENT_EPOCH_KEY = "currentEpochRecord";
    private static final String EPOCH_RECORD_KEY_FORMAT = "epochRecord-%d";
    private static final String HISTORY_TIMESERES_CHUNK_FORMAT = "historyTimeSeriesChunk-%d";
    private static final String SEGMENTS_SEALED_SIZE_MAP_SHARD_FORMAT = "segmentsSealedSizeMapShard-%d";
    private static final String SEGMENT_SEALED_EPOCH_KEY_FORMAT = "segmentSealedEpochPath-%d";
    private static final String COMMITTING_TRANSACTIONS_RECORD_KEY = "committingTxns";
    private static final String SEGMENT_MARKER_PATH_FORMAT = "markers-%d";
    private static final String WAITING_REQUEST_PROCESSOR_PATH = "waitingRequestProcessor";
    private static final String STREAM_KEY_PREFIX = "Key.#.%s.#.%s.#.";
    private static final String COMPLETED_TRANSACTIONS_KEY_FORMAT = "Key.#.%s.#.%s.#./%s";
    private final PravegaTablesStoreHelper storeHelper;
    private final Supplier<Integer> currentBatchSupplier;
    private final Supplier<CompletableFuture<String>> streamsInScopeTableNameSupplier;
    private final AtomicReference<String> idRef;
    private final ZkOrderedStore txnCommitOrderer;
    private final ScheduledExecutorService executor;

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(PravegaTablesStream.class);
    private static final VersionedMetadata<ActiveTxnRecord> NON_EXISTENT_TXN = new VersionedMetadata<>(ActiveTxnRecord.EMPTY, new Version.LongVersion(Long.MIN_VALUE));

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public PravegaTablesStream(String str, String str2, PravegaTablesStoreHelper pravegaTablesStoreHelper, ZkOrderedStore zkOrderedStore, Supplier<Integer> supplier, Supplier<CompletableFuture<String>> supplier2, ScheduledExecutorService scheduledExecutorService) {
        this(str, str2, pravegaTablesStoreHelper, zkOrderedStore, supplier, 1000, 1000, supplier2, scheduledExecutorService);
    }

    @VisibleForTesting
    PravegaTablesStream(String str, String str2, PravegaTablesStoreHelper pravegaTablesStoreHelper, ZkOrderedStore zkOrderedStore, Supplier<Integer> supplier, int i, int i2, Supplier<CompletableFuture<String>> supplier2, ScheduledExecutorService scheduledExecutorService) {
        super(str, str2, i, i2);
        this.storeHelper = pravegaTablesStoreHelper;
        this.txnCommitOrderer = zkOrderedStore;
        this.currentBatchSupplier = supplier;
        this.streamsInScopeTableNameSupplier = supplier2;
        this.idRef = new AtomicReference<>(null);
        this.executor = scheduledExecutorService;
    }

    private CompletableFuture<String> getId() {
        String str = this.idRef.get();
        return !Strings.isNullOrEmpty(str) ? CompletableFuture.completedFuture(str) : this.streamsInScopeTableNameSupplier.get().thenCompose(str2 -> {
            return this.storeHelper.getEntry(str2, getName(), bArr -> {
                return BitConverter.readUUID(bArr, 0);
            });
        }).thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) versionedMetadata -> {
            this.idRef.compareAndSet(null, ((UUID) versionedMetadata.getObject()).toString());
            return getId();
        });
    }

    private CompletableFuture<String> getMetadataTable() {
        return getId().thenApply(this::getMetadataTableName);
    }

    private String getMetadataTableName(String str) {
        return NameUtils.getQualifiedTableName("_system", new String[]{getScope(), getName(), String.format(METADATA_TABLE, str)});
    }

    private CompletableFuture<String> getEpochsWithTransactionsTable() {
        return getId().thenApply(this::getEpochsWithTransactionsTableName);
    }

    private String getEpochsWithTransactionsTableName(String str) {
        return NameUtils.getQualifiedTableName("_system", new String[]{getScope(), getName(), String.format(EPOCHS_WITH_TRANSACTIONS_TABLE, str)});
    }

    private CompletableFuture<String> getTransactionsInEpochTable(int i) {
        return getId().thenApply(str -> {
            return getTransactionsInEpochTableName(i, str);
        });
    }

    private String getTransactionsInEpochTableName(int i, String str) {
        return NameUtils.getQualifiedTableName("_system", new String[]{getScope(), getName(), String.format(TRANSACTIONS_IN_EPOCH_TABLE_FORMAT, Integer.valueOf(i), str)});
    }

    private CompletableFuture<String> getWritersTable() {
        return getId().thenApply(this::getWritersTableName);
    }

    private String getWritersTableName(String str) {
        return NameUtils.getQualifiedTableName("_system", new String[]{getScope(), getName(), String.format(WRITERS_POSITIONS_TABLE, str)});
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase, io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> completeCommittingTransactions(VersionedMetadata<CommittingTransactionsRecord> versionedMetadata) {
        long currentTimeMillis = System.currentTimeMillis();
        Map map = (Map) versionedMetadata.getObject().getTransactionsToCommit().stream().collect(Collectors.toMap((v0) -> {
            return v0.toString();
        }, uuid -> {
            return new CompletedTxnRecord(currentTimeMillis, TxnStatus.COMMITTED).toBytes();
        }));
        return (versionedMetadata.getObject().getTransactionsToCommit().size() == 0 ? CompletableFuture.completedFuture(null) : generateMarksForTransactions(versionedMetadata.getObject()).thenCompose(r5 -> {
            return createCompletedTxEntries(map);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r7 -> {
            return getTransactionsInEpochTable(((CommittingTransactionsRecord) versionedMetadata.getObject()).getEpoch()).thenCompose(str -> {
                return this.storeHelper.removeEntries(str, map.keySet());
            });
        }).thenCompose(r52 -> {
            return tryRemoveOlderTransactionsInEpochTables(num -> {
                return num.intValue() < ((CommittingTransactionsRecord) versionedMetadata.getObject()).getEpoch();
            });
        })).thenCompose(r8 -> {
            return Futures.toVoid(updateCommittingTxnRecord(new VersionedMetadata<>(CommittingTransactionsRecord.EMPTY, versionedMetadata.getVersion())));
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createStreamMetadata() {
        return getId().thenCompose(str -> {
            String metadataTableName = getMetadataTableName(str);
            String epochsWithTransactionsTableName = getEpochsWithTransactionsTableName(str);
            String writersTableName = getWritersTableName(str);
            return CompletableFuture.allOf(this.storeHelper.createTable(metadataTableName), this.storeHelper.createTable(epochsWithTransactionsTableName), this.storeHelper.createTable(writersTableName)).thenAccept(r11 -> {
                log.debug("stream {}/{} metadata tables {}, {} & {} created", new Object[]{getScope(), getName(), metadataTableName, epochsWithTransactionsTableName, writersTableName});
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<CreateStreamResponse> checkStreamExists(StreamConfiguration streamConfiguration, long j, int i) {
        return this.storeHelper.expectingDataNotFound(getCreationTime(), null).thenCompose(l -> {
            return l == null ? CompletableFuture.completedFuture(new CreateStreamResponse(CreateStreamResponse.CreateStatus.NEW, streamConfiguration, j, i)) : this.storeHelper.expectingDataNotFound(getConfiguration(), null).thenCompose(streamConfiguration2 -> {
                if (streamConfiguration2 != null) {
                    return handleConfigExists(l.longValue(), streamConfiguration2, i, l.longValue() == j);
                }
                return CompletableFuture.completedFuture(new CreateStreamResponse(CreateStreamResponse.CreateStatus.NEW, streamConfiguration, l.longValue(), i));
            });
        });
    }

    private CompletableFuture<CreateStreamResponse> handleConfigExists(long j, StreamConfiguration streamConfiguration, int i, boolean z) {
        CreateStreamResponse.CreateStatus createStatus = z ? CreateStreamResponse.CreateStatus.NEW : CreateStreamResponse.CreateStatus.EXISTS_CREATING;
        return this.storeHelper.expectingDataNotFound(getState(true), null).thenApply(state -> {
            return state == null ? new CreateStreamResponse(createStatus, streamConfiguration, j, i) : (state.equals(State.UNKNOWN) || state.equals(State.CREATING)) ? new CreateStreamResponse(createStatus, streamConfiguration, j, i) : new CreateStreamResponse(CreateStreamResponse.CreateStatus.EXISTS_ACTIVE, streamConfiguration, j, i);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Long> getCreationTime() {
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.getCachedData(str, CREATION_TIME_KEY, bArr -> {
                return Long.valueOf(BitConverter.readLong(bArr, 0));
            });
        }).thenApply((Function<? super U, ? extends U>) (v0) -> {
            return v0.getObject();
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> deleteStream() {
        return getId().thenCompose(str -> {
            return this.storeHelper.expectingDataNotFound(tryRemoveOlderTransactionsInEpochTables(num -> {
                return true;
            }), null).thenCompose(r6 -> {
                return getEpochsWithTransactionsTable().thenCompose(str -> {
                    return this.storeHelper.expectingDataNotFound(this.storeHelper.deleteTable(str, false), null);
                }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r6 -> {
                    return this.storeHelper.deleteTable(getMetadataTableName(str), false);
                });
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createRetentionSetDataIfAbsent(RetentionSet retentionSet) {
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.addNewEntryIfAbsent(str, RETENTION_SET_KEY, retentionSet.toBytes()).thenAccept(version -> {
                this.storeHelper.invalidateCache(str, RETENTION_SET_KEY);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<RetentionSet>> getRetentionSetData() {
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.getEntry(str, RETENTION_SET_KEY, RetentionSet::fromBytes);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateRetentionSetData(VersionedMetadata<RetentionSet> versionedMetadata) {
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.updateEntry(str, RETENTION_SET_KEY, ((RetentionSet) versionedMetadata.getObject()).toBytes(), versionedMetadata.getVersion()).thenApply(version -> {
                this.storeHelper.invalidateCache(str, RETENTION_SET_KEY);
                return version;
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createStreamCutRecordData(long j, StreamCutRecord streamCutRecord) {
        String format = String.format(RETENTION_STREAM_CUT_RECORD_KEY_FORMAT, Long.valueOf(j));
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.addNewEntryIfAbsent(str, format, streamCutRecord.toBytes()).thenAccept(version -> {
                this.storeHelper.invalidateCache(str, format);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<StreamCutRecord>> getStreamCutRecordData(long j) {
        String format = String.format(RETENTION_STREAM_CUT_RECORD_KEY_FORMAT, Long.valueOf(j));
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.getCachedData(str, format, StreamCutRecord::fromBytes);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> deleteStreamCutRecordData(long j) {
        String format = String.format(RETENTION_STREAM_CUT_RECORD_KEY_FORMAT, Long.valueOf(j));
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.removeEntry(str, format).thenAccept(r7 -> {
                this.storeHelper.invalidateCache(str, format);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createHistoryTimeSeriesChunkDataIfAbsent(int i, HistoryTimeSeries historyTimeSeries) {
        String format = String.format(HISTORY_TIMESERES_CHUNK_FORMAT, Integer.valueOf(i));
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.addNewEntryIfAbsent(str, format, historyTimeSeries.toBytes()).thenAccept(version -> {
                this.storeHelper.invalidateCache(str, format);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<HistoryTimeSeries>> getHistoryTimeSeriesChunkData(int i, boolean z) {
        String format = String.format(HISTORY_TIMESERES_CHUNK_FORMAT, Integer.valueOf(i));
        return getMetadataTable().thenCompose(str -> {
            return z ? this.storeHelper.getEntry(str, format, HistoryTimeSeries::fromBytes) : this.storeHelper.getCachedData(str, format, HistoryTimeSeries::fromBytes);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateHistoryTimeSeriesChunkData(int i, VersionedMetadata<HistoryTimeSeries> versionedMetadata) {
        String format = String.format(HISTORY_TIMESERES_CHUNK_FORMAT, Integer.valueOf(i));
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.updateEntry(str, format, ((HistoryTimeSeries) versionedMetadata.getObject()).toBytes(), versionedMetadata.getVersion()).thenApply(version -> {
                this.storeHelper.invalidateCache(str, format);
                return version;
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createCurrentEpochRecordDataIfAbsent(EpochRecord epochRecord) {
        byte[] bArr = new byte[4];
        BitConverter.writeInt(bArr, 0, epochRecord.getEpoch());
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.addNewEntryIfAbsent(str, CURRENT_EPOCH_KEY, bArr).thenAccept(version -> {
                this.storeHelper.invalidateCache(str, CURRENT_EPOCH_KEY);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateCurrentEpochRecordData(VersionedMetadata<EpochRecord> versionedMetadata) {
        byte[] bArr = new byte[4];
        BitConverter.writeInt(bArr, 0, versionedMetadata.getObject().getEpoch());
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.updateEntry(str, CURRENT_EPOCH_KEY, bArr, versionedMetadata.getVersion()).thenApply(version -> {
                this.storeHelper.invalidateCache(str, CURRENT_EPOCH_KEY);
                return version;
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<EpochRecord>> getCurrentEpochRecordData(boolean z) {
        return getMetadataTable().thenCompose(str -> {
            return (z ? this.storeHelper.getEntry(str, CURRENT_EPOCH_KEY, bArr -> {
                return Integer.valueOf(BitConverter.readInt(bArr, 0));
            }) : this.storeHelper.getCachedData(str, CURRENT_EPOCH_KEY, bArr2 -> {
                return Integer.valueOf(BitConverter.readInt(bArr2, 0));
            })).thenCompose(versionedMetadata -> {
                return getEpochRecord(((Integer) versionedMetadata.getObject()).intValue()).thenApply(epochRecord -> {
                    return new VersionedMetadata(epochRecord, versionedMetadata.getVersion());
                });
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createEpochRecordDataIfAbsent(int i, EpochRecord epochRecord) {
        String format = String.format(EPOCH_RECORD_KEY_FORMAT, Integer.valueOf(i));
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.addNewEntryIfAbsent(str, format, epochRecord.toBytes()).thenAccept(version -> {
                this.storeHelper.invalidateCache(str, format);
            });
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r6 -> {
            return epochRecord.getEpoch() == epochRecord.getReferenceEpoch() ? createTransactionsInEpochTable(i) : CompletableFuture.completedFuture(null);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<EpochRecord>> getEpochRecordData(int i) {
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.getCachedData(str, String.format(EPOCH_RECORD_KEY_FORMAT, Integer.valueOf(i)), EpochRecord::fromBytes);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createSealedSegmentSizesMapShardDataIfAbsent(int i, SealedSegmentsMapShard sealedSegmentsMapShard) {
        String format = String.format(SEGMENTS_SEALED_SIZE_MAP_SHARD_FORMAT, Integer.valueOf(i));
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.addNewEntryIfAbsent(str, format, sealedSegmentsMapShard.toBytes()).thenAccept(version -> {
                this.storeHelper.invalidateCache(str, format);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<SealedSegmentsMapShard>> getSealedSegmentSizesMapShardData(int i) {
        String format = String.format(SEGMENTS_SEALED_SIZE_MAP_SHARD_FORMAT, Integer.valueOf(i));
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.getEntry(str, format, SealedSegmentsMapShard::fromBytes);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateSealedSegmentSizesMapShardData(int i, VersionedMetadata<SealedSegmentsMapShard> versionedMetadata) {
        String format = String.format(SEGMENTS_SEALED_SIZE_MAP_SHARD_FORMAT, Integer.valueOf(i));
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.updateEntry(str, format, ((SealedSegmentsMapShard) versionedMetadata.getObject()).toBytes(), versionedMetadata.getVersion()).thenApply(version -> {
                this.storeHelper.invalidateCache(str, format);
                return version;
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createSegmentSealedEpochRecords(Collection<Long> collection, int i) {
        byte[] bArr = new byte[4];
        BitConverter.writeInt(bArr, 0, i);
        Map map = (Map) collection.stream().collect(Collectors.toMap(l -> {
            return String.format(SEGMENT_SEALED_EPOCH_KEY_FORMAT, l);
        }, l2 -> {
            return bArr;
        }));
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.addNewEntriesIfAbsent(str, map);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<Integer>> getSegmentSealedRecordData(long j) {
        String format = String.format(SEGMENT_SEALED_EPOCH_KEY_FORMAT, Long.valueOf(j));
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.getCachedData(str, format, bArr -> {
                return Integer.valueOf(BitConverter.readInt(bArr, 0));
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createEpochTransitionIfAbsent(EpochTransitionRecord epochTransitionRecord) {
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.addNewEntryIfAbsent(str, EPOCH_TRANSITION_KEY, epochTransitionRecord.toBytes()).thenAccept(version -> {
                this.storeHelper.invalidateCache(str, EPOCH_TRANSITION_KEY);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateEpochTransitionNode(VersionedMetadata<EpochTransitionRecord> versionedMetadata) {
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.updateEntry(str, EPOCH_TRANSITION_KEY, ((EpochTransitionRecord) versionedMetadata.getObject()).toBytes(), versionedMetadata.getVersion()).thenApply(version -> {
                this.storeHelper.invalidateCache(str, EPOCH_TRANSITION_KEY);
                return version;
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<EpochTransitionRecord>> getEpochTransitionNode() {
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.getEntry(str, EPOCH_TRANSITION_KEY, EpochTransitionRecord::fromBytes);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> storeCreationTimeIfAbsent(long j) {
        byte[] bArr = new byte[8];
        BitConverter.writeLong(bArr, 0, j);
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.addNewEntryIfAbsent(str, CREATION_TIME_KEY, bArr).thenAccept(version -> {
                this.storeHelper.invalidateCache(str, CREATION_TIME_KEY);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> createConfigurationIfAbsent(StreamConfigurationRecord streamConfigurationRecord) {
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.addNewEntryIfAbsent(str, CONFIGURATION_KEY, streamConfigurationRecord.toBytes()).thenAccept(version -> {
                this.storeHelper.invalidateCache(str, CONFIGURATION_KEY);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> createStateIfAbsent(StateRecord stateRecord) {
        return getMetadataTable().thenCompose(str -> {
            return Futures.toVoid(this.storeHelper.addNewEntryIfAbsent(str, STATE_KEY, stateRecord.toBytes()));
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> createMarkerData(long j, long j2) {
        String format = String.format(SEGMENT_MARKER_PATH_FORMAT, Long.valueOf(j));
        byte[] bArr = new byte[8];
        BitConverter.writeLong(bArr, 0, j2);
        return getMetadataTable().thenCompose(str -> {
            return Futures.toVoid(this.storeHelper.addNewEntryIfAbsent(str, format, bArr));
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateMarkerData(long j, VersionedMetadata<Long> versionedMetadata) {
        String format = String.format(SEGMENT_MARKER_PATH_FORMAT, Long.valueOf(j));
        byte[] bArr = new byte[8];
        BitConverter.writeLong(bArr, 0, versionedMetadata.getObject().longValue());
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.updateEntry(str, format, bArr, versionedMetadata.getVersion());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<Long>> getMarkerData(long j) {
        String format = String.format(SEGMENT_MARKER_PATH_FORMAT, Long.valueOf(j));
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.expectingDataNotFound(this.storeHelper.getEntry(str, format, bArr -> {
                return Long.valueOf(BitConverter.readLong(bArr, 0));
            }), null);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> removeMarkerData(long j) {
        String format = String.format(SEGMENT_MARKER_PATH_FORMAT, Long.valueOf(j));
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.removeEntry(str, format);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Map<UUID, ActiveTxnRecord>> getActiveTxns() {
        return getEpochsWithTransactions().thenCompose(list -> {
            return Futures.allOfWithResults((List) list.stream().map((v1) -> {
                return getTxnInEpoch(v1);
            }).collect(Collectors.toList()));
        }).thenApply((Function<? super U, ? extends U>) list2 -> {
            HashMap hashMap = new HashMap();
            hashMap.getClass();
            list2.forEach(hashMap::putAll);
            return hashMap;
        });
    }

    private CompletableFuture<List<Integer>> getEpochsWithTransactions() {
        return getEpochsWithTransactionsTable().thenCompose(str -> {
            ArrayList arrayList = new ArrayList();
            return this.storeHelper.getAllKeys(str).collectRemaining(str -> {
                arrayList.add(Integer.valueOf(Integer.parseInt(str)));
                return true;
            }).thenApply(r3 -> {
                return arrayList;
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Integer> getNumberOfOngoingTransactions() {
        ArrayList arrayList = new ArrayList();
        return getEpochsWithTransactionsTable().thenCompose(str -> {
            return this.storeHelper.getAllKeys(str).forEachRemaining(str -> {
                arrayList.add(getNumberOfOngoingTransactions(Integer.parseInt(str)));
            }, this.executor).thenCompose(r4 -> {
                return Futures.allOfWithResults(arrayList).thenApply(list -> {
                    return (Integer) list.stream().reduce(0, (v0, v1) -> {
                        return Integer.sum(v0, v1);
                    });
                });
            });
        });
    }

    private CompletableFuture<Integer> getNumberOfOngoingTransactions(int i) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        return getTransactionsInEpochTable(i).thenCompose(str -> {
            return this.storeHelper.getAllKeys(str).forEachRemaining(str -> {
                atomicInteger.incrementAndGet();
            }, this.executor).thenApply(r3 -> {
                return Integer.valueOf(atomicInteger.get());
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<List<Map.Entry<UUID, ActiveTxnRecord>>> getOrderedCommittingTxnInLowestEpoch() {
        return super.getOrderedCommittingTxnInLowestEpochHelper(this.txnCommitOrderer, this.executor);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    @VisibleForTesting
    CompletableFuture<Map<Long, UUID>> getAllOrderedCommittingTxns() {
        return super.getAllOrderedCommittingTxnsHelper(this.txnCommitOrderer);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<List<ActiveTxnRecord>> getTransactionRecords(int i, List<String> list) {
        return getTransactionsInEpochTable(i).thenCompose(str -> {
            return this.storeHelper.getEntries(str, list, ActiveTxnRecord::fromBytes, NON_EXISTENT_TXN);
        }).thenApply((Function<? super U, ? extends U>) list2 -> {
            return (List) list2.stream().map((v0) -> {
                return v0.getObject();
            }).collect(Collectors.toList());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Map<UUID, ActiveTxnRecord>> getTxnInEpoch(int i) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        return getTransactionsInEpochTable(i).thenCompose(str -> {
            return this.storeHelper.expectingDataNotFound(this.storeHelper.getAllEntries(str, ActiveTxnRecord::fromBytes).collectRemaining(entry -> {
                concurrentHashMap.put(UUID.fromString((String) entry.getKey()), ((VersionedMetadata) entry.getValue()).getObject());
                return true;
            }).thenApply(r3 -> {
                return concurrentHashMap;
            }), Collections.emptyMap());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Version> createNewTransaction(int i, UUID uuid, ActiveTxnRecord activeTxnRecord) {
        return getTransactionsInEpochTable(i).thenCompose(str -> {
            return this.storeHelper.addNewEntryIfAbsent(str, uuid.toString(), activeTxnRecord.toBytes());
        });
    }

    private CompletableFuture<Void> createTransactionsInEpochTable(int i) {
        return getEpochsWithTransactionsTable().thenCompose(str -> {
            return this.storeHelper.addNewEntryIfAbsent(str, Integer.toString(i), new byte[0]);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) version -> {
            CompletableFuture<String> transactionsInEpochTable = getTransactionsInEpochTable(i);
            PravegaTablesStoreHelper pravegaTablesStoreHelper = this.storeHelper;
            pravegaTablesStoreHelper.getClass();
            return transactionsInEpochTable.thenCompose(pravegaTablesStoreHelper::createTable);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<ActiveTxnRecord>> getActiveTx(int i, UUID uuid) {
        return getTransactionsInEpochTable(i).thenCompose(str -> {
            return this.storeHelper.getEntry(str, uuid.toString(), ActiveTxnRecord::fromBytes);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateActiveTx(int i, UUID uuid, VersionedMetadata<ActiveTxnRecord> versionedMetadata) {
        return getTransactionsInEpochTable(i).thenCompose(str -> {
            return this.storeHelper.updateEntry(str, uuid.toString(), ((ActiveTxnRecord) versionedMetadata.getObject()).toBytes(), versionedMetadata.getVersion());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Long> addTxnToCommitOrder(UUID uuid) {
        return this.txnCommitOrderer.addEntity(getScope(), getName(), uuid.toString());
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> removeTxnsFromCommitOrder(List<Long> list) {
        return this.txnCommitOrderer.removeEntities(getScope(), getName(), list);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> removeActiveTxEntry(int i, UUID uuid) {
        return getTransactionsInEpochTable(i).thenCompose(str -> {
            return this.storeHelper.removeEntry(str, uuid.toString());
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r5 -> {
            return tryRemoveOlderTransactionsInEpochTables(num -> {
                return num.intValue() < i;
            });
        });
    }

    private CompletableFuture<Void> tryRemoveOlderTransactionsInEpochTables(Predicate<Integer> predicate) {
        return getEpochsWithTransactions().thenCompose(list -> {
            return Futures.allOf((Collection) list.stream().filter(predicate).map((v1) -> {
                return tryRemoveTransactionsInEpochTable(v1);
            }).collect(Collectors.toList()));
        });
    }

    private CompletableFuture<Void> tryRemoveTransactionsInEpochTable(int i) {
        return getTransactionsInEpochTable(i).thenCompose(str -> {
            return this.storeHelper.deleteTable(str, true).handle((r4, th) -> {
                if (th != null && !PravegaTablesStreamMetadataStore.DATA_NOT_FOUND_PREDICATE.test(th)) {
                    if (AbstractStreamMetadataStore.DATA_NOT_EMPTY_PREDICATE.test(th)) {
                        return false;
                    }
                    throw new CompletionException(th);
                }
                return true;
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) bool -> {
                return bool.booleanValue() ? getEpochsWithTransactionsTable().thenCompose(str -> {
                    return this.storeHelper.removeEntry(str, Integer.toString(i));
                }) : CompletableFuture.completedFuture(null);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createCompletedTxEntry(UUID uuid, CompletedTxnRecord completedTxnRecord) {
        return createCompletedTxEntries(Collections.singletonMap(uuid.toString(), completedTxnRecord.toBytes()));
    }

    private CompletableFuture<Void> createCompletedTxEntries(Map<String, byte[]> map) {
        Integer num = this.currentBatchSupplier.get();
        String completedTransactionsBatchTableName = getCompletedTransactionsBatchTableName(num.intValue());
        Map<String, byte[]> map2 = (Map) map.entrySet().stream().collect(Collectors.toMap(entry -> {
            return getCompletedTransactionKey(getScope(), getName(), (String) entry.getKey());
        }, (v0) -> {
            return v0.getValue();
        }));
        return Futures.toVoid(Futures.exceptionallyComposeExpecting(this.storeHelper.addNewEntriesIfAbsent(completedTransactionsBatchTableName, map2), PravegaTablesStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, () -> {
            return tryCreateBatchTable(num.intValue()).thenCompose(r7 -> {
                return this.storeHelper.addNewEntriesIfAbsent(completedTransactionsBatchTableName, map2);
            });
        })).exceptionally(th -> {
            throw new CompletionException(th);
        });
    }

    @VisibleForTesting
    static String getCompletedTransactionKey(String str, String str2, String str3) {
        return String.format(COMPLETED_TRANSACTIONS_KEY_FORMAT, str, str2, str3);
    }

    @VisibleForTesting
    static String getCompletedTransactionsBatchTableName(int i) {
        return NameUtils.getQualifiedTableName("_system", new String[]{String.format("completedTransactionsBatch-%d", Integer.valueOf(i))});
    }

    private CompletableFuture<Void> tryCreateBatchTable(int i) {
        String completedTransactionsBatchTableName = getCompletedTransactionsBatchTableName(i);
        return this.storeHelper.createTable(PravegaTablesStreamMetadataStore.COMPLETED_TRANSACTIONS_BATCHES_TABLE).thenAccept(r4 -> {
            log.debug("batches root table {} created", PravegaTablesStreamMetadataStore.COMPLETED_TRANSACTIONS_BATCHES_TABLE);
        }).thenCompose(r7 -> {
            return this.storeHelper.addNewEntryIfAbsent(PravegaTablesStreamMetadataStore.COMPLETED_TRANSACTIONS_BATCHES_TABLE, Integer.toString(i), new byte[0]);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) version -> {
            return this.storeHelper.createTable(completedTransactionsBatchTableName);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<CompletedTxnRecord>> getCompletedTx(UUID uuid) {
        ArrayList arrayList = new ArrayList();
        return this.storeHelper.getAllKeys(PravegaTablesStreamMetadataStore.COMPLETED_TRANSACTIONS_BATCHES_TABLE).collectRemaining(str -> {
            arrayList.add(Integer.valueOf(Integer.parseInt(str)));
            return true;
        }).thenCompose(r7 -> {
            return Futures.allOfWithResults((List) arrayList.stream().map(num -> {
                return this.storeHelper.expectingDataNotFound(this.storeHelper.getCachedData(getCompletedTransactionsBatchTableName(num.intValue()), getCompletedTransactionKey(getScope(), getName(), uuid.toString()), CompletedTxnRecord::fromBytes), null);
            }).collect(Collectors.toList()));
        }).thenCompose(list -> {
            Optional findFirst = list.stream().filter((v0) -> {
                return Objects.nonNull(v0);
            }).findFirst();
            if (findFirst.isPresent()) {
                return CompletableFuture.completedFuture(findFirst.get());
            }
            throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Completed Txn not found");
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> createTruncationDataIfAbsent(StreamTruncationRecord streamTruncationRecord) {
        return getMetadataTable().thenCompose(str -> {
            return Futures.toVoid(this.storeHelper.addNewEntryIfAbsent(str, TRUNCATION_KEY, streamTruncationRecord.toBytes()));
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> setTruncationData(VersionedMetadata<StreamTruncationRecord> versionedMetadata) {
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.updateEntry(str, TRUNCATION_KEY, ((StreamTruncationRecord) versionedMetadata.getObject()).toBytes(), versionedMetadata.getVersion()).thenApply(version -> {
                this.storeHelper.invalidateCache(str, TRUNCATION_KEY);
                return version;
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<StreamTruncationRecord>> getTruncationData(boolean z) {
        return getMetadataTable().thenCompose(str -> {
            return z ? this.storeHelper.getEntry(str, TRUNCATION_KEY, StreamTruncationRecord::fromBytes) : this.storeHelper.getCachedData(str, TRUNCATION_KEY, StreamTruncationRecord::fromBytes);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> setConfigurationData(VersionedMetadata<StreamConfigurationRecord> versionedMetadata) {
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.updateEntry(str, CONFIGURATION_KEY, ((StreamConfigurationRecord) versionedMetadata.getObject()).toBytes(), versionedMetadata.getVersion()).thenApply(version -> {
                this.storeHelper.invalidateCache(str, CONFIGURATION_KEY);
                return version;
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<StreamConfigurationRecord>> getConfigurationData(boolean z) {
        return getMetadataTable().thenCompose(str -> {
            return z ? this.storeHelper.getEntry(str, CONFIGURATION_KEY, StreamConfigurationRecord::fromBytes) : this.storeHelper.getCachedData(str, CONFIGURATION_KEY, StreamConfigurationRecord::fromBytes);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> setStateData(VersionedMetadata<StateRecord> versionedMetadata) {
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.updateEntry(str, STATE_KEY, ((StateRecord) versionedMetadata.getObject()).toBytes(), versionedMetadata.getVersion()).thenApply(version -> {
                this.storeHelper.invalidateCache(str, STATE_KEY);
                return version;
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<StateRecord>> getStateData(boolean z) {
        return getMetadataTable().thenCompose(str -> {
            return z ? this.storeHelper.getEntry(str, STATE_KEY, StateRecord::fromBytes) : this.storeHelper.getCachedData(str, STATE_KEY, StateRecord::fromBytes);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createCommitTxnRecordIfAbsent(CommittingTransactionsRecord committingTransactionsRecord) {
        return getMetadataTable().thenCompose(str -> {
            return Futures.toVoid(this.storeHelper.addNewEntryIfAbsent(str, COMMITTING_TRANSACTIONS_RECORD_KEY, committingTransactionsRecord.toBytes()));
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> getCommitTxnRecord() {
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.getEntry(str, COMMITTING_TRANSACTIONS_RECORD_KEY, CommittingTransactionsRecord::fromBytes);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateCommittingTxnRecord(VersionedMetadata<CommittingTransactionsRecord> versionedMetadata) {
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.updateEntry(str, COMMITTING_TRANSACTIONS_RECORD_KEY, ((CommittingTransactionsRecord) versionedMetadata.getObject()).toBytes(), versionedMetadata.getVersion());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createWaitingRequestNodeIfAbsent(String str) {
        return getMetadataTable().thenCompose(str2 -> {
            return Futures.toVoid(this.storeHelper.addNewEntryIfAbsent(str2, WAITING_REQUEST_PROCESSOR_PATH, str.getBytes(StandardCharsets.UTF_8)));
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<String> getWaitingRequestNode() {
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.getEntry(str, WAITING_REQUEST_PROCESSOR_PATH, bArr -> {
                return StandardCharsets.UTF_8.decode(ByteBuffer.wrap(bArr)).toString();
            });
        }).thenApply((Function<? super U, ? extends U>) (v0) -> {
            return v0.getObject();
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> deleteWaitingRequestNode() {
        return getMetadataTable().thenCompose(str -> {
            return this.storeHelper.removeEntry(str, WAITING_REQUEST_PROCESSOR_PATH);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createWriterMarkRecord(String str, long j, ImmutableMap<Long, Long> immutableMap) {
        WriterMark writerMark = new WriterMark(j, immutableMap);
        return Futures.toVoid(getWritersTable().thenCompose(str2 -> {
            return this.storeHelper.addNewEntry(str2, str, writerMark.toBytes());
        }));
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> removeWriterRecord(String str, Version version) {
        return getWritersTable().thenCompose(str2 -> {
            return this.storeHelper.removeEntry(str2, str, version);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<WriterMark>> getWriterMarkRecord(String str) {
        return getWritersTable().thenCompose(str2 -> {
            return this.storeHelper.getEntry(str2, str, WriterMark::fromBytes);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> updateWriterMarkRecord(String str, long j, ImmutableMap<Long, Long> immutableMap, boolean z, Version version) {
        WriterMark writerMark = new WriterMark(j, immutableMap, z);
        return Futures.toVoid(getWritersTable().thenCompose(str2 -> {
            return this.storeHelper.updateEntry(str2, str, writerMark.toBytes(), version);
        }));
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Map<String, WriterMark>> getAllWriterMarks() {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        return getWritersTable().thenCompose(str -> {
            return this.storeHelper.getAllEntries(str, WriterMark::fromBytes).collectRemaining(entry -> {
                concurrentHashMap.put(entry.getKey(), ((VersionedMetadata) entry.getValue()).getObject());
                return true;
            });
        }).thenApply((Function<? super U, ? extends U>) r3 -> {
            return concurrentHashMap;
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public void refresh() {
        String andSet = this.idRef.getAndSet(null);
        if (Strings.isNullOrEmpty(andSet)) {
            return;
        }
        this.storeHelper.invalidateCache(getMetadataTableName(andSet), STATE_KEY);
        this.storeHelper.invalidateCache(getMetadataTableName(andSet), CONFIGURATION_KEY);
        this.storeHelper.invalidateCache(getMetadataTableName(andSet), TRUNCATION_KEY);
        this.storeHelper.invalidateCache(getMetadataTableName(andSet), EPOCH_TRANSITION_KEY);
        this.storeHelper.invalidateCache(getMetadataTableName(andSet), COMMITTING_TRANSACTIONS_RECORD_KEY);
        this.storeHelper.invalidateCache(getMetadataTableName(andSet), CURRENT_EPOCH_KEY);
    }
}
