package io.reactivex.netty.examples.http.logtail;

import io.netty.buffer.ByteBuf;
import io.netty.handler.logging.LogLevel;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;

/* loaded from: input_file:io/reactivex/netty/examples/http/logtail/LogAggregator.class */
public class LogAggregator {
    static final int DEFAULT_AG_PORT = 8091;
    private final int port;
    private final List<Integer> producerPorts;
    HttpServer<ByteBuf, ServerSentEvent> server;

    public LogAggregator(int i, List<Integer> list) {
        this.port = i;
        this.producerPorts = list;
    }

    public LogAggregator(int i, int i2, int i3) {
        this.port = i;
        int i4 = (i3 - i2) + 1;
        this.producerPorts = new ArrayList(i4);
        for (int i5 = 0; i5 < i4; i5++) {
            this.producerPorts.add(Integer.valueOf(i2 + i5));
        }
    }

    public HttpServer<ByteBuf, ServerSentEvent> createAggregationServer() {
        this.server = RxNetty.newHttpServerBuilder(this.port, new RequestHandler<ByteBuf, ServerSentEvent>() { // from class: io.reactivex.netty.examples.http.logtail.LogAggregator.1
            public Observable<Void> handle(HttpServerRequest<ByteBuf> httpServerRequest, final HttpServerResponse<ServerSentEvent> httpServerResponse) {
                return LogAggregator.this.connectToLogProducers().flatMap(new Func1<ServerSentEvent, Observable<Void>>() { // from class: io.reactivex.netty.examples.http.logtail.LogAggregator.1.1
                    public Observable<Void> call(ServerSentEvent serverSentEvent) {
                        return httpServerResponse.writeAndFlush(serverSentEvent);
                    }
                });
            }
        }).enableWireLogging(LogLevel.ERROR).pipelineConfigurator(PipelineConfigurators.serveSseConfigurator()).build();
        System.out.println("Logs aggregator server started...");
        return this.server;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<ServerSentEvent> connectToLogProducers() {
        ArrayList arrayList = new ArrayList(this.producerPorts.size());
        Iterator<Integer> it = this.producerPorts.iterator();
        while (it.hasNext()) {
            arrayList.add(connectToLogProducer(it.next().intValue()));
        }
        return Observable.merge(arrayList);
    }

    private static Observable<ServerSentEvent> connectToLogProducer(int i) {
        return RxNetty.createHttpClient("localhost", i, PipelineConfigurators.clientSseConfigurator()).submit(HttpClientRequest.createGet("/logstream")).flatMap(new Func1<HttpClientResponse<ServerSentEvent>, Observable<ServerSentEvent>>() { // from class: io.reactivex.netty.examples.http.logtail.LogAggregator.2
            public Observable<ServerSentEvent> call(HttpClientResponse<ServerSentEvent> httpClientResponse) {
                return httpClientResponse.getContent().doOnNext(new Action1<ServerSentEvent>() { // from class: io.reactivex.netty.examples.http.logtail.LogAggregator.2.1
                    public void call(ServerSentEvent serverSentEvent) {
                        serverSentEvent.retain();
                    }
                });
            }
        });
    }

    public static void main(String[] strArr) {
        if (strArr.length < 2) {
            System.err.println("ERROR: provide log producers port range");
        } else {
            new LogAggregator(DEFAULT_AG_PORT, Integer.valueOf(strArr[0]).intValue(), Integer.valueOf(strArr[1]).intValue()).createAggregationServer().startAndWait();
            System.out.println("Aggregator service terminated");
        }
    }
}
