package io.pravega.controller.store.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.RetentionPolicy;
import io.pravega.client.stream.impl.StreamImpl;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.lang.AtomicInt96;
import io.pravega.common.lang.Int96;
import io.pravega.common.util.BitConverter;
import io.pravega.controller.server.rest.generated.api.ApiResponseMessage;
import io.pravega.controller.server.retention.BucketChangeListener;
import io.pravega.controller.server.retention.BucketOwnershipListener;
import io.pravega.controller.store.index.ZKHostIndex;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.Version;
import io.pravega.controller.util.Config;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/pravega/controller/store/stream/ZKStreamMetadataStore.class */
public class ZKStreamMetadataStore extends AbstractStreamMetadataStore implements AutoCloseable {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(ZKStreamMetadataStore.class);

    @VisibleForTesting
    static final int COUNTER_RANGE = 10000;
    static final String COUNTER_PATH = "/counter";
    static final String DELETED_STREAMS_PATH = "/lastActiveStreamSegment/%s";
    static final String BUCKET_ROOT_PATH = "/buckets";
    static final String BUCKET_OWNERSHIP_PATH = "/buckets/ownership";
    static final String BUCKET_PATH = "/buckets/%d";
    private static final String RETENTION_PATH = "/buckets/%d/%s";
    private static final String TRANSACTION_ROOT_PATH = "/transactions";
    private static final String COMPLETED_TXN_GC_NAME = "completedTxnGC";
    static final String ACTIVE_TX_ROOT_PATH = "/transactions/activeTx";
    static final String COMPLETED_TX_ROOT_PATH = "/transactions/completedTx";
    static final String COMPLETED_TX_BATCH_ROOT_PATH = "/transactions/completedTx/batches";
    static final String COMPLETED_TX_BATCH_PATH = "/transactions/completedTx/batches/%d";

    @VisibleForTesting
    private ZKStoreHelper storeHelper;
    private final ConcurrentMap<Integer, PathChildrenCache> bucketCacheMap;
    private final AtomicReference<PathChildrenCache> bucketOwnershipCacheRef;
    private final Object lock;

    @GuardedBy("lock")
    private final AtomicInt96 limit;

    @GuardedBy("lock")
    private final AtomicInt96 counter;

