package io.pravega.controller.server;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AbstractIdleService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.ClientConfig;
import io.pravega.client.netty.impl.ConnectionFactory;
import io.pravega.client.netty.impl.ConnectionFactoryImpl;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.cluster.Cluster;
import io.pravega.common.cluster.Host;
import io.pravega.common.cluster.zkImpl.ClusterZKImpl;
import io.pravega.common.concurrent.ExecutorServiceHelpers;
import io.pravega.controller.fault.ControllerClusterListener;
import io.pravega.controller.fault.SegmentContainerMonitor;
import io.pravega.controller.fault.UniformContainerBalancer;
import io.pravega.controller.server.eventProcessor.ControllerEventProcessors;
import io.pravega.controller.server.eventProcessor.LocalController;
import io.pravega.controller.server.rest.RESTServer;
import io.pravega.controller.server.retention.StreamCutService;
import io.pravega.controller.server.rpc.grpc.GRPCServer;
import io.pravega.controller.store.checkpoint.CheckpointStore;
import io.pravega.controller.store.checkpoint.CheckpointStoreFactory;
import io.pravega.controller.store.client.StoreClient;
import io.pravega.controller.store.host.HostControllerStore;
import io.pravega.controller.store.host.HostStoreFactory;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.StreamStoreFactory;
import io.pravega.controller.store.task.TaskMetadataStore;
import io.pravega.controller.store.task.TaskStoreFactory;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.controller.task.Stream.StreamTransactionMetadataTasks;
import io.pravega.controller.task.Stream.TxnSweeper;
import io.pravega.controller.task.TaskSweeper;
import io.pravega.controller.util.Config;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/ControllerServiceStarter.class */
public class ControllerServiceStarter extends AbstractIdleService {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log;
    private final ControllerServiceConfig serviceConfig;
    private final StoreClient storeClient;
    private ScheduledExecutorService controllerExecutor;
    private ScheduledExecutorService retentionExecutor;
    private ConnectionFactory connectionFactory;
    private StreamMetadataTasks streamMetadataTasks;
    private StreamTransactionMetadataTasks streamTransactionMetadataTasks;
    private StreamCutService streamCutService;
    private SegmentContainerMonitor monitor;
    private ControllerClusterListener controllerClusterListener;
    private ControllerService controllerService;
    private LocalController localController;
    private ControllerEventProcessors controllerEventProcessors;
    private GRPCServer grpcServer;
    private RESTServer restServer;
    static final /* synthetic */ boolean $assertionsDisabled;
    private Cluster cluster = null;
    private final String objectId = "ControllerServiceStarter";
    private final CountDownLatch controllerReadyLatch = new CountDownLatch(1);

    public ControllerServiceStarter(ControllerServiceConfig controllerServiceConfig, StoreClient storeClient) {
        this.serviceConfig = controllerServiceConfig;
        this.storeClient = storeClient;
    }

