package io.pravega.controller.store.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.lang.Int96;
import io.pravega.common.util.AsyncIterator;
import io.pravega.common.util.BitConverter;
import io.pravega.controller.server.SegmentHelper;
import io.pravega.controller.server.rpc.auth.GrpcAuthHelper;
import io.pravega.controller.store.PravegaTablesScope;
import io.pravega.controller.store.PravegaTablesStoreHelper;
import io.pravega.controller.store.Version;
import io.pravega.controller.store.ZKStoreHelper;
import io.pravega.controller.store.index.ZKHostIndex;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.util.Config;
import io.pravega.shared.NameUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/store/stream/PravegaTablesStreamMetadataStore.class */
public class PravegaTablesStreamMetadataStore extends AbstractStreamMetadataStore {
    public static final String SEPARATOR = ".#.";
    static final String COMPLETED_TRANSACTIONS_BATCH_TABLE_FORMAT = "completedTransactionsBatch-%d";
    private static final String COMPLETED_TXN_GC_NAME = "completedTxnGC";
    private final ZkInt96Counter counter;
    private final AtomicReference<ZKGarbageCollector> completedTxnGCRef;
    private final ZKGarbageCollector completedTxnGC;

    @VisibleForTesting
    private final PravegaTablesStoreHelper storeHelper;
    private final ZkOrderedStore orderer;
    private final ScheduledExecutorService executor;

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(PravegaTablesStreamMetadataStore.class);
    public static final String SCOPES_TABLE = NameUtils.getQualifiedTableName("_system", new String[]{"scopes"});
    static final String DELETED_STREAMS_TABLE = NameUtils.getQualifiedTableName("_system", new String[]{"deletedStreams"});
    static final String COMPLETED_TRANSACTIONS_BATCHES_TABLE = NameUtils.getQualifiedTableName("_system", new String[]{"completedTransactionsBatches"});

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public PravegaTablesStreamMetadataStore(SegmentHelper segmentHelper, CuratorFramework curatorFramework, ScheduledExecutorService scheduledExecutorService, GrpcAuthHelper grpcAuthHelper) {
        this(segmentHelper, curatorFramework, scheduledExecutorService, Duration.ofHours(Config.COMPLETED_TRANSACTION_TTL_IN_HOURS), grpcAuthHelper);
    }

    @VisibleForTesting
    PravegaTablesStreamMetadataStore(SegmentHelper segmentHelper, CuratorFramework curatorFramework, ScheduledExecutorService scheduledExecutorService, Duration duration, GrpcAuthHelper grpcAuthHelper) {
        super(new ZKHostIndex(curatorFramework, "/hostTxnIndex", scheduledExecutorService), new ZKHostIndex(curatorFramework, "/hostRequestIndex", scheduledExecutorService));
        ZKStoreHelper zKStoreHelper = new ZKStoreHelper(curatorFramework, scheduledExecutorService);
        this.orderer = new ZkOrderedStore("txnCommitOrderer", zKStoreHelper, scheduledExecutorService);
        this.completedTxnGC = new ZKGarbageCollector(COMPLETED_TXN_GC_NAME, zKStoreHelper, this::gcCompletedTxn, duration);
        this.completedTxnGC.startAsync();
        this.completedTxnGC.awaitRunning();
        this.completedTxnGCRef = new AtomicReference<>(this.completedTxnGC);
        this.counter = new ZkInt96Counter(zKStoreHelper);
        this.storeHelper = new PravegaTablesStoreHelper(segmentHelper, grpcAuthHelper, scheduledExecutorService);
        this.executor = scheduledExecutorService;
    }

