package io.mantisrx.runtime.executor;

import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.properties.MantisPropertiesLoader;
import io.mantisrx.runtime.GroupToGroup;
import io.mantisrx.runtime.GroupToScalar;
import io.mantisrx.runtime.KeyToKey;
import io.mantisrx.runtime.KeyToScalar;
import io.mantisrx.runtime.KeyValueStageConfig;
import io.mantisrx.runtime.ScalarToGroup;
import io.mantisrx.runtime.ScalarToKey;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.StageConfig;
import io.mantisrx.server.core.ServiceRegistry;
import io.mantisrx.shaded.com.google.common.base.Preconditions;
import io.reactivex.mantis.network.push.HashFunctions;
import io.reactivex.mantis.network.push.KeyValuePair;
import io.reactivex.mantis.network.push.LegacyTcpPushServer;
import io.reactivex.mantis.network.push.PushServers;
import io.reactivex.mantis.network.push.RouterFactory;
import io.reactivex.mantis.network.push.Routers;
import io.reactivex.mantis.network.push.ServerConfig;
import io.reactivex.mantis.remote.observable.RemoteRxServer;
import io.reactivex.mantis.remote.observable.RxMetrics;
import io.reactivex.mantis.remote.observable.ServeNestedObservable;
import io.reactivex.mantis.remote.observable.slotting.RoundRobin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;

/* loaded from: input_file:io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.class */
public class WorkerPublisherRemoteObservable<T> implements WorkerPublisher<T> {
    private static final Logger logger = LoggerFactory.getLogger(WorkerPublisherRemoteObservable.class);
    private final String name;
    private final int serverPort;
    private RemoteRxServer server;
    private final MantisPropertiesLoader propService;
    private String jobName;
    private RouterFactory routerFactory;

    /* loaded from: input_file:io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable$LegacyRxServer.class */
    private static class LegacyRxServer<T> extends RemoteRxServer {
        private final LegacyTcpPushServer<T> modernServer;

        public LegacyRxServer(LegacyTcpPushServer<T> legacyTcpPushServer) {
            this.modernServer = legacyTcpPushServer;
        }

        public void start() {
            this.modernServer.start();
        }

        public void startAndWait() {
        }

        public void shutdown() {
            this.modernServer.shutdown();
        }

        public void blockUntilServerShutdown() {
            this.modernServer.blockUntilShutdown();
        }
    }

    public WorkerPublisherRemoteObservable(int i, String str, Observable<Integer> observable, String str2) {
        this(i, str, observable, str2, new Routers());
    }

    public WorkerPublisherRemoteObservable(int i, String str, Observable<Integer> observable, String str2, RouterFactory routerFactory) {
        this.name = str;
        this.serverPort = i;
        this.propService = ServiceRegistry.INSTANCE.getPropertiesService();
        this.jobName = str2;
        this.routerFactory = routerFactory;
    }

    @Override // io.mantisrx.runtime.executor.WorkerPublisher
    public void start(StageConfig<?, T> stageConfig, Observable<Observable<T>> observable) {
        RemoteRxServer.Builder builder = new RemoteRxServer.Builder();
        if (stageConfig instanceof KeyValueStageConfig) {
            this.server = new LegacyRxServer(startKeyValueStage((KeyValueStageConfig) stageConfig, observable));
        } else {
            if (!(stageConfig instanceof ScalarToScalar) && !(stageConfig instanceof KeyToScalar) && !(stageConfig instanceof GroupToScalar)) {
                throw new RuntimeException("Unsupported stage type: " + stageConfig);
            }
            if (runNewW2Wserver(this.jobName)) {
                logger.info("Modern server setup for name: " + this.name + " type: Scalarstage");
                this.server = new LegacyRxServer(PushServers.infiniteStreamLegacyTcpNested(new ServerConfig.Builder().name(this.name).port(this.serverPort).metricsRegistry(MetricsRegistry.getInstance()).router(this.routerFactory.scalarStageToStageRouter(this.name, obj -> {
                    return stageConfig.getOutputCodec().encode(obj);
                })).build(), observable));
            } else {
                logger.info("Legacy server setup for name: " + this.name + " type: Scalarstage");
                RoundRobin roundRobin = new RoundRobin();
                builder.addObservable(new ServeNestedObservable.Builder().name(this.name).encoder(stageConfig.getOutputCodec()).observable(observable).slottingStrategy(roundRobin).build());
                MetricsRegistry.getInstance().registerAndGet(roundRobin.getMetrics());
                this.server = builder.port(this.serverPort).build();
            }
        }
        this.server.start();
    }

