package io.pravega.controller.store.stream;

import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.RetentionPolicy;
import io.pravega.client.stream.impl.StreamImpl;
import io.pravega.controller.server.rest.generated.api.ApiResponseMessage;
import io.pravega.controller.server.retention.BucketChangeListener;
import io.pravega.controller.server.retention.BucketOwnershipListener;
import io.pravega.controller.store.index.ZKHostIndex;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.tables.Data;
import io.pravega.controller.util.Config;
import java.io.IOException;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/pravega/controller/store/stream/ZKStreamMetadataStore.class */
public class ZKStreamMetadataStore extends AbstractStreamMetadataStore {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(ZKStreamMetadataStore.class);
    private final ZKStoreHelper storeHelper;
    private final ConcurrentMap<Integer, PathChildrenCache> bucketCacheMap;
    private final AtomicReference<PathChildrenCache> bucketOwnershipCacheRef;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.pravega.controller.store.stream.ZKStreamMetadataStore$1, reason: invalid class name */
    /* loaded from: input_file:io/pravega/controller/store/stream/ZKStreamMetadataStore$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type = new int[PathChildrenCacheEvent.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_ADDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_REMOVED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_UPDATED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CONNECTION_LOST.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    ZKStreamMetadataStore(CuratorFramework curatorFramework, Executor executor) {
        this(curatorFramework, Config.BUCKET_COUNT, executor);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ZKStreamMetadataStore(CuratorFramework curatorFramework, int i, Executor executor) {
        super(new ZKHostIndex(curatorFramework, "/hostTxnIndex", executor), i);
        initialize();
        this.storeHelper = new ZKStoreHelper(curatorFramework, executor);
        this.bucketCacheMap = new ConcurrentHashMap();
        this.bucketOwnershipCacheRef = new AtomicReference<>();
    }

    private void initialize() {
        METRICS_PROVIDER.start();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    public ZKStream newStream(String str, String str2) {
        return new ZKStream(str, str2, this.storeHelper);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // io.pravega.controller.store.stream.AbstractStreamMetadataStore
    public ZKScope newScope(String str) {
        return new ZKScope(str, this.storeHelper);
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<String> getScopeConfiguration(String str) {
        return this.storeHelper.checkExists(String.format("/store/%s", str)).thenApply(bool -> {
            if (bool.booleanValue()) {
                return str;
            }
            throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, str);
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<String>> listScopes() {
        return this.storeHelper.listScopes();
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Boolean> checkStreamExists(String str, String str2) {
        return this.storeHelper.checkExists(newStream(str, str2).getStreamPath());
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public void registerBucketOwnershipListener(BucketOwnershipListener bucketOwnershipListener) {
        Preconditions.checkNotNull(bucketOwnershipListener);
        PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildrenCacheEvent) -> {
            switch (AnonymousClass1.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
                case ApiResponseMessage.ERROR /* 1 */:
                    return;
                case ApiResponseMessage.WARNING /* 2 */:
                    bucketOwnershipListener.notify(new BucketOwnershipListener.BucketNotification(Integer.parseInt(ZKPaths.getNodeFromPath(pathChildrenCacheEvent.getData().getPath())), BucketOwnershipListener.BucketNotification.NotificationType.BucketAvailable));
                    return;
                case ApiResponseMessage.INFO /* 3 */:
                default:
                    log.warn("Received unknown event {}", pathChildrenCacheEvent.getType());
                    return;
                case ApiResponseMessage.OK /* 4 */:
                    bucketOwnershipListener.notify(new BucketOwnershipListener.BucketNotification(Integer.MIN_VALUE, BucketOwnershipListener.BucketNotification.NotificationType.ConnectivityError));
                    return;
            }
        };
        this.bucketOwnershipCacheRef.compareAndSet(null, new PathChildrenCache(this.storeHelper.getClient(), "/buckets/ownership", true));
        this.bucketOwnershipCacheRef.get().getListenable().addListener(pathChildrenCacheListener);
        this.bucketOwnershipCacheRef.get().start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        log.info("bucket ownership listener registered");
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public void unregisterBucketOwnershipListener() {
        if (this.bucketOwnershipCacheRef.get() != null) {
            try {
                this.bucketOwnershipCacheRef.get().clear();
                this.bucketOwnershipCacheRef.get().close();
            } catch (IOException e) {
                log.warn("unable to close listener for bucket ownership {}", e);
            }
        }
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public void registerBucketChangeListener(int i, BucketChangeListener bucketChangeListener) {
        Preconditions.checkNotNull(bucketChangeListener);
        PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildrenCacheEvent) -> {
            switch (AnonymousClass1.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
                case ApiResponseMessage.ERROR /* 1 */:
                    StreamImpl streamFromPath = getStreamFromPath(pathChildrenCacheEvent.getData().getPath());
                    bucketChangeListener.notify(new BucketChangeListener.StreamNotification(streamFromPath.getScope(), streamFromPath.getStreamName(), BucketChangeListener.StreamNotification.NotificationType.StreamAdded));
                    return;
                case ApiResponseMessage.WARNING /* 2 */:
                    StreamImpl streamFromPath2 = getStreamFromPath(pathChildrenCacheEvent.getData().getPath());
                    bucketChangeListener.notify(new BucketChangeListener.StreamNotification(streamFromPath2.getScope(), streamFromPath2.getStreamName(), BucketChangeListener.StreamNotification.NotificationType.StreamRemoved));
                    return;
                case ApiResponseMessage.INFO /* 3 */:
                    StreamImpl streamFromPath3 = getStreamFromPath(pathChildrenCacheEvent.getData().getPath());
                    bucketChangeListener.notify(new BucketChangeListener.StreamNotification(streamFromPath3.getScope(), streamFromPath3.getStreamName(), BucketChangeListener.StreamNotification.NotificationType.StreamUpdated));
                    return;
                case ApiResponseMessage.OK /* 4 */:
                    bucketChangeListener.notify(new BucketChangeListener.StreamNotification(null, null, BucketChangeListener.StreamNotification.NotificationType.ConnectivityError));
                    return;
                default:
                    log.warn("Received unknown event {} on bucket", pathChildrenCacheEvent.getType(), Integer.valueOf(i));
                    return;
            }
        };
        this.bucketCacheMap.put(Integer.valueOf(i), new PathChildrenCache(this.storeHelper.getClient(), String.format("/buckets/%d", Integer.valueOf(i)), true));
        PathChildrenCache pathChildrenCache = this.bucketCacheMap.get(Integer.valueOf(i));
        pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
        pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);
        log.info("bucket {} change notification listener registered", Integer.valueOf(i));
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public void unregisterBucketListener(int i) {
        PathChildrenCache remove = this.bucketCacheMap.remove(Integer.valueOf(i));
        if (remove != null) {
            try {
                remove.clear();
                remove.close();
            } catch (IOException e) {
                log.warn("unable to close watch on bucket {} with exception {}", Integer.valueOf(i), e);
            }
        }
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Boolean> takeBucketOwnership(int i, String str, Executor executor) {
        Preconditions.checkArgument(i < this.bucketCount);
        String makePath = ZKPaths.makePath("/buckets/ownership", String.valueOf(i));
        return this.storeHelper.createEphemeralZNode(makePath, SerializationUtils.serialize(str)).thenCompose(bool -> {
            return !bool.booleanValue() ? this.storeHelper.getData(makePath).thenApply(data -> {
                return Boolean.valueOf(SerializationUtils.deserialize(data.getData()).equals(str));
            }) : CompletableFuture.completedFuture(true);
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<List<String>> getStreamsForBucket(int i, Executor executor) {
        return this.storeHelper.getChildren(String.format("/buckets/%d", Integer.valueOf(i))).thenApply(list -> {
            return (List) list.stream().map(this::decodedScopedStreamName).collect(Collectors.toList());
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> addUpdateStreamForAutoStreamCut(String str, String str2, RetentionPolicy retentionPolicy, OperationContext operationContext, Executor executor) {
        Preconditions.checkNotNull(retentionPolicy);
        String format = String.format("/buckets/%d/%s", Integer.valueOf(getBucket(str, str2)), encodedScopedStreamName(str, str2));
        byte[] serialize = SerializationUtils.serialize(retentionPolicy);
        return this.storeHelper.getData(format).exceptionally(th -> {
            if (th instanceof StoreException.DataNotFoundException) {
                return null;
            }
            throw new CompletionException(th);
        }).thenCompose(data -> {
            return data == null ? this.storeHelper.createZNodeIfNotExist(format, serialize) : this.storeHelper.setData(format, new Data<>(serialize, data.getVersion()));
        });
    }

    @Override // io.pravega.controller.store.stream.StreamMetadataStore
    public CompletableFuture<Void> removeStreamFromAutoStreamCut(String str, String str2, OperationContext operationContext, Executor executor) {
        return this.storeHelper.deleteNode(String.format("/buckets/%d/%s", Integer.valueOf(getBucket(str, str2)), encodedScopedStreamName(str, str2))).exceptionally(th -> {
            if (th instanceof StoreException.DataNotFoundException) {
                return null;
            }
            throw new CompletionException(th);
        });
    }

    private String encodedScopedStreamName(String str, String str2) {
        return Base64.getEncoder().encodeToString(getScopedStreamName(str, str2).getBytes());
    }

    private String decodedScopedStreamName(String str) {
        return new String(Base64.getDecoder().decode(str));
    }

    private StreamImpl getStreamFromPath(String str) {
        String[] split = decodedScopedStreamName(ZKPaths.getNodeFromPath(str)).split("/");
        return new StreamImpl(split[0], split[1]);
    }
}