    @VisibleForTesting
    CompletableFuture<Void> gcCompletedTxn() {
        ArrayList arrayList = new ArrayList();
        PravegaTablesStoreHelper pravegaTablesStoreHelper = this.storeHelper;
        AsyncIterator<String> allKeys = this.storeHelper.getAllKeys(COMPLETED_TRANSACTIONS_BATCHES_TABLE);
        arrayList.getClass();
        return Futures.completeOn(pravegaTablesStoreHelper.expectingDataNotFound(allKeys.collectRemaining((v1) -> {
            return r2.add(v1);
        }).thenApply(r5 -> {
            return findStaleBatches(arrayList);
        }).thenCompose(list -> {
            log.debug("deleting batches {} on new scheme", list);
            return Futures.allOf((Collection) list.stream().map(str -> {
                return this.storeHelper.deleteTable(NameUtils.getQualifiedTableName("_system", new String[]{String.format(COMPLETED_TRANSACTIONS_BATCH_TABLE_FORMAT, Long.valueOf(Long.parseLong(str)))}), false);
            }).collect(Collectors.toList())).thenCompose(r6 -> {
                return this.storeHelper.removeEntries(COMPLETED_TRANSACTIONS_BATCHES_TABLE, list);
            });
        }), null), this.executor);
    }

    @VisibleForTesting
    List<String> findStaleBatches(List<String> list) {
        if (list.size() <= 2) {
            return new ArrayList();
        }
        int i = Integer.MIN_VALUE;
        int i2 = Integer.MIN_VALUE;
        long j = Long.MIN_VALUE;
        long j2 = Long.MIN_VALUE;
        for (int i3 = 0; i3 < list.size(); i3++) {
            long parseLong = Long.parseLong(list.get(i3));
            if (parseLong > j) {
                i2 = i;
                j2 = j;
                j = parseLong;
                i = i3;
            } else if (parseLong > j2) {
                i2 = i3;
                j2 = parseLong;
            }
        }
        ArrayList arrayList = new ArrayList(list);
        arrayList.remove(i);
        if (i < i2) {
            arrayList.remove(i2 - 1);
        } else {
            arrayList.remove(i2);
        }
        return arrayList;
    }

