package io.pravega.controller.store.stream;

import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.impl.StreamImpl;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.controller.store.ZKStoreHelper;
import io.pravega.controller.store.client.StoreType;
import io.pravega.controller.store.stream.BucketStore;
import io.pravega.controller.store.stream.StoreException;
import java.util.Base64;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/store/stream/ZookeeperBucketStore.class */
public class ZookeeperBucketStore implements BucketStore {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ZookeeperBucketStore.class);
    private static final String ROOT_PATH = "/";
    private static final String OWNERSHIP_CHILD_PATH = "ownership";
    private final ImmutableMap<BucketStore.ServiceType, Integer> bucketCountMap;
    private final ZKStoreHelper storeHelper;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ZookeeperBucketStore(ImmutableMap<BucketStore.ServiceType, Integer> immutableMap, CuratorFramework curatorFramework, Executor executor) {
        this.bucketCountMap = immutableMap;
        this.storeHelper = new ZKStoreHelper(curatorFramework, executor);
    }

    @Override // io.pravega.controller.store.stream.BucketStore
    public StoreType getStoreType() {
        return StoreType.Zookeeper;
    }

    @Override // io.pravega.controller.store.stream.BucketStore
    public int getBucketCount(BucketStore.ServiceType serviceType) {
        return ((Integer) this.bucketCountMap.get(serviceType)).intValue();
    }

    public CompletableFuture<Void> createBucketsRoot(BucketStore.ServiceType serviceType) {
        String bucketRootPath = getBucketRootPath(serviceType);
        String bucketOwnershipPath = getBucketOwnershipPath(serviceType);
        return Futures.toVoid(this.storeHelper.createZNodeIfNotExist(bucketRootPath).thenCompose(num -> {
            return this.storeHelper.createZNodeIfNotExist(bucketOwnershipPath);
        }));
    }

    public CompletableFuture<Void> createBucket(BucketStore.ServiceType serviceType, int i) {
        String bucketPath = getBucketPath(serviceType, i);
        return Futures.toVoid(createBucketsRoot(serviceType).thenCompose(r5 -> {
            return this.storeHelper.createZNodeIfNotExist(bucketPath);
        }));
    }

    @Override // io.pravega.controller.store.stream.BucketStore
    public CompletableFuture<Set<String>> getStreamsForBucket(BucketStore.ServiceType serviceType, int i, Executor executor) {
        return this.storeHelper.getChildren(getBucketPath(serviceType, i)).thenApply(list -> {
            return (Set) list.stream().map(this::decodedScopedStreamName).collect(Collectors.toSet());
        });
    }

    @Override // io.pravega.controller.store.stream.BucketStore
    public CompletableFuture<Void> addStreamToBucketStore(BucketStore.ServiceType serviceType, String str, String str2, Executor executor) {
        String makePath = ZKPaths.makePath(getBucketPath(serviceType, BucketStore.getBucket(str, str2, ((Integer) this.bucketCountMap.get(serviceType)).intValue())), encodedScopedStreamName(str, str2));
        return this.storeHelper.checkExists(makePath).thenCompose(bool -> {
            return bool.booleanValue() ? CompletableFuture.completedFuture(null) : Futures.toVoid(this.storeHelper.createZNodeIfNotExist(makePath));
        });
    }

    @Override // io.pravega.controller.store.stream.BucketStore
    public CompletableFuture<Void> removeStreamFromBucketStore(BucketStore.ServiceType serviceType, String str, String str2, Executor executor) {
        return Futures.exceptionallyExpecting(this.storeHelper.deleteNode(ZKPaths.makePath(getBucketPath(serviceType, BucketStore.getBucket(str, str2, ((Integer) this.bucketCountMap.get(serviceType)).intValue())), encodedScopedStreamName(str, str2))), th -> {
            return Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException;
        }, (Object) null);
    }

    public CompletableFuture<Boolean> takeBucketOwnership(BucketStore.ServiceType serviceType, int i, String str) {
        String makePath = ZKPaths.makePath(getBucketOwnershipPath(serviceType), "" + i);
        return this.storeHelper.createEphemeralZNode(makePath, SerializationUtils.serialize(str)).thenCompose(bool -> {
            return !bool.booleanValue() ? this.storeHelper.getData(makePath, bArr -> {
                return bArr;
            }).thenApply(versionedMetadata -> {
                return Boolean.valueOf(SerializationUtils.deserialize((byte[]) versionedMetadata.getObject()).equals(str));
            }) : CompletableFuture.completedFuture(true);
        });
    }

    public PathChildrenCache getBucketPathChildrenCache(BucketStore.ServiceType serviceType, int i) {
        return this.storeHelper.getPathChildrenCache(getBucketPath(serviceType, i), true);
    }

    public PathChildrenCache getServiceOwnershipPathChildrenCache(BucketStore.ServiceType serviceType) {
        return this.storeHelper.getPathChildrenCache(getBucketOwnershipPath(serviceType), true);
    }

    public StreamImpl getStreamFromPath(String str) {
        String[] split = decodedScopedStreamName(ZKPaths.getNodeFromPath(str)).split(ROOT_PATH);
        return new StreamImpl(split[0], split[1]);
    }

    private String encodedScopedStreamName(String str, String str2) {
        return Base64.getEncoder().encodeToString(BucketStore.getScopedStreamName(str, str2).getBytes());
    }

    private String decodedScopedStreamName(String str) {
        return new String(Base64.getDecoder().decode(str));
    }

    private String getBucketRootPath(BucketStore.ServiceType serviceType) {
        return ZKPaths.makePath(ROOT_PATH, serviceType.getName());
    }

    private String getBucketOwnershipPath(BucketStore.ServiceType serviceType) {
        return ZKPaths.makePath(getBucketRootPath(serviceType), OWNERSHIP_CHILD_PATH);
    }

    private String getBucketPath(BucketStore.ServiceType serviceType, int i) {
        return ZKPaths.makePath(ZKPaths.makePath(ROOT_PATH, serviceType.getName()), "" + i);
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public ZKStoreHelper getStoreHelper() {
        return this.storeHelper;
    }
}
