package ai.rev.streaming;

import ai.rev.streaming.WebsocketManager;
import ai.rev.streaming.models.ClientConfig;
import ai.rev.streaming.models.StreamingResponse;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
import javax.websocket.ContainerProvider;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.RemoteEndpoint;
import javax.websocket.Session;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.text.StringsKt;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.TextMessage;

/* compiled from: WebsocketClientEndpoint.kt */
@ClientEndpoint
@Metadata(mv = {1, 1, 16}, bv = {1, 0, 3}, k = 1, d1 = {"��j\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u0012\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000b\n\u0002\b\u0007\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u000e\n\u0002\b\u0006\b\u0007\u0018�� -2\u00020\u0001:\u0001-B\r\u0012\u0006\u0010\u0002\u001a\u00020\u0003¢\u0006\u0002\u0010\u0004J\u001c\u0010\u001c\u001a\u00020\u001d2\u0012\u0010\u001e\u001a\u000e\u0012\u0004\u0012\u00020\u000b\u0012\u0004\u0012\u00020\f0\nH\u0016J\b\u0010\u001f\u001a\u00020\fH\u0016J\b\u0010 \u001a\u00020\fH\u0016J\b\u0010!\u001a\u00020\fH\u0002J\u001c\u0010\"\u001a\u00020\f2\b\u0010#\u001a\u0004\u0018\u00010\u00122\b\u0010$\u001a\u0004\u0018\u00010%H\u0007J\u001c\u0010&\u001a\u00020\f2\b\u0010'\u001a\u0004\u0018\u00010(2\b\u0010#\u001a\u0004\u0018\u00010\u0012H\u0007J\u0012\u0010)\u001a\u00020\f2\b\u0010#\u001a\u0004\u0018\u00010\u0012H\u0007J\u0010\u0010*\u001a\u00020\f2\u0006\u0010+\u001a\u00020\u0007H\u0016J\b\u0010,\u001a\u00020\fH\u0002R\u0014\u0010\u0005\u001a\b\u0012\u0004\u0012\u00020\u00070\u0006X\u0082\u0004¢\u0006\u0002\n��R6\u0010\b\u001a*\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u000b\u0012\u0004\u0012\u00020\f0\n0\tj\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u000b\u0012\u0004\u0012\u00020\f0\n`\rX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u0016\u0010\u000e\u001a\n \u0010*\u0004\u0018\u00010\u000f0\u000fX\u0082\u0004¢\u0006\u0002\n��R\u001c\u0010\u0011\u001a\u0004\u0018\u00010\u0012X\u0086\u000e¢\u0006\u000e\n��\u001a\u0004\b\u0013\u0010\u0014\"\u0004\b\u0015\u0010\u0016R\u0014\u0010\u0017\u001a\b\u0012\u0004\u0012\u00020\u00190\u0018X\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u001a\u001a\u00020\u001bX\u0082\u0004¢\u0006\u0002\n��¨\u0006."}, d2 = {"Lai/rev/streaming/WebsocketClientEndpoint;", "Lai/rev/streaming/WebsocketManager;", "config", "Lai/rev/streaming/models/ClientConfig;", "(Lai/rev/streaming/models/ClientConfig;)V", "audioQueue", "Ljava/util/Queue;", "", "callbacks", "Ljava/util/ArrayList;", "Lkotlin/Function1;", "Lai/rev/streaming/models/StreamingResponse;", "", "Lkotlin/collections/ArrayList;", "executor", "Ljava/util/concurrent/ExecutorService;", "kotlin.jvm.PlatformType", "session", "Ljavax/websocket/Session;", "getSession", "()Ljavax/websocket/Session;", "setSession", "(Ljavax/websocket/Session;)V", "state", "Ljava/util/concurrent/atomic/AtomicReference;", "Lai/rev/streaming/WebsocketManager$State;", "task", "Ljava/lang/Runnable;", "addCallback", "", "callback", "clearCallbacks", "close", "connect", "onClose", "userSession", "reason", "Ljavax/websocket/CloseReason;", "onMessage", "message", "", "onOpen", "sendAudio", "audio", "startExecutor", "Companion", "rev-ai-api"})
/* loaded from: input_file:ai/rev/streaming/WebsocketClientEndpoint.class */
public final class WebsocketClientEndpoint implements WebsocketManager {
    private AtomicReference<WebsocketManager.State> state;
    private final Queue<byte[]> audioQueue;
    private final ArrayList<Function1<StreamingResponse, Unit>> callbacks;
    private final ExecutorService executor;

