package io.mantisrx.runtime.executor;

import io.mantisrx.common.MantisProperties;
import io.mantisrx.common.WorkerPorts;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.MetricsServer;
import io.mantisrx.common.metrics.netty.MantisNettyEventsListenerFactory;
import io.mantisrx.common.network.Endpoint;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.MachineDefinitions;
import io.mantisrx.runtime.MantisJobDurationType;
import io.mantisrx.runtime.SinkHolder;
import io.mantisrx.runtime.SourceHolder;
import io.mantisrx.runtime.StageConfig;
import io.mantisrx.runtime.WorkerInfo;
import io.mantisrx.runtime.WorkerMap;
import io.mantisrx.runtime.command.CommandException;
import io.mantisrx.runtime.command.ValidateJob;
import io.mantisrx.runtime.descriptor.SchedulingInfo;
import io.mantisrx.runtime.descriptor.StageSchedulingInfo;
import io.mantisrx.runtime.lifecycle.Lifecycle;
import io.mantisrx.runtime.lifecycle.ServiceLocator;
import io.mantisrx.runtime.parameter.Parameter;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.ParameterUtils;
import io.reactivex.mantis.remote.observable.EndpointChange;
import io.reactivex.mantis.remote.observable.EndpointInjector;
import io.reactivex.mantis.remote.observable.RxMetrics;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import mantis.io.reactivex.netty.RxNetty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.subjects.BehaviorSubject;

/* loaded from: input_file:io/mantisrx/runtime/executor/LocalJobExecutorNetworked.class */
public class LocalJobExecutorNetworked {
    private static final int numPartitions = 1;
    private static final Logger logger = LoggerFactory.getLogger(LocalJobExecutorNetworked.class);
    private static final Action0 nullAction = () -> {
        System.exit(0);
    };

    private LocalJobExecutorNetworked() {
    }

    private static void startSource(int i, int i2, int i3, SourceHolder sourceHolder, StageConfig stageConfig, Context context, Observable<Integer> observable) {
        logger.debug("Creating source publisher on port " + i2);
        StageExecutors.executeSource(i, sourceHolder, stageConfig, new WorkerPublisherRemoteObservable(i2, null, Observable.just(Integer.valueOf(i3 * numPartitions)), null), context, observable);
    }

    private static void startIntermediate(int[] iArr, int i, StageConfig stageConfig, Context context, int i2, int i3, int i4, int i5) {
        if (logger.isDebugEnabled()) {
            StringBuilder sb = new StringBuilder();
            int length = iArr.length;
            for (int i6 = 0; i6 < length; i6 += numPartitions) {
                sb.append(iArr[i6] + " ");
            }
            logger.debug("Creating intermediate consumer connecting to publishers on ports " + ((Object) sb));
        }
        WorkerConsumerRemoteObservable workerConsumerRemoteObservable = new WorkerConsumerRemoteObservable(null, staticInjector(staticEndpoints(iArr, i4, i2, numPartitions)));
        logger.debug("Creating intermediate publisher on port " + i);
        StageExecutors.executeIntermediate(workerConsumerRemoteObservable, stageConfig, new WorkerPublisherRemoteObservable(i, null, Observable.just(Integer.valueOf(i3 * numPartitions)), null), context);
    }

    private static void startSink(StageConfig stageConfig, int[] iArr, StageConfig stageConfig2, PortSelector portSelector, SinkHolder sinkHolder, Context context, Action0 action0, Action0 action02, Action1<Throwable> action1, int i, int i2, int i3) {
        if (logger.isDebugEnabled()) {
            StringBuilder sb = new StringBuilder();
            int length = iArr.length;
            for (int i4 = 0; i4 < length; i4 += numPartitions) {
                sb.append(iArr[i4] + " ");
            }
            logger.debug("Creating sink consumer connecting to publishers on ports " + ((Object) sb));
        }
        StageExecutors.executeSink(new WorkerConsumerRemoteObservable(null, staticInjector(staticEndpoints(iArr, i, i2, numPartitions))), stageConfig2, sinkHolder, portSelector, new RxMetrics(), context, action02, null, null, action0, action1);
    }

