package io.cdsoft.sf.messaging.internal.client.cometd;

import io.cdsoft.sf.messaging.MessagingException;
import io.cdsoft.sf.messaging.api.config.ConnectionConfig;
import io.cdsoft.sf.messaging.api.consumer.EventConsumer;
import io.cdsoft.sf.messaging.api.consumer.JacksonPlatformEventConsumer;
import io.cdsoft.sf.messaging.api.consumer.JacksonPushTopicEventConsumer;
import io.cdsoft.sf.messaging.api.consumer.JsonEventConsumer;
import io.cdsoft.sf.messaging.api.consumer.MapEventConsumer;
import io.cdsoft.sf.messaging.api.subscription.Subscription;
import io.cdsoft.sf.messaging.internal.client.ManagedClient;
import io.cdsoft.sf.messaging.internal.client.auth.ManagedAuthClient;
import io.cdsoft.sf.messaging.internal.client.http.ManagedHttpClient;
import io.cdsoft.sf.messaging.internal.client.retry.RetryStrategy;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.ClientTransport;
import org.cometd.client.transport.LongPollingTransport;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/cdsoft/sf/messaging/internal/client/cometd/ManagedCometdClient.class */
public class ManagedCometdClient implements ManagedClient {
    private static final Logger LOG = LoggerFactory.getLogger(ManagedCometdClient.class);
    private static final Long CONNECT_TIMEOUT = 30L;
    private static final TimeUnit TIMEOUT_TIME_UNIT = TimeUnit.SECONDS;
    private static final Integer MAX_NETWORK_DELAY = 15000;
    private static final Integer MAX_BUFFER_SIZE = 1048576;
    private static final ReplayExtension REPLAY_EXTENSION = new ReplayExtension();
    private final ConnectionConfig config;
    private final ManagedAuthClient authClient;
    private final ManagedHttpClient httpClient;
    private BayeuxClient bayeuxClient;
    private final ConcurrentHashMap<String, Subscription> subscriptions = new ConcurrentHashMap<>();
    private ClientSessionChannel.MessageListener handshakeListener = (clientSessionChannel, message) -> {
        LOG.debug("Meta-message [{}]: {}", clientSessionChannel.getChannelId(), message);
        if (message.isSuccessful()) {
            return;
        }
        restartClient();
    };
    private ClientSessionChannel.MessageListener connectListener = (clientSessionChannel, message) -> {
        LOG.debug("Meta-message [{}]: {}", clientSessionChannel.getChannelId(), message);
        if (message.isSuccessful()) {
            resubscribe();
        }
    };
    private ClientSessionChannel.MessageListener disconnectListener = (clientSessionChannel, message) -> {
        LOG.debug("Meta-message [{}]: {}", clientSessionChannel.getChannelId(), message);
        restartClient();
    };

    public ManagedCometdClient(ConnectionConfig connectionConfig, ManagedAuthClient managedAuthClient, ManagedHttpClient managedHttpClient) {
        this.config = connectionConfig;
        this.authClient = managedAuthClient;
        this.httpClient = managedHttpClient;
    }

    @Override // io.cdsoft.sf.messaging.internal.client.ManagedClient
    public void doStart() throws MessagingException {
        new RetryStrategy(this.config).exectue(this::connect);
    }

    @Override // io.cdsoft.sf.messaging.internal.client.ManagedClient
    public void doStop() throws MessagingException {
        disconnect();
    }

    public void addSubscription(Subscription subscription) {
        if (this.bayeuxClient.isDisconnected()) {
            throw new IllegalStateException(String.format("Connector[%s] has not been started", this.authClient.getCometdEndpoint()));
        }
        LOG.info("Subscribing to channel: {}", subscription.getChannelName());
        REPLAY_EXTENSION.addOrUpdateChannelReplayId(subscription.getChannelName(), subscription.getReplayFrom().longValue());
        ClientSessionChannel channel = this.bayeuxClient.getChannel(subscription.getChannelName());
        EventConsumer consumer = subscription.getConsumer();
        channel.subscribe((clientSessionChannel, message) -> {
            LOG.trace("Subscription-message [{}]: {}", clientSessionChannel.getChannelId(), message);
            if (consumer instanceof JsonEventConsumer) {
                ((JsonEventConsumer) consumer).accept(message.getJSON());
                return;
            }
            if (consumer instanceof MapEventConsumer) {
                ((MapEventConsumer) consumer).accept(message.getDataAsMap());
            } else if (consumer instanceof JacksonPlatformEventConsumer) {
                ((JacksonPlatformEventConsumer) consumer).accept(message.getJSON());
            } else if (consumer instanceof JacksonPushTopicEventConsumer) {
                ((JacksonPushTopicEventConsumer) consumer).accept(message.getJSON());
            }
        }, (clientSessionChannel2, message2) -> {
            if (!message2.isSuccessful()) {
                LOG.error("Unable to subscribe to subscription {} : {}", subscription, message2);
            } else {
                LOG.info("Successfully Subscribed to channel: {} {}", subscription.getChannelName(), clientSessionChannel2.getChannelId());
                this.subscriptions.put(subscription.getChannelName(), subscription);
            }
        });
    }