    @Nullable
    private Session session;
    private final Runnable task;
    private final ClientConfig config;
    private static final Logger logger;
    private static final long TIMEOUT = 2;
    public static final Companion Companion = new Companion(null);

    /* compiled from: WebsocketClientEndpoint.kt */
    @Metadata(mv = {1, 1, 16}, bv = {1, 0, 3}, k = 1, d1 = {"��\u0018\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0010\t\n��\n\u0002\u0018\u0002\n��\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082T¢\u0006\u0002\n��R\u000e\u0010\u0005\u001a\u00020\u0006X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0007"}, d2 = {"Lai/rev/streaming/WebsocketClientEndpoint$Companion;", "", "()V", "TIMEOUT", "", "logger", "Lorg/slf4j/Logger;", "rev-ai-api"})
    /* loaded from: input_file:ai/rev/streaming/WebsocketClientEndpoint$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    @Nullable
    public final Session getSession() {
        return this.session;
    }

    public final void setSession(@Nullable Session session) {
        this.session = session;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void connect() {
        if (this.state.get() != WebsocketManager.State.CLOSING && this.state.get() != WebsocketManager.State.CLOSED) {
            this.state.set(WebsocketManager.State.CONNECTING);
        }
        try {
            this.session = ContainerProvider.getWebSocketContainer().connectToServer(this, NetworkUtils.INSTANCE.createURI(this.config));
            startExecutor();
        } catch (Exception e) {
            logger.error("Could not connect to Rev.ai websocket.", e);
        }
    }

    @OnOpen
    public final void onOpen(@Nullable Session session) {
        System.out.println((Object) "opening websocket");
        this.session = session;
        if (this.state.get() == WebsocketManager.State.CLOSING || this.state.get() == WebsocketManager.State.CLOSED) {
            return;
        }
        this.state.set(WebsocketManager.State.CONNECTED);
    }

    @OnClose
    public final void onClose(@Nullable Session session, @Nullable CloseReason closeReason) {
        System.out.println((Object) ("closing websocket: " + (closeReason != null ? closeReason.getReasonPhrase() : null)));
        this.session = (Session) null;
    }

    @OnMessage
    public final void onMessage(@Nullable String str, @Nullable Session session) {
        logger.info("Text message received\n" + str);
        this.session = session;
        String str2 = str;
        if (str2 == null || StringsKt.isBlank(str2)) {
            return;
        }
        StreamingResponse convertToStreamingResponse = AppUtils.INSTANCE.convertToStreamingResponse(new TextMessage(str));
        if (this.state.get() == WebsocketManager.State.READY) {
            Iterator<T> it = this.callbacks.iterator();
            while (it.hasNext()) {
                ((Function1) it.next()).invoke(convertToStreamingResponse);
            }
        } else {
            if (!Intrinsics.areEqual(convertToStreamingResponse.getType(), "connected") || this.state.get() == WebsocketManager.State.CLOSING || this.state.get() == WebsocketManager.State.CLOSED) {
                return;
            }
            this.state.set(WebsocketManager.State.READY);
        }
    }

    @Override // ai.rev.streaming.WebsocketManager
    public boolean addCallback(@NotNull Function1<? super StreamingResponse, Unit> function1) {
        Intrinsics.checkParameterIsNotNull(function1, "callback");
        return this.callbacks.add(function1);
    }

    @Override // ai.rev.streaming.WebsocketManager
    public void clearCallbacks() {
        this.callbacks.clear();
    }

    @Override // ai.rev.streaming.WebsocketManager
    public void sendAudio(@NotNull byte[] bArr) {
        Intrinsics.checkParameterIsNotNull(bArr, "audio");
        if (this.state.get() == WebsocketManager.State.CLOSING || this.state.get() == WebsocketManager.State.CLOSED) {
            logger.warn("RevAi client is closing down, cannot stream any more audio-data.");
            return;
        }
        logger.debug("Adding given audio bytes to the queue...");
        this.audioQueue.offer(bArr);
        startExecutor();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.state.set(WebsocketManager.State.CLOSING);
        new Thread(new Runnable() { // from class: ai.rev.streaming.WebsocketClientEndpoint$close$1
            /* JADX WARN: Incorrect condition in loop: B:2:0x0010 */
            @Override // java.lang.Runnable
            /*
                Code decompiled incorrectly, please refer to instructions dump.
                To view partially-correct add '--show-bad-code' argument
            */
            public final void run() {
                /*
                    r5 = this;
                L0:
                    r0 = r5
                    ai.rev.streaming.WebsocketClientEndpoint r0 = ai.rev.streaming.WebsocketClientEndpoint.this
                    java.util.concurrent.atomic.AtomicReference r0 = ai.rev.streaming.WebsocketClientEndpoint.access$getState$p(r0)
                    java.lang.Object r0 = r0.get()
                    ai.rev.streaming.WebsocketManager$State r0 = (ai.rev.streaming.WebsocketManager.State) r0
                    ai.rev.streaming.WebsocketManager$State r1 = ai.rev.streaming.WebsocketManager.State.CLOSED
                    if (r0 == r1) goto L1c
                    r0 = 2000(0x7d0, double:9.88E-321)
                    java.lang.Thread.sleep(r0)
                    goto L0
                L1c:
                    r0 = r5
                    ai.rev.streaming.WebsocketClientEndpoint r0 = ai.rev.streaming.WebsocketClientEndpoint.this
                    java.util.concurrent.ExecutorService r0 = ai.rev.streaming.WebsocketClientEndpoint.access$getExecutor$p(r0)
                    r0.shutdown()
                    r0 = r5
                    ai.rev.streaming.WebsocketClientEndpoint r0 = ai.rev.streaming.WebsocketClientEndpoint.this     // Catch: java.lang.InterruptedException -> L4e
                    java.util.concurrent.ExecutorService r0 = ai.rev.streaming.WebsocketClientEndpoint.access$getExecutor$p(r0)     // Catch: java.lang.InterruptedException -> L4e
                    r1 = 2
                    java.util.concurrent.TimeUnit r2 = java.util.concurrent.TimeUnit.MINUTES     // Catch: java.lang.InterruptedException -> L4e
                    boolean r0 = r0.awaitTermination(r1, r2)     // Catch: java.lang.InterruptedException -> L4e
                    if (r0 != 0) goto L5c
                    r0 = r5
                    ai.rev.streaming.WebsocketClientEndpoint r0 = ai.rev.streaming.WebsocketClientEndpoint.this     // Catch: java.lang.InterruptedException -> L4e
                    java.util.concurrent.ExecutorService r0 = ai.rev.streaming.WebsocketClientEndpoint.access$getExecutor$p(r0)     // Catch: java.lang.InterruptedException -> L4e
                    java.util.List r0 = r0.shutdownNow()     // Catch: java.lang.InterruptedException -> L4e
                    goto L5c
                L4e:
                    r6 = move-exception
                    r0 = r5
                    ai.rev.streaming.WebsocketClientEndpoint r0 = ai.rev.streaming.WebsocketClientEndpoint.this
                    java.util.concurrent.ExecutorService r0 = ai.rev.streaming.WebsocketClientEndpoint.access$getExecutor$p(r0)
                    java.util.List r0 = r0.shutdownNow()
                L5c:
                    return
                */
                throw new UnsupportedOperationException("Method not decompiled: ai.rev.streaming.WebsocketClientEndpoint$close$1.run():void");
            }
        }).start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void startExecutor() {
        this.executor.execute(this.task);
    }

    public WebsocketClientEndpoint(@NotNull ClientConfig clientConfig) {
        Intrinsics.checkParameterIsNotNull(clientConfig, "config");
        this.config = clientConfig;
        this.state = new AtomicReference<>(WebsocketManager.State.IDLE);
        this.audioQueue = new ConcurrentLinkedQueue();
        this.callbacks = CollectionsKt.arrayListOf(new Function1[]{this.config.getCallback()});
        this.executor = Executors.newSingleThreadExecutor();
        this.task = new Runnable() { // from class: ai.rev.streaming.WebsocketClientEndpoint$task$1
            @Override // java.lang.Runnable
            public final void run() {
                Logger logger2;
                AtomicReference atomicReference;
                AtomicReference atomicReference2;
                Queue queue;
                Logger logger3;
                AtomicReference atomicReference3;
                Queue queue2;
                Queue queue3;
                Logger logger4;
                AtomicReference atomicReference4;
                AtomicReference atomicReference5;
                AtomicReference atomicReference6;
                Logger logger5;
                Queue queue4;
                Thread currentThread;
                Logger logger6;
                Logger logger7;
                Logger logger8;
                logger2 = WebsocketClientEndpoint.logger;
                StringBuilder append = new StringBuilder().append("Current state: ");
                atomicReference = WebsocketClientEndpoint.this.state;
                logger2.debug(append.append(atomicReference).toString());
                atomicReference2 = WebsocketClientEndpoint.this.state;
                if (atomicReference2.get() == null) {
                    Intrinsics.throwNpe();
                }
                switch ((WebsocketManager.State) r0) {
                    case IDLE:
                    case DISCONNECTED:
                        logger8 = WebsocketClientEndpoint.logger;
                        logger8.debug("Trying to connect/reconnect with rev.ai...");
                        WebsocketClientEndpoint.this.connect();
                        WebsocketClientEndpoint.this.startExecutor();
                        return;
                    case CONNECTING:
                    case CONNECTED:
                        logger7 = WebsocketClientEndpoint.logger;
                        logger7.debug("Will start streaming soon...");
                        Thread.sleep(500L);
                        WebsocketClientEndpoint.this.startExecutor();
                        return;
                    case READY:
                        break;
                    case CLOSING:
                        queue = WebsocketClientEndpoint.this.audioQueue;
                        if (queue.isEmpty()) {
                            logger3 = WebsocketClientEndpoint.logger;
                            logger3.debug("Audio queue is empty. All the data has been streamed to rev.ai.");
                            Session session = WebsocketClientEndpoint.this.getSession();
                            if (session != null && session.isOpen()) {
                                Session session2 = WebsocketClientEndpoint.this.getSession();
                                if (session2 != null) {
                                    RemoteEndpoint.Async asyncRemote = session2.getAsyncRemote();
                                    if (asyncRemote != null) {
                                        asyncRemote.sendText("EOS");
                                    }
                                }
                            }
                            atomicReference3 = WebsocketClientEndpoint.this.state;
                            atomicReference3.set(WebsocketManager.State.CLOSED);
                            WebsocketClientEndpoint.this.startExecutor();
                            return;
                        }
                        return;
                    case CLOSED:
                        return;
                    default:
                        return;
                }
                do {
                    queue2 = WebsocketClientEndpoint.this.audioQueue;
                    if (!(!queue2.isEmpty())) {
                        return;
                    }
                    queue3 = WebsocketClientEndpoint.this.audioQueue;
                    byte[] bArr = (byte[]) queue3.peek();
                    Session session3 = WebsocketClientEndpoint.this.getSession();
                    if (session3 == null || !session3.isOpen()) {
                        logger4 = WebsocketClientEndpoint.logger;
                        logger4.error("Session closed. Retrying...");
                        atomicReference4 = WebsocketClientEndpoint.this.state;
                        if (((WebsocketManager.State) atomicReference4.get()) != WebsocketManager.State.CLOSING) {
                            atomicReference5 = WebsocketClientEndpoint.this.state;
                            if (((WebsocketManager.State) atomicReference5.get()) != WebsocketManager.State.CLOSED) {
                                atomicReference6 = WebsocketClientEndpoint.this.state;
                                atomicReference6.set(WebsocketManager.State.DISCONNECTED);
                            }
                        }
                        WebsocketClientEndpoint.this.startExecutor();
                        return;
                    }
                    logger5 = WebsocketClientEndpoint.logger;
                    logger5.debug("Session is open, streaming audio...");
                    Session session4 = WebsocketClientEndpoint.this.getSession();
                    if (session4 != null) {
                        RemoteEndpoint.Async asyncRemote2 = session4.getAsyncRemote();
                        if (asyncRemote2 != null) {
                            asyncRemote2.sendBinary(ByteBuffer.wrap(bArr));
                        }
                    }
                    queue4 = WebsocketClientEndpoint.this.audioQueue;
                    queue4.poll();
                    currentThread = Thread.currentThread();
                    Intrinsics.checkExpressionValueIsNotNull(currentThread, "Thread.currentThread()");
                } while (!currentThread.isInterrupted());
                logger6 = WebsocketClientEndpoint.logger;
                logger6.debug("Thread streaming data to rev.ai is interrupted.");
            }
        };
    }

    static {
        AppUtils appUtils = AppUtils.INSTANCE;
        Logger logger2 = LoggerFactory.getLogger(WebsocketClientEndpoint.class);
        Intrinsics.checkExpressionValueIsNotNull(logger2, "LoggerFactory.getLogger(T::class.java)");
        logger = logger2;
    }
}
