package io.pravega.controller.store.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.lang.Int96;
import io.pravega.common.util.BitConverter;
import io.pravega.controller.store.index.ZKHostIndex;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.Version;
import io.pravega.controller.util.Config;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/pravega/controller/store/stream/ZKStreamMetadataStore.class */
public class ZKStreamMetadataStore extends AbstractStreamMetadataStore implements AutoCloseable {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(ZKStreamMetadataStore.class);

    @VisibleForTesting
    static final String SCOPE_ROOT_PATH = "/store";
    static final String DELETED_STREAMS_PATH = "/lastActiveStreamSegment/%s";
    private static final String TRANSACTION_ROOT_PATH = "/transactions";
    private static final String COMPLETED_TXN_GC_NAME = "completedTxnGC";
    static final String ACTIVE_TX_ROOT_PATH = "/transactions/activeTx";
    static final String COMPLETED_TX_ROOT_PATH = "/transactions/completedTx";
    static final String COMPLETED_TX_BATCH_ROOT_PATH = "/transactions/completedTx/batches";
    static final String COMPLETED_TX_BATCH_PATH = "/transactions/completedTx/batches/%d";

    @VisibleForTesting
    private ZKStoreHelper storeHelper;
    private final ZkOrderedStore orderer;
    private final ZKGarbageCollector completedTxnGC;
    private final ZkInt96Counter counter;
    private final Executor executor;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public ZKStreamMetadataStore(CuratorFramework curatorFramework, Executor executor) {
        this(curatorFramework, executor, Duration.ofHours(Config.COMPLETED_TRANSACTION_TTL_IN_HOURS));
    }

    @VisibleForTesting
    ZKStreamMetadataStore(CuratorFramework curatorFramework, Executor executor, Duration duration) {
        super(new ZKHostIndex(curatorFramework, "/hostTxnIndex", executor), new ZKHostIndex(curatorFramework, "/hostRequestIndex", executor));
        this.storeHelper = new ZKStoreHelper(curatorFramework, executor);
        this.orderer = new ZkOrderedStore("txnCommitOrderer", this.storeHelper, executor);
        this.completedTxnGC = new ZKGarbageCollector(COMPLETED_TXN_GC_NAME, this.storeHelper, this::gcCompletedTxn, duration);
        this.completedTxnGC.startAsync();
        this.completedTxnGC.awaitRunning();
        this.counter = new ZkInt96Counter(this.storeHelper);
        this.executor = executor;
    }

