package io.pravega.controller.server.bucket;

import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.controller.server.rest.generated.api.ApiResponseMessage;
import io.pravega.controller.store.stream.BucketStore;
import io.pravega.controller.store.stream.ZookeeperBucketStore;
import io.pravega.controller.util.RetryHelper;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/bucket/ZooKeeperBucketManager.class */
public class ZooKeeperBucketManager extends BucketManager {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(ZooKeeperBucketManager.class);
    private final ZookeeperBucketStore bucketStore;
    private final ConcurrentMap<BucketStore.ServiceType, PathChildrenCache> bucketOwnershipCacheMap;

    /* renamed from: io.pravega.controller.server.bucket.ZooKeeperBucketManager$1, reason: invalid class name */
    /* loaded from: input_file:io/pravega/controller/server/bucket/ZooKeeperBucketManager$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type = new int[PathChildrenCacheEvent.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_ADDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_REMOVED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CONNECTION_LOST.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ZooKeeperBucketManager(String str, ZookeeperBucketStore zookeeperBucketStore, BucketStore.ServiceType serviceType, ScheduledExecutorService scheduledExecutorService, Function<Integer, BucketService> function) {
        super(str, serviceType, scheduledExecutorService, function);
        this.bucketOwnershipCacheMap = new ConcurrentHashMap();
        this.bucketStore = zookeeperBucketStore;
    }

    @Override // io.pravega.controller.server.bucket.BucketManager
    protected int getBucketCount() {
        return this.bucketStore.getBucketCount(getServiceType());
    }

    @Override // io.pravega.controller.server.bucket.BucketManager
    public void startBucketOwnershipListener() {
        PathChildrenCache computeIfAbsent = this.bucketOwnershipCacheMap.computeIfAbsent(getServiceType(), serviceType -> {
            return this.bucketStore.getServiceOwnershipPathChildrenCache(getServiceType());
        });
        computeIfAbsent.getListenable().addListener((curatorFramework, pathChildrenCacheEvent) -> {
            switch (AnonymousClass1.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
                case ApiResponseMessage.ERROR /* 1 */:
                    return;
                case ApiResponseMessage.WARNING /* 2 */:
                    int parseInt = Integer.parseInt(ZKPaths.getNodeFromPath(pathChildrenCacheEvent.getData().getPath()));
                    RetryHelper.withIndefiniteRetriesAsync(() -> {
                        return tryTakeOwnership(parseInt);
                    }, th -> {
                        log.warn("{}: exception while attempting to take ownership for bucket {} ", new Object[]{getServiceType(), Integer.valueOf(parseInt), th.getMessage()});
                    }, getExecutor());
                    return;
                case ApiResponseMessage.INFO /* 3 */:
                    log.warn("{}: Received connectivity error", getServiceType());
                    return;
                default:
                    log.warn("Received unknown event {} on bucket root {} ", pathChildrenCacheEvent.getType(), getServiceType());
                    return;
            }
        });
        log.info("bucket ownership listener registered on bucket root {}", getServiceType());
        try {
            computeIfAbsent.start(PathChildrenCache.StartMode.NORMAL);
        } catch (Exception e) {
            log.error("Starting ownership listener for service {} threw exception", getServiceType(), e);
            throw Exceptions.sneakyThrow(e);
        }
    }

    @Override // io.pravega.controller.server.bucket.BucketManager
    public void stopBucketOwnershipListener() {
        PathChildrenCache remove = this.bucketOwnershipCacheMap.remove(getServiceType());
        if (remove != null) {
            try {
                remove.clear();
                remove.close();
            } catch (IOException e) {
                log.warn("unable to close listener for bucket ownership", e);
            }
        }
    }

    @Override // io.pravega.controller.server.bucket.BucketManager
    public CompletableFuture<Void> initializeService() {
        return this.bucketStore.createBucketsRoot(getServiceType());
    }

    @Override // io.pravega.controller.server.bucket.BucketManager
    public CompletableFuture<Void> initializeBucket(int i) {
        Preconditions.checkArgument(i < this.bucketStore.getBucketCount(getServiceType()));
        return this.bucketStore.createBucket(getServiceType(), i);
    }

    @Override // io.pravega.controller.server.bucket.BucketManager
    public CompletableFuture<Boolean> takeBucketOwnership(int i, String str, Executor executor) {
        Preconditions.checkArgument(i < this.bucketStore.getBucketCount(getServiceType()));
        return this.bucketStore.takeBucketOwnership(getServiceType(), i, str);
    }
}
