package io.mantisrx.runtime.sink;

import io.mantisrx.common.properties.MantisPropertiesLoader;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.Metadata;
import io.mantisrx.runtime.PortRequest;
import io.mantisrx.runtime.sink.predicate.Predicate;
import io.mantisrx.server.core.ServiceRegistry;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOption;
import io.reactivex.mantis.network.push.PushServerSse;
import io.reactivex.mantis.network.push.PushServers;
import io.reactivex.mantis.network.push.Routers;
import io.reactivex.mantis.network.push.ServerConfig;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import mantis.io.reactivex.netty.RxNetty;
import mantis.io.reactivex.netty.pipeline.PipelineConfigurators;
import mantis.io.reactivex.netty.protocol.http.server.HttpServer;
import mantis.io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.BehaviorSubject;

/* loaded from: input_file:io/mantisrx/runtime/sink/ServerSentEventsSink.class */
public class ServerSentEventsSink<T> implements SelfDocumentingSink<T> {
    private static final Logger LOG = LoggerFactory.getLogger(ServerSentEventsSink.class);
    private final Func2<Map<String, List<String>>, Context, Void> subscribeProcessor;
    private final BehaviorSubject<Integer> portObservable;
    private final Func1<T, String> encoder;
    private final Func1<Throwable, String> errorEncoder;
    private final Predicate<T> predicate;
    private Func2<Map<String, List<String>>, Context, Void> requestPreprocessor;
    private Func2<Map<String, List<String>>, Context, Void> requestPostprocessor;
    private int port;
    private final MantisPropertiesLoader propService;
    private PushServerSse<T, Context> pushServerSse;
    private HttpServer<ByteBuf, ServerSentEvent> httpServer;

    /* loaded from: input_file:io/mantisrx/runtime/sink/ServerSentEventsSink$Builder.class */
    public static class Builder<T> {
        private Func1<T, String> encoder;
        private Func2<Map<String, List<String>>, Context, Void> requestPreprocessor;
        private Func2<Map<String, List<String>>, Context, Void> requestPostprocessor;
        private Func1<Throwable, String> errorEncoder = (v0) -> {
            return v0.getMessage();
        };
        private Predicate<T> predicate;
        private Func2<Map<String, List<String>>, Context, Void> subscribeProcessor;

        public Builder<T> withEncoder(Func1<T, String> func1) {
            this.encoder = func1;
            return this;
        }

        public Builder<T> withErrorEncoder(Func1<Throwable, String> func1) {
            this.errorEncoder = func1;
            return this;
        }

        public Builder<T> withPredicate(Predicate<T> predicate) {
            this.predicate = predicate;
            return this;
        }

        public Builder<T> withRequestPreprocessor(Func2<Map<String, List<String>>, Context, Void> func2) {
            this.requestPreprocessor = func2;
            return this;
        }

        public Builder<T> withSubscribePreprocessor(Func2<Map<String, List<String>>, Context, Void> func2) {
            this.subscribeProcessor = func2;
            return this;
        }

        public Builder<T> withRequestPostprocessor(Func2<Map<String, List<String>>, Context, Void> func2) {
            this.requestPostprocessor = func2;
            return this;
        }

        public ServerSentEventsSink<T> build() {
            return new ServerSentEventsSink<>(this);
        }
    }

    public ServerSentEventsSink(Func1<T, String> func1) {
        this(func1, null, null);
    }

    ServerSentEventsSink(Func1<T, String> func1, Func1<Throwable, String> func12, Predicate<T> predicate) {
        this.portObservable = BehaviorSubject.create();
        this.port = -1;
        func12 = func12 == null ? (v0) -> {
            return v0.getMessage();
        } : func12;
        this.encoder = func1;
        this.errorEncoder = func12;
        this.predicate = predicate;
        this.propService = ServiceRegistry.INSTANCE.getPropertiesService();
        this.subscribeProcessor = null;
    }

    ServerSentEventsSink(Builder<T> builder) {
        this.portObservable = BehaviorSubject.create();
        this.port = -1;
        this.encoder = ((Builder) builder).encoder;
        this.errorEncoder = ((Builder) builder).errorEncoder;
        this.predicate = ((Builder) builder).predicate;
        this.requestPreprocessor = ((Builder) builder).requestPreprocessor;
        this.requestPostprocessor = ((Builder) builder).requestPostprocessor;
        this.subscribeProcessor = ((Builder) builder).subscribeProcessor;
        this.propService = ServiceRegistry.INSTANCE.getPropertiesService();
    }

