package io.mantisrx.runtime.sink;

import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.Tag;
import io.mantisrx.common.compression.CompressionUtils;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.network.Endpoint;
import io.mantisrx.common.network.HashFunctions;
import io.mantisrx.common.network.ServerSlotManager;
import io.mantisrx.common.network.WritableEndpoint;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.sink.predicate.Predicate;
import io.netty.buffer.ByteBuf;
import io.reactivx.mantis.operators.DisableBackPressureOperator;
import io.reactivx.mantis.operators.DropOperator;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import mantis.io.reactivex.netty.protocol.http.server.HttpServerRequest;
import mantis.io.reactivex.netty.protocol.http.server.HttpServerResponse;
import mantis.io.reactivex.netty.protocol.http.server.RequestHandler;
import mantis.io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;

/* loaded from: input_file:io/mantisrx/runtime/sink/ServerSentEventRequestHandler.class */
public class ServerSentEventRequestHandler<T> implements RequestHandler<ByteBuf, ServerSentEvent> {
    private static final String ENABLE_PINGS_PARAM = "enablePings";
    private static final String SAMPLE_PARAM = "sample";
    private static final String SAMPLE_PARAM_MSEC = "sampleMSec";
    private static final String CLIENT_ID_PARAM = "clientId";
    private static final int PING_INTERVAL = 2000;
    private static final String TEXT_FORMAT = "text";
    private static final String DEFAULT_FORMAT = "text";
    private static final String FORMAT_PARAM = "format";
    private static final String PING = "\ndata: ping\n\n";
    private Observable<T> observableToServe;
    private Func1<T, String> encoder;
    private Func1<Throwable, String> errorEncoder;
    private Predicate<T> predicate;
    private Func2<Map<String, List<String>>, Context, Void> requestPreprocessor;
    private Func2<Map<String, List<String>>, Context, Void> requestPostprocessor;
    private Context context;
    private int flushIntervalMillis;
    protected static final Object BINARY_FORMAT = "binary";
    private static final Logger LOG = LoggerFactory.getLogger(ServerSentEventRequestHandler.class);
    private static final byte[] EVENT_PREFIX_BYTES = "event: ".getBytes();
    private static final String TWO_NEWLINES = "\n\n";
    private static final byte[] NEW_LINE_AS_BYTES = TWO_NEWLINES.getBytes();
    private static final byte[] ID_PREFIX_AS_BYTES = "id: ".getBytes();
    private static final String SSE_DATA_PREFIX = "data: ";
    private static final byte[] DATA_PREFIX_AS_BYTES = SSE_DATA_PREFIX.getBytes();
    final ServerSlotManager ssm = new ServerSlotManager(HashFunctions.ketama());
    private boolean pingsEnabled = true;
    private String format = "text";

    public ServerSentEventRequestHandler(Observable<T> observable, Func1<T, String> func1, Func1<Throwable, String> func12, Predicate<T> predicate, Func2<Map<String, List<String>>, Context, Void> func2, Func2<Map<String, List<String>>, Context, Void> func22, Context context, int i) {
        this.flushIntervalMillis = 250;
        this.observableToServe = observable;
        this.encoder = func1;
        this.errorEncoder = func12;
        this.predicate = predicate;
        this.requestPreprocessor = func2;
        this.requestPostprocessor = func22;
        this.context = context;
        this.flushIntervalMillis = i;
    }