    @GuardedBy("lock")
    private volatile CompletableFuture<Void> refreshFutureRef;
    private final ZKGarbageCollector completedTxnGC;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.pravega.controller.store.stream.ZKStreamMetadataStore$1, reason: invalid class name */
    /* loaded from: input_file:io/pravega/controller/store/stream/ZKStreamMetadataStore$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type = new int[PathChildrenCacheEvent.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_ADDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_REMOVED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_UPDATED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CONNECTION_LOST.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    ZKStreamMetadataStore(CuratorFramework curatorFramework, Executor executor) {
        this(curatorFramework, Config.BUCKET_COUNT, executor);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public ZKStreamMetadataStore(CuratorFramework curatorFramework, int i, Executor executor) {
        this(curatorFramework, i, executor, Duration.ofHours(Config.COMPLETED_TRANSACTION_TTL_IN_HOURS));
    }

    @VisibleForTesting
    ZKStreamMetadataStore(CuratorFramework curatorFramework, int i, Executor executor, Duration duration) {
        super(new ZKHostIndex(curatorFramework, "/hostTxnIndex", executor), i);
        this.storeHelper = new ZKStoreHelper(curatorFramework, executor);
        this.bucketCacheMap = new ConcurrentHashMap();
        this.bucketOwnershipCacheRef = new AtomicReference<>();
        this.lock = new Object();
        this.counter = new AtomicInt96();
        this.limit = new AtomicInt96();
        this.refreshFutureRef = null;
        this.completedTxnGC = new ZKGarbageCollector(COMPLETED_TXN_GC_NAME, this.storeHelper, this::gcCompletedTxn, duration);
        this.completedTxnGC.startAsync();
        this.completedTxnGC.awaitRunning();
    }

    private CompletableFuture<Void> gcCompletedTxn() {
        return this.storeHelper.getChildren(COMPLETED_TX_BATCH_ROOT_PATH).thenApply(list -> {
            List list = (List) list.stream().map(Long::parseLong).sorted().collect(Collectors.toList());
            return list.size() > 2 ? list.subList(0, list.size() - 2) : new ArrayList();
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) list2 -> {
            log.debug("deleting batches {} on new scheme" + list2);
            return Futures.allOf((Collection) list2.stream().map(l -> {
                return this.storeHelper.deleteTree(String.format(COMPLETED_TX_BATCH_PATH, l));
            }).collect(Collectors.toList()));
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    public ZKStream newStream(String str, String str2) {
        ZKStoreHelper zKStoreHelper = this.storeHelper;
        ZKGarbageCollector zKGarbageCollector = this.completedTxnGC;
        zKGarbageCollector.getClass();
        return new ZKStream(str, str2, zKStoreHelper, zKGarbageCollector::getLatestBatch);
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    CompletableFuture<Int96> getNextCounter() {
        CompletableFuture thenCompose;
        synchronized (this.lock) {
            Int96 incrementAndGet = this.counter.incrementAndGet();
            thenCompose = incrementAndGet.compareTo(this.limit.get()) > 0 ? refreshRangeIfNeeded().thenCompose(r3 -> {
                return getNextCounter();
            }) : CompletableFuture.completedFuture(incrementAndGet);
        }
        return thenCompose;
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    Version getEmptyVersion() {
        return Version.IntVersion.EMPTY;
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    Version parseVersionData(byte[] bArr) {
        return Version.IntVersion.fromBytes(bArr);
    }

    @VisibleForTesting
    CompletableFuture<Void> refreshRangeIfNeeded() {
        CompletableFuture<Void> completableFuture;
        synchronized (this.lock) {
            completableFuture = this.refreshFutureRef;
            if (this.refreshFutureRef == null) {
                if (this.counter.get().compareTo(this.limit.get()) >= 0) {
                    log.info("Refreshing counter range. Current counter is {}. Current limit is {}", this.counter.get(), this.limit.get());
                    this.refreshFutureRef = getRefreshFuture().exceptionally(th -> {
                        synchronized (this.lock) {
                            this.refreshFutureRef = null;
                        }
                        log.warn("Exception thrown while trying to refresh transaction counter range", th);
                        throw new CompletionException(th);
                    });
                    completableFuture = this.refreshFutureRef;
                } else {
                    completableFuture = CompletableFuture.completedFuture(null);
                }
            }
        }
        return completableFuture;
    }

    @VisibleForTesting
    CompletableFuture<Void> getRefreshFuture() {
        return this.storeHelper.createZNodeIfNotExist(COUNTER_PATH, Int96.ZERO.toBytes()).thenCompose(num -> {
            return this.storeHelper.getData(COUNTER_PATH).thenCompose(data -> {
                Int96 fromBytes = Int96.fromBytes(data.getData());
                Int96 add = fromBytes.add(COUNTER_RANGE);
                return this.storeHelper.setData(COUNTER_PATH, new Data(add.toBytes(), data.getVersion())).thenAccept(num -> {
                    synchronized (this.lock) {
                        this.counter.set(fromBytes.getMsb(), fromBytes.getLsb());
                        this.limit.set(add.getMsb(), add.getLsb());
                        this.refreshFutureRef = null;
                        log.info("Refreshed counter range. Current counter is {}. Current limit is {}", this.counter.get(), this.limit.get());
                    }
                });
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    public ZKScope newScope(String str) {
        return new ZKScope(str, this.storeHelper);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<String> getScopeConfiguration(String str) {
        return this.storeHelper.checkExists(String.format("/store/%s", str)).thenApply(bool -> {
            if (bool.booleanValue()) {
                return str;
            }
            throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, str);
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<String>> listScopes() {
        return this.storeHelper.listScopes();
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Boolean> checkStreamExists(String str, String str2) {
        return this.storeHelper.checkExists(newStream(str, str2).getStreamPath());
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    public CompletableFuture<Integer> getSafeStartingSegmentNumberFor(String str, String str2) {
        return this.storeHelper.getData(String.format(DELETED_STREAMS_PATH, getScopedStreamName(str, str2))).handleAsync((data, th) -> {
            if (th == null) {
                return Integer.valueOf(BitConverter.readInt(data.getData(), 0) + 1);
            }
            if (th instanceof StoreException.DataNotFoundException) {
                return 0;
            }
            log.error("Problem found while getting a safe starting segment number for {}.", getScopedStreamName(str, str2), th);
            throw new CompletionException(th);
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public void registerBucketOwnershipListener(BucketOwnershipListener bucketOwnershipListener) {
        Preconditions.checkNotNull(bucketOwnershipListener);
        PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildrenCacheEvent) -> {
            switch (AnonymousClass1.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
                case ApiResponseMessage.ERROR /* 1 */:
                    return;
                case ApiResponseMessage.WARNING /* 2 */:
                    bucketOwnershipListener.notify(new BucketOwnershipListener.BucketNotification(Integer.parseInt(ZKPaths.getNodeFromPath(pathChildrenCacheEvent.getData().getPath())), BucketOwnershipListener.BucketNotification.NotificationType.BucketAvailable));
                    return;
                case ApiResponseMessage.INFO /* 3 */:
                default:
                    log.warn("Received unknown event {}", pathChildrenCacheEvent.getType());
                    return;
                case ApiResponseMessage.OK /* 4 */:
                    bucketOwnershipListener.notify(new BucketOwnershipListener.BucketNotification(Integer.MIN_VALUE, BucketOwnershipListener.BucketNotification.NotificationType.ConnectivityError));
                    return;
            }
        };
        this.bucketOwnershipCacheRef.compareAndSet(null, new PathChildrenCache(this.storeHelper.getClient(), BUCKET_OWNERSHIP_PATH, true));
        this.bucketOwnershipCacheRef.get().getListenable().addListener(pathChildrenCacheListener);
        this.bucketOwnershipCacheRef.get().start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        log.info("bucket ownership listener registered");
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public void unregisterBucketOwnershipListener() {
        if (this.bucketOwnershipCacheRef.get() != null) {
            try {
                this.bucketOwnershipCacheRef.get().clear();
                this.bucketOwnershipCacheRef.get().close();
            } catch (IOException e) {
                log.warn("unable to close listener for bucket ownership {}", e);
            }
        }
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public void registerBucketChangeListener(int i, BucketChangeListener bucketChangeListener) {
        Preconditions.checkNotNull(bucketChangeListener);
        PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildrenCacheEvent) -> {
            switch (AnonymousClass1.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
                case ApiResponseMessage.ERROR /* 1 */:
                    StreamImpl streamFromPath = getStreamFromPath(pathChildrenCacheEvent.getData().getPath());
                    bucketChangeListener.notify(new BucketChangeListener.StreamNotification(streamFromPath.getScope(), streamFromPath.getStreamName(), BucketChangeListener.StreamNotification.NotificationType.StreamAdded));
                    return;
                case ApiResponseMessage.WARNING /* 2 */:
                    StreamImpl streamFromPath2 = getStreamFromPath(pathChildrenCacheEvent.getData().getPath());
                    bucketChangeListener.notify(new BucketChangeListener.StreamNotification(streamFromPath2.getScope(), streamFromPath2.getStreamName(), BucketChangeListener.StreamNotification.NotificationType.StreamRemoved));
                    return;
                case ApiResponseMessage.INFO /* 3 */:
                    StreamImpl streamFromPath3 = getStreamFromPath(pathChildrenCacheEvent.getData().getPath());
                    bucketChangeListener.notify(new BucketChangeListener.StreamNotification(streamFromPath3.getScope(), streamFromPath3.getStreamName(), BucketChangeListener.StreamNotification.NotificationType.StreamUpdated));
                    return;
                case ApiResponseMessage.OK /* 4 */:
                    bucketChangeListener.notify(new BucketChangeListener.StreamNotification(null, null, BucketChangeListener.StreamNotification.NotificationType.ConnectivityError));
                    return;
                default:
                    log.warn("Received unknown event {} on bucket", pathChildrenCacheEvent.getType(), Integer.valueOf(i));
                    return;
            }
        };
        this.bucketCacheMap.put(Integer.valueOf(i), new PathChildrenCache(this.storeHelper.getClient(), String.format(BUCKET_PATH, Integer.valueOf(i)), true));
        PathChildrenCache pathChildrenCache = this.bucketCacheMap.get(Integer.valueOf(i));
        pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
        pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);
        log.info("bucket {} change notification listener registered", Integer.valueOf(i));
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public void unregisterBucketListener(int i) {
        PathChildrenCache remove = this.bucketCacheMap.remove(Integer.valueOf(i));
        if (remove != null) {
            try {
                remove.clear();
                remove.close();
            } catch (IOException e) {
                log.warn("unable to close watch on bucket {} with exception {}", Integer.valueOf(i), e);
            }
        }
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Boolean> takeBucketOwnership(int i, String str, Executor executor) {
        Preconditions.checkArgument(i < this.bucketCount);
        String makePath = ZKPaths.makePath(BUCKET_OWNERSHIP_PATH, String.valueOf(i));
        return this.storeHelper.createEphemeralZNode(makePath, SerializationUtils.serialize(str)).thenCompose(bool -> {
            return !bool.booleanValue() ? this.storeHelper.getData(makePath).thenApply(data -> {
                return Boolean.valueOf(SerializationUtils.deserialize(data.getData()).equals(str));
            }) : CompletableFuture.completedFuture(true);
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<String>> getStreamsForBucket(int i, Executor executor) {
        return this.storeHelper.getChildren(String.format(BUCKET_PATH, Integer.valueOf(i))).thenApply(list -> {
            return (List) list.stream().map(this::decodedScopedStreamName).collect(Collectors.toList());
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> addUpdateStreamForAutoStreamCut(String str, String str2, RetentionPolicy retentionPolicy, OperationContext operationContext, Executor executor) {
        Preconditions.checkNotNull(retentionPolicy);
        String format = String.format(RETENTION_PATH, Integer.valueOf(getBucket(str, str2)), encodedScopedStreamName(str, str2));
        byte[] serialize = SerializationUtils.serialize(retentionPolicy);
        return this.storeHelper.getData(format).exceptionally(th -> {
            if (th instanceof StoreException.DataNotFoundException) {
                return null;
            }
            throw new CompletionException(th);
        }).thenCompose(data -> {
            return data == null ? Futures.toVoid(this.storeHelper.createZNodeIfNotExist(format, serialize)) : Futures.toVoid(this.storeHelper.setData(format, new Data(serialize, data.getVersion())));
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> removeStreamFromAutoStreamCut(String str, String str2, OperationContext operationContext, Executor executor) {
        return this.storeHelper.deleteNode(String.format(RETENTION_PATH, Integer.valueOf(getBucket(str, str2)), encodedScopedStreamName(str, str2))).exceptionally(th -> {
            if (th instanceof StoreException.DataNotFoundException) {
                return null;
            }
            throw new CompletionException(th);
        });
    }

    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    CompletableFuture<Void> recordLastStreamSegment(String str, String str2, int i, OperationContext operationContext, Executor executor) {
        String format = String.format(DELETED_STREAMS_PATH, getScopedStreamName(str, str2));
        byte[] bArr = new byte[4];
        BitConverter.writeInt(bArr, 0, i);
        return this.storeHelper.getData(format).exceptionally(th -> {
            if (th instanceof StoreException.DataNotFoundException) {
                return null;
            }
            throw new CompletionException(th);
        }).thenCompose(data -> {
            log.debug("Recording last segment {} for stream {}/{} on deletion.", new Object[]{Integer.valueOf(i), str, str2});
            if (data == null) {
                return Futures.toVoid(this.storeHelper.createZNodeIfNotExist(format, bArr));
            }
            int readInt = BitConverter.readInt(data.getData(), 0);
            Preconditions.checkArgument(i >= readInt, "Old last active segment ({}) for {}/{} is higher than current one {}.", Integer.valueOf(readInt), str, str2, Integer.valueOf(i));
            return Futures.toVoid(this.storeHelper.setData(format, new Data(bArr, data.getVersion())));
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> createBucketsRoot() {
        return initializeZNode(BUCKET_ROOT_PATH).thenCompose(r4 -> {
            return initializeZNode(BUCKET_OWNERSHIP_PATH);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r7 -> {
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < this.bucketCount; i++) {
                arrayList.add(initializeZNode(String.format(BUCKET_PATH, Integer.valueOf(i))));
            }
            return Futures.allOf(arrayList);
        });
    }

    private CompletableFuture<Void> initializeZNode(String str) {
        return this.storeHelper.createZNodeIfNotExist(str).handle((num, th) -> {
            if (th == null) {
                log.debug("Stream bucket correctly initialized: {}.", str);
                return null;
            }
            if (!(Exceptions.unwrap(th) instanceof StoreException.DataExistsException)) {
                throw new CompletionException("Unexpected exception initializing Stream bucket.", th);
            }
            log.debug("Stream bucket already initialized: {}.", str);
            return null;
        });
    }

    private String encodedScopedStreamName(String str, String str2) {
        return Base64.getEncoder().encodeToString(getScopedStreamName(str, str2).getBytes());
    }

    private String decodedScopedStreamName(String str) {
        return new String(Base64.getDecoder().decode(str));
    }

    private StreamImpl getStreamFromPath(String str) {
        String[] split = decodedScopedStreamName(ZKPaths.getNodeFromPath(str)).split("/");
        return new StreamImpl(split[0], split[1]);
    }

    @VisibleForTesting
    void setCounterAndLimitForTesting(int i, long j, int i2, long j2) {
        synchronized (this.lock) {
            this.limit.set(i2, j2);
            this.counter.set(i, j);
        }
    }

    @VisibleForTesting
    Int96 getLimitForTesting() {
        Int96 int96;
        synchronized (this.lock) {
            int96 = this.limit.get();
        }
        return int96;
    }

    @VisibleForTesting
    Int96 getCounterForTesting() {
        Int96 int96;
        synchronized (this.lock) {
            int96 = this.counter.get();
        }
        return int96;
    }

    @VisibleForTesting
    public void setStoreHelperForTesting(ZKStoreHelper zKStoreHelper) {
        this.storeHelper = zKStoreHelper;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.completedTxnGC.stopAsync();
        this.completedTxnGC.awaitTerminated();
    }

    @SuppressFBWarnings(justification = "generated code")
    ZKStoreHelper getStoreHelper() {
        return this.storeHelper;
    }
}