    protected void startUp() {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.objectId, "startUp", new Object[0]);
        log.info("Initiating controller service startUp");
        log.info("Event processors enabled = {}", Boolean.valueOf(this.serviceConfig.getEventProcessorConfig().isPresent()));
        log.info("Cluster listener enabled = {}", Boolean.valueOf(this.serviceConfig.isControllerClusterListenerEnabled()));
        log.info("    Host monitor enabled = {}", Boolean.valueOf(this.serviceConfig.getHostMonitorConfig().isHostMonitorEnabled()));
        log.info("     gRPC server enabled = {}", Boolean.valueOf(this.serviceConfig.getGRPCServerConfig().isPresent()));
        log.info("     REST server enabled = {}", Boolean.valueOf(this.serviceConfig.getRestServerConfig().isPresent()));
        try {
            this.controllerExecutor = ExecutorServiceHelpers.newScheduledThreadPool(this.serviceConfig.getThreadPoolSize(), "controllerpool");
            this.retentionExecutor = ExecutorServiceHelpers.newScheduledThreadPool(Config.RETENTION_THREAD_POOL_SIZE, "retentionpool");
            log.info("Creating the stream store");
            StreamMetadataStore createStore = StreamStoreFactory.createStore(this.storeClient, this.controllerExecutor);
            log.info("Creating the task store");
            TaskMetadataStore createStore2 = TaskStoreFactory.createStore(this.storeClient, this.controllerExecutor);
            log.info("Creating the host store");
            HostControllerStore createStore3 = HostStoreFactory.createStore(this.serviceConfig.getHostMonitorConfig(), this.storeClient);
            log.info("Creating the checkpoint store");
            CheckpointStore create = CheckpointStoreFactory.create(this.storeClient);
            Host host = new Host(getHostName(), getPort(), UUID.randomUUID().toString());
            if (this.serviceConfig.getHostMonitorConfig().isHostMonitorEnabled()) {
                this.monitor = new SegmentContainerMonitor(createStore3, (CuratorFramework) this.storeClient.getClient(), new UniformContainerBalancer(), this.serviceConfig.getHostMonitorConfig().getHostMonitorMinRebalanceInterval());
                log.info("Starting segment container monitor");
                this.monitor.startAsync();
            }
            this.connectionFactory = new ConnectionFactoryImpl(ClientConfig.builder().controllerURI(URI.create((this.serviceConfig.getGRPCServerConfig().get().isTlsEnabled() ? "tls://" : "tcp://") + "localhost")).trustStore(this.serviceConfig.getGRPCServerConfig().get().getTlsTrustStore()).validateHostName(false).build());
            SegmentHelper segmentHelper = new SegmentHelper();
            this.streamMetadataTasks = new StreamMetadataTasks(createStore, createStore3, createStore2, segmentHelper, this.controllerExecutor, host.getHostId(), this.connectionFactory, this.serviceConfig.getGRPCServerConfig().get().isAuthorizationEnabled(), this.serviceConfig.getGRPCServerConfig().get().getTokenSigningKey());
            this.streamTransactionMetadataTasks = new StreamTransactionMetadataTasks(createStore, createStore3, segmentHelper, this.controllerExecutor, host.getHostId(), this.serviceConfig.getTimeoutServiceConfig(), this.connectionFactory, this.serviceConfig.getGRPCServerConfig().get().isAuthorizationEnabled(), this.serviceConfig.getGRPCServerConfig().get().getTokenSigningKey());
            this.streamCutService = new StreamCutService(Config.BUCKET_COUNT, host.getHostId(), createStore, this.streamMetadataTasks, this.retentionExecutor);
            log.info("starting auto retention service asynchronously");
            this.streamCutService.startAsync();
            this.streamCutService.awaitRunning();
            TaskSweeper taskSweeper = new TaskSweeper(createStore2, host.getHostId(), this.controllerExecutor, this.streamMetadataTasks);
            TxnSweeper txnSweeper = new TxnSweeper(createStore, this.streamTransactionMetadataTasks, this.serviceConfig.getTimeoutServiceConfig().getMaxLeaseValue(), this.controllerExecutor);
            if (this.serviceConfig.isControllerClusterListenerEnabled()) {
                this.cluster = new ClusterZKImpl((CuratorFramework) this.storeClient.getClient(), "controllers");
            }
            this.controllerService = new ControllerService(createStore, createStore3, this.streamMetadataTasks, this.streamTransactionMetadataTasks, new SegmentHelper(), this.controllerExecutor, this.cluster);
            setController(new LocalController(this.controllerService, this.serviceConfig.getGRPCServerConfig().get().isAuthorizationEnabled(), this.serviceConfig.getGRPCServerConfig().get().getTokenSigningKey()));
            if (this.serviceConfig.getEventProcessorConfig().isPresent()) {
                this.controllerEventProcessors = new ControllerEventProcessors(host.getHostId(), this.serviceConfig.getEventProcessorConfig().get(), this.localController, create, createStore, this.connectionFactory, this.streamMetadataTasks, this.streamTransactionMetadataTasks, this.controllerExecutor);
                log.info("Starting event processors");
                this.controllerEventProcessors.bootstrap(this.streamTransactionMetadataTasks, this.streamMetadataTasks).thenAcceptAsync(r3 -> {
                    this.controllerEventProcessors.startAsync();
                }, (Executor) this.controllerExecutor);
            }
            if (this.serviceConfig.isControllerClusterListenerEnabled()) {
                ArrayList arrayList = new ArrayList();
                arrayList.add(taskSweeper);
                arrayList.add(txnSweeper);
                if (this.serviceConfig.getEventProcessorConfig().isPresent()) {
                    if (!$assertionsDisabled && this.controllerEventProcessors == null) {
                        throw new AssertionError();
                    }
                    arrayList.add(this.controllerEventProcessors);
                }
                this.controllerClusterListener = new ControllerClusterListener(host, this.cluster, this.controllerExecutor, arrayList);
                log.info("Starting controller cluster listener");
                this.controllerClusterListener.startAsync();
            }
            if (this.serviceConfig.getGRPCServerConfig().isPresent()) {
                this.grpcServer = new GRPCServer(this.controllerService, this.serviceConfig.getGRPCServerConfig().get());
                this.grpcServer.startAsync();
                log.info("Awaiting start of rpc server");
                this.grpcServer.awaitRunning();
            }
            if (this.serviceConfig.getRestServerConfig().isPresent()) {
                this.restServer = new RESTServer(this.localController, this.controllerService, this.grpcServer.getPravegaAuthManager(), this.serviceConfig.getRestServerConfig().get(), this.connectionFactory);
                this.restServer.startAsync();
                log.info("Awaiting start of REST server");
                this.restServer.awaitRunning();
            }
            if (this.serviceConfig.getEventProcessorConfig().isPresent()) {
                log.info("Awaiting start of controller event processors");
                this.controllerEventProcessors.awaitRunning();
            }
            if (this.serviceConfig.isControllerClusterListenerEnabled()) {
                log.info("Awaiting start of controller cluster listener");
                this.controllerClusterListener.awaitRunning();
            }
            LoggerHelpers.traceLeave(log, this.objectId, "startUp", traceEnterWithContext, new Object[0]);
        } catch (Throwable th) {
            LoggerHelpers.traceLeave(log, this.objectId, "startUp", traceEnterWithContext, new Object[0]);
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    protected void shutDown() throws Exception {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.objectId, "shutDown", new Object[0]);
        log.info("Initiating controller service shutDown");
        try {
            try {
                if (this.restServer != null) {
                    this.restServer.stopAsync();
                }
                if (this.grpcServer != null) {
                    this.grpcServer.stopAsync();
                }
                if (this.controllerEventProcessors != null) {
                    log.info("Stopping controller event processors");
                    this.controllerEventProcessors.stopAsync();
                }
                if (this.monitor != null) {
                    log.info("Stopping the segment container monitor");
                    this.monitor.stopAsync();
                }
                if (this.controllerClusterListener != null) {
                    log.info("Stopping controller cluster listener");
                    this.controllerClusterListener.stopAsync();
                    log.info("Controller cluster listener shutdown");
                }
                if (this.streamCutService != null) {
                    log.info("Stopping auto retention service");
                    this.streamCutService.stopAsync();
                }
                log.info("Closing stream metadata tasks");
                this.streamMetadataTasks.close();
                log.info("Closing stream transaction metadata tasks");
                this.streamTransactionMetadataTasks.close();
                if (this.restServer != null) {
                    log.info("Awaiting termination of REST server");
                    this.restServer.awaitTerminated();
                }
                if (this.grpcServer != null) {
                    log.info("Awaiting termination of gRPC server");
                    this.grpcServer.awaitTerminated();
                }
                if (this.controllerEventProcessors != null) {
                    log.info("Awaiting termination of controller event processors");
                    this.controllerEventProcessors.awaitTerminated();
                }
                if (this.monitor != null) {
                    log.info("Awaiting termination of segment container monitor");
                    this.monitor.awaitTerminated();
                }
                if (this.controllerClusterListener != null) {
                    log.info("Awaiting termination of controller cluster listener");
                    this.controllerClusterListener.awaitTerminated();
                }
                if (this.streamCutService != null) {
                    log.info("Awaiting termination of auto retention");
                    this.streamCutService.awaitTerminated();
                }
                log.info("Stopping controller executor");
                ExecutorServiceHelpers.shutdown(Duration.ofSeconds(5L), new ExecutorService[]{this.controllerExecutor, this.retentionExecutor});
                if (this.cluster != null) {
                    log.info("Closing controller cluster instance");
                    this.cluster.close();
                }
                log.info("Closing connection factory");
                this.connectionFactory.close();
                log.info("Closing storeClient");
                this.storeClient.close();
                LoggerHelpers.traceLeave(log, this.objectId, "shutDown", traceEnterWithContext, new Object[0]);
            } catch (Exception e) {
                log.error("Controller Service Starter threw exception during shutdown", e);
                throw e;
            }
        } catch (Throwable th) {
            log.info("Stopping controller executor");
            ExecutorServiceHelpers.shutdown(Duration.ofSeconds(5L), new ExecutorService[]{this.controllerExecutor, this.retentionExecutor});
            if (this.cluster != null) {
                log.info("Closing controller cluster instance");
                this.cluster.close();
            }
            log.info("Closing connection factory");
            this.connectionFactory.close();
            log.info("Closing storeClient");
            this.storeClient.close();
            LoggerHelpers.traceLeave(log, this.objectId, "shutDown", traceEnterWithContext, new Object[0]);
            throw th;
        }
    }

