package io.pravega.controller.fault;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractIdleService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.cluster.Cluster;
import io.pravega.common.cluster.ClusterException;
import io.pravega.common.cluster.ClusterListener;
import io.pravega.common.cluster.Host;
import io.pravega.common.concurrent.Futures;
import io.pravega.controller.server.rest.generated.api.ApiResponseMessage;
import io.pravega.controller.util.RetryHelper;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/fault/ControllerClusterListener.class */
public class ControllerClusterListener extends AbstractIdleService {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(ControllerClusterListener.class);
    private final String objectId;
    private final Host host;
    private final Cluster cluster;
    private final ScheduledExecutorService executor;
    private final List<FailoverSweeper> sweepers;

    /* renamed from: io.pravega.controller.fault.ControllerClusterListener$1, reason: invalid class name */
    /* loaded from: input_file:io/pravega/controller/fault/ControllerClusterListener$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$pravega$common$cluster$ClusterListener$EventType = new int[ClusterListener.EventType.values().length];

        static {
            try {
                $SwitchMap$io$pravega$common$cluster$ClusterListener$EventType[ClusterListener.EventType.HOST_ADDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$pravega$common$cluster$ClusterListener$EventType[ClusterListener.EventType.HOST_REMOVED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$pravega$common$cluster$ClusterListener$EventType[ClusterListener.EventType.ERROR.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public ControllerClusterListener(Host host, Cluster cluster, ScheduledExecutorService scheduledExecutorService, List<FailoverSweeper> list) {
        Preconditions.checkNotNull(host, "host");
        Preconditions.checkNotNull(cluster, "cluster");
        Preconditions.checkNotNull(scheduledExecutorService, "executor");
        Preconditions.checkArgument(list.stream().noneMatch((v0) -> {
            return Objects.isNull(v0);
        }));
        this.objectId = "ControllerClusterListener";
        this.host = host;
        this.cluster = cluster;
        this.executor = scheduledExecutorService;
        this.sweepers = Lists.newArrayList(list);
    }

    protected void startUp() throws InterruptedException {
        long traceEnter = LoggerHelpers.traceEnter(log, this.objectId, new Object[]{"startUp"});
        try {
            log.info("Registering host {} with controller cluster", this.host);
            this.cluster.registerHost(this.host);
            log.info("Adding controller cluster listener");
            this.cluster.addListener((eventType, host) -> {
                switch (AnonymousClass1.$SwitchMap$io$pravega$common$cluster$ClusterListener$EventType[eventType.ordinal()]) {
                    case ApiResponseMessage.ERROR /* 1 */:
                        log.info("Received controller cluster event: {} for host: {}", eventType, host);
                        return;
                    case ApiResponseMessage.WARNING /* 2 */:
                        log.info("Received controller cluster event: {} for host: {}", eventType, host);
                        handleHostRemoved(host);
                        return;
                    case ApiResponseMessage.INFO /* 3 */:
                        log.info("Received error event when monitoring the controller host cluster, ignoring...");
                        return;
                    default:
                        return;
                }
            }, this.executor);
            sweepAll(() -> {
                try {
                    return (Set) this.cluster.getClusterMembers().stream().map((v0) -> {
                        return v0.getHostId();
                    }).collect(Collectors.toSet());
                } catch (ClusterException e) {
                    log.error("error fetching cluster members {}", e);
                    throw new CompletionException((Throwable) e);
                }
            });
            log.info("Controller cluster listener startUp complete");
            LoggerHelpers.traceLeave(log, this.objectId, "startUp", traceEnter, new Object[0]);
        } catch (Throwable th) {
            LoggerHelpers.traceLeave(log, this.objectId, "startUp", traceEnter, new Object[0]);
            throw th;
        }
    }

    private CompletableFuture<Void> handleHostRemoved(Host host) {
        return Futures.allOf((Collection) this.sweepers.stream().map(failoverSweeper -> {
            return failoverSweeper.isReady() ? RetryHelper.withIndefiniteRetriesAsync(() -> {
                return failoverSweeper.handleFailedProcess(host.getHostId());
            }, th -> {
                log.warn(th.getMessage());
            }, this.executor) : CompletableFuture.completedFuture((Void) null);
        }).collect(Collectors.toList()));
    }

    private CompletableFuture<Void> sweepAll(Supplier<Set<String>> supplier) {
        return Futures.allOf((Collection) this.sweepers.stream().map(failoverSweeper -> {
            return RetryHelper.withIndefiniteRetriesAsync(() -> {
                if (failoverSweeper.isReady()) {
                    return failoverSweeper.sweepFailedProcesses(supplier);
                }
                log.trace("sweeper not ready, retrying with exponential backoff");
                throw new RuntimeException("sweeper not ready");
            }, th -> {
                log.warn(th.getMessage());
            }, this.executor);
        }).collect(Collectors.toList()));
    }

    protected void shutDown() throws Exception {
        long traceEnter = LoggerHelpers.traceEnter(log, this.objectId, new Object[]{"shutDown"});
        try {
            log.info("Deregistering host {} from controller cluster", this.host);
            this.cluster.deregisterHost(this.host);
            log.info("Controller cluster listener shutDown complete");
            LoggerHelpers.traceLeave(log, this.objectId, "shutDown", traceEnter, new Object[0]);
        } catch (Throwable th) {
            LoggerHelpers.traceLeave(log, this.objectId, "shutDown", traceEnter, new Object[0]);
            throw th;
        }
    }
}
