package io.aleph0.yap.messaging.jetty;

import io.aleph0.yap.core.Sink;
import io.aleph0.yap.messaging.core.FirehoseMetrics;
import io.aleph0.yap.messaging.core.FirehoseProducerWorker;
import io.aleph0.yap.messaging.core.Message;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.websocket.api.Callback;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketOpen;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/aleph0/yap/messaging/jetty/WebSocketFirehoseProducerWorker.class */
public class WebSocketFirehoseProducerWorker<T> implements FirehoseProducerWorker<Message<T>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketFirehoseProducerWorker.class);
    private final AtomicLong receivedMetric;
    private final URI uri;
    private final Configurator configurator;
    private final MessageFactory<T> messageFactory;

    /* loaded from: input_file:io/aleph0/yap/messaging/jetty/WebSocketFirehoseProducerWorker$Configurator.class */
    public interface Configurator {
        default void configureHttpClient(HttpClient httpClient) {
        }

        default void configureWebSocketClient(WebSocketClient webSocketClient) {
        }

        default void configureClientUpgradeRequest(ClientUpgradeRequest clientUpgradeRequest) {
        }

        default void configureSession(Session session) {
        }
    }

    @WebSocket(autoDemand = true)
    /* loaded from: input_file:io/aleph0/yap/messaging/jetty/WebSocketFirehoseProducerWorker$InternalSocketListener.class */
    public class InternalSocketListener {
        private final Sink<Message<T>> sink;
        private final BlockingQueue<Throwable> failureCauses;
        private final CountDownLatch latch;

        public InternalSocketListener(Sink<Message<T>> sink, BlockingQueue<Throwable> blockingQueue, CountDownLatch countDownLatch) {
            this.sink = (Sink) Objects.requireNonNull(sink, "sink");
            this.failureCauses = (BlockingQueue) Objects.requireNonNull(blockingQueue, "failureCauses");
            this.latch = (CountDownLatch) Objects.requireNonNull(countDownLatch, "latch");
        }

        @OnWebSocketOpen
        public void onWebSocketOpen(Session session) {
            WebSocketFirehoseProducerWorker.LOGGER.atInfo().log("WebSocket connected");
            WebSocketFirehoseProducerWorker.this.configurator.configureSession(session);
        }

        @OnWebSocketMessage
        public void onWebSocketText(Session session, String str) {
            try {
                putMessages(session, WebSocketFirehoseProducerWorker.this.messageFactory.newTextMessages(str));
            } catch (Exception e) {
                WebSocketFirehoseProducerWorker.LOGGER.atError().setCause(e).log("Failed to create text messages");
                this.failureCauses.offer(e);
                session.close(1011, (String) null, Callback.NOOP);
            }
        }

        @OnWebSocketMessage
        public void onWebSocketBinary(Session session, ByteBuffer byteBuffer, Callback callback) {
            try {
                putMessages(session, WebSocketFirehoseProducerWorker.this.messageFactory.newBinaryMessages(byteBuffer));
                callback.succeed();
            } catch (Exception e) {
                WebSocketFirehoseProducerWorker.LOGGER.atError().setCause(e).log("Failed to create binary messages");
                this.failureCauses.offer(e);
                session.close(1011, (String) null, Callback.NOOP);
            }
        }

        private void putMessages(Session session, List<Message<T>> list) {
            try {
                Iterator<Message<T>> it = list.iterator();
                while (it.hasNext()) {
                    this.sink.put(it.next());
                    WebSocketFirehoseProducerWorker.this.receivedMetric.incrementAndGet();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                WebSocketFirehoseProducerWorker.LOGGER.atInfo().setCause(e).log("Interrupted while putting messages");
                this.failureCauses.offer(e);
                session.close(1000, (String) null, Callback.NOOP);
            }
        }

        @OnWebSocketError
        public void onWebSocketError(Session session, Throwable th) {
            WebSocketFirehoseProducerWorker.LOGGER.atError().setCause(th).log("WebSocket error");
            this.failureCauses.offer(th);
            this.latch.countDown();
            session.disconnect();
        }

        @OnWebSocketClose
        public void onWebSocketClose(Session session, int i, String str) {
            WebSocketFirehoseProducerWorker.LOGGER.atInfo().addKeyValue("statusCode", Integer.valueOf(i)).addKeyValue("reason", str).log("WebSocket closed");
            this.failureCauses.offer(i == 1000 ? new NormalCloseException() : new IOException("WebSocket closed with status code " + i));
            this.latch.countDown();
        }
    }

    /* loaded from: input_file:io/aleph0/yap/messaging/jetty/WebSocketFirehoseProducerWorker$MessageFactory.class */
    public interface MessageFactory<T> {
        List<Message<T>> newTextMessages(String str);

        List<Message<T>> newBinaryMessages(ByteBuffer byteBuffer);
    }

    /* loaded from: input_file:io/aleph0/yap/messaging/jetty/WebSocketFirehoseProducerWorker$NormalCloseException.class */
    private static class NormalCloseException extends IOException {
        public NormalCloseException() {
            super("closed");
        }
    }

    public static Configurator defaultConfigurator() {
        return new Configurator() { // from class: io.aleph0.yap.messaging.jetty.WebSocketFirehoseProducerWorker.1
        };
    }

    public WebSocketFirehoseProducerWorker(URI uri, MessageFactory<T> messageFactory) {
        this(uri, defaultConfigurator(), messageFactory);
    }

    public WebSocketFirehoseProducerWorker(URI uri, Configurator configurator, MessageFactory<T> messageFactory) {
        this.receivedMetric = new AtomicLong(0L);
        this.uri = (URI) Objects.requireNonNull(uri, "uri");
        this.configurator = (Configurator) Objects.requireNonNull(configurator, "configurator");
        this.messageFactory = (MessageFactory) Objects.requireNonNull(messageFactory, "messageFactory");
    }

    public void produce(Sink<Message<T>> sink) throws IOException, InterruptedException {
        try {
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
            HttpClient httpClient = new HttpClient();
            this.configurator.configureHttpClient(httpClient);
            WebSocketClient webSocketClient = new WebSocketClient(httpClient);
            webSocketClient.setStopTimeout(5000L);
            webSocketClient.setIdleTimeout(Duration.ofSeconds(30L));
            this.configurator.configureWebSocketClient(webSocketClient);
            try {
                webSocketClient.start();
                try {
                    ClientUpgradeRequest clientUpgradeRequest = new ClientUpgradeRequest();
                    this.configurator.configureClientUpgradeRequest(clientUpgradeRequest);
                    CountDownLatch countDownLatch = new CountDownLatch(1);
                    webSocketClient.connect(new InternalSocketListener(sink, arrayBlockingQueue, countDownLatch), this.uri, clientUpgradeRequest);
                    countDownLatch.await();
                    Throwable th = (Throwable) arrayBlockingQueue.take();
                    try {
                        webSocketClient.stop();
                        if (th instanceof NormalCloseException) {
                            LOGGER.atInfo().log("Websocket session closed normally");
                        } else {
                            if (th instanceof Error) {
                                throw ((Error) th);
                            }
                            if (th instanceof InterruptedException) {
                                throw new InterruptedException();
                            }
                            if (!(th instanceof Exception)) {
                                throw new AssertionError("Unexpected error", th);
                            }
                            throw new ExecutionException("Websocket failed", (Exception) th);
                        }
                    } catch (Exception e) {
                        LOGGER.atError().setCause(e).log("Failed to stop WebSocket client");
                        throw new ExecutionException("Failed to stop WebSocket client", e);
                    }
                } catch (Throwable th2) {
                    try {
                        webSocketClient.stop();
                        throw th2;
                    } catch (Exception e2) {
                        LOGGER.atError().setCause(e2).log("Failed to stop WebSocket client");
                        throw new ExecutionException("Failed to stop WebSocket client", e2);
                    }
                }
            } catch (Exception e3) {
                LOGGER.atError().setCause(e3).log("Failed to start WebSocket client");
                throw new ExecutionException("Failed to start WebSocket client", e3);
            }
        } catch (InterruptedException e4) {
            LOGGER.atError().setCause(e4).log("Websocket session interrupted. Propagating...");
            Thread.currentThread().interrupt();
            throw new InterruptedException();
        } catch (ExecutionException e5) {
            Throwable cause = e5.getCause();
            LOGGER.atError().setCause(cause).log("Websocket session failed. Failing task...");
            if (cause instanceof IOException) {
                throw ((IOException) cause);
            }
            if (cause instanceof RuntimeException) {
                throw ((RuntimeException) cause);
            }
            if (!(cause instanceof Error)) {
                throw new IOException("Jetstream session failed", cause);
            }
            throw ((Error) cause);
        }
    }

    /* renamed from: checkMetrics, reason: merged with bridge method [inline-methods] */
    public FirehoseMetrics m2checkMetrics() {
        return new FirehoseMetrics(this.receivedMetric.get());
    }

    /* renamed from: flushMetrics, reason: merged with bridge method [inline-methods] */
    public FirehoseMetrics m1flushMetrics() {
        FirehoseMetrics m2checkMetrics = m2checkMetrics();
        this.receivedMetric.set(0L);
        return m2checkMetrics;
    }
}