    @Override // io.mantisrx.runtime.sink.SelfDocumentingSink
    public Metadata metadata() {
        StringBuilder sb = new StringBuilder();
        sb.append("HTTP server streaming results using Server-sent events.  The sink supports optional subscription (GET) parameters to change the events emitted by the stream.  A sampling interval can be applied to the stream using the GET parameter sample=numSeconds.  This will limit the stream rate to events-per-numSeconds.");
        if (this.predicate != null && this.predicate.getDescription() != null) {
            sb.append("  Predicate description: ").append(this.predicate.getDescription());
        }
        return new Metadata.Builder().name("Server Sent Event Sink").description(sb.toString()).build();
    }

    private boolean runNewSseServerImpl(String str) {
        return Boolean.parseBoolean(this.propService.getStringValue("mantis.sse.newServerImpl", "true")) || Boolean.parseBoolean(this.propService.getStringValue(new StringBuilder().append(str).append(".mantis.sse.newServerImpl").toString(), "false"));
    }

    private int numConsumerThreads() {
        return Integer.parseInt(this.propService.getStringValue("mantis.sse.numConsumerThreads", "1"));
    }

    private int maxChunkSize() {
        return Integer.parseInt(this.propService.getStringValue("mantis.sse.maxChunkSize", "1000"));
    }

    private int maxReadTime() {
        return Integer.parseInt(this.propService.getStringValue("mantis.sse.maxReadTimeMSec", "250"));
    }

    private int bufferCapacity() {
        return Integer.parseInt(this.propService.getStringValue("mantis.sse.bufferCapacity", "25000"));
    }

    private boolean useSpsc() {
        return Boolean.parseBoolean(this.propService.getStringValue("mantis.sse.spsc", "false"));
    }

    public void call(Context context, PortRequest portRequest, Observable<T> observable) {
        this.port = portRequest.getPort();
        if (runNewSseServerImpl(context.getWorkerInfo().getJobName())) {
            LOG.info("Serving modern HTTP SSE server sink on port: " + this.port);
            ServerConfig.Builder maxChunkTimeMSec = new ServerConfig.Builder().name("SseSink").groupRouter(Routers.roundRobinSse("SseSink", this.encoder)).port(this.port).metricsRegistry(context.getMetricsRegistry()).maxChunkTimeMSec(maxReadTime()).maxChunkSize(maxChunkSize()).bufferCapacity(bufferCapacity()).numQueueConsumers(numConsumerThreads()).useSpscQueue(useSpsc()).maxChunkTimeMSec(getBatchInterval());
            if (this.predicate != null) {
                maxChunkTimeMSec.predicate(this.predicate.getPredicate());
            }
            this.pushServerSse = PushServers.infiniteStreamSse(maxChunkTimeMSec.build(), observable, this.requestPreprocessor, this.requestPostprocessor, this.subscribeProcessor, context, true);
            this.pushServerSse.start();
        } else {
            LOG.info("Serving legacy HTTP SSE server sink on port: " + this.port);
            this.httpServer = RxNetty.newHttpServerBuilder(this.port, new ServerSentEventRequestHandler(observable, this.encoder, this.errorEncoder, this.predicate, this.requestPreprocessor, this.requestPostprocessor, context, getBatchInterval())).pipelineConfigurator(PipelineConfigurators.serveSseConfigurator()).channelOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 5242880).channelOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 1048576).build();
            this.httpServer.start();
        }
        this.portObservable.onNext(Integer.valueOf(this.port));
    }

    @Override // io.mantisrx.runtime.sink.Sink, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.pushServerSse != null) {
            this.pushServerSse.shutdown();
        } else if (this.httpServer != null) {
            try {
                this.httpServer.shutdown();
            } catch (InterruptedException e) {
                throw new IOException(String.format("Failed to shut down the http server %s", this.httpServer), e);
            }
        }
    }

    private int getBatchInterval() {
        String stringValue = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.sse.batchInterval", "100");
        LOG.info("Read fast property mantis.sse.batchInterval" + stringValue);
        return Integer.parseInt(stringValue);
    }

    private int getHighWaterMark() {
        String stringValue = this.propService.getStringValue("JOB_NAME", "default");
        int i = 5242880;
        String stringValue2 = this.propService.getStringValue(stringValue + ".sse.highwater.mark", Integer.toString(5242880));
        LOG.info("Read fast property:" + stringValue + ".sse.highwater.mark ->" + stringValue2);
        try {
            i = Integer.parseInt(stringValue2);
        } catch (Exception e) {
            LOG.error("Error parsing string " + stringValue2 + " exception " + e.getMessage());
        }
        return i;
    }

    public int getServerPort() {
        return this.port;
    }

    public Observable<Integer> portConnections() {
        return this.portObservable;
    }
}