    private CompletableFuture<Void> gcCompletedTxn() {
        return this.storeHelper.getChildren(COMPLETED_TX_BATCH_ROOT_PATH).thenApply(list -> {
            List list = (List) list.stream().map(Long::parseLong).sorted().collect(Collectors.toList());
            return list.size() > 2 ? list.subList(0, list.size() - 2) : new ArrayList();
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) list2 -> {
            log.debug("deleting batches {} on new scheme" + list2);
            return Futures.allOf((Collection) list2.stream().map(l -> {
                return this.storeHelper.deleteTree(String.format(COMPLETED_TX_BATCH_PATH, l));
            }).collect(Collectors.toList()));
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    public ZKStream newStream(String str, String str2) {
        ZKStoreHelper zKStoreHelper = this.storeHelper;
        ZKGarbageCollector zKGarbageCollector = this.completedTxnGC;
        zKGarbageCollector.getClass();
        return new ZKStream(str, str2, zKStoreHelper, zKGarbageCollector::getLatestBatch, this.executor, this.orderer);
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    CompletableFuture<Int96> getNextCounter() {
        return this.counter.getNextCounter();
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    CompletableFuture<Boolean> checkScopeExists(String str) {
        return this.storeHelper.checkExists(ZKPaths.makePath(SCOPE_ROOT_PATH, str));
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    Version getEmptyVersion() {
        return Version.IntVersion.EMPTY;
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    Version parseVersionData(byte[] bArr) {
        return Version.IntVersion.fromBytes(bArr);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    public ZKScope newScope(String str) {
        return new ZKScope(str, this.storeHelper);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<String> getScopeConfiguration(String str) {
        return this.storeHelper.checkExists(String.format("/store/%s", str)).thenApply(bool -> {
            if (bool.booleanValue()) {
                return str;
            }
            throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, str);
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<String>> listScopes() {
        return this.storeHelper.getChildren(SCOPE_ROOT_PATH).thenApply(list -> {
            return (List) list.stream().filter(str -> {
                return !str.equals("_streamsinscope");
            }).collect(Collectors.toList());
        });
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore, io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<CreateStreamResponse> createStream(String str, String str2, StreamConfiguration streamConfiguration, long j, OperationContext operationContext, Executor executor) {
        ZKScope zKScope = (ZKScope) getScope(str);
        ZKStream zKStream = (ZKStream) getStream(str, str2, operationContext);
        return super.createStream(str, str2, streamConfiguration, j, operationContext, executor).thenCompose(createStreamResponse -> {
            CompletableFuture<Integer> nextStreamPosition = zKScope.getNextStreamPosition();
            zKStream.getClass();
            return nextStreamPosition.thenCompose((v1) -> {
                return r1.createStreamPositionNodeIfAbsent(v1);
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r3 -> {
                return zKStream.getStreamPosition();
            }).thenCompose(num -> {
                return zKScope.addStreamToScope(str2, num.intValue());
            }).thenApply(r32 -> {
                return createStreamResponse;
            });
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Boolean> checkStreamExists(String str, String str2) {
        return this.storeHelper.checkExists(newStream(str, str2).getStreamPath());
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    public CompletableFuture<Integer> getSafeStartingSegmentNumberFor(String str, String str2) {
        return this.storeHelper.getData(String.format(DELETED_STREAMS_PATH, getScopedStreamName(str, str2)), bArr -> {
            return Integer.valueOf(BitConverter.readInt(bArr, 0));
        }).handleAsync((versionedMetadata, th) -> {
            if (th == null) {
                return Integer.valueOf(((Integer) versionedMetadata.getObject()).intValue() + 1);
            }
            if (th instanceof StoreException.DataNotFoundException) {
                return 0;
            }
            log.error("Problem found while getting a safe starting segment number for {}.", getScopedStreamName(str, str2), th);
            throw new CompletionException(th);
        });
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    CompletableFuture<Void> recordLastStreamSegment(String str, String str2, int i, OperationContext operationContext, Executor executor) {
        String format = String.format(DELETED_STREAMS_PATH, getScopedStreamName(str, str2));
        byte[] bArr = new byte[4];
        BitConverter.writeInt(bArr, 0, i);
        return this.storeHelper.getData(format, bArr2 -> {
            return Integer.valueOf(BitConverter.readInt(bArr2, 0));
        }).exceptionally(th -> {
            if (th instanceof StoreException.DataNotFoundException) {
                return null;
            }
            throw new CompletionException(th);
        }).thenCompose(versionedMetadata -> {
            log.debug("Recording last segment {} for stream {}/{} on deletion.", new Object[]{Integer.valueOf(i), str, str2});
            if (versionedMetadata == null) {
                return Futures.toVoid(this.storeHelper.createZNodeIfNotExist(format, bArr));
            }
            int intValue = ((Integer) versionedMetadata.getObject()).intValue();
            Preconditions.checkArgument(i >= intValue, "Old last active segment ({}) for {}/{} is higher than current one {}.", Integer.valueOf(intValue), str, str2, Integer.valueOf(i));
            return Futures.toVoid(this.storeHelper.setData(format, bArr, versionedMetadata.getVersion()));
        });
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore, io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> deleteStream(String str, String str2, OperationContext operationContext, Executor executor) {
        ZKScope zKScope = (ZKScope) getScope(str);
        return Futures.exceptionallyExpecting(((ZKStream) getStream(str, str2, operationContext)).getStreamPosition().thenCompose(num -> {
            return zKScope.removeStreamFromScope(str2, num.intValue());
        }), th -> {
            return Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException;
        }, (Object) null).thenCompose(r11 -> {
            return super.deleteStream(str, str2, operationContext, executor);
        });
    }

    @VisibleForTesting
    public void setStoreHelperForTesting(ZKStoreHelper zKStoreHelper) {
        this.storeHelper = zKStoreHelper;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.completedTxnGC.stopAsync();
        this.completedTxnGC.awaitTerminated();
    }

    @SuppressFBWarnings(justification = "generated code")
    ZKStoreHelper getStoreHelper() {
        return this.storeHelper;
    }
}
