package io.pravega.controller.server;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.Monitor;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.LoggerHelpers;
import io.pravega.controller.metrics.ZookeeperMetrics;
import io.pravega.controller.store.client.StoreClient;
import io.pravega.controller.store.client.StoreClientFactory;
import io.pravega.controller.store.client.StoreType;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import lombok.Generated;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/ControllerServiceMain.class */
public class ControllerServiceMain extends AbstractExecutionThreadService {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ControllerServiceMain.class);
    private final String objectId;
    private final ControllerServiceConfig serviceConfig;
    private final BiFunction<ControllerServiceConfig, StoreClient, ControllerServiceStarter> starterFactory;
    private ControllerServiceStarter starter;
    private final CompletableFuture<Void> serviceStopFuture;
    private StoreClient storeClient;
    private ServiceState serviceState;
    private final Monitor monitor;
    private final Monitor.Guard hasReachedStarting;
    private final Monitor.Guard hasReachedPausing;
    private final ZookeeperMetrics zookeeperMetrics;

    /* loaded from: input_file:io/pravega/controller/server/ControllerServiceMain$HasReachedState.class */
    final class HasReachedState extends Monitor.Guard {
        private ServiceState desiredState;

        HasReachedState(ServiceState serviceState) {
            super(ControllerServiceMain.this.monitor);
            this.desiredState = serviceState;
        }

        public boolean isSatisfied() {
            return ControllerServiceMain.this.serviceState == this.desiredState;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/pravega/controller/server/ControllerServiceMain$ServiceState.class */
    public enum ServiceState {
        NEW,
        STARTING,
        PAUSING
    }

    public ControllerServiceMain(ControllerServiceConfig controllerServiceConfig) {
        this(controllerServiceConfig, ControllerServiceStarter::new);
    }

    @VisibleForTesting
    ControllerServiceMain(ControllerServiceConfig controllerServiceConfig, BiFunction<ControllerServiceConfig, StoreClient, ControllerServiceStarter> biFunction) {
        this.monitor = new Monitor();
        this.hasReachedStarting = new HasReachedState(ServiceState.STARTING);
        this.hasReachedPausing = new HasReachedState(ServiceState.PAUSING);
        this.objectId = "ControllerServiceMain";
        this.serviceConfig = controllerServiceConfig;
        this.starterFactory = biFunction;
        this.serviceStopFuture = new CompletableFuture<>();
        this.serviceState = ServiceState.NEW;
        this.zookeeperMetrics = new ZookeeperMetrics();
    }

    protected void triggerShutdown() {
        log.info("Shutting down ControllerServiceMain");
        this.serviceStopFuture.complete(null);
    }

    protected void run() throws Exception {
        long traceEnter = LoggerHelpers.traceEnter(log, this.objectId, new Object[]{"run"});
        while (isRunning()) {
            try {
                try {
                    log.info("Creating store client");
                    this.storeClient = StoreClientFactory.createStoreClient(this.serviceConfig.getStoreClientConfig());
                    this.starter = this.starterFactory.apply(this.serviceConfig, this.storeClient);
                    boolean z = this.serviceConfig.getStoreClientConfig().getStoreType().equals(StoreType.Zookeeper) || this.serviceConfig.isControllerClusterListenerEnabled();
                    CompletableFuture completableFuture = new CompletableFuture();
                    if (z) {
                        CuratorFramework curatorFramework = (CuratorFramework) this.storeClient.getClient();
                        log.info("Awaiting ZK client connection to ZK server");
                        curatorFramework.blockUntilConnected();
                        log.info("Awaiting ZK session expiry or termination trigger for ControllerServiceMain");
                        curatorFramework.getConnectionStateListenable().addListener((curatorFramework2, connectionState) -> {
                            if (connectionState.equals(ConnectionState.LOST)) {
                                completableFuture.complete(null);
                                this.starter.notifySessionExpiration();
                            }
                        });
                    }
                    log.info("Starting controller services");
                    notifyServiceStateChange(ServiceState.STARTING);
                    this.starter.startAsync();
                    log.info("Awaiting controller services start");
                    this.starter.awaitRunning();
                    if (z) {
                        CompletableFuture.anyOf(completableFuture, this.serviceStopFuture).join();
                        if (completableFuture.isDone()) {
                            this.zookeeperMetrics.reportZKSessionExpiration();
                            log.info("ZK session expired");
                        }
                    } else {
                        this.serviceStopFuture.join();
                    }
                    log.info("Stopping ControllerServiceStarter");
                    notifyServiceStateChange(ServiceState.PAUSING);
                    this.starter.stopAsync();
                    log.info("Awaiting termination of ControllerServiceStarter");
                    this.starter.awaitTerminated();
                    if (z) {
                        log.info("calling close on store client");
                        this.storeClient.close();
                    }
                } catch (Exception e) {
                    log.error("Controller Service Main thread exited exceptionally", e);
                    throw e;
                }
            } catch (Throwable th) {
                if (this.storeClient != null) {
                    this.storeClient.close();
                }
                LoggerHelpers.traceLeave(log, this.objectId, "run", traceEnter, new Object[0]);
                throw th;
            }
        }
        if (this.storeClient != null) {
            this.storeClient.close();
        }
        LoggerHelpers.traceLeave(log, this.objectId, "run", traceEnter, new Object[0]);
    }

    private void notifyServiceStateChange(ServiceState serviceState) {
        this.monitor.enter();
        try {
            this.serviceState = serviceState;
        } finally {
            this.monitor.leave();
        }
    }

    @VisibleForTesting
    public ControllerServiceStarter awaitServiceStarting() {
        this.monitor.enterWhenUninterruptibly(this.hasReachedStarting);
        try {
            if (this.serviceState != ServiceState.STARTING) {
                throw new IllegalStateException("Expected state=" + ServiceState.STARTING + ", but actual state=" + this.serviceState);
            }
            return this.starter;
        } finally {
            this.monitor.leave();
        }
    }

    @VisibleForTesting
    public ControllerServiceStarter awaitServicePausing() {
        this.monitor.enterWhenUninterruptibly(this.hasReachedPausing);
        try {
            if (this.serviceState != ServiceState.PAUSING) {
                throw new IllegalStateException("Expected state=" + ServiceState.PAUSING + ", but actual state=" + this.serviceState);
            }
            return this.starter;
        } finally {
            this.monitor.leave();
        }
    }

    @VisibleForTesting
    public void forceClientSessionExpiry() throws Exception {
        Preconditions.checkState(this.serviceConfig.isControllerClusterListenerEnabled(), "Controller Cluster not enabled");
        awaitServiceStarting();
        ((CuratorFramework) this.storeClient.getClient()).getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
    }

    protected void shutDown() throws Exception {
        if (this.starter == null || !this.starter.isRunning()) {
            return;
        }
        triggerShutdown();
        this.starter.awaitTerminated();
    }
}