    public static Map<String, Object> checkAndGetParameters(Map<String, ParameterDefinition<?>> map, Parameter... parameterArr) throws IllegalArgumentException {
        HashMap hashMap = new HashMap();
        int length = parameterArr.length;
        for (int i = 0; i < length; i += numPartitions) {
            Parameter parameter = parameterArr[i];
            hashMap.put(parameter.getName(), parameter);
        }
        return ParameterUtils.checkThenCreateState(map, hashMap);
    }

    public static void execute(Job job, Parameter... parameterArr) throws IllegalMantisJobException {
        List<StageConfig<?, ?>> stages = job.getStages();
        SchedulingInfo.Builder builder = new SchedulingInfo.Builder();
        for (StageConfig<?, ?> stageConfig : stages) {
            builder.singleWorkerStage(MachineDefinitions.micro());
        }
        builder.numberOfStages(stages.size());
        execute(job, builder.build(), parameterArr);
    }

    public static void execute(Job job, SchedulingInfo schedulingInfo, Parameter... parameterArr) throws IllegalMantisJobException {
        try {
            new ValidateJob(job).execute();
            List<StageConfig<?, ?>> stages = job.getStages();
            SourceHolder<?> source = job.getSource();
            SinkHolder sink = job.getSink();
            PortSelectorInRange portSelectorInRange = new PortSelectorInRange(8000, 9000);
            RxNetty.useMetricListenersFactory(new MantisNettyEventsListenerFactory());
            MetricsServer metricsServer = new MetricsServer(portSelectorInRange.acquirePort(), 1L, Collections.EMPTY_MAP);
            metricsServer.start();
            Lifecycle lifecycle = job.getLifecycle();
            lifecycle.startup();
            Map<String, ParameterDefinition<?>> parameterDefinitions = job.getParameterDefinitions();
            String format = String.format("localJob-%s-%d", (String) Optional.ofNullable(MantisProperties.getProperty("USER")).orElse("userUnknown"), Integer.valueOf((int) (Math.random() * 10000.0d)));
            logger.info("jobID {}", format);
            ServiceLocator serviceLocator = lifecycle.getServiceLocator();
            int numberOfInstances = schedulingInfo.forStage(numPartitions).getNumberOfInstances();
            BehaviorSubject create = BehaviorSubject.create(Integer.valueOf(numberOfInstances));
            BehaviorSubject create2 = BehaviorSubject.create();
            if (stages.size() == numPartitions) {
                StageConfig<?, ?> stageConfig = stages.get(0);
                final CountDownLatch countDownLatch = new CountDownLatch(numberOfInstances);
                Action0 action0 = new Action0() { // from class: io.mantisrx.runtime.executor.LocalJobExecutorNetworked.1
                    public void call() {
                        countDownLatch.countDown();
                    }
                };
                Action0 action02 = new Action0() { // from class: io.mantisrx.runtime.executor.LocalJobExecutorNetworked.2
                    public void call() {
                    }
                };
                Action1<Throwable> action1 = new Action1<Throwable>() { // from class: io.mantisrx.runtime.executor.LocalJobExecutorNetworked.3
                    public void call(Throwable th) {
                    }
                };
                HashMap hashMap = new HashMap();
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < numberOfInstances; i += numPartitions) {
                    WorkerInfo workerInfo = new WorkerInfo(format, format, numPartitions, i, i + numPartitions, MantisJobDurationType.Perpetual, "localhost", new WorkerPorts(portSelectorInRange.acquirePort(), portSelectorInRange.acquirePort(), portSelectorInRange.acquirePort(), portSelectorInRange.acquirePort(), portSelectorInRange.acquirePort()));
                    arrayList.add(workerInfo);
                    Context context = new Context(ParameterUtils.createContextParameters(parameterDefinitions, parameterArr), lifecycle.getServiceLocator(), workerInfo, MetricsRegistry.getInstance(), () -> {
                        System.exit(0);
                    }, create2, Thread.currentThread().getContextClassLoader());
                    hashMap.put(Integer.valueOf(numPartitions), arrayList);
                    create2.onNext(new WorkerMap(hashMap));
                    StageExecutors.executeSingleStageJob(source, stageConfig, sink, () -> {
                        return workerInfo.getWorkerPorts().getSinkPort();
                    }, new RxMetrics(), context, action0, i, create, null, null, action02, action1);
                }
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } else {
                int i2 = 0;
                StageConfig<?, ?> stageConfig2 = stages.get(0);
                StageSchedulingInfo forStage = schedulingInfo.forStage(numPartitions);
                StageSchedulingInfo forStage2 = schedulingInfo.forStage(2);
                int[] iArr = new int[forStage.getNumberOfInstances()];
                HashMap hashMap2 = new HashMap();
                ArrayList arrayList2 = new ArrayList();
                for (int i3 = 0; i3 < forStage.getNumberOfInstances(); i3 += numPartitions) {
                    WorkerInfo workerInfo2 = new WorkerInfo(format, format, numPartitions, i3, i3 + numPartitions, MantisJobDurationType.Perpetual, "localhost", new WorkerPorts(portSelectorInRange.acquirePort(), portSelectorInRange.acquirePort(), portSelectorInRange.acquirePort(), portSelectorInRange.acquirePort(), portSelectorInRange.acquirePort()));
                    arrayList2.add(workerInfo2);
                    int sinkPort = workerInfo2.getWorkerPorts().getSinkPort();
                    iArr[i3] = sinkPort;
                    startSource(i3, sinkPort, forStage2.getNumberOfInstances(), job.getSource(), stageConfig2, new Context(ParameterUtils.createContextParameters(parameterDefinitions, parameterArr), serviceLocator, workerInfo2, MetricsRegistry.getInstance(), nullAction, create2, Thread.currentThread().getContextClassLoader()), create);
                }
                hashMap2.put(Integer.valueOf(numPartitions), arrayList2);
                create2.onNext(new WorkerMap(hashMap2));
                for (int i4 = numPartitions; i4 < stages.size() - numPartitions; i4 += numPartitions) {
                    StageSchedulingInfo forStage3 = schedulingInfo.forStage(i4);
                    StageSchedulingInfo forStage4 = schedulingInfo.forStage(i4 + numPartitions);
                    stageConfig2 = stages.get(i4);
                    StageSchedulingInfo forStage5 = schedulingInfo.forStage(i4 + 2);
                    int[] iArr2 = new int[forStage4.getNumberOfInstances()];
                    ArrayList arrayList3 = new ArrayList();
                    for (int i5 = 0; i5 < forStage4.getNumberOfInstances(); i5 += numPartitions) {
                        WorkerPorts workerPorts = new WorkerPorts(portSelectorInRange.acquirePort(), portSelectorInRange.acquirePort(), portSelectorInRange.acquirePort(), portSelectorInRange.acquirePort(), portSelectorInRange.acquirePort());
                        int i6 = i2;
                        i2 += numPartitions;
                        WorkerInfo workerInfo3 = new WorkerInfo(format, format, i4 + numPartitions, i5, i6, MantisJobDurationType.Perpetual, "localhost", workerPorts);
                        arrayList3.add(workerInfo3);
                        int sinkPort2 = workerInfo3.getWorkerPorts().getSinkPort();
                        iArr2[i5] = sinkPort2;
                        startIntermediate(iArr, sinkPort2, stageConfig2, new Context(ParameterUtils.createContextParameters(parameterDefinitions, parameterArr), serviceLocator, workerInfo3, MetricsRegistry.getInstance(), nullAction, create2, Thread.currentThread().getContextClassLoader()), i5, forStage5.getNumberOfInstances(), i4, forStage3.getNumberOfInstances());
                    }
                    hashMap2.put(Integer.valueOf(i4 + numPartitions), arrayList3);
                    create2.onNext(new WorkerMap(hashMap2));
                    iArr = iArr2;
                }
                StageSchedulingInfo forStage6 = schedulingInfo.forStage(stages.size() - numPartitions);
                StageConfig<?, ?> stageConfig3 = stages.get(stages.size() - 2);
                StageConfig<?, ?> stageConfig4 = stages.get(stages.size() - numPartitions);
                int numberOfInstances2 = schedulingInfo.forStage(stages.size()).getNumberOfInstances();
                final CountDownLatch countDownLatch2 = new CountDownLatch(numberOfInstances2);
                Action0 action03 = new Action0() { // from class: io.mantisrx.runtime.executor.LocalJobExecutorNetworked.4
                    public void call() {
                        countDownLatch2.countDown();
                    }
                };
                Action0 action04 = new Action0() { // from class: io.mantisrx.runtime.executor.LocalJobExecutorNetworked.5
                    public void call() {
                    }
                };
                Action1<Throwable> action12 = new Action1<Throwable>() { // from class: io.mantisrx.runtime.executor.LocalJobExecutorNetworked.6
                    public void call(Throwable th) {
                    }
                };
                ArrayList arrayList4 = new ArrayList();
                for (int i7 = 0; i7 < numberOfInstances2; i7 += numPartitions) {
                    WorkerPorts workerPorts2 = new WorkerPorts(portSelectorInRange.acquirePort(), portSelectorInRange.acquirePort(), portSelectorInRange.acquirePort(), portSelectorInRange.acquirePort(), portSelectorInRange.acquirePort());
                    int i8 = i2;
                    i2 += numPartitions;
                    WorkerInfo workerInfo4 = new WorkerInfo(format, format, stages.size(), i7, i8, MantisJobDurationType.Perpetual, "localhost", workerPorts2);
                    arrayList4.add(workerInfo4);
                    startSink(stageConfig3, iArr, stageConfig4, () -> {
                        return workerInfo4.getWorkerPorts().getSinkPort();
                    }, sink, new Context(ParameterUtils.createContextParameters(parameterDefinitions, parameterArr), serviceLocator, workerInfo4, MetricsRegistry.getInstance(), nullAction, create2, Thread.currentThread().getContextClassLoader()), action03, action04, action12, stages.size(), i7, forStage6.getNumberOfInstances());
                }
                hashMap2.put(Integer.valueOf(stages.size()), arrayList4);
                create2.onNext(new WorkerMap(hashMap2));
                try {
                    countDownLatch2.await();
                } catch (InterruptedException e2) {
                    throw new RuntimeException(e2);
                }
            }
            lifecycle.shutdown();
            metricsServer.shutdown();
        } catch (CommandException e3) {
            throw new IllegalMantisJobException(e3);
        }
    }

    private static Observable<Set<Endpoint>> staticEndpoints(final int[] iArr, final int i, final int i2, final int i3) {
        return Observable.create(new Observable.OnSubscribe<Set<Endpoint>>() { // from class: io.mantisrx.runtime.executor.LocalJobExecutorNetworked.7
            public void call(Subscriber<? super Set<Endpoint>> subscriber) {
                HashSet hashSet = new HashSet();
                for (int i4 = 0; i4 < iArr.length; i4 += LocalJobExecutorNetworked.numPartitions) {
                    int i5 = iArr[i4];
                    for (int i6 = LocalJobExecutorNetworked.numPartitions; i6 <= i3; i6 += LocalJobExecutorNetworked.numPartitions) {
                        Endpoint endpoint = new Endpoint("localhost", i5, "stage_" + i + "_index_" + i2 + "_partition_" + i6);
                        LocalJobExecutorNetworked.logger.info("adding static endpoint:" + endpoint);
                        hashSet.add(endpoint);
                    }
                }
                subscriber.onNext(hashSet);
                subscriber.onCompleted();
            }
        });
    }

    private static EndpointInjector staticInjector(final Observable<Set<Endpoint>> observable) {
        return new EndpointInjector() { // from class: io.mantisrx.runtime.executor.LocalJobExecutorNetworked.8
            public Observable<EndpointChange> deltas() {
                return observable.flatMap(new Func1<Set<Endpoint>, Observable<EndpointChange>>() { // from class: io.mantisrx.runtime.executor.LocalJobExecutorNetworked.8.1
                    public Observable<EndpointChange> call(Set<Endpoint> set) {
                        return Observable.from(set).map(new Func1<Endpoint, EndpointChange>() { // from class: io.mantisrx.runtime.executor.LocalJobExecutorNetworked.8.1.1
                            public EndpointChange call(Endpoint endpoint) {
                                LocalJobExecutorNetworked.logger.info("injected endpoint:" + endpoint);
                                return new EndpointChange(EndpointChange.Type.add, new Endpoint(endpoint.getHost(), endpoint.getPort(), endpoint.getSlotId()));
                            }
                        });
                    }
                });
            }
        };
    }
}
