package io.pravega.controller.server.bucket;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.Service;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.concurrent.Futures;
import io.pravega.controller.store.stream.BucketStore;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.concurrent.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/bucket/BucketManager.class */
public abstract class BucketManager extends AbstractService {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(BucketManager.class);
    private final String processId;
    private final BucketStore.ServiceType serviceType;
    private final Function<Integer, BucketService> bucketServiceSupplier;
    private final Object lock = new Object();

    @GuardedBy("lock")
    private final Map<Integer, BucketService> buckets = new HashMap();
    private final ScheduledExecutorService executor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BucketManager(String str, BucketStore.ServiceType serviceType, ScheduledExecutorService scheduledExecutorService, Function<Integer, BucketService> function) {
        this.processId = str;
        this.serviceType = serviceType;
        this.executor = scheduledExecutorService;
        this.bucketServiceSupplier = function;
    }

    protected void doStart() {
        initializeService().thenCompose(r4 -> {
            return Futures.allOf((Collection) IntStream.range(0, getBucketCount()).boxed().map(num -> {
                return initializeBucket(num.intValue()).thenCompose(r5 -> {
                    return tryTakeOwnership(num.intValue());
                });
            }).collect(Collectors.toList()));
        }).thenAccept((Consumer<? super U>) r3 -> {
            startBucketOwnershipListener();
        }).whenComplete((r42, th) -> {
            if (th != null) {
                notifyFailed(th);
            } else {
                notifyStarted();
            }
        });
    }

    protected abstract int getBucketCount();

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> tryTakeOwnership(int i) {
        return takeBucketOwnership(i, this.processId, this.executor).thenCompose(bool -> {
            if (!bool.booleanValue()) {
                return CompletableFuture.completedFuture(null);
            }
            log.info("{}: Taken ownership for bucket {}", this.serviceType, Integer.valueOf(i));
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            synchronized (this.lock) {
                if (this.buckets.containsKey(Integer.valueOf(i))) {
                    completableFuture.complete(null);
                } else {
                    this.buckets.put(Integer.valueOf(i), startNewBucketService(i, completableFuture));
                }
            }
            return completableFuture;
        });
    }

    private BucketService startNewBucketService(final int i, final CompletableFuture<Void> completableFuture) {
        BucketService apply = this.bucketServiceSupplier.apply(Integer.valueOf(i));
        apply.addListener(new Service.Listener() { // from class: io.pravega.controller.server.bucket.BucketManager.1
            public void running() {
                super.running();
                BucketManager.log.info("{}: successfully started bucket service bucket: {} ", BucketManager.this.serviceType, Integer.valueOf(i));
                completableFuture.complete(null);
            }

            public void failed(Service.State state, Throwable th) {
                super.failed(state, th);
                BucketManager.log.error("{}: Failed to start bucket: {} ", BucketManager.this.serviceType, Integer.valueOf(i));
                synchronized (BucketManager.this.lock) {
                    BucketManager.this.buckets.remove(Integer.valueOf(i));
                }
                completableFuture.completeExceptionally(th);
            }
        }, this.executor);
        apply.startAsync();
        return apply;
    }

    protected void doStop() {
        Collection<BucketService> values;
        log.info("{}: Stop request received for bucket manager", this.serviceType);
        synchronized (this.lock) {
            values = this.buckets.values();
        }
        Futures.allOf((Collection) values.stream().map(bucketService -> {
            final CompletableFuture completableFuture = new CompletableFuture();
            bucketService.addListener(new Service.Listener() { // from class: io.pravega.controller.server.bucket.BucketManager.2
                public void terminated(Service.State state) {
                    super.terminated(state);
                    completableFuture.complete(null);
                }

                public void failed(Service.State state, Throwable th) {
                    super.failed(state, th);
                    completableFuture.completeExceptionally(th);
                }
            }, this.executor);
            bucketService.stopAsync();
            return completableFuture;
        }).collect(Collectors.toList())).whenComplete((r6, th) -> {
            stopBucketOwnershipListener();
            if (th != null) {
                log.error("{}: bucket service shutdown failed with exception", this.serviceType, th);
                notifyFailed(th);
            } else {
                log.info("{}: bucket service stopped", this.serviceType);
                notifyStopped();
            }
        });
    }

    abstract void startBucketOwnershipListener();

    abstract void stopBucketOwnershipListener();

    abstract CompletableFuture<Void> initializeService();

    abstract CompletableFuture<Void> initializeBucket(int i);

    abstract CompletableFuture<Boolean> takeBucketOwnership(int i, String str, Executor executor);

    @VisibleForTesting
    Map<Integer, BucketService> getBucketServices() {
        return Collections.unmodifiableMap(this.buckets);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @SuppressFBWarnings(justification = "generated code")
    public BucketStore.ServiceType getServiceType() {
        return this.serviceType;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @SuppressFBWarnings(justification = "generated code")
    public ScheduledExecutorService getExecutor() {
        return this.executor;
    }
}
