package io.mantisrx.connector.publish.source.http;

import io.mantisrx.connector.publish.core.QueryRegistry;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.WorkerMap;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.type.IntParameter;
import io.mantisrx.runtime.parameter.type.StringParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import io.mantisrx.runtime.source.Index;
import io.mantisrx.runtime.source.Source;
import io.reactivx.mantis.operators.DropOperator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

/* loaded from: input_file:io/mantisrx/connector/publish/source/http/PushHttpSource.class */
public class PushHttpSource implements Source<String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushHttpSource.class);
    private final QueryRegistry queryRegistry;
    private static final String NETTY_THREAD_COUNT_PARAM_NAME = "nettyThreadCount";
    private static final String SERVER_PORT = "serverPort";
    private SourceHttpServer server;
    private final Subject<String, String> eventSubject = new SerializedSubject(PublishSubject.create());
    private AtomicReference<WorkerMap> workerMapAtomicReference = new AtomicReference<>(new WorkerMap(new HashMap()));

    public PushHttpSource(QueryRegistry queryRegistry) {
        this.queryRegistry = queryRegistry;
    }

    public Observable<Observable<String>> call(Context context, Index index) {
        return Observable.just(this.eventSubject.lift(new DropOperator("incoming_" + PushHttpSource.class.getCanonicalName() + "_batch")).onErrorResumeNext(th -> {
            return Observable.empty();
        }));
    }

    public void init(Context context, Index index) {
        LOGGER.info("Initializing PushHttpSource");
        int intValue = ((Integer) context.getParameters().get(NETTY_THREAD_COUNT_PARAM_NAME, 4)).intValue();
        int intValue2 = ((Integer) context.getParameters().get(SERVER_PORT)).intValue();
        LOGGER.info("PushHttpSource server starting at Port " + intValue2);
        this.server = new NettySourceHttpServer(context, intValue);
        try {
            this.server.init(this.queryRegistry, this.eventSubject, intValue2);
            this.server.startServer();
            context.getWorkerMapObservable().subscribeOn(Schedulers.io()).subscribe(workerMap -> {
                LOGGER.info("Got WorkerUpdate" + workerMap);
                this.workerMapAtomicReference.set(workerMap);
            });
            LOGGER.info("PushHttpSource server started");
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public List<ParameterDefinition<?>> getParameters() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new IntParameter().name(NETTY_THREAD_COUNT_PARAM_NAME).validator(Validators.range(1, 8)).defaultValue(4).build());
        arrayList.add(new StringParameter().name("zoneList").description("list of Zones").validator(Validators.alwaysPass()).defaultValue("").build());
        arrayList.add(new StringParameter().name("targetApp").description("target app").validator(Validators.alwaysPass()).defaultValue("").build());
        arrayList.add(new IntParameter().name(SERVER_PORT).description("port to serve the output").validator(Validators.range(1000, 65535)).defaultValue(5054).build());
        arrayList.add(new StringParameter().name("targetASGs").description("target ASGs CSV regex").validator(Validators.alwaysPass()).defaultValue("").build());
        return arrayList;
    }

    public void close() throws IOException {
        if (this.server != null) {
            this.server.shutdownServer();
            this.server = null;
        }
    }
}
