package org.apache.bookkeeper.stream.storage.impl.cluster;

import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
import org.apache.bookkeeper.stream.storage.StorageConstants;
import org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
import org.apache.bookkeeper.stream.storage.exceptions.StorageRuntimeException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.metadata.DLMetadata;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.class */
public class ZkClusterMetadataStore implements ClusterMetadataStore {
    private static final Logger log = LoggerFactory.getLogger(ZkClusterMetadataStore.class);
    private final CuratorFramework client;
    private final String zkServers;
    private final String zkRootPath;
    private final String zkClusterMetadataPath;
    private final String zkClusterAssignmentPath;
    private NodeCache assignmentDataCache;
    private volatile boolean closed = false;
    private final Map<Consumer<Void>, NodeCacheListener> assignmentDataConsumers = new HashMap();

    public ZkClusterMetadataStore(CuratorFramework curatorFramework, String str, String str2) {
        this.client = curatorFramework;
        this.zkServers = str;
        this.zkRootPath = str2;
        this.zkClusterMetadataPath = StorageConstants.getClusterMetadataPath(str2);
        this.zkClusterAssignmentPath = StorageConstants.getClusterAssignmentPath(str2);
    }

    synchronized int getNumWatchers() {
        return this.assignmentDataConsumers.size();
    }

    public void close() {
        synchronized (this) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (null != this.assignmentDataCache) {
                try {
                    this.assignmentDataCache.close();
                } catch (IOException e) {
                    log.warn("Failed to close assignment data cache", e);
                }
            }
        }
    }

    public boolean initializeCluster(int i, Optional<String> optional) {
        try {
            this.client.transaction().forOperations(new CuratorOp[]{(CuratorOp) this.client.transactionOp().create().forPath(this.zkRootPath), (CuratorOp) this.client.transactionOp().create().forPath(this.zkClusterMetadataPath, ClusterMetadata.newBuilder().setNumStorageContainers(i).build().toByteArray()), (CuratorOp) this.client.transactionOp().create().forPath(this.zkClusterAssignmentPath, ClusterAssignmentData.newBuilder().build().toByteArray()), (CuratorOp) this.client.transactionOp().create().forPath(StorageConstants.getServersPath(this.zkRootPath)), (CuratorOp) this.client.transactionOp().create().forPath(StorageConstants.getWritableServersPath(this.zkRootPath)), (CuratorOp) this.client.transactionOp().create().forPath(StorageConstants.getStoragePath(this.zkRootPath), DLMetadata.create(new BKDLConfig(this.zkServers, optional.orElse(StorageConstants.getSegmentsRootPath(this.zkRootPath)))).serialize())});
            return true;
        } catch (Exception e) {
            if (!(e instanceof KeeperException.NodeExistsException)) {
                throw new StorageRuntimeException("Failed to initialize storage cluster with " + i + " storage containers", e);
            }
            log.info("Stream storage cluster is already initialized.");
            return false;
        }
    }

    public ClusterAssignmentData getClusterAssignmentData() {
        try {
            return ClusterAssignmentData.parseFrom((byte[]) this.client.getData().forPath(this.zkClusterAssignmentPath));
        } catch (Exception e) {
            throw new StorageRuntimeException("Failed to fetch cluster assignment data from zookeeper @" + this.zkClusterAssignmentPath, e);
        } catch (InvalidProtocolBufferException e2) {
            throw new StorageRuntimeException("The cluster assignment data from zookeeper @" + this.zkClusterAssignmentPath + " is corrupted", e2);
        }
    }

    public void updateClusterAssignmentData(ClusterAssignmentData clusterAssignmentData) {
        try {
            this.client.setData().forPath(this.zkClusterAssignmentPath, clusterAssignmentData.toByteArray());
        } catch (Exception e) {
            throw new StorageRuntimeException("Failed to update cluster assignment data to zookeeper @" + this.zkClusterAssignmentPath, e);
        }
    }

    public void watchClusterAssignmentData(Consumer<Void> consumer, Executor executor) {
        synchronized (this) {
            if (this.assignmentDataCache == null) {
                this.assignmentDataCache = new NodeCache(this.client, this.zkClusterAssignmentPath);
                try {
                    this.assignmentDataCache.start();
                } catch (Exception e) {
                    throw new StorageRuntimeException("Failed to watch cluster assignment data", e);
                }
            }
            if (null == this.assignmentDataConsumers.get(consumer)) {
                NodeCacheListener nodeCacheListener = () -> {
                    consumer.accept(null);
                };
                this.assignmentDataConsumers.put(consumer, nodeCacheListener);
                this.assignmentDataCache.getListenable().addListener(nodeCacheListener, executor);
            }
        }
    }

    public void unwatchClusterAssignmentData(Consumer<Void> consumer) {
        synchronized (this) {
            NodeCacheListener remove = this.assignmentDataConsumers.remove(consumer);
            if (null != remove && null != this.assignmentDataCache) {
                this.assignmentDataCache.getListenable().removeListener(remove);
            }
            if (this.assignmentDataConsumers.isEmpty() && null != this.assignmentDataCache) {
                try {
                    this.assignmentDataCache.close();
                } catch (IOException e) {
                    log.warn("Failed to close assignment data cache when there is no watcher", e);
                }
            }
        }
    }

    public ClusterMetadata getClusterMetadata() {
        try {
            return ClusterMetadata.parseFrom((byte[]) this.client.getData().forPath(this.zkClusterMetadataPath));
        } catch (Exception e) {
            throw new StorageRuntimeException("Failed to fetch cluster metadata from zookeeper @" + this.zkClusterMetadataPath, e);
        } catch (InvalidProtocolBufferException e2) {
            throw new StorageRuntimeException("The cluster metadata from zookeeper @" + this.zkClusterMetadataPath + " is corrupted", e2);
        }
    }

    public void updateClusterMetadata(ClusterMetadata clusterMetadata) {
        try {
            this.client.setData().forPath(this.zkClusterMetadataPath, clusterMetadata.toByteArray());
        } catch (Exception e) {
            throw new StorageRuntimeException("Failed to update cluster metadata to zookeeper @" + this.zkClusterMetadataPath, e);
        }
    }
}