    public Observable<Void> handle(HttpServerRequest<ByteBuf> httpServerRequest, final HttpServerResponse<ServerSentEvent> httpServerResponse) {
        InetSocketAddress inetSocketAddress = (InetSocketAddress) httpServerResponse.getChannel().remoteAddress();
        LOG.info("HTTP SSE connection received from " + inetSocketAddress.getAddress() + ":" + inetSocketAddress.getPort() + "  queryParams: " + httpServerRequest.getQueryParameters());
        String inetAddress = inetSocketAddress.getAddress().toString();
        final WritableEndpoint writableEndpoint = new WritableEndpoint(inetSocketAddress.getHostString(), inetSocketAddress.getPort(), Endpoint.uniqueHost(inetSocketAddress.getHostString(), inetSocketAddress.getPort(), (String) null));
        final Map queryParameters = httpServerRequest.getQueryParameters();
        final ServerSlotManager.SlotAssignmentManager registerServer = this.ssm.registerServer(writableEndpoint, queryParameters);
        new AtomicLong().set(-1L);
        final AtomicLong atomicLong = new AtomicLong(-1L);
        Observable<T> observable = this.observableToServe;
        if ("true".equals("false")) {
            observable = observable.lift(new DropOperator("outgoing_ServerSentEventRequestHandler", new Tag[]{new BasicTag("sockAddr", (String) Optional.ofNullable(inetAddress).orElse("none"))})).observeOn(Schedulers.io());
        }
        httpServerResponse.getHeaders().set("Access-Control-Allow-Origin", "*");
        httpServerResponse.getHeaders().set("content-type", "text/event-stream");
        httpServerResponse.getHeaders().set("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate");
        httpServerResponse.getHeaders().set("Pragma", "no-cache");
        httpServerResponse.flush();
        String str = inetAddress;
        if (queryParameters != null && queryParameters.containsKey(CLIENT_ID_PARAM)) {
            str = (String) ((List) queryParameters.get(CLIENT_ID_PARAM)).get(0);
        }
        if (queryParameters != null && queryParameters.containsKey(FORMAT_PARAM)) {
            this.format = (String) ((List) queryParameters.get(FORMAT_PARAM)).get(0);
        }
        if (queryParameters != null && this.requestPreprocessor != null) {
            this.requestPreprocessor.call(queryParameters, this.context);
        }
        if (queryParameters != null && queryParameters.containsKey(SAMPLE_PARAM_MSEC)) {
            observable = observable.sample(Integer.parseInt((String) ((List) queryParameters.get(SAMPLE_PARAM_MSEC)).get(0)), TimeUnit.MILLISECONDS);
        }
        if (queryParameters != null && queryParameters.containsKey(SAMPLE_PARAM)) {
            observable = observable.sample(Integer.parseInt((String) ((List) queryParameters.get(SAMPLE_PARAM)).get(0)), TimeUnit.SECONDS);
        }
        if (queryParameters != null && queryParameters.containsKey(ENABLE_PINGS_PARAM)) {
            if ("true".equalsIgnoreCase((String) ((List) queryParameters.get(ENABLE_PINGS_PARAM)).get(0))) {
                this.pingsEnabled = true;
            } else {
                this.pingsEnabled = false;
            }
        }
        if (queryParameters != null && queryParameters.containsKey("delay")) {
            try {
                int parseInt = Integer.parseInt((String) ((List) queryParameters.get("delay")).get(0));
                if (parseInt >= 50) {
                    this.flushIntervalMillis = parseInt;
                } else {
                    LOG.warn("delay parameter too small " + parseInt + " min. is 100");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        Metrics build = new Metrics.Builder().id("ServerSentEventRequestHandler", new Tag[]{new BasicTag(CLIENT_ID_PARAM, (String) Optional.ofNullable(str).orElse("none")), new BasicTag("sockAddr", (String) Optional.ofNullable(inetAddress).orElse("none"))}).addCounter("processedCounter").addCounter("pingCounter").addCounter("errorCounter").addCounter("droppedCounter").addCounter("flushCounter").build();
        final Counter counter = build.getCounter("processedCounter");
        final Counter counter2 = build.getCounter("pingCounter");
        final Counter counter3 = build.getCounter("errorCounter");
        final Counter counter4 = build.getCounter("droppedCounter");
        final Counter counter5 = build.getCounter("flushCounter");
        Func1<T, Boolean> func1 = new Func1<T, Boolean>() { // from class: io.mantisrx.runtime.sink.ServerSentEventRequestHandler.1
            public Boolean call(T t) {
                return true;
            }

            /* renamed from: call, reason: collision with other method in class */
            public /* bridge */ /* synthetic */ Object m13call(Object obj) {
                return call((AnonymousClass1) obj);
            }
        };
        if (queryParameters != null && this.predicate != null) {
            func1 = (Func1) this.predicate.getPredicate().call(queryParameters);
        }
        final Subscription subscribe = Observable.interval(1L, TimeUnit.SECONDS).doOnNext(new Action1<Long>() { // from class: io.mantisrx.runtime.sink.ServerSentEventRequestHandler.2
            public void call(Long l) {
                long currentTimeMillis = System.currentTimeMillis();
                if (ServerSentEventRequestHandler.this.pingsEnabled) {
                    if (atomicLong.get() == -1 || currentTimeMillis > atomicLong.get() + 2000) {
                        counter2.increment();
                        httpServerResponse.writeStringAndFlush(ServerSentEventRequestHandler.PING);
                        atomicLong.set(currentTimeMillis);
                    }
                }
            }
        }).subscribe();
        return observable.filter(func1).map(this.encoder).lift(new DisableBackPressureOperator()).buffer(this.flushIntervalMillis, TimeUnit.MILLISECONDS).flatMap(new Func1<List<String>, Observable<Void>>() { // from class: io.mantisrx.runtime.sink.ServerSentEventRequestHandler.4
            public Observable<Void> call(List<String> list) {
                if (httpServerResponse.isCloseIssued() || !httpServerResponse.getChannel().isActive()) {
                    ServerSentEventRequestHandler.LOG.info("Client closed detected, throwing closed channel exception");
                    return Observable.error(new ClosedChannelException());
                }
                Stream<String> stream = list.stream();
                ServerSlotManager.SlotAssignmentManager slotAssignmentManager = registerServer;
                WritableEndpoint writableEndpoint2 = writableEndpoint;
                List<String> list2 = (List) stream.filter(str2 -> {
                    return slotAssignmentManager.filter(writableEndpoint2, str2.getBytes());
                }).collect(Collectors.toList());
                if (!httpServerResponse.getChannel().isWritable()) {
                    counter4.increment(list2.size());
                    return Observable.empty();
                }
                counter5.increment();
                if (ServerSentEventRequestHandler.this.format.equals(ServerSentEventRequestHandler.BINARY_FORMAT)) {
                    try {
                        String compressAndBase64Encode = CompressionUtils.compressAndBase64Encode(list2, true);
                        StringBuilder sb = new StringBuilder(3);
                        sb.append(ServerSentEventRequestHandler.SSE_DATA_PREFIX);
                        sb.append(compressAndBase64Encode);
                        sb.append(ServerSentEventRequestHandler.TWO_NEWLINES);
                        counter.increment(list.size());
                        atomicLong.set(System.currentTimeMillis());
                        return httpServerResponse.writeStringAndFlush(sb.toString());
                    } catch (Exception e2) {
                        ServerSentEventRequestHandler.LOG.warn("Could not compress data" + e2.getMessage());
                        counter4.increment(list.size());
                        return Observable.empty();
                    }
                }
                int i = 0;
                StringBuilder sb2 = new StringBuilder(list.size() * 3);
                for (String str3 : list2) {
                    sb2.append(ServerSentEventRequestHandler.SSE_DATA_PREFIX);
                    sb2.append(str3);
                    sb2.append(ServerSentEventRequestHandler.TWO_NEWLINES);
                    i++;
                }
                counter.increment(i);
                atomicLong.set(System.currentTimeMillis());
                return httpServerResponse.writeStringAndFlush(sb2.toString());
            }
        }).onErrorResumeNext(new Func1<Throwable, Observable<? extends Void>>() { // from class: io.mantisrx.runtime.sink.ServerSentEventRequestHandler.3
            public Observable<? extends Void> call(Throwable th) {
                Throwable cause = th.getCause();
                counter3.increment();
                if (cause != null && !(cause instanceof ClosedChannelException)) {
                    ServerSentEventRequestHandler.LOG.warn("Error detected in SSE sink", cause);
                    if (ServerSentEventRequestHandler.this.errorEncoder != null) {
                        httpServerResponse.writeAndFlush(ServerSentEvent.withEventType(httpServerResponse.getAllocator().buffer().writeBytes("error: ".getBytes()), httpServerResponse.getAllocator().buffer().writeBytes(((String) ServerSentEventRequestHandler.this.errorEncoder.call(th)).getBytes())));
                    }
                    th.printStackTrace();
                }
                if (ServerSentEventRequestHandler.this.requestPostprocessor != null && queryParameters != null) {
                    ServerSentEventRequestHandler.this.requestPostprocessor.call(queryParameters, ServerSentEventRequestHandler.this.context);
                }
                ServerSentEventRequestHandler.this.ssm.deregisterServer(writableEndpoint, queryParameters);
                subscribe.unsubscribe();
                return Observable.error(th);
            }
        });
    }
}