    @VisibleForTesting
    void setCompletedTxnGCRef(ZKGarbageCollector zKGarbageCollector) {
        this.completedTxnGCRef.set(zKGarbageCollector);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    public PravegaTablesStream newStream(String str, String str2) {
        PravegaTablesStoreHelper pravegaTablesStoreHelper = this.storeHelper;
        ZkOrderedStore zkOrderedStore = this.orderer;
        ZKGarbageCollector zKGarbageCollector = this.completedTxnGCRef.get();
        zKGarbageCollector.getClass();
        return new PravegaTablesStream(str, str2, pravegaTablesStoreHelper, zkOrderedStore, zKGarbageCollector::getLatestBatch, () -> {
            return ((PravegaTablesScope) getScope(str)).getStreamsInScopeTableName();
        }, this.executor);
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    CompletableFuture<Int96> getNextCounter() {
        return Futures.completeOn(this.counter.getNextCounter(), this.executor);
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    CompletableFuture<Boolean> checkScopeExists(String str) {
        return Futures.completeOn(this.storeHelper.expectingDataNotFound(this.storeHelper.getEntry(SCOPES_TABLE, str, bArr -> {
            return bArr;
        }).thenApply(versionedMetadata -> {
            return true;
        }), false), this.executor);
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore, io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<CreateStreamResponse> createStream(String str, String str2, StreamConfiguration streamConfiguration, long j, OperationContext operationContext, Executor executor) {
        return Futures.completeOn(((PravegaTablesScope) getScope(str)).addStreamToScope(str2).thenCompose(r17 -> {
            return super.createStream(str, str2, streamConfiguration, j, operationContext, executor);
        }), executor);
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore, io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> deleteStream(String str, String str2, OperationContext operationContext, Executor executor) {
        return Futures.completeOn(super.deleteStream(str, str2, operationContext, executor).thenCompose(r6 -> {
            return ((PravegaTablesScope) getScope(str)).removeStreamFromScope(str2).thenApply(r3 -> {
                return r6;
            });
        }), executor);
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    Version getEmptyVersion() {
        return Version.LongVersion.EMPTY;
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    Version parseVersionData(byte[] bArr) {
        return Version.IntVersion.fromBytes(bArr);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    public PravegaTablesScope newScope(String str) {
        return new PravegaTablesScope(str, this.storeHelper);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<String> getScopeConfiguration(String str) {
        return Futures.completeOn(this.storeHelper.getEntry(SCOPES_TABLE, str, bArr -> {
            return bArr;
        }).thenApply(versionedMetadata -> {
            return str;
        }), this.executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<String>> listScopes() {
        ArrayList arrayList = new ArrayList();
        AsyncIterator<String> allKeys = this.storeHelper.getAllKeys(SCOPES_TABLE);
        arrayList.getClass();
        return Futures.completeOn(Futures.exceptionallyComposeExpecting(allKeys.collectRemaining((v1) -> {
            return r1.add(v1);
        }).thenApply(r3 -> {
            return arrayList;
        }), DATA_NOT_FOUND_PREDICATE, () -> {
            return this.storeHelper.createTable(SCOPES_TABLE).thenApply(r2 -> {
                return Collections.emptyList();
            });
        }), this.executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Pair<List<String>, String>> listScopes(String str, int i, Executor executor) {
        return Futures.completeOn(Futures.exceptionallyComposeExpecting(this.storeHelper.getKeysPaginated(SCOPES_TABLE, Unpooled.wrappedBuffer(Base64.getDecoder().decode(str)), i).thenApply(entry -> {
            return new ImmutablePair(entry.getValue(), Base64.getEncoder().encodeToString(((ByteBuf) entry.getKey()).array()));
        }), DATA_NOT_FOUND_PREDICATE, () -> {
            return this.storeHelper.createTable(SCOPES_TABLE).thenApply(r4 -> {
                return ImmutablePair.of(Collections.emptyList(), str);
            });
        }), executor);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Boolean> checkStreamExists(String str, String str2) {
        return Futures.completeOn(((PravegaTablesScope) getScope(str)).checkStreamExistsInScope(str2), this.executor);
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    public CompletableFuture<Integer> getSafeStartingSegmentNumberFor(String str, String str2) {
        return Futures.completeOn(this.storeHelper.getEntry(DELETED_STREAMS_TABLE, getScopedStreamName(str, str2), bArr -> {
            return Integer.valueOf(BitConverter.readInt(bArr, 0));
        }).handle((versionedMetadata, th) -> {
            if (th == null) {
                return Integer.valueOf(((Integer) versionedMetadata.getObject()).intValue() + 1);
            }
            if (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException) {
                return 0;
            }
            log.error("Problem found while getting a safe starting segment number for {}.", getScopedStreamName(str, str2), th);
            throw new CompletionException(th);
        }), this.executor);
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    CompletableFuture<Void> recordLastStreamSegment(String str, String str2, int i, OperationContext operationContext, Executor executor) {
        String scopedStreamName = getScopedStreamName(str, str2);
        byte[] bArr = new byte[4];
        BitConverter.writeInt(bArr, 0, i);
        return Futures.completeOn(this.storeHelper.createTable(DELETED_STREAMS_TABLE).thenCompose(r14 -> {
            return this.storeHelper.expectingDataNotFound(this.storeHelper.getEntry(DELETED_STREAMS_TABLE, scopedStreamName, bArr2 -> {
                return Integer.valueOf(BitConverter.readInt(bArr2, 0));
            }), null).thenCompose(versionedMetadata -> {
                log.debug("Recording last segment {} for stream {}/{} on deletion.", new Object[]{Integer.valueOf(i), str, str2});
                if (versionedMetadata == null) {
                    return Futures.toVoid(this.storeHelper.addNewEntryIfAbsent(DELETED_STREAMS_TABLE, scopedStreamName, bArr));
                }
                int intValue = ((Integer) versionedMetadata.getObject()).intValue();
                Preconditions.checkArgument(i >= intValue, "Old last active segment ({}) for {}/{} is higher than current one {}.", Integer.valueOf(intValue), str, str2, Integer.valueOf(i));
                return Futures.toVoid(this.storeHelper.updateEntry(DELETED_STREAMS_TABLE, scopedStreamName, bArr, versionedMetadata.getVersion()));
            });
        }), executor);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.completedTxnGC.stopAsync();
        this.completedTxnGC.awaitTerminated();
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    PravegaTablesStoreHelper getStoreHelper() {
        return this.storeHelper;
    }
}