    private <K> LegacyTcpPushServer<KeyValuePair<K, T>> startKeyValueStage(KeyValueStageConfig<?, K, T> keyValueStageConfig, Observable<Observable<T>> observable) {
        Preconditions.checkArgument(runNewW2WserverGroups(this.jobName), String.format("Need to use new worker2worker server group for jobName %s", this.jobName));
        logger.info("Modern server setup for name: {} type: Keyedstage", this.name);
        long j = Long.MAX_VALUE;
        if (keyValueStageConfig instanceof KeyToKey) {
            j = ((KeyToKey) keyValueStageConfig).getKeyExpireTimeSeconds();
        } else if (keyValueStageConfig instanceof ScalarToKey) {
            j = ((ScalarToKey) keyValueStageConfig).getKeyExpireTimeSeconds();
        }
        Func1 func1 = obj -> {
            return keyValueStageConfig.getOutputCodec().encode(obj);
        };
        Func1 func12 = obj2 -> {
            return keyValueStageConfig.getOutputKeyCodec().encode(obj2);
        };
        ServerConfig build = new ServerConfig.Builder().name(this.name).port(this.serverPort).metricsRegistry(MetricsRegistry.getInstance()).numQueueConsumers(numConsumerThreads()).maxChunkSize(maxChunkSize()).maxChunkTimeMSec(maxChunkTimeMSec()).bufferCapacity(bufferCapacity()).useSpscQueue(useSpsc()).router(Routers.consistentHashingLegacyTcpProtocol(this.jobName, func12, func1)).build();
        return ((keyValueStageConfig instanceof ScalarToGroup) || (keyValueStageConfig instanceof GroupToGroup)) ? PushServers.infiniteStreamLegacyTcpNestedMantisGroup(build, observable, j, func12, HashFunctions.xxh3()) : PushServers.infiniteStreamLegacyTcpNestedGroupedObservable(build, observable, j, func12, HashFunctions.xxh3());
    }

    private boolean useSpsc() {
        return Boolean.parseBoolean(this.propService.getStringValue("mantis.w2w.spsc", "false"));
    }

    private int bufferCapacity() {
        return Integer.parseInt(this.propService.getStringValue("mantis.w2w.toKeyBuffer", "50000"));
    }

    private int maxChunkTimeMSec() {
        return Integer.parseInt(this.propService.getStringValue("mantis.w2w.toKeyMaxChunkTimeMSec", "250"));
    }

    private int maxChunkSize() {
        return Integer.parseInt(this.propService.getStringValue("mantis.w2w.toKeyMaxChunkSize", "1000"));
    }

    private int numConsumerThreads() {
        return Integer.parseInt(this.propService.getStringValue("mantis.w2w.toKeyThreads", "1"));
    }

    private boolean runNewW2Wserver(String str) {
        return Boolean.parseBoolean(this.propService.getStringValue("mantis.w2w.newServerImplScalar", "true")) || Boolean.parseBoolean(this.propService.getStringValue(new StringBuilder().append(str).append(".mantis.w2w.newServerImplScalar").toString(), "false"));
    }

    private boolean runNewW2WserverGroups(String str) {
        return Boolean.parseBoolean(this.propService.getStringValue("mantis.w2w.newServerImplKeyed", "true")) || Boolean.parseBoolean(this.propService.getStringValue(new StringBuilder().append(str).append(".mantis.w2w.newServerImplKeyed").toString(), "false"));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.server.shutdown();
    }

    public RemoteRxServer getServer() {
        return this.server;
    }

    @Override // io.mantisrx.runtime.executor.WorkerPublisher
    public RxMetrics getMetrics() {
        return this.server.getMetrics();
    }
}
