package io.pravega.controller.fault;

import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.cluster.Cluster;
import io.pravega.common.cluster.ClusterException;
import io.pravega.common.cluster.ClusterListener;
import io.pravega.common.cluster.zkImpl.ClusterZKImpl;
import io.pravega.controller.server.rest.generated.api.ApiResponseMessage;
import io.pravega.controller.store.host.HostControllerStore;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/fault/SegmentMonitorLeader.class */
class SegmentMonitorLeader implements LeaderSelectorListener {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(SegmentMonitorLeader.class);
    private final HostControllerStore hostStore;
    private final ContainerBalancer segBalancer;
    private Duration minRebalanceInterval;

    @SuppressFBWarnings(justification = "generated code")
    private final Object $lock = new Object[0];
    private Cluster pravegaServiceCluster = null;
    private final Semaphore hostsChange = new Semaphore(0);
    private final Semaphore suspendMonitor = new Semaphore(0);
    private final AtomicBoolean suspended = new AtomicBoolean(false);

    /* renamed from: io.pravega.controller.fault.SegmentMonitorLeader$1, reason: invalid class name */
    /* loaded from: input_file:io/pravega/controller/fault/SegmentMonitorLeader$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$pravega$common$cluster$ClusterListener$EventType = new int[ClusterListener.EventType.values().length];

        static {
            try {
                $SwitchMap$io$pravega$common$cluster$ClusterListener$EventType[ClusterListener.EventType.HOST_ADDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$pravega$common$cluster$ClusterListener$EventType[ClusterListener.EventType.HOST_REMOVED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$pravega$common$cluster$ClusterListener$EventType[ClusterListener.EventType.ERROR.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public SegmentMonitorLeader(HostControllerStore hostControllerStore, ContainerBalancer containerBalancer, int i) {
        Preconditions.checkNotNull(hostControllerStore, "hostStore");
        Preconditions.checkNotNull(containerBalancer, "balancer");
        Preconditions.checkArgument(i >= 0, "minRebalanceInterval should not be negative");
        this.hostStore = hostControllerStore;
        this.segBalancer = containerBalancer;
        this.minRebalanceInterval = Duration.ofSeconds(i);
    }

    public void suspend() {
        this.suspended.set(true);
    }

    public void resume() {
        if (this.suspended.compareAndSet(true, false)) {
            this.suspendMonitor.release();
        }
    }

    public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
        synchronized (this.$lock) {
            log.info("Obtained leadership to monitor the Host to Segment Container Mapping");
            this.hostsChange.release();
            this.pravegaServiceCluster = new ClusterZKImpl(curatorFramework, "hosts");
            this.pravegaServiceCluster.addListener((eventType, host) -> {
                switch (AnonymousClass1.$SwitchMap$io$pravega$common$cluster$ClusterListener$EventType[eventType.ordinal()]) {
                    case ApiResponseMessage.ERROR /* 1 */:
                    case ApiResponseMessage.WARNING /* 2 */:
                        log.info("Received event: {} for host: {}. Wake up leader for rebalancing", eventType, host);
                        this.hostsChange.release();
                        return;
                    case ApiResponseMessage.INFO /* 3 */:
                        log.info("Received error event when monitoring the pravega host cluster, ignoring...");
                        return;
                    default:
                        return;
                }
            });
            while (true) {
                try {
                    try {
                        if (this.suspended.get()) {
                            log.info("Monitor is suspended, waiting for notification to resume");
                            this.suspendMonitor.acquire();
                            log.info("Resuming monitor");
                        }
                        this.hostsChange.acquire();
                        log.info("Received rebalance event");
                        waitForRebalance();
                        this.hostsChange.drainPermits();
                        triggerRebalance();
                    } catch (InterruptedException e) {
                        log.warn("Leadership interrupted, releasing monitor thread");
                        this.pravegaServiceCluster.close();
                        throw e;
                    }
                } catch (Exception e2) {
                    if (!this.suspended.get()) {
                        log.warn("Failed to perform rebalancing, relinquishing leadership");
                        this.pravegaServiceCluster.close();
                        throw e2;
                    }
                }
            }
            log.warn("Failed to perform rebalancing, relinquishing leadership");
            this.pravegaServiceCluster.close();
            throw e2;
        }
    }

    private void waitForRebalance() throws InterruptedException {
        log.info("Waiting for {} seconds before attempting to rebalance", Long.valueOf(this.minRebalanceInterval.getSeconds()));
        Thread.sleep(this.minRebalanceInterval.toMillis());
    }

    private void triggerRebalance() throws IOException {
        try {
            this.hostStore.updateHostContainersMap(this.segBalancer.rebalance(this.hostStore.getHostContainersMap(), this.pravegaServiceCluster.getClusterMembers()));
        } catch (ClusterException e) {
            throw new IOException((Throwable) e);
        }
    }

    public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
        log.info("Zookeeper connection state changed to: " + connectionState.toString());
    }
}
