package org.bithon.server.collector.source.thrift;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.thrift.TMultiplexedProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.layered.TFramedTransport;
import org.bithon.agent.rpc.thrift.service.event.IEventCollector;
import org.bithon.agent.rpc.thrift.service.metric.IMetricCollector;
import org.bithon.agent.rpc.thrift.service.setting.SettingService;
import org.bithon.agent.rpc.thrift.service.trace.ITraceCollector;
import org.bithon.server.collector.setting.AgentSettingService;
import org.bithon.server.collector.setting.SettingServiceThriftImpl;
import org.bithon.server.event.sink.IEventMessageSink;
import org.bithon.server.metric.sink.IMetricMessageSink;
import org.bithon.server.tracing.sink.ITraceMessageSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;

@ConditionalOnProperty(value = {"collector-thrift.enabled"}, havingValue = "true", matchIfMissing = false)
@Component
/* loaded from: input_file:org/bithon/server/collector/source/thrift/ThriftCollectorStarter.class */
public class ThriftCollectorStarter implements SmartLifecycle, ApplicationContextAware {
    private static final Logger log = LoggerFactory.getLogger(ThriftCollectorStarter.class);
    private final List<TThreadedSelectorServer> thriftServers = new ArrayList();
    private ApplicationContext applicationContext;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/bithon/server/collector/source/thrift/ThriftCollectorStarter$ServiceGroup.class */
    public static class ServiceGroup {
        private final List<String> services = new ArrayList();
        private final TMultiplexedProcessor processor = new TMultiplexedProcessor();

        ServiceGroup() {
        }

        public List<String> getServices() {
            return this.services;
        }

        public TMultiplexedProcessor getProcessor() {
            return this.processor;
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:19:0x00cb. Please report as an issue. */
    /* JADX WARN: Failed to find 'out' block for switch in B:5:0x0061. Please report as an issue. */
    public void start() {
        ThriftCollectorConfig thriftCollectorConfig = (ThriftCollectorConfig) this.applicationContext.getBean(ThriftCollectorConfig.class);
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Integer> entry : thriftCollectorConfig.getPort().entrySet()) {
            String key = entry.getKey();
            Integer value = entry.getValue();
            IMetricCollector.Processor processor = null;
            boolean z = -1;
            switch (key.hashCode()) {
                case -1077545552:
                    if (key.equals("metric")) {
                        z = false;
                        break;
                    }
                    break;
                case -1067396926:
                    if (key.equals("tracing")) {
                        z = 2;
                        break;
                    }
                    break;
                case 3064427:
                    if (key.equals("ctrl")) {
                        z = 3;
                        break;
                    }
                    break;
                case 96891546:
                    if (key.equals("event")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    processor = new IMetricCollector.Processor(new ThriftMetricCollector((IMetricMessageSink) this.applicationContext.getBean(IMetricMessageSink.class)));
                    break;
                case true:
                    processor = new IEventCollector.Processor(new ThriftEventCollector((IEventMessageSink) this.applicationContext.getBean(IEventMessageSink.class)));
                    break;
                case true:
                    processor = new ITraceCollector.Processor(new ThriftTraceCollector((ITraceMessageSink) this.applicationContext.getBean(ITraceMessageSink.class)));
                    break;
                case true:
                    processor = new SettingService.Processor(new SettingServiceThriftImpl((AgentSettingService) this.applicationContext.getBean(AgentSettingService.class)));
                    break;
            }
            if (processor != null) {
                ServiceGroup serviceGroup = (ServiceGroup) hashMap.computeIfAbsent(value, num -> {
                    return new ServiceGroup();
                });
                serviceGroup.getServices().add(key);
                serviceGroup.getProcessor().registerProcessor(key, processor);
            }
        }
        hashMap.forEach((num2, serviceGroup2) -> {
            String join = String.join(",", serviceGroup2.getServices());
            try {
                TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(new TNonblockingServerSocket(num2.intValue()));
                args.processorFactory(new TProcessorFactory(serviceGroup2.getProcessor()));
                args.transportFactory(new TFramedTransport.Factory());
                args.protocolFactory(new TCompactProtocol.Factory());
                args.selectorThreads(Runtime.getRuntime().availableProcessors());
                TThreadedSelectorServer tThreadedSelectorServer = new TThreadedSelectorServer(args);
                if (tThreadedSelectorServer.isServing()) {
                    throw new RuntimeException(String.format(Locale.ENGLISH, "Failed to start thrift server on port [%d]: The port is already in used", num2));
                }
                this.thriftServers.add(tThreadedSelectorServer);
                new Thread(() -> {
                    log.info("Starting thrift server[{}] on port {}...", join, num2);
                    tThreadedSelectorServer.serve();
                    log.info("Thrift server[{}] stopped", join);
                }, "thrift-server-" + join).start();
            } catch (TTransportException e) {
                throw new RuntimeException(String.format(Locale.ENGLISH, "Failed to start thrift server[%s] on port [%d]: %s", join, num2, e.getMessage()), e);
            }
        });
    }

    public void stop() {
        Iterator<TThreadedSelectorServer> it = this.thriftServers.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
    }

    public boolean isRunning() {
        return this.thriftServers.stream().anyMatch((v0) -> {
            return v0.isServing();
        });
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}