    @VisibleForTesting
    public boolean awaitTasksModuleInitialization(long j, TimeUnit timeUnit) throws InterruptedException {
        this.controllerReadyLatch.await();
        return this.streamTransactionMetadataTasks.awaitInitialization(j, timeUnit);
    }

    @VisibleForTesting
    public ControllerService getControllerService() throws InterruptedException {
        this.controllerReadyLatch.await();
        return this.controllerService;
    }

    @VisibleForTesting
    public LocalController getController() throws InterruptedException {
        this.controllerReadyLatch.await();
        return this.localController;
    }

    private void setController(LocalController localController) {
        this.localController = localController;
        this.controllerReadyLatch.countDown();
    }

    private String getHostName() {
        String str = null;
        if (this.serviceConfig.getGRPCServerConfig().isPresent()) {
            str = this.serviceConfig.getGRPCServerConfig().get().getPublishedRPCHost().orElse(null);
        }
        if (StringUtils.isEmpty(str)) {
            try {
                str = InetAddress.getLocalHost().getHostAddress();
            } catch (UnknownHostException e) {
                log.warn("Failed to get host address, defaulting to localhost: {}", e);
                str = "localhost";
            }
        }
        return str;
    }

    private int getPort() {
        int i = 0;
        if (this.serviceConfig.getGRPCServerConfig().isPresent()) {
            i = this.serviceConfig.getGRPCServerConfig().get().getPublishedRPCPort().orElse(Integer.valueOf(this.serviceConfig.getGRPCServerConfig().get().getPort())).intValue();
        }
        return i;
    }

    static {
        $assertionsDisabled = !ControllerServiceStarter.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(ControllerServiceStarter.class);
    }
}
