package io.pravega.controller.store.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.tracing.TagLogger;
import io.pravega.controller.store.PravegaTablesStoreHelper;
import io.pravega.controller.store.Version;
import io.pravega.controller.store.VersionedMetadata;
import io.pravega.controller.store.stream.CreateStreamResponse;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.records.ActiveTxnRecord;
import io.pravega.controller.store.stream.records.CommittingTransactionsRecord;
import io.pravega.controller.store.stream.records.CompletedTxnRecord;
import io.pravega.controller.store.stream.records.EpochRecord;
import io.pravega.controller.store.stream.records.EpochTransitionRecord;
import io.pravega.controller.store.stream.records.HistoryTimeSeries;
import io.pravega.controller.store.stream.records.RetentionSet;
import io.pravega.controller.store.stream.records.SealedSegmentsMapShard;
import io.pravega.controller.store.stream.records.StateRecord;
import io.pravega.controller.store.stream.records.StreamConfigurationRecord;
import io.pravega.controller.store.stream.records.StreamCutRecord;
import io.pravega.controller.store.stream.records.StreamSubscriber;
import io.pravega.controller.store.stream.records.StreamTruncationRecord;
import io.pravega.controller.store.stream.records.Subscribers;
import io.pravega.controller.store.stream.records.WriterMark;
import io.pravega.controller.util.Config;
import io.pravega.shared.NameUtils;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/pravega/controller/store/stream/PravegaTablesStream.class */
public class PravegaTablesStream extends PersistentStreamBase {
    private static final String STREAM_KEY_PREFIX = "Key.#.%s.#.%s.#.";
    private static final String COMPLETED_TRANSACTIONS_KEY_FORMAT = "Key.#.%s.#.%s.#./%s";
    private final PravegaTablesStoreHelper storeHelper;
    private final Supplier<Integer> currentBatchSupplier;
    private final BiFunction<Boolean, OperationContext, CompletableFuture<String>> streamsInScopeTableNameSupplier;
    private final AtomicReference<String> idRef;
    private final ZkOrderedStore txnCommitOrderer;
    private final ScheduledExecutorService executor;
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(PravegaTablesStream.class));
    private static final VersionedMetadata<ActiveTxnRecord> NON_EXISTENT_TXN = new VersionedMetadata<>(ActiveTxnRecord.EMPTY, new Version.LongVersion(Long.MIN_VALUE));

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public PravegaTablesStream(String str, String str2, PravegaTablesStoreHelper pravegaTablesStoreHelper, ZkOrderedStore zkOrderedStore, Supplier<Integer> supplier, BiFunction<Boolean, OperationContext, CompletableFuture<String>> biFunction, ScheduledExecutorService scheduledExecutorService) {
        this(str, str2, pravegaTablesStoreHelper, zkOrderedStore, supplier, 1000, 1000, biFunction, scheduledExecutorService);
    }

    @VisibleForTesting
    PravegaTablesStream(String str, String str2, PravegaTablesStoreHelper pravegaTablesStoreHelper, ZkOrderedStore zkOrderedStore, Supplier<Integer> supplier, int i, int i2, BiFunction<Boolean, OperationContext, CompletableFuture<String>> biFunction, ScheduledExecutorService scheduledExecutorService) {
        super(str, str2, i, i2);
        this.storeHelper = pravegaTablesStoreHelper;
        this.txnCommitOrderer = zkOrderedStore;
        this.currentBatchSupplier = supplier;
        this.streamsInScopeTableNameSupplier = biFunction;
        this.idRef = new AtomicReference<>(null);
        this.executor = scheduledExecutorService;
    }

    private CompletableFuture<String> getId(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        String str = this.idRef.get();
        return !Strings.isNullOrEmpty(str) ? CompletableFuture.completedFuture(str) : this.storeHelper.loadFromTableHandleStaleTableName(this.streamsInScopeTableNameSupplier, getName(), PravegaTablesStoreHelper.BYTES_TO_UUID_FUNCTION, operationContext).thenComposeAsync(versionedMetadata -> {
            this.idRef.compareAndSet(null, ((UUID) versionedMetadata.getObject()).toString());
            return getId(operationContext);
        });
    }

    private CompletableFuture<String> getMetadataTable(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getId(operationContext).thenApply(this::getMetadataTableName);
    }

    private String getMetadataTableName(String str) {
        return NameUtils.getQualifiedTableName("_system", new String[]{getScope(), getName(), String.format("metadata.#.%s", str)});
    }

    private CompletableFuture<String> getEpochsWithTransactionsTable(OperationContext operationContext) {
        return getId(operationContext).thenApply(this::getEpochsWithTransactionsTableName);
    }

    private String getEpochsWithTransactionsTableName(String str) {
        return NameUtils.getQualifiedTableName("_system", new String[]{getScope(), getName(), String.format("epochsWithTransactions.#.%s", str)});
    }

    private CompletableFuture<String> getTransactionsInEpochTable(int i, OperationContext operationContext) {
        return getId(operationContext).thenApply(str -> {
            return getTransactionsInEpochTableName(i, str);
        });
    }

    private String getTransactionsInEpochTableName(int i, String str) {
        return NameUtils.getQualifiedTableName("_system", new String[]{getScope(), getName(), String.format("transactionsInEpoch-%s.#.%s", Integer.valueOf(i), str)});
    }

    private CompletableFuture<String> getWritersTable(OperationContext operationContext) {
        return getId(operationContext).thenApply(this::getWritersTableName);
    }

    private String getWritersTableName(String str) {
        return NameUtils.getQualifiedTableName("_system", new String[]{getScope(), getName(), String.format("writersPositions.#.%s", str)});
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase, io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> completeCommittingTransactions(VersionedMetadata<CommittingTransactionsRecord> versionedMetadata, OperationContext operationContext, Map<String, TxnWriterMark> map) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList(versionedMetadata.getObject().getTransactionsToCommit().size());
        ArrayList arrayList2 = new ArrayList(versionedMetadata.getObject().getTransactionsToCommit().size());
        versionedMetadata.getObject().getTransactionsToCommit().forEach(uuid -> {
            arrayList.add(new AbstractMap.SimpleEntry(getCompletedTransactionKey(getScope(), getName(), uuid.toString()), new CompletedTxnRecord(currentTimeMillis, TxnStatus.COMMITTED)));
            arrayList2.add(uuid.toString());
        });
        return (versionedMetadata.getObject().getTransactionsToCommit().size() == 0 ? CompletableFuture.completedFuture(null) : generateMarksForTransactions(operationContext, map).thenCompose(r7 -> {
            return createCompletedTxEntries(arrayList, operationContext);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r9 -> {
            return getTransactionsInEpochTable(((CommittingTransactionsRecord) versionedMetadata.getObject()).getEpoch(), operationContext).thenCompose(str -> {
                return this.storeHelper.removeEntries(str, arrayList2, operationContext.getRequestId());
            });
        }).thenCompose(r72 -> {
            return tryRemoveOlderTransactionsInEpochTables(num -> {
                return num.intValue() < ((CommittingTransactionsRecord) versionedMetadata.getObject()).getEpoch();
            }, operationContext);
        })).thenCompose(r92 -> {
            return Futures.toVoid(updateCommittingTxnRecord(new VersionedMetadata<>(CommittingTransactionsRecord.EMPTY, versionedMetadata.getVersion()), operationContext));
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createStreamMetadata(OperationContext operationContext) {
        return getId(operationContext).thenCompose(str -> {
            String metadataTableName = getMetadataTableName(str);
            String epochsWithTransactionsTableName = getEpochsWithTransactionsTableName(str);
            String writersTableName = getWritersTableName(str);
            return CompletableFuture.allOf(this.storeHelper.createTable(metadataTableName, operationContext.getRequestId()), this.storeHelper.createTable(epochsWithTransactionsTableName, operationContext.getRequestId()), this.storeHelper.createTable(writersTableName, operationContext.getRequestId())).thenAccept(r14 -> {
                log.debug(operationContext.getRequestId(), "stream {}/{} metadata tables {}, {} & {} created", new Object[]{getScope(), getName(), metadataTableName, epochsWithTransactionsTableName, writersTableName});
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<CreateStreamResponse> checkStreamExists(StreamConfiguration streamConfiguration, long j, int i, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return this.storeHelper.expectingDataNotFound(getCreationTime(operationContext), null).thenCompose(l -> {
            return l == null ? CompletableFuture.completedFuture(new CreateStreamResponse(CreateStreamResponse.CreateStatus.NEW, streamConfiguration, j, i)) : this.storeHelper.expectingDataNotFound(getConfiguration(operationContext), null).thenCompose(streamConfiguration2 -> {
                if (streamConfiguration2 != null) {
                    return handleConfigExists(l.longValue(), streamConfiguration2, i, l.longValue() == j, operationContext);
                }
                return CompletableFuture.completedFuture(new CreateStreamResponse(CreateStreamResponse.CreateStatus.NEW, streamConfiguration, l.longValue(), i));
            });
        });
    }

    private CompletableFuture<CreateStreamResponse> handleConfigExists(long j, StreamConfiguration streamConfiguration, int i, boolean z, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        CreateStreamResponse.CreateStatus createStatus = z ? CreateStreamResponse.CreateStatus.NEW : CreateStreamResponse.CreateStatus.EXISTS_CREATING;
        return this.storeHelper.expectingDataNotFound(getState(true, operationContext), null).thenApply(state -> {
            return state == null ? new CreateStreamResponse(createStatus, streamConfiguration, j, i) : (state.equals(State.UNKNOWN) || state.equals(State.CREATING)) ? new CreateStreamResponse(createStatus, streamConfiguration, j, i) : new CreateStreamResponse(CreateStreamResponse.CreateStatus.EXISTS_ACTIVE, streamConfiguration, j, i);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Long> getCreationTime(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.getCachedOrLoad(str, "creationTime", PravegaTablesStoreHelper.BYTES_TO_LONG_FUNCTION, 0L, operationContext.getRequestId());
        }).thenApply((Function<? super U, ? extends U>) (v0) -> {
            return v0.getObject();
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> addSubscriber(String str, long j, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return createSubscribersRecordIfAbsent(operationContext).thenCompose(r6 -> {
            return getSubscriberSetRecord(true, operationContext);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) versionedMetadata -> {
            return !((Subscribers) versionedMetadata.getObject()).getSubscribers().contains(str) ? getMetadataTable(operationContext).thenCompose(str2 -> {
                return this.storeHelper.updateEntry(str2, "subscriberset", Subscribers.add((Subscribers) versionedMetadata.getObject(), str), (v0) -> {
                    return v0.toBytes();
                }, versionedMetadata.getVersion(), operationContext.getRequestId());
            }) : CompletableFuture.completedFuture(null);
        }).thenCompose(version -> {
            return Futures.exceptionallyExpecting(getSubscriberRecord(str, operationContext), th -> {
                return Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException;
            }, (Object) null).thenCompose(versionedMetadata2 -> {
                if (versionedMetadata2 != null) {
                    return ((StreamSubscriber) versionedMetadata2.getObject()).getGeneration() < j ? Futures.toVoid(setSubscriberData(new VersionedMetadata<>(new StreamSubscriber(str, j, ((StreamSubscriber) versionedMetadata2.getObject()).getTruncationStreamCut(), System.currentTimeMillis()), versionedMetadata2.getVersion()), operationContext)) : CompletableFuture.completedFuture(null);
                }
                StreamSubscriber streamSubscriber = new StreamSubscriber(str, j, ImmutableMap.of(), System.currentTimeMillis());
                return Futures.toVoid(getMetadataTable(operationContext).thenApply(str2 -> {
                    return this.storeHelper.addNewEntryIfAbsent(str2, getKeyForSubscriber(str), streamSubscriber, (v0) -> {
                        return v0.toBytes();
                    }, operationContext.getRequestId());
                }));
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> createSubscribersRecordIfAbsent(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return Futures.exceptionallyExpecting(getSubscriberSetRecord(true, operationContext), th -> {
            return Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException;
        }, (Object) null).thenCompose(versionedMetadata -> {
            return versionedMetadata == null ? Futures.toVoid(getMetadataTable(operationContext).thenCompose(str -> {
                return this.storeHelper.addNewEntryIfAbsent(str, "subscriberset", Subscribers.EMPTY_SET, (v0) -> {
                    return v0.toBytes();
                }, operationContext.getRequestId());
            })) : CompletableFuture.completedFuture(null);
        });
    }

    public CompletableFuture<VersionedMetadata<Subscribers>> getSubscriberSetRecord(boolean z, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            if (z) {
                unload(str, "subscriberset");
            }
            return this.storeHelper.getCachedOrLoad(str, "subscriberset", Subscribers::fromBytes, operationContext.getOperationStartTime(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> setSubscriberData(VersionedMetadata<StreamSubscriber> versionedMetadata, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.updateEntry(str, getKeyForSubscriber(((StreamSubscriber) versionedMetadata.getObject()).getSubscriber()), (StreamSubscriber) versionedMetadata.getObject(), (v0) -> {
                return v0.toBytes();
            }, versionedMetadata.getVersion(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> deleteSubscriber(String str, long j, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getSubscriberRecord(str, operationContext).thenCompose(versionedMetadata -> {
            if (j >= ((StreamSubscriber) versionedMetadata.getObject()).getGeneration()) {
                return getMetadataTable(operationContext).thenCompose(str2 -> {
                    return this.storeHelper.removeEntry(str2, getKeyForSubscriber(str), operationContext.getRequestId()).thenAccept(r8 -> {
                        this.storeHelper.invalidateCache(str2, getKeyForSubscriber(str));
                    }).thenCompose(r10 -> {
                        return getSubscriberSetRecord(true, operationContext).thenCompose(versionedMetadata -> {
                            if (!((Subscribers) versionedMetadata.getObject()).getSubscribers().contains(str)) {
                                return CompletableFuture.completedFuture(null);
                            }
                            return Futures.toVoid(this.storeHelper.updateEntry(str2, "subscriberset", Subscribers.remove((Subscribers) versionedMetadata.getObject(), str), (v0) -> {
                                return v0.toBytes();
                            }, versionedMetadata.getVersion(), operationContext.getRequestId()));
                        });
                    });
                });
            }
            log.warn(operationContext.getRequestId(), "skipped deleting subscriber {} due to generation mismatch", new Object[]{str});
            return CompletableFuture.completedFuture(null);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<StreamSubscriber>> getSubscriberRecord(String str, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str2 -> {
            return this.storeHelper.getCachedOrLoad(str2, getKeyForSubscriber(str), StreamSubscriber::fromBytes, operationContext.getOperationStartTime(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<List<String>> listSubscribers(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return getSubscriberSetRecord(true, operationContext).thenApply(versionedMetadata -> {
                return ((Subscribers) versionedMetadata.getObject()).getSubscribers().asList();
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> deleteStream(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getId(operationContext).thenCompose(str -> {
            return this.storeHelper.expectingDataNotFound(tryRemoveOlderTransactionsInEpochTables(num -> {
                return true;
            }, operationContext), null).thenCompose(r8 -> {
                return getEpochsWithTransactionsTable(operationContext).thenCompose(str -> {
                    return this.storeHelper.expectingDataNotFound(this.storeHelper.deleteTable(str, false, operationContext.getRequestId()), null);
                }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r9 -> {
                    return this.storeHelper.deleteTable(getMetadataTableName(str), false, operationContext.getRequestId());
                });
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createRetentionSetDataIfAbsent(RetentionSet retentionSet, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return Futures.toVoid(this.storeHelper.addNewEntryIfAbsent(str, "retention", retentionSet, (v0) -> {
                return v0.toBytes();
            }, operationContext.getRequestId()));
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<RetentionSet>> getRetentionSetData(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.getCachedOrLoad(str, "retention", RetentionSet::fromBytes, operationContext.getOperationStartTime(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateRetentionSetData(VersionedMetadata<RetentionSet> versionedMetadata, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.updateEntry(str, "retention", (RetentionSet) versionedMetadata.getObject(), (v0) -> {
                return v0.toBytes();
            }, versionedMetadata.getVersion(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createStreamCutRecordData(long j, StreamCutRecord streamCutRecord, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        String format = String.format("retentionCuts-%s", Long.valueOf(j));
        return getMetadataTable(operationContext).thenCompose(str -> {
            return Futures.toVoid(this.storeHelper.addNewEntryIfAbsent(str, format, streamCutRecord, (v0) -> {
                return v0.toBytes();
            }, operationContext.getRequestId()));
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<StreamCutRecord>> getStreamCutRecordData(long j, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        String format = String.format("retentionCuts-%s", Long.valueOf(j));
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.getCachedOrLoad(str, format, StreamCutRecord::fromBytes, 0L, operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> deleteStreamCutRecordData(long j, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        String format = String.format("retentionCuts-%s", Long.valueOf(j));
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.removeEntry(str, format, operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createHistoryTimeSeriesChunkDataIfAbsent(int i, HistoryTimeSeries historyTimeSeries, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        String format = String.format("historyTimeSeriesChunk-%s", Integer.valueOf(i));
        return getMetadataTable(operationContext).thenCompose(str -> {
            return Futures.toVoid(this.storeHelper.addNewEntryIfAbsent(str, format, historyTimeSeries, (v0) -> {
                return v0.toBytes();
            }, operationContext.getRequestId()));
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<HistoryTimeSeries>> getHistoryTimeSeriesChunkData(int i, boolean z, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        String format = String.format("historyTimeSeriesChunk-%s", Integer.valueOf(i));
        return getMetadataTable(operationContext).thenCompose(str -> {
            if (z) {
                unload(str, format);
            }
            return this.storeHelper.getCachedOrLoad(str, format, HistoryTimeSeries::fromBytes, operationContext.getOperationStartTime(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateHistoryTimeSeriesChunkData(int i, VersionedMetadata<HistoryTimeSeries> versionedMetadata, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        String format = String.format("historyTimeSeriesChunk-%s", Integer.valueOf(i));
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.updateEntry(str, format, (HistoryTimeSeries) versionedMetadata.getObject(), (v0) -> {
                return v0.toBytes();
            }, versionedMetadata.getVersion(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createCurrentEpochRecordDataIfAbsent(EpochRecord epochRecord, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return Futures.toVoid(this.storeHelper.addNewEntryIfAbsent(str, "currentEpochRecord", Integer.valueOf(epochRecord.getEpoch()), PravegaTablesStoreHelper.INTEGER_TO_BYTES_FUNCTION, operationContext.getRequestId()));
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateCurrentEpochRecordData(VersionedMetadata<EpochRecord> versionedMetadata, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.updateEntry(str, "currentEpochRecord", Integer.valueOf(((EpochRecord) versionedMetadata.getObject()).getEpoch()), PravegaTablesStoreHelper.INTEGER_TO_BYTES_FUNCTION, versionedMetadata.getVersion(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<EpochRecord>> getCurrentEpochRecordData(boolean z, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            if (z) {
                unload(str, "currentEpochRecord");
            }
            return this.storeHelper.getCachedOrLoad(str, "currentEpochRecord", PravegaTablesStoreHelper.BYTES_TO_INTEGER_FUNCTION, operationContext.getOperationStartTime(), operationContext.getRequestId()).thenCompose(versionedMetadata -> {
                return getEpochRecord(((Integer) versionedMetadata.getObject()).intValue(), operationContext).thenApply(epochRecord -> {
                    return new VersionedMetadata(epochRecord, versionedMetadata.getVersion());
                });
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createEpochRecordDataIfAbsent(int i, EpochRecord epochRecord, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        String format = String.format("epochRecord-%s", Integer.valueOf(i));
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.addNewEntryIfAbsent(str, format, epochRecord, (v0) -> {
                return v0.toBytes();
            }, operationContext.getRequestId());
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) version -> {
            return epochRecord.getEpoch() == epochRecord.getReferenceEpoch() ? createTransactionsInEpochTable(i, operationContext) : CompletableFuture.completedFuture(null);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<EpochRecord>> getEpochRecordData(int i, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.getCachedOrLoad(str, String.format("epochRecord-%s", Integer.valueOf(i)), EpochRecord::fromBytes, 0L, operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createSealedSegmentSizesMapShardDataIfAbsent(int i, SealedSegmentsMapShard sealedSegmentsMapShard, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        String format = String.format("segmentsSealedSizeMapShard-%s", Integer.valueOf(i));
        return getMetadataTable(operationContext).thenCompose(str -> {
            return Futures.toVoid(this.storeHelper.addNewEntryIfAbsent(str, format, sealedSegmentsMapShard, (v0) -> {
                return v0.toBytes();
            }, operationContext.getRequestId()));
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<SealedSegmentsMapShard>> getSealedSegmentSizesMapShardData(int i, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        String format = String.format("segmentsSealedSizeMapShard-%s", Integer.valueOf(i));
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.getCachedOrLoad(str, format, SealedSegmentsMapShard::fromBytes, operationContext.getOperationStartTime(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateSealedSegmentSizesMapShardData(int i, VersionedMetadata<SealedSegmentsMapShard> versionedMetadata, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        String format = String.format("segmentsSealedSizeMapShard-%s", Integer.valueOf(i));
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.updateEntry(str, format, (SealedSegmentsMapShard) versionedMetadata.getObject(), (v0) -> {
                return v0.toBytes();
            }, versionedMetadata.getVersion(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createSegmentSealedEpochRecords(Collection<Long> collection, int i, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        ArrayList arrayList = new ArrayList(collection.size());
        collection.forEach(l -> {
            arrayList.add(new AbstractMap.SimpleEntry(String.format("segmentSealedEpochPath-%s", l), Integer.valueOf(i)));
        });
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.addNewEntriesIfAbsent(str, arrayList, PravegaTablesStoreHelper.INTEGER_TO_BYTES_FUNCTION, operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<Integer>> getSegmentSealedRecordData(long j, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        String format = String.format("segmentSealedEpochPath-%s", Long.valueOf(j));
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.getCachedOrLoad(str, format, PravegaTablesStoreHelper.BYTES_TO_INTEGER_FUNCTION, 0L, operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createEpochTransitionIfAbsent(EpochTransitionRecord epochTransitionRecord, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return Futures.toVoid(this.storeHelper.addNewEntryIfAbsent(str, "epochTransition", epochTransitionRecord, (v0) -> {
                return v0.toBytes();
            }, operationContext.getRequestId()));
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateEpochTransitionNode(VersionedMetadata<EpochTransitionRecord> versionedMetadata, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.updateEntry(str, "epochTransition", (EpochTransitionRecord) versionedMetadata.getObject(), (v0) -> {
                return v0.toBytes();
            }, versionedMetadata.getVersion(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<EpochTransitionRecord>> getEpochTransitionNode(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.getCachedOrLoad(str, "epochTransition", EpochTransitionRecord::fromBytes, operationContext.getOperationStartTime(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> storeCreationTimeIfAbsent(long j, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return Futures.toVoid(this.storeHelper.addNewEntryIfAbsent(str, "creationTime", Long.valueOf(j), PravegaTablesStoreHelper.LONG_TO_BYTES_FUNCTION, operationContext.getRequestId()));
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> createConfigurationIfAbsent(StreamConfigurationRecord streamConfigurationRecord, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return Futures.toVoid(this.storeHelper.addNewEntryIfAbsent(str, "configuration", streamConfigurationRecord, (v0) -> {
                return v0.toBytes();
            }, operationContext.getRequestId()));
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> createStateIfAbsent(StateRecord stateRecord, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return Futures.toVoid(this.storeHelper.addNewEntryIfAbsent(str, "state", stateRecord, (v0) -> {
                return v0.toBytes();
            }, operationContext.getRequestId()));
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> createMarkerData(long j, long j2, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        String format = String.format("markers-%d", Long.valueOf(j));
        return getMetadataTable(operationContext).thenCompose(str -> {
            return Futures.toVoid(this.storeHelper.addNewEntryIfAbsent(str, format, Long.valueOf(j2), PravegaTablesStoreHelper.LONG_TO_BYTES_FUNCTION, operationContext.getRequestId()));
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateMarkerData(long j, VersionedMetadata<Long> versionedMetadata, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        String format = String.format("markers-%d", Long.valueOf(j));
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.updateEntry(str, format, (Long) versionedMetadata.getObject(), PravegaTablesStoreHelper.LONG_TO_BYTES_FUNCTION, versionedMetadata.getVersion(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<Long>> getMarkerData(long j, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        String format = String.format("markers-%d", Long.valueOf(j));
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.expectingDataNotFound(this.storeHelper.getCachedOrLoad(str, format, PravegaTablesStoreHelper.BYTES_TO_LONG_FUNCTION, operationContext.getOperationStartTime(), operationContext.getRequestId()), null);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> removeMarkerData(long j, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        String format = String.format("markers-%d", Long.valueOf(j));
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.removeEntry(str, format, operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Map<UUID, ActiveTxnRecord>> getActiveTxns(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getEpochsWithTransactions(operationContext).thenCompose(list -> {
            return Futures.allOfWithResults((List) list.stream().map(num -> {
                return getTxnInEpoch(num.intValue(), operationContext);
            }).collect(Collectors.toList()));
        }).thenApply((Function<? super U, ? extends U>) list2 -> {
            HashMap hashMap = new HashMap();
            Objects.requireNonNull(hashMap);
            list2.forEach(hashMap::putAll);
            return hashMap;
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Map<UUID, TxnStatus>> listCompletedTxns(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getTxnInBatch(0, operationContext);
    }

    private CompletableFuture<Map<UUID, TxnStatus>> getTxnInBatch(Integer num, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        return this.storeHelper.expectingDataNotFound(this.storeHelper.getAllEntries(getCompletedTransactionsBatchTableName(num.intValue()), CompletedTxnRecord::fromBytes, operationContext.getRequestId()).collectRemaining(entry -> {
            concurrentHashMap.put(((String) entry.getKey()).replace(getCompletedTransactionKey(getScope(), getName(), ""), ""), (VersionedMetadata) entry.getValue());
            return true;
        }).thenApply(r5 -> {
            return (Map) concurrentHashMap.entrySet().stream().sorted((entry2, entry3) -> {
                return Long.compare(((CompletedTxnRecord) ((VersionedMetadata) entry3.getValue()).getObject()).getCompleteTime(), ((CompletedTxnRecord) ((VersionedMetadata) entry2.getValue()).getObject()).getCompleteTime());
            }).limit(Config.LIST_COMPLETED_TXN_MAX_RECORDS).collect(Collectors.toMap(entry4 -> {
                return UUID.fromString((String) entry4.getKey());
            }, entry5 -> {
                return ((CompletedTxnRecord) ((VersionedMetadata) entry5.getValue()).getObject()).getCompletionStatus();
            }));
        }).exceptionally(th -> {
            return Collections.emptyMap();
        }), Collections.emptyMap());
    }

    private CompletableFuture<List<Integer>> getEpochsWithTransactions(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getEpochsWithTransactionsTable(operationContext).thenCompose(str -> {
            ArrayList arrayList = new ArrayList();
            return this.storeHelper.getAllKeys(str, operationContext.getRequestId()).collectRemaining(str -> {
                arrayList.add(Integer.valueOf(Integer.parseInt(str)));
                return true;
            }).thenApply(r3 -> {
                return arrayList;
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Long> getNumberOfOngoingTransactions(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        ArrayList arrayList = new ArrayList();
        return getEpochsWithTransactionsTable(operationContext).thenCompose(str -> {
            return this.storeHelper.getAllKeys(str, operationContext.getRequestId()).forEachRemaining(str -> {
                arrayList.add(getNumberOfOngoingTransactions(Integer.parseInt(str), operationContext));
            }, this.executor).thenCompose(r4 -> {
                return Futures.allOfWithResults(arrayList).thenApply(list -> {
                    return (Long) list.stream().reduce(0L, (v0, v1) -> {
                        return Long.sum(v0, v1);
                    });
                });
            });
        });
    }

    private CompletableFuture<Long> getNumberOfOngoingTransactions(int i, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        new AtomicInteger(0);
        return getTransactionsInEpochTable(i, operationContext).thenCompose(str -> {
            return this.storeHelper.getEntryCount(str, operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<List<VersionedTransactionData>> getOrderedCommittingTxnInLowestEpoch(int i, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return super.getOrderedCommittingTxnInLowestEpochHelper(this.txnCommitOrderer, i, this.executor, operationContext);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    @VisibleForTesting
    CompletableFuture<Map<Long, UUID>> getAllOrderedCommittingTxns(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return super.getAllOrderedCommittingTxnsHelper(this.txnCommitOrderer, operationContext);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<List<ActiveTxnRecord>> getTransactionRecords(int i, List<String> list, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getTransactionsInEpochTable(i, operationContext).thenCompose(str -> {
            return this.storeHelper.getEntries(str, list, ActiveTxnRecord::fromBytes, NON_EXISTENT_TXN, operationContext.getRequestId()).thenApply(list2 -> {
                ArrayList arrayList = new ArrayList();
                for (int i2 = 0; i2 < list.size(); i2++) {
                    arrayList.add((ActiveTxnRecord) ((VersionedMetadata) list2.get(i2)).getObject());
                }
                return arrayList;
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<List<VersionedTransactionData>> getVersionedTransactionRecords(int i, List<String> list, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getTransactionsInEpochTable(i, operationContext).thenCompose(str -> {
            return this.storeHelper.getEntries(str, list, ActiveTxnRecord::fromBytes, NON_EXISTENT_TXN, operationContext.getRequestId()).thenApply(list2 -> {
                ArrayList arrayList = new ArrayList();
                for (int i2 = 0; i2 < list.size(); i2++) {
                    VersionedMetadata versionedMetadata = (VersionedMetadata) list2.get(i2);
                    ActiveTxnRecord activeTxnRecord = (ActiveTxnRecord) versionedMetadata.getObject();
                    if (!ActiveTxnRecord.EMPTY.equals(activeTxnRecord)) {
                        arrayList.add(new VersionedTransactionData(i, UUID.fromString((String) list.get(i2)), versionedMetadata.getVersion(), activeTxnRecord.getTxnStatus(), activeTxnRecord.getTxCreationTimestamp(), activeTxnRecord.getMaxExecutionExpiryTime(), activeTxnRecord.getWriterId(), Long.valueOf(activeTxnRecord.getCommitTime()), Long.valueOf(activeTxnRecord.getCommitOrder()), activeTxnRecord.getCommitOffsets()));
                    }
                }
                return arrayList;
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Map<UUID, ActiveTxnRecord>> getTxnInEpoch(int i, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        return getTransactionsInEpochTable(i, operationContext).thenCompose(str -> {
            return this.storeHelper.expectingDataNotFound(this.storeHelper.getAllEntries(str, ActiveTxnRecord::fromBytes, operationContext.getRequestId()).collectRemaining(entry -> {
                concurrentHashMap.put((String) entry.getKey(), (VersionedMetadata) entry.getValue());
                return true;
            }).thenApply(r5 -> {
                return (Map) concurrentHashMap.entrySet().stream().collect(Collectors.toMap(entry2 -> {
                    return UUID.fromString((String) entry2.getKey());
                }, entry3 -> {
                    return (ActiveTxnRecord) ((VersionedMetadata) entry3.getValue()).getObject();
                }));
            }), Collections.emptyMap());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Version> createNewTransaction(int i, UUID uuid, ActiveTxnRecord activeTxnRecord, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getTransactionsInEpochTable(i, operationContext).thenCompose(str -> {
            return this.storeHelper.addNewEntryIfAbsent(str, uuid.toString(), activeTxnRecord, (v0) -> {
                return v0.toBytes();
            }, operationContext.getRequestId());
        });
    }

    private CompletableFuture<Void> createTransactionsInEpochTable(int i, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getEpochsWithTransactionsTable(operationContext).thenCompose(str -> {
            return this.storeHelper.addNewEntryIfAbsent(str, Integer.toString(i), new byte[0], bArr -> {
                return bArr;
            }, operationContext.getRequestId());
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) version -> {
            return getTransactionsInEpochTable(i, operationContext).thenCompose(str2 -> {
                return this.storeHelper.createTable(str2, operationContext.getRequestId());
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<ActiveTxnRecord>> getActiveTx(int i, UUID uuid, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getTransactionsInEpochTable(i, operationContext).thenCompose(str -> {
            return this.storeHelper.getCachedOrLoad(str, uuid.toString(), ActiveTxnRecord::fromBytes, operationContext.getOperationStartTime(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateActiveTx(int i, UUID uuid, VersionedMetadata<ActiveTxnRecord> versionedMetadata, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getTransactionsInEpochTable(i, operationContext).thenCompose(str -> {
            return this.storeHelper.updateEntry(str, uuid.toString(), (ActiveTxnRecord) versionedMetadata.getObject(), (v0) -> {
                return v0.toBytes();
            }, versionedMetadata.getVersion(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Long> addTxnToCommitOrder(UUID uuid, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return this.txnCommitOrderer.addEntity(getScope(), getName(), uuid.toString(), operationContext.getRequestId());
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> removeTxnsFromCommitOrder(List<Long> list, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return this.txnCommitOrderer.removeEntities(getScope(), getName(), list);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> removeActiveTxEntry(int i, UUID uuid, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getTransactionsInEpochTable(i, operationContext).thenCompose(str -> {
            return this.storeHelper.removeEntry(str, uuid.toString(), operationContext.getRequestId());
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r7 -> {
            return tryRemoveOlderTransactionsInEpochTables(num -> {
                return num.intValue() < i;
            }, operationContext);
        });
    }

    private CompletableFuture<Void> tryRemoveOlderTransactionsInEpochTables(Predicate<Integer> predicate, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getEpochsWithTransactions(operationContext).thenCompose(list -> {
            return Futures.allOf((Collection) list.stream().filter(predicate).map(num -> {
                return tryRemoveTransactionsInEpochTable(num.intValue(), operationContext);
            }).collect(Collectors.toList()));
        });
    }

    private CompletableFuture<Void> tryRemoveTransactionsInEpochTable(int i, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getTransactionsInEpochTable(i, operationContext).thenCompose(str -> {
            return this.storeHelper.deleteTable(str, true, operationContext.getRequestId()).handle((r4, th) -> {
                if (th != null && !PravegaTablesStreamMetadataStore.DATA_NOT_FOUND_PREDICATE.test(th)) {
                    if (AbstractStreamMetadataStore.DATA_NOT_EMPTY_PREDICATE.test(th)) {
                        return false;
                    }
                    throw new CompletionException(th);
                }
                return true;
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) bool -> {
                return bool.booleanValue() ? getEpochsWithTransactionsTable(operationContext).thenCompose(str -> {
                    return this.storeHelper.removeEntry(str, Integer.toString(i), operationContext.getRequestId());
                }) : CompletableFuture.completedFuture(null);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createCompletedTxEntry(UUID uuid, CompletedTxnRecord completedTxnRecord, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return createCompletedTxEntries(Collections.singletonList(new AbstractMap.SimpleEntry(getCompletedTransactionKey(getScope(), getName(), uuid.toString()), completedTxnRecord)), operationContext);
    }

    private CompletableFuture<Void> createCompletedTxEntries(List<Map.Entry<String, CompletedTxnRecord>> list, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        Integer num = this.currentBatchSupplier.get();
        String completedTransactionsBatchTableName = getCompletedTransactionsBatchTableName(num.intValue());
        return Futures.toVoid(Futures.exceptionallyComposeExpecting(this.storeHelper.addNewEntriesIfAbsent(completedTransactionsBatchTableName, list, (v0) -> {
            return v0.toBytes();
        }, operationContext.getRequestId()), PravegaTablesStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, () -> {
            return tryCreateBatchTable(num.intValue(), operationContext).thenCompose(r11 -> {
                return this.storeHelper.addNewEntriesIfAbsent(completedTransactionsBatchTableName, list, (v0) -> {
                    return v0.toBytes();
                }, operationContext.getRequestId());
            });
        })).exceptionally(th -> {
            throw new CompletionException(th);
        });
    }

    @VisibleForTesting
    static String getCompletedTransactionKey(String str, String str2, String str3) {
        return String.format(COMPLETED_TRANSACTIONS_KEY_FORMAT, str, str2, str3);
    }

    @VisibleForTesting
    static String getCompletedTransactionsBatchTableName(int i) {
        return NameUtils.getQualifiedTableName("_system", new String[]{String.format("completedTransactionsBatch-%s", Integer.valueOf(i))});
    }

    private CompletableFuture<Void> tryCreateBatchTable(int i, OperationContext operationContext) {
        String completedTransactionsBatchTableName = getCompletedTransactionsBatchTableName(i);
        return Futures.exceptionallyComposeExpecting(this.storeHelper.addNewEntryIfAbsent(NameUtils.COMPLETED_TRANSACTIONS_BATCHES_TABLE, Integer.toString(i), new byte[0], bArr -> {
            return bArr;
        }, operationContext.getRequestId()), th -> {
            return Exceptions.unwrap(th) instanceof StoreException.DataContainerNotFoundException;
        }, () -> {
            return this.storeHelper.createTable(NameUtils.COMPLETED_TRANSACTIONS_BATCHES_TABLE, operationContext.getRequestId()).thenAccept(r10 -> {
                log.debug(operationContext.getRequestId(), "batches root table {} created", new Object[]{NameUtils.COMPLETED_TRANSACTIONS_BATCHES_TABLE});
            }).thenCompose(r11 -> {
                return this.storeHelper.addNewEntryIfAbsent(NameUtils.COMPLETED_TRANSACTIONS_BATCHES_TABLE, Integer.toString(i), new byte[0], bArr2 -> {
                    return bArr2;
                }, operationContext.getRequestId());
            });
        }).thenCompose(version -> {
            return this.storeHelper.createTable(completedTransactionsBatchTableName, operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<CompletedTxnRecord>> getCompletedTx(UUID uuid, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        ArrayList arrayList = new ArrayList();
        return this.storeHelper.getAllKeys(NameUtils.COMPLETED_TRANSACTIONS_BATCHES_TABLE, operationContext.getRequestId()).collectRemaining(str -> {
            arrayList.add(Integer.valueOf(Integer.parseInt(str)));
            return true;
        }).thenCompose(r9 -> {
            return Futures.allOfWithResults((List) arrayList.stream().map(num -> {
                return this.storeHelper.expectingDataNotFound(this.storeHelper.getCachedOrLoad(getCompletedTransactionsBatchTableName(num.intValue()), getCompletedTransactionKey(getScope(), getName(), uuid.toString()), CompletedTxnRecord::fromBytes, 0L, operationContext.getRequestId()), null);
            }).collect(Collectors.toList()));
        }).thenCompose(list -> {
            Optional findFirst = list.stream().filter((v0) -> {
                return Objects.nonNull(v0);
            }).findFirst();
            if (findFirst.isPresent()) {
                return CompletableFuture.completedFuture((VersionedMetadata) findFirst.get());
            }
            throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Completed Txn not found");
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> createTruncationDataIfAbsent(StreamTruncationRecord streamTruncationRecord, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return Futures.toVoid(this.storeHelper.addNewEntryIfAbsent(str, "truncation", streamTruncationRecord, (v0) -> {
                return v0.toBytes();
            }, operationContext.getRequestId()));
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> setTruncationData(VersionedMetadata<StreamTruncationRecord> versionedMetadata, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.updateEntry(str, "truncation", (StreamTruncationRecord) versionedMetadata.getObject(), (v0) -> {
                return v0.toBytes();
            }, versionedMetadata.getVersion(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<StreamTruncationRecord>> getTruncationData(boolean z, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            if (z) {
                unload(str, "truncation");
            }
            return this.storeHelper.getCachedOrLoad(str, "truncation", StreamTruncationRecord::fromBytes, operationContext.getOperationStartTime(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> setConfigurationData(VersionedMetadata<StreamConfigurationRecord> versionedMetadata, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.updateEntry(str, "configuration", (StreamConfigurationRecord) versionedMetadata.getObject(), (v0) -> {
                return v0.toBytes();
            }, versionedMetadata.getVersion(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<StreamConfigurationRecord>> getConfigurationData(boolean z, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            if (z) {
                unload(str, "configuration");
            }
            return this.storeHelper.getCachedOrLoad(str, "configuration", StreamConfigurationRecord::fromBytes, operationContext.getOperationStartTime(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> setStateData(VersionedMetadata<StateRecord> versionedMetadata, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.updateEntry(str, "state", (StateRecord) versionedMetadata.getObject(), (v0) -> {
                return v0.toBytes();
            }, versionedMetadata.getVersion(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<StateRecord>> getStateData(boolean z, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            if (z) {
                unload(str, "state");
            }
            return this.storeHelper.getCachedOrLoad(str, "state", StateRecord::fromBytes, operationContext.getOperationStartTime(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createCommitTxnRecordIfAbsent(CommittingTransactionsRecord committingTransactionsRecord, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return Futures.toVoid(this.storeHelper.addNewEntryIfAbsent(str, "committingTxns", committingTransactionsRecord, (v0) -> {
                return v0.toBytes();
            }, operationContext.getRequestId()));
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> getCommitTxnRecord(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.getCachedOrLoad(str, "committingTxns", CommittingTransactionsRecord::fromBytes, operationContext.getOperationStartTime(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Version> updateCommittingTxnRecord(VersionedMetadata<CommittingTransactionsRecord> versionedMetadata, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.updateEntry(str, "committingTxns", (CommittingTransactionsRecord) versionedMetadata.getObject(), (v0) -> {
                return v0.toBytes();
            }, versionedMetadata.getVersion(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createWaitingRequestNodeIfAbsent(String str, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str2 -> {
            return Futures.toVoid(this.storeHelper.addNewEntryIfAbsent(str2, "waitingRequestProcessor", str, str2 -> {
                return str2.getBytes(StandardCharsets.UTF_8);
            }, operationContext.getRequestId()));
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<String> getWaitingRequestNode(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.getCachedOrLoad(str, "waitingRequestProcessor", bArr -> {
                return StandardCharsets.UTF_8.decode(ByteBuffer.wrap(bArr)).toString();
            }, System.currentTimeMillis(), operationContext.getRequestId());
        }).thenApply((Function<? super U, ? extends U>) (v0) -> {
            return v0.getObject();
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> deleteWaitingRequestNode(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getMetadataTable(operationContext).thenCompose(str -> {
            return this.storeHelper.removeEntry(str, "waitingRequestProcessor", operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createWriterMarkRecord(String str, long j, ImmutableMap<Long, Long> immutableMap, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        WriterMark writerMark = new WriterMark(j, immutableMap);
        return Futures.toVoid(getWritersTable(operationContext).thenCompose(str2 -> {
            return this.storeHelper.addNewEntry(str2, str, writerMark, (v0) -> {
                return v0.toBytes();
            }, operationContext.getRequestId());
        }));
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> removeWriterRecord(String str, Version version, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getWritersTable(operationContext).thenCompose(str2 -> {
            return this.storeHelper.removeEntry(str2, str, version, operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<VersionedMetadata<WriterMark>> getWriterMarkRecord(String str, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        return getWritersTable(operationContext).thenCompose(str2 -> {
            return this.storeHelper.getCachedOrLoad(str2, str, WriterMark::fromBytes, operationContext.getOperationStartTime(), operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> updateWriterMarkRecord(String str, long j, ImmutableMap<Long, Long> immutableMap, boolean z, Version version, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        WriterMark writerMark = new WriterMark(j, immutableMap, z);
        return Futures.toVoid(getWritersTable(operationContext).thenCompose(str2 -> {
            return this.storeHelper.updateEntry(str2, str, writerMark, (v0) -> {
                return v0.toBytes();
            }, version, operationContext.getRequestId());
        }));
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Map<String, WriterMark>> getAllWriterMarks(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "operation context cannot be null");
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        return getWritersTable(operationContext).thenCompose(str -> {
            return this.storeHelper.getAllEntries(str, WriterMark::fromBytes, operationContext.getRequestId()).collectRemaining(entry -> {
                concurrentHashMap.put((String) entry.getKey(), (WriterMark) ((VersionedMetadata) entry.getValue()).getObject());
                return true;
            });
        }).thenApply((Function<? super U, ? extends U>) r3 -> {
            return concurrentHashMap;
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public void refresh() {
        this.idRef.set(null);
    }

    private String getKeyForSubscriber(String str) {
        return "subscriber_" + str;
    }

    private void unload(String str, String str2) {
        this.storeHelper.invalidateCache(str, str2);
    }
}
