package io.pravega.controller.store.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.lang.Int96;
import io.pravega.common.tracing.TagLogger;
import io.pravega.common.util.AsyncIterator;
import io.pravega.controller.server.SegmentHelper;
import io.pravega.controller.server.security.auth.GrpcAuthHelper;
import io.pravega.controller.store.PravegaTablesScope;
import io.pravega.controller.store.PravegaTablesStoreHelper;
import io.pravega.controller.store.Version;
import io.pravega.controller.store.ZKStoreHelper;
import io.pravega.controller.store.index.ZKHostIndex;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.util.Config;
import io.pravega.shared.NameUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/store/stream/PravegaTablesStreamMetadataStore.class */
public class PravegaTablesStreamMetadataStore extends AbstractStreamMetadataStore {
    private static final String COMPLETED_TXN_GC_NAME = "completedTxnGC";
    private final ZkInt96Counter counter;
    private final AtomicReference<ZKGarbageCollector> completedTxnGCRef;
    private final ZKGarbageCollector completedTxnGC;

    @VisibleForTesting
    private final PravegaTablesStoreHelper storeHelper;
    private final ZkOrderedStore orderer;
    private final ScheduledExecutorService executor;
    public static final String SCOPES_TABLE = NameUtils.getQualifiedTableName("_system", new String[]{"scopes"});
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(PravegaTablesStreamMetadataStore.class));

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public PravegaTablesStreamMetadataStore(SegmentHelper segmentHelper, CuratorFramework curatorFramework, ScheduledExecutorService scheduledExecutorService, GrpcAuthHelper grpcAuthHelper) {
        this(segmentHelper, curatorFramework, scheduledExecutorService, Duration.ofHours(Config.COMPLETED_TRANSACTION_TTL_IN_HOURS), grpcAuthHelper);
    }

    @VisibleForTesting
    PravegaTablesStreamMetadataStore(SegmentHelper segmentHelper, CuratorFramework curatorFramework, ScheduledExecutorService scheduledExecutorService, Duration duration, GrpcAuthHelper grpcAuthHelper) {
        super(new ZKHostIndex(curatorFramework, "/hostTxnIndex", scheduledExecutorService), new ZKHostIndex(curatorFramework, "/hostRequestIndex", scheduledExecutorService));
        ZKStoreHelper zKStoreHelper = new ZKStoreHelper(curatorFramework, scheduledExecutorService);
        this.orderer = new ZkOrderedStore("txnCommitOrderer", zKStoreHelper, scheduledExecutorService);
        this.completedTxnGC = new ZKGarbageCollector(COMPLETED_TXN_GC_NAME, zKStoreHelper, this::gcCompletedTxn, duration);
        this.completedTxnGC.startAsync();
        this.completedTxnGC.awaitRunning();
        this.completedTxnGCRef = new AtomicReference<>(this.completedTxnGC);
        this.counter = new ZkInt96Counter(zKStoreHelper);
        this.storeHelper = new PravegaTablesStoreHelper(segmentHelper, grpcAuthHelper, scheduledExecutorService);
        this.executor = scheduledExecutorService;
    }

    @VisibleForTesting
    public PravegaTablesStreamMetadataStore(CuratorFramework curatorFramework, ScheduledExecutorService scheduledExecutorService, Duration duration, PravegaTablesStoreHelper pravegaTablesStoreHelper) {
        super(new ZKHostIndex(curatorFramework, "/hostTxnIndex", scheduledExecutorService), new ZKHostIndex(curatorFramework, "/hostRequestIndex", scheduledExecutorService));
        ZKStoreHelper zKStoreHelper = new ZKStoreHelper(curatorFramework, scheduledExecutorService);
        this.orderer = new ZkOrderedStore("txnCommitOrderer", zKStoreHelper, scheduledExecutorService);
        this.completedTxnGC = new ZKGarbageCollector(COMPLETED_TXN_GC_NAME, zKStoreHelper, this::gcCompletedTxn, duration);
        this.completedTxnGC.startAsync();
        this.completedTxnGC.awaitRunning();
        this.completedTxnGCRef = new AtomicReference<>(this.completedTxnGC);
        this.counter = new ZkInt96Counter(zKStoreHelper);
        this.storeHelper = pravegaTablesStoreHelper;
        this.executor = scheduledExecutorService;
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore, io.pravega.controller.store.stream.StreamMetadataStore
    public OperationContext createScopeContext(String str, long j) {
        return new ScopeOperationContext(newScope(str), j);
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore, io.pravega.controller.store.stream.StreamMetadataStore
    public OperationContext createStreamContext(String str, String str2, long j) {
        PravegaTablesScope newScope = newScope(str);
        PravegaTablesStoreHelper pravegaTablesStoreHelper = this.storeHelper;
        ZkOrderedStore zkOrderedStore = this.orderer;
        ZKGarbageCollector zKGarbageCollector = this.completedTxnGCRef.get();
        Objects.requireNonNull(zKGarbageCollector);
        Supplier supplier = zKGarbageCollector::getLatestBatch;
        Objects.requireNonNull(newScope);
        return new StreamOperationContext(newScope, new PravegaTablesStream(str, str2, pravegaTablesStoreHelper, zkOrderedStore, supplier, (v1, v2) -> {
            return r7.getStreamsInScopeTableName(v1, v2);
        }, this.executor), j);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    @VisibleForTesting
    public CompletableFuture<Void> sealScope(String str, OperationContext operationContext, ScheduledExecutorService scheduledExecutorService) {
        log.debug("Add entry to _deletingScopesTable called for scope {}", str);
        return ((PravegaTablesScope) getScope(str, operationContext)).sealScope(str, operationContext);
    }

    @VisibleForTesting
    CompletableFuture<Void> gcCompletedTxn() {
        ArrayList arrayList = new ArrayList();
        OperationContext operationContext = getOperationContext(null);
        PravegaTablesStoreHelper pravegaTablesStoreHelper = this.storeHelper;
        AsyncIterator<String> allKeys = this.storeHelper.getAllKeys(NameUtils.COMPLETED_TRANSACTIONS_BATCHES_TABLE, operationContext.getRequestId());
        Objects.requireNonNull(arrayList);
        return Futures.completeOn(pravegaTablesStoreHelper.expectingDataNotFound(allKeys.collectRemaining((v1) -> {
            return r2.add(v1);
        }).thenApply(r5 -> {
            return findStaleBatches(arrayList);
        }).thenCompose(list -> {
            log.debug(operationContext.getRequestId(), "deleting batches {} on new scheme", new Object[]{list});
            return Futures.allOf((Collection) list.stream().map(str -> {
                return this.storeHelper.deleteTable(NameUtils.getQualifiedTableName("_system", new String[]{String.format("completedTransactionsBatch-%s", Long.valueOf(Long.parseLong(str)))}), false, operationContext.getRequestId());
            }).collect(Collectors.toList())).thenCompose(r9 -> {
                return this.storeHelper.removeEntries(NameUtils.COMPLETED_TRANSACTIONS_BATCHES_TABLE, list, operationContext.getRequestId());
            });
        }), null), this.executor);
    }

    @VisibleForTesting
    List<String> findStaleBatches(List<String> list) {
        if (list.size() <= 2) {
            return new ArrayList();
        }
        int i = Integer.MIN_VALUE;
        int i2 = Integer.MIN_VALUE;
        long j = Long.MIN_VALUE;
        long j2 = Long.MIN_VALUE;
        for (int i3 = 0; i3 < list.size(); i3++) {
            long parseLong = Long.parseLong(list.get(i3));
            if (parseLong > j) {
                i2 = i;
                j2 = j;
                j = parseLong;
                i = i3;
            } else if (parseLong > j2) {
                i2 = i3;
                j2 = parseLong;
            }
        }
        ArrayList arrayList = new ArrayList(list);
        arrayList.remove(i);
        if (i < i2) {
            arrayList.remove(i2 - 1);
        } else {
            arrayList.remove(i2);
        }
        return arrayList;
    }

    @VisibleForTesting
    void setCompletedTxnGCRef(ZKGarbageCollector zKGarbageCollector) {
        this.completedTxnGCRef.set(zKGarbageCollector);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    public PravegaTablesStream newStream(String str, String str2) {
        PravegaTablesStoreHelper pravegaTablesStoreHelper = this.storeHelper;
        ZkOrderedStore zkOrderedStore = this.orderer;
        ZKGarbageCollector zKGarbageCollector = this.completedTxnGCRef.get();
        Objects.requireNonNull(zKGarbageCollector);
        return new PravegaTablesStream(str, str2, pravegaTablesStoreHelper, zkOrderedStore, zKGarbageCollector::getLatestBatch, (bool, operationContext) -> {
            return ((PravegaTablesScope) getScope(str, operationContext)).getStreamsInScopeTableName(bool.booleanValue(), operationContext);
        }, this.executor);
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    CompletableFuture<Int96> getNextCounter() {
        return Futures.completeOn(this.counter.getNextCounter(), this.executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Boolean> checkScopeExists(String str, OperationContext operationContext, Executor executor) {
        return Futures.completeOn(this.storeHelper.expectingDataNotFound(this.storeHelper.getEntry(SCOPES_TABLE, str, bArr -> {
            return bArr;
        }, getOperationContext(operationContext).getRequestId()).thenApply(versionedMetadata -> {
            return true;
        }), false), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Boolean> isScopeSealed(String str, OperationContext operationContext, Executor executor) {
        return Futures.completeOn(this.storeHelper.expectingDataNotFound(this.storeHelper.getEntry(PravegaTablesScope.DELETING_SCOPES_TABLE, str, bArr -> {
            return bArr;
        }, getOperationContext(operationContext).getRequestId()).thenApply(versionedMetadata -> {
            return true;
        }), false), executor);
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore, io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<CreateStreamResponse> createStream(String str, String str2, StreamConfiguration streamConfiguration, long j, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(((PravegaTablesScope) getScope(str, operationContext2)).addStreamToScope(str2, operationContext2).thenCompose(r17 -> {
            return super.createStream(str, str2, streamConfiguration, j, operationContext2, executor);
        }), executor);
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore, io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> deleteStream(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(super.deleteStream(str, str2, operationContext2, executor).thenCompose(r8 -> {
            return ((PravegaTablesScope) getScope(str, operationContext2)).removeStreamFromScope(str2, operationContext2).thenApply(r3 -> {
                return r8;
            });
        }), executor);
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    Version getEmptyVersion() {
        return Version.LongVersion.EMPTY;
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    Version parseVersionData(byte[] bArr) {
        return Version.IntVersion.fromBytes(bArr);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    public PravegaTablesScope newScope(String str) {
        return new PravegaTablesScope(str, this.storeHelper);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<String> getScopeConfiguration(String str, OperationContext operationContext, Executor executor) {
        return Futures.completeOn(this.storeHelper.getEntry(SCOPES_TABLE, str, bArr -> {
            return bArr;
        }, getOperationContext(operationContext).getRequestId()).thenApply(versionedMetadata -> {
            return str;
        }), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<String>> listScopes(Executor executor, long j) {
        ArrayList arrayList = new ArrayList();
        AsyncIterator<String> allKeys = this.storeHelper.getAllKeys(SCOPES_TABLE, j);
        Objects.requireNonNull(arrayList);
        return Futures.completeOn(Futures.exceptionallyComposeExpecting(allKeys.collectRemaining((v1) -> {
            return r1.add(v1);
        }).thenApply(r3 -> {
            return arrayList;
        }), DATA_NOT_FOUND_PREDICATE, () -> {
            return this.storeHelper.createTable(SCOPES_TABLE, j).thenApply(r2 -> {
                return Collections.emptyList();
            });
        }), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Pair<List<String>, String>> listScopes(String str, int i, Executor executor, long j) {
        return Futures.completeOn(Futures.exceptionallyComposeExpecting(this.storeHelper.getKeysPaginated(SCOPES_TABLE, Unpooled.wrappedBuffer(Base64.getDecoder().decode(str)), i, j).thenApply(entry -> {
            return new ImmutablePair((List) entry.getValue(), Base64.getEncoder().encodeToString(((ByteBuf) entry.getKey()).array()));
        }), DATA_NOT_FOUND_PREDICATE, () -> {
            return this.storeHelper.createTable(SCOPES_TABLE, j).thenApply(r4 -> {
                return ImmutablePair.of(Collections.emptyList(), str);
            });
        }), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> addStreamTagsToIndex(String str, String str2, StreamConfiguration streamConfiguration, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return (str2.startsWith("_") || streamConfiguration.getTags().isEmpty()) ? CompletableFuture.completedFuture(null) : Futures.completeOn(((PravegaTablesScope) getScope(str, operationContext2)).addTagsUnderScope(str2, streamConfiguration.getTags(), operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> removeTagsFromIndex(String str, String str2, Set<String> set, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return (str2.startsWith("_") || set.isEmpty()) ? CompletableFuture.completedFuture(null) : Futures.completeOn(((PravegaTablesScope) getScope(str, operationContext2)).removeTagsUnderScope(str2, set, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Boolean> checkStreamExists(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(((PravegaTablesScope) getScope(str, operationContext2)).checkStreamExistsInScope(str2, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Boolean> checkReaderGroupExists(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(((PravegaTablesScope) getScope(str, operationContext2)).checkReaderGroupExistsInScope(str2, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    public CompletableFuture<Integer> getSafeStartingSegmentNumberFor(String str, String str2, OperationContext operationContext, Executor executor) {
        return Futures.completeOn(this.storeHelper.getEntry(NameUtils.DELETED_STREAMS_TABLE, getScopedStreamName(str, str2), PravegaTablesStoreHelper.BYTES_TO_INTEGER_FUNCTION, getOperationContext(operationContext).getRequestId()).handle((versionedMetadata, th) -> {
            if (th == null) {
                return Integer.valueOf(((Integer) versionedMetadata.getObject()).intValue() + 1);
            }
            if (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException) {
                return 0;
            }
            log.warn(operationContext.getRequestId(), "Problem found while getting a safe starting segment number for {}.", new Object[]{getScopedStreamName(str, str2), th});
            throw new CompletionException(th);
        }), executor);
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    CompletableFuture<Void> recordLastStreamSegment(String str, String str2, int i, OperationContext operationContext, Executor executor) {
        String scopedStreamName = getScopedStreamName(str, str2);
        OperationContext operationContext2 = getOperationContext(operationContext);
        long requestId = getOperationContext(operationContext2).getRequestId();
        return Futures.completeOn(Futures.handleCompose(this.storeHelper.getEntry(NameUtils.DELETED_STREAMS_TABLE, scopedStreamName, PravegaTablesStoreHelper.BYTES_TO_INTEGER_FUNCTION, requestId), (versionedMetadata, th) -> {
            if (th == null) {
                return CompletableFuture.completedFuture(versionedMetadata);
            }
            Throwable unwrap = Exceptions.unwrap(th);
            if (unwrap instanceof StoreException.DataContainerNotFoundException) {
                return this.storeHelper.createTable(NameUtils.DELETED_STREAMS_TABLE, requestId).thenApply(r2 -> {
                    return null;
                });
            }
            if (unwrap instanceof StoreException.DataNotFoundException) {
                return CompletableFuture.completedFuture(null);
            }
            throw new CompletionException(unwrap);
        }).thenCompose(versionedMetadata2 -> {
            log.debug(operationContext2.getRequestId(), "Recording last segment {} for stream {}/{} on deletion.", new Object[]{Integer.valueOf(i), str, str2});
            if (versionedMetadata2 == null) {
                return Futures.toVoid(this.storeHelper.addNewEntryIfAbsent(NameUtils.DELETED_STREAMS_TABLE, scopedStreamName, Integer.valueOf(i), PravegaTablesStoreHelper.INTEGER_TO_BYTES_FUNCTION, requestId));
            }
            int intValue = ((Integer) versionedMetadata2.getObject()).intValue();
            Preconditions.checkArgument(i >= intValue, "Old last active segment ({}) for {}/{} is higher than current one {}.", Integer.valueOf(intValue), str, str2, Integer.valueOf(i));
            return Futures.toVoid(this.storeHelper.updateEntry(NameUtils.DELETED_STREAMS_TABLE, scopedStreamName, Integer.valueOf(i), PravegaTablesStoreHelper.INTEGER_TO_BYTES_FUNCTION, versionedMetadata2.getVersion(), requestId));
        }), executor);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.completedTxnGC.stopAsync();
        this.completedTxnGC.awaitTerminated();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    public PravegaTablesReaderGroup newReaderGroup(String str, String str2) {
        return new PravegaTablesReaderGroup(str, str2, this.storeHelper, (bool, operationContext) -> {
            return ((PravegaTablesScope) getScope(str, null)).getReaderGroupsInScopeTableName(bool.booleanValue(), operationContext);
        }, this.executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> addReaderGroupToScope(String str, String str2, UUID uuid, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(((PravegaTablesScope) getScope(str, operationContext2)).addReaderGroupToScope(str2, uuid, operationContext2), executor);
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore, io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> deleteReaderGroup(String str, String str2, OperationContext operationContext, Executor executor) {
        OperationContext operationContext2 = getOperationContext(operationContext);
        return Futures.completeOn(super.deleteReaderGroup(str, str2, operationContext2, executor).thenCompose(r8 -> {
            return ((PravegaTablesScope) getScope(str, operationContext2)).removeReaderGroupFromScope(str2, operationContext2).thenApply(r3 -> {
                return r8;
            });
        }), executor);
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    PravegaTablesStoreHelper getStoreHelper() {
        return this.storeHelper;
    }
}
