package org.apache.pulsar.proxy.server;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.ConfigurationMetadataCacheService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.proxy.stats.TopicStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/proxy/server/ProxyService.class */
public class ProxyService implements Closeable {
    private final ProxyConfiguration proxyConfig;
    private final Timer timer;
    private String serviceUrl;
    private String serviceUrlTls;
    private ConfigurationMetadataCacheService configurationCacheService;
    private final AuthenticationService authenticationService;
    private AuthorizationService authorizationService;
    private MetadataStoreExtended localMetadataStore;
    private MetadataStoreExtended configMetadataStore;
    private PulsarResources pulsarResources;
    private final EventLoopGroup acceptorGroup;
    private final EventLoopGroup workerGroup;
    private Channel listenChannel;
    private Channel listenChannelTls;
    private final DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("pulsar-proxy-acceptor");
    private final DefaultThreadFactory workersThreadFactory = new DefaultThreadFactory("pulsar-proxy-io");
    private BrokerDiscoveryProvider discoveryProvider;
    protected final AtomicReference<Semaphore> lookupRequestSemaphore;
    protected int proxyLogLevel;
    private final ScheduledExecutorService statsExecutor;
    private final Set<ProxyConnection> clientCnxs;
    private final Map<String, TopicStats> topicStats;
    private AdditionalServlets proxyAdditionalServlets;
    private static final int numThreads = Runtime.getRuntime().availableProcessors();
    static final Gauge activeConnections = Gauge.build("pulsar_proxy_active_connections", "Number of connections currently active in the proxy").create().register();
    static final Counter newConnections = Counter.build("pulsar_proxy_new_connections", "Counter of connections being opened in the proxy").create().register();
    static final Counter rejectedConnections = Counter.build("pulsar_proxy_rejected_connections", "Counter for connections rejected due to throttling").create().register();
    static final Counter opsCounter = Counter.build("pulsar_proxy_binary_ops", "Counter of proxy operations").create().register();
    static final Counter bytesCounter = Counter.build("pulsar_proxy_binary_bytes", "Counter of proxy bytes").create().register();
    private static final Logger LOG = LoggerFactory.getLogger(ProxyService.class);