    public void removeSubscription(String str) {
        if (this.bayeuxClient.isDisconnected()) {
            throw new IllegalStateException(String.format("Connector[%s] has not been started", this.authClient.getCometdEndpoint()));
        }
        Subscription subscription = this.subscriptions.get(str);
        if (subscription == null) {
            LOG.warn("Subscription with name {} does not exist", str);
        } else {
            this.bayeuxClient.getChannel(subscription.getChannelName()).unsubscribe();
        }
    }

    private BayeuxClient createClient() {
        BayeuxClient bayeuxClient = new BayeuxClient(this.authClient.getCometdEndpoint().toExternalForm(), new LongPollingTransport(new HashMap<String, Object>() { // from class: io.cdsoft.sf.messaging.internal.client.cometd.ManagedCometdClient.1
            {
                put("maxNetworkDelay", ManagedCometdClient.MAX_NETWORK_DELAY);
                put("maxMessageSize", ManagedCometdClient.MAX_BUFFER_SIZE);
            }
        }, this.httpClient.getHttpClient()) { // from class: io.cdsoft.sf.messaging.internal.client.cometd.ManagedCometdClient.2
            protected void customize(Request request) {
                super.customize(request);
                request.getHeaders().put(HttpHeader.AUTHORIZATION, "OAuth " + ManagedCometdClient.this.authClient.getBearerToken());
            }
        }, new ClientTransport[0]);
        bayeuxClient.addExtension(REPLAY_EXTENSION);
        return bayeuxClient;
    }

    private Boolean connect() throws MessagingException {
        this.subscriptions.clear();
        this.bayeuxClient = createClient();
        this.bayeuxClient.getChannel("/meta/handshake").addListener(this.handshakeListener);
        this.bayeuxClient.getChannel("/meta/connect").addListener(this.connectListener);
        this.bayeuxClient.getChannel("/meta/disconnect").addListener(this.disconnectListener);
        this.bayeuxClient.handshake();
        if (this.bayeuxClient.waitFor(TimeUnit.MILLISECONDS.convert(CONNECT_TIMEOUT.longValue(), TIMEOUT_TIME_UNIT), BayeuxClient.State.CONNECTED, new BayeuxClient.State[0])) {
            return true;
        }
        throw new MessagingException("Timeout connecting to " + this.authClient.getCometdEndpoint());
    }

    private void disconnect() {
        this.bayeuxClient.getChannel("/meta/handshake").removeListener(this.handshakeListener);
        this.bayeuxClient.getChannel("/meta/connect").removeListener(this.connectListener);
        this.bayeuxClient.getChannel("/meta/disconnect").removeListener(this.disconnectListener);
        this.bayeuxClient.disconnect();
        this.bayeuxClient = null;
    }

    private synchronized void restartClient() {
        disconnect();
        try {
            new RetryStrategy(this.config).exectue(this::connect);
        } catch (Exception e) {
            LOG.error("Unable to restart client: {}", e);
            throw new RuntimeException("Unable to restart client.", e);
        }
    }

    private synchronized void resubscribe() {
        LOG.trace("Refreshing subscriptions to channels on reconnect");
        Iterator<Map.Entry<String, Subscription>> it = this.subscriptions.entrySet().iterator();
        while (it.hasNext()) {
            Subscription value = it.next().getValue();
            if (this.bayeuxClient.getChannel(value.getChannelName()).getSubscribers().size() == 0) {
                LOG.debug("Re-subscribing to channel: [{}] because no subscribers exist.", value.getChannelName());
                addSubscription(value);
            }
        }
    }
}
