package io.pravega.controller.server.retention;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.Service;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.util.internal.ConcurrentSet;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.tracing.RequestTracker;
import io.pravega.controller.server.rest.generated.api.ApiResponseMessage;
import io.pravega.controller.server.retention.BucketOwnershipListener;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.controller.util.RetryHelper;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/retention/StreamCutService.class */
public class StreamCutService extends AbstractService implements BucketOwnershipListener {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(StreamCutService.class);
    private final int bucketCount;
    private final String processId;
    private final ConcurrentSet<StreamCutBucketService> buckets = new ConcurrentSet<>();
    private final StreamMetadataStore streamMetadataStore;
    private final StreamMetadataTasks streamMetadataTasks;
    private final ScheduledExecutorService executor;
    private final RequestTracker requestTracker;

    /* renamed from: io.pravega.controller.server.retention.StreamCutService$3, reason: invalid class name */
    /* loaded from: input_file:io/pravega/controller/server/retention/StreamCutService$3.class */
    static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$io$pravega$controller$server$retention$BucketOwnershipListener$BucketNotification$NotificationType = new int[BucketOwnershipListener.BucketNotification.NotificationType.values().length];

        static {
            try {
                $SwitchMap$io$pravega$controller$server$retention$BucketOwnershipListener$BucketNotification$NotificationType[BucketOwnershipListener.BucketNotification.NotificationType.BucketAvailable.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$pravega$controller$server$retention$BucketOwnershipListener$BucketNotification$NotificationType[BucketOwnershipListener.BucketNotification.NotificationType.ConnectivityError.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public StreamCutService(int i, String str, StreamMetadataStore streamMetadataStore, StreamMetadataTasks streamMetadataTasks, ScheduledExecutorService scheduledExecutorService, RequestTracker requestTracker) {
        this.bucketCount = i;
        this.processId = str;
        this.streamMetadataStore = streamMetadataStore;
        this.streamMetadataTasks = streamMetadataTasks;
        this.executor = scheduledExecutorService;
        this.requestTracker = requestTracker;
    }

    protected void doStart() {
        this.streamMetadataStore.createBucketsRoot().thenCompose(r4 -> {
            return Futures.allOf((Collection) IntStream.range(0, this.bucketCount).boxed().map((v1) -> {
                return tryTakeOwnership(v1);
            }).collect(Collectors.toList()));
        }).thenAccept((Consumer<? super U>) r42 -> {
            this.streamMetadataStore.registerBucketOwnershipListener(this);
        }).whenComplete((r43, th) -> {
            if (th != null) {
                notifyFailed(th);
            } else {
                notifyStarted();
            }
        });
    }

    private CompletableFuture<Void> tryTakeOwnership(int i) {
        return RetryHelper.withIndefiniteRetriesAsync(() -> {
            return this.streamMetadataStore.takeBucketOwnership(i, this.processId, this.executor);
        }, th -> {
            log.warn("exception while attempting to take ownership");
        }, this.executor).thenCompose(bool -> {
            if (!bool.booleanValue() || !this.buckets.stream().noneMatch(streamCutBucketService -> {
                return streamCutBucketService.getBucketId() == i;
            })) {
                return CompletableFuture.completedFuture(null);
            }
            log.info("Taken ownership for bucket {}", Integer.valueOf(i));
            StreamCutBucketService streamCutBucketService2 = new StreamCutBucketService(i, this.streamMetadataStore, this.streamMetadataTasks, this.executor, this.requestTracker);
            this.buckets.add(streamCutBucketService2);
            final CompletableFuture completableFuture = new CompletableFuture();
            streamCutBucketService2.addListener(new Service.Listener() { // from class: io.pravega.controller.server.retention.StreamCutService.1
                public void running() {
                    super.running();
                    completableFuture.complete(null);
                }

                public void failed(Service.State state, Throwable th2) {
                    super.failed(state, th2);
                    completableFuture.completeExceptionally(th2);
                }
            }, this.executor);
            streamCutBucketService2.startAsync();
            return completableFuture;
        });
    }

    protected void doStop() {
        Futures.allOf((Collection) this.buckets.stream().map(streamCutBucketService -> {
            final CompletableFuture completableFuture = new CompletableFuture();
            streamCutBucketService.addListener(new Service.Listener() { // from class: io.pravega.controller.server.retention.StreamCutService.2
                public void terminated(Service.State state) {
                    super.terminated(state);
                    completableFuture.complete(null);
                }

                public void failed(Service.State state, Throwable th) {
                    super.failed(state, th);
                    completableFuture.completeExceptionally(th);
                }
            }, this.executor);
            streamCutBucketService.stopAsync();
            return completableFuture;
        }).collect(Collectors.toList())).whenComplete((r4, th) -> {
            this.streamMetadataStore.unregisterBucketOwnershipListener();
            if (th != null) {
                notifyFailed(th);
            } else {
                notifyStopped();
            }
        });
    }

    @Override // io.pravega.controller.server.retention.BucketOwnershipListener
    public void notify(BucketOwnershipListener.BucketNotification bucketNotification) {
        switch (AnonymousClass3.$SwitchMap$io$pravega$controller$server$retention$BucketOwnershipListener$BucketNotification$NotificationType[bucketNotification.getType().ordinal()]) {
            case ApiResponseMessage.ERROR /* 1 */:
                tryTakeOwnership(bucketNotification.getBucketId());
                return;
            case ApiResponseMessage.WARNING /* 2 */:
                log.info("Bucket notification for connectivity error");
                return;
            default:
                return;
        }
    }

    @VisibleForTesting
    Set<StreamCutBucketService> getBuckets() {
        return Collections.unmodifiableSet(this.buckets);
    }
}