    public ProxyService(ProxyConfiguration proxyConfiguration, AuthenticationService authenticationService) throws IOException {
        Preconditions.checkNotNull(proxyConfiguration);
        this.proxyConfig = proxyConfiguration;
        this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1L, TimeUnit.MILLISECONDS);
        this.clientCnxs = Sets.newConcurrentHashSet();
        this.topicStats = Maps.newConcurrentMap();
        this.lookupRequestSemaphore = new AtomicReference<>(new Semaphore(proxyConfiguration.getMaxConcurrentLookupRequests(), false));
        if (proxyConfiguration.getProxyLogLevel().isPresent()) {
            this.proxyLogLevel = proxyConfiguration.getProxyLogLevel().get().intValue();
        } else {
            this.proxyLogLevel = 0;
        }
        this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, this.acceptorThreadFactory);
        this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, this.workersThreadFactory);
        this.authenticationService = authenticationService;
        this.statsExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("proxy-stats-executor"));
        this.statsExecutor.schedule(() -> {
            this.clientCnxs.forEach(proxyConnection -> {
                if (proxyConnection.getDirectProxyHandler() == null || proxyConnection.getDirectProxyHandler().getInboundChannelRequestsRate() == null) {
                    return;
                }
                proxyConnection.getDirectProxyHandler().getInboundChannelRequestsRate().calculateRate();
            });
            this.topicStats.forEach((str, topicStats) -> {
                topicStats.calculate();
            });
        }, 60L, TimeUnit.SECONDS);
        this.proxyAdditionalServlets = AdditionalServlets.load(proxyConfiguration);
    }

    public void start() throws Exception {
        if (this.proxyConfig.isAuthorizationEnabled() && !this.proxyConfig.isAuthenticationEnabled()) {
            throw new IllegalStateException("Invalid proxy configuration. Authentication must be enabled with authenticationEnabled=true when authorization is enabled with authorizationEnabled=true.");
        }
        if (!StringUtils.isBlank(this.proxyConfig.getZookeeperServers()) && !StringUtils.isBlank(this.proxyConfig.getConfigurationStoreServers())) {
            this.localMetadataStore = createLocalMetadataStore();
            this.configMetadataStore = createConfigurationMetadataStore();
            this.pulsarResources = new PulsarResources(this.localMetadataStore, this.configMetadataStore);
            this.discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, this.pulsarResources);
            this.configurationCacheService = new ConfigurationMetadataCacheService(this.pulsarResources, (String) null);
            this.authorizationService = new AuthorizationService(PulsarConfigurationLoader.convertFrom(this.proxyConfig), this.configurationCacheService);
        }
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
        serverBootstrap.group(this.acceptorGroup, this.workerGroup);
        serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
        serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1024, 16384, 1048576));
        serverBootstrap.channel(EventLoopUtil.getServerSocketChannelClass(this.workerGroup));
        EventLoopUtil.enableTriggeredMode(serverBootstrap);
        serverBootstrap.childHandler(new ServiceChannelInitializer(this, this.proxyConfig, false));
        if (this.proxyConfig.getServicePort().isPresent()) {
            try {
                this.listenChannel = serverBootstrap.bind(this.proxyConfig.getBindAddress(), this.proxyConfig.getServicePort().get().intValue()).sync().channel();
                LOG.info("Started Pulsar Proxy at {}", this.listenChannel.localAddress());
            } catch (Exception e) {
                throw new IOException("Failed to bind Pulsar Proxy on port " + this.proxyConfig.getServicePort().get(), e);
            }
        }
        if (this.proxyConfig.getServicePortTls().isPresent()) {
            ServerBootstrap clone = serverBootstrap.clone();
            clone.childHandler(new ServiceChannelInitializer(this, this.proxyConfig, true));
            this.listenChannelTls = clone.bind(this.proxyConfig.getBindAddress(), this.proxyConfig.getServicePortTls().get().intValue()).sync().channel();
            LOG.info("Started Pulsar TLS Proxy on {}", this.listenChannelTls.localAddress());
        }
        String defaultOrConfiguredAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(this.proxyConfig.getAdvertisedAddress());
        if (this.proxyConfig.getServicePort().isPresent()) {
            this.serviceUrl = String.format("pulsar://%s:%d/", defaultOrConfiguredAddress, getListenPort().get());
        } else {
            this.serviceUrl = null;
        }
        if (this.proxyConfig.getServicePortTls().isPresent()) {
            this.serviceUrlTls = String.format("pulsar+ssl://%s:%d/", defaultOrConfiguredAddress, getListenPortTls().get());
        } else {
            this.serviceUrlTls = null;
        }
    }

    public BrokerDiscoveryProvider getDiscoveryProvider() {
        return this.discoveryProvider;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.discoveryProvider != null) {
            this.discoveryProvider.close();
        }
        if (this.listenChannel != null) {
            this.listenChannel.close();
        }
        if (this.listenChannelTls != null) {
            this.listenChannelTls.close();
        }
        if (this.statsExecutor != null) {
            this.statsExecutor.shutdown();
        }
        if (this.proxyAdditionalServlets != null) {
            this.proxyAdditionalServlets.close();
            this.proxyAdditionalServlets = null;
        }
        if (this.localMetadataStore != null) {
            try {
                this.localMetadataStore.close();
            } catch (Exception e) {
                throw new IOException(e);
            }
        }
        if (this.configMetadataStore != null) {
            try {
                this.configMetadataStore.close();
            } catch (Exception e2) {
                throw new IOException(e2);
            }
        }
        this.acceptorGroup.shutdownGracefully();
        this.workerGroup.shutdownGracefully();
        if (this.timer != null) {
            this.timer.stop();
        }
    }

    public String getServiceUrl() {
        return this.serviceUrl;
    }

    public String getServiceUrlTls() {
        return this.serviceUrlTls;
    }

    public ProxyConfiguration getConfiguration() {
        return this.proxyConfig;
    }

    public Timer getTimer() {
        return this.timer;
    }

    public AuthenticationService getAuthenticationService() {
        return this.authenticationService;
    }

    public AuthorizationService getAuthorizationService() {
        return this.authorizationService;
    }

    public ConfigurationCacheService getConfigurationCacheService() {
        return this.configurationCacheService;
    }

    public void setConfigurationCacheService(ConfigurationMetadataCacheService configurationMetadataCacheService) {
        this.configurationCacheService = configurationMetadataCacheService;
    }

    public Semaphore getLookupRequestSemaphore() {
        return this.lookupRequestSemaphore.get();
    }

    public EventLoopGroup getWorkerGroup() {
        return this.workerGroup;
    }

    public Optional<Integer> getListenPort() {
        return this.listenChannel != null ? Optional.of(Integer.valueOf(((InetSocketAddress) this.listenChannel.localAddress()).getPort())) : Optional.empty();
    }

    public Optional<Integer> getListenPortTls() {
        return this.listenChannelTls != null ? Optional.of(Integer.valueOf(((InetSocketAddress) this.listenChannelTls.localAddress()).getPort())) : Optional.empty();
    }

    public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
        return PulsarResources.createMetadataStore(this.proxyConfig.getZookeeperServers(), this.proxyConfig.getZookeeperSessionTimeoutMs());
    }

    public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
        return PulsarResources.createMetadataStore(this.proxyConfig.getConfigurationStoreServers(), this.proxyConfig.getZookeeperSessionTimeoutMs());
    }

    public int getProxyLogLevel() {
        return this.proxyLogLevel;
    }

    public void setProxyLogLevel(int i) {
        this.proxyLogLevel = i;
    }

    public Set<ProxyConnection> getClientCnxs() {
        return this.clientCnxs;
    }

    public Map<String, TopicStats> getTopicStats() {
        return this.topicStats;
    }

    public AdditionalServlets getProxyAdditionalServlets() {
        return this.proxyAdditionalServlets;
    }
}
