package io.featureflow.client.core;

import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import okhttp3.Call;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okio.BufferedSource;
import okio.Okio;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/featureflow/client/core/EventSource.class */
public class EventSource implements ConnectionHandler, Closeable {
    private final URI uri;
    private volatile long reconnectTimeMillis;
    private final Headers headers;
    private final EventSourceHandler eventSourceHandler;
    private volatile Call call;
    public static final Logger log = LoggerFactory.getLogger(EventSource.class);
    private String lastEventId;
    private final AtomicReference<State> state = new AtomicReference<>(State.UNINITIALISED);
    private final ExecutorService executor = Executors.newCachedThreadPool();
    private final OkHttpClient client = new OkHttpClient().newBuilder().readTimeout(0, TimeUnit.SECONDS).writeTimeout(0, TimeUnit.SECONDS).connectTimeout(0, TimeUnit.SECONDS).retryOnConnectionFailure(true).build();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/featureflow/client/core/EventSource$State.class */
    public enum State {
        UNINITIALISED,
        CONNECTING,
        OPEN,
        CLOSED,
        SHUTDOWN
    }

    public EventSource(URI uri, long j, Headers headers, EventSourceHandler eventSourceHandler) {
        this.reconnectTimeMillis = 0L;
        this.uri = uri;
        this.reconnectTimeMillis = j;
        this.headers = headers;
        this.eventSourceHandler = eventSourceHandler;
    }

    public void init() {
        if (!this.state.compareAndSet(State.UNINITIALISED, State.CONNECTING)) {
            log.info("Already starting.");
            return;
        }
        log.debug("state change: " + State.UNINITIALISED + " to " + State.CONNECTING);
        log.info("Starting EventSource client using URI: " + this.uri);
        this.executor.execute(new Runnable() { // from class: io.featureflow.client.core.EventSource.1
            @Override // java.lang.Runnable
            public void run() {
                EventSource.this.doConnect();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doConnect() {
        String readUtf8LineStrict;
        Response response = null;
        while (!Thread.currentThread().isInterrupted() && this.state.get() != State.SHUTDOWN) {
            try {
                log.debug("state change: " + this.state.getAndSet(State.CONNECTING) + " to " + State.CONNECTING);
                try {
                    try {
                        Request.Builder builder = new Request.Builder().headers(this.headers).url(this.uri.toASCIIString()).get();
                        if (this.lastEventId != null && !this.lastEventId.isEmpty()) {
                            builder.addHeader("Last-Event-ID", this.lastEventId);
                        }
                        this.call = this.client.newCall(builder.build());
                        response = this.call.execute();
                        if (response.isSuccessful()) {
                            State andSet = this.state.getAndSet(State.OPEN);
                            if (andSet != State.CONNECTING) {
                                log.warn("Unexpected state change: " + andSet + " to " + State.OPEN);
                            } else {
                                log.debug("state change: " + andSet + " to " + State.OPEN);
                            }
                            log.info("Connected to Feature Control SSE Stream");
                            BufferedSource buffer = Okio.buffer(response.body().source());
                            EventSourceParser eventSourceParser = new EventSourceParser(this.uri, this.eventSourceHandler, this);
                            while (!Thread.currentThread().isInterrupted() && (readUtf8LineStrict = buffer.readUtf8LineStrict()) != null) {
                                eventSourceParser.line(readUtf8LineStrict);
                            }
                        } else {
                            log.debug("Failed Response: " + response);
                            this.eventSourceHandler.onError(new FailedResponseException(response.message(), response.code(), response.request().url().toString()));
                        }
                        log.debug("state change: " + this.state.getAndSet(State.CLOSED) + " to " + State.CLOSED);
                        if (response != null && response.body() != null) {
                            response.body().close();
                        }
                        if (this.call != null) {
                            this.call.cancel();
                        }
                    } catch (Throwable th) {
                        log.debug("state change: " + this.state.getAndSet(State.CLOSED) + " to " + State.CLOSED);
                        if (response != null && response.body() != null) {
                            response.body().close();
                        }
                        if (this.call != null) {
                            this.call.cancel();
                        }
                        throw th;
                    }
                } catch (EOFException e) {
                    log.warn("Connection unexpectedly closed due to {}.", e.getMessage());
                    log.debug("state change: " + this.state.getAndSet(State.CLOSED) + " to " + State.CLOSED);
                    if (response != null && response.body() != null) {
                        response.body().close();
                    }
                    if (this.call != null) {
                        this.call.cancel();
                    }
                } catch (IOException e2) {
                    log.debug("Connection problem.", e2);
                    this.eventSourceHandler.onError(e2);
                    log.debug("state change: " + this.state.getAndSet(State.CLOSED) + " to " + State.CLOSED);
                    if (response != null && response.body() != null) {
                        response.body().close();
                    }
                    if (this.call != null) {
                        this.call.cancel();
                    }
                }
                if (this.reconnectTimeMillis > 0) {
                    log.info("Waiting to reconnect.." + this.reconnectTimeMillis);
                    try {
                        Thread.sleep(this.reconnectTimeMillis);
                    } catch (InterruptedException e3) {
                    }
                }
            } catch (RejectedExecutionException e4) {
                return;
            }
        }
    }

    @Override // io.featureflow.client.core.ConnectionHandler
    public void setReconnectionTimeMillis(long j) {
        this.reconnectTimeMillis = j;
    }

    @Override // io.featureflow.client.core.ConnectionHandler
    public void setLastEventId(String str) {
        this.lastEventId = str;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }
}
