package net.jacobpeterson.alpaca.websocket.marketdata.client;

import com.google.common.base.Preconditions;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import com.google.gson.JsonPrimitive;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.websocket.DeploymentException;
import net.jacobpeterson.abstracts.websocket.client.WebsocketClient;
import net.jacobpeterson.abstracts.websocket.exception.WebsocketException;
import net.jacobpeterson.abstracts.websocket.listener.StreamListener;
import net.jacobpeterson.abstracts.websocket.message.StreamMessage;
import net.jacobpeterson.abstracts.websocket.message.StreamMessageType;
import net.jacobpeterson.alpaca.websocket.marketdata.listener.MarketDataStreamListener;
import net.jacobpeterson.alpaca.websocket.marketdata.message.MarketDataStreamMessageType;
import net.jacobpeterson.domain.alpaca.marketdata.streaming.MarketDataStreamMessage;
import net.jacobpeterson.domain.alpaca.marketdata.streaming.abstracts.MarketDataStreamDataMessage;
import net.jacobpeterson.domain.alpaca.marketdata.streaming.abstracts.MarketDataStreamStatusMessage;
import net.jacobpeterson.domain.alpaca.marketdata.streaming.aggregate.AggregateMinuteMessage;
import net.jacobpeterson.domain.alpaca.marketdata.streaming.authorization.AuthorizationMessage;
import net.jacobpeterson.domain.alpaca.marketdata.streaming.listening.ListeningMessage;
import net.jacobpeterson.domain.alpaca.marketdata.streaming.quote.QuoteMessage;
import net.jacobpeterson.domain.alpaca.marketdata.streaming.trade.TradeMessage;
import net.jacobpeterson.util.gson.GsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/jacobpeterson/alpaca/websocket/marketdata/client/MarketDataWebsocketClient.class */
public class MarketDataWebsocketClient implements WebsocketClient {
    private static final String STREAM_KEY = "stream";
    private static final String EVENT_TYPE_KEY = "ev";
    private static final String DATA_KEY = "data";
    private static final String ALL_TICKERS = "*";
    private final String keyId;
    private final String secret;
    private final String oAuthToken;
    private final String streamAPIURL;
    private final List<MarketDataStreamListener> listeners;
    private MarketDataWebsocketClientEndpoint marketDataWebsocketClientEndpoint;
    private boolean authenticated;
    private static final Logger LOGGER = LoggerFactory.getLogger(MarketDataWebsocketClient.class);
    private static final String TRADES_STREAM_PREFIX = GsonUtil.GSON.toJson(MarketDataStreamMessageType.TRADES).replace("\"", "") + ".";
    private static final String QUOTES_STREAM_PREFIX = GsonUtil.GSON.toJson(MarketDataStreamMessageType.QUOTES).replace("\"", "") + ".";
    private static final String AGGREGATE_MINUTE_STREAM_PREFIX = GsonUtil.GSON.toJson(MarketDataStreamMessageType.AGGREGATE_MINUTE).replace("\"", "") + ".";

    public MarketDataWebsocketClient(String str, String str2, String str3) {
        this.keyId = str;
        this.secret = str2;
        this.oAuthToken = null;
        this.streamAPIURL = str3.replace("https", "wss") + "/stream";
        this.listeners = new ArrayList();
    }

    public MarketDataWebsocketClient(String str, String str2) {
        this.keyId = null;
        this.secret = null;
        this.oAuthToken = str;
        this.streamAPIURL = str2.replace("https", "wss") + "/stream";
        this.listeners = new ArrayList();
    }

    @Override // net.jacobpeterson.abstracts.websocket.client.WebsocketClient
    public void addListener(StreamListener<?, ?> streamListener) throws WebsocketException {
        Preconditions.checkState(streamListener instanceof MarketDataStreamListener);
        if (this.listeners.isEmpty()) {
            try {
                connect();
            } catch (IOException | URISyntaxException | DeploymentException e) {
                throw new WebsocketException(e);
            }
        }
        this.listeners.add((MarketDataStreamListener) streamListener);
        submitStreamRequest();
    }

    @Override // net.jacobpeterson.abstracts.websocket.client.WebsocketClient
    public void removeListener(StreamListener<?, ?> streamListener) throws WebsocketException {
        Preconditions.checkState(streamListener instanceof MarketDataStreamListener);
        this.listeners.remove(streamListener);
        if (!this.listeners.isEmpty()) {
            submitStreamRequest();
            return;
        }
        try {
            disconnect();
        } catch (Exception e) {
            throw new WebsocketException(e);
        }
    }

    @Override // net.jacobpeterson.abstracts.websocket.client.WebsocketClient
    public void connect() throws URISyntaxException, IOException, DeploymentException {
        LOGGER.info("Connecting...");
        this.marketDataWebsocketClientEndpoint = new MarketDataWebsocketClientEndpoint(this, new URI(this.streamAPIURL));
        this.marketDataWebsocketClientEndpoint.setAutomaticallyReconnect(true);
        this.marketDataWebsocketClientEndpoint.connect();
        LOGGER.info("Connected.");
    }

    @Override // net.jacobpeterson.abstracts.websocket.client.WebsocketClient
    public void disconnect() throws Exception {
        LOGGER.info("Disconnecting...");
        this.marketDataWebsocketClientEndpoint.disconnect();
        LOGGER.info("Disconnected.");
    }

    @Override // net.jacobpeterson.abstracts.websocket.client.WebsocketClient
    public void sendAuthenticationMessage() {
        JsonObject jsonObject = new JsonObject();
        jsonObject.addProperty("action", "authenticate");
        JsonObject jsonObject2 = new JsonObject();
        if (this.oAuthToken != null) {
            jsonObject2.addProperty("oauth_token", this.oAuthToken);
        } else {
            jsonObject2.addProperty("key_id", this.keyId);
            jsonObject2.addProperty("secret_key", this.secret);
        }
        jsonObject.add(DATA_KEY, jsonObject2);
        this.marketDataWebsocketClientEndpoint.sendMessage(jsonObject.toString());
    }

    @Override // net.jacobpeterson.abstracts.websocket.client.WebsocketClient
    public void handleResubscribing() {
        submitStreamRequest();
    }

    @Override // net.jacobpeterson.abstracts.websocket.client.WebsocketClient
    public void handleWebsocketMessage(String str) {
        JsonElement parse = GsonUtil.JSON_PARSER.parse(str);
        Preconditions.checkState(parse instanceof JsonObject);
        JsonObject asJsonObject = parse.getAsJsonObject();
        if (asJsonObject.has(STREAM_KEY)) {
            JsonElement jsonElement = asJsonObject.get(STREAM_KEY);
            if (!(jsonElement instanceof JsonPrimitive)) {
                LOGGER.error("Unknown stream message: " + asJsonObject);
                return;
            }
            try {
                String asString = jsonElement.getAsString();
                MarketDataStreamMessageType marketDataStreamMessageType = (asString.startsWith(TRADES_STREAM_PREFIX) || asString.startsWith(QUOTES_STREAM_PREFIX) || asString.startsWith(AGGREGATE_MINUTE_STREAM_PREFIX)) ? (MarketDataStreamMessageType) GsonUtil.GSON.fromJson(asJsonObject.get(DATA_KEY).getAsJsonObject().get(EVENT_TYPE_KEY), MarketDataStreamMessageType.class) : (MarketDataStreamMessageType) GsonUtil.GSON.fromJson(jsonElement, MarketDataStreamMessageType.class);
                switch (marketDataStreamMessageType) {
                    case AUTHORIZATION:
                        AuthorizationMessage authorizationMessage = (AuthorizationMessage) GsonUtil.GSON.fromJson(asJsonObject, AuthorizationMessage.class);
                        sendStreamMessageToListeners(marketDataStreamMessageType, authorizationMessage);
                        this.authenticated = isAuthorizationMessageSuccess(authorizationMessage);
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug(authorizationMessage.toString());
                            break;
                        }
                        break;
                    case LISTENING:
                        ListeningMessage listeningMessage = (ListeningMessage) GsonUtil.GSON.fromJson(asJsonObject, ListeningMessage.class);
                        sendStreamMessageToListeners(marketDataStreamMessageType, listeningMessage);
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug(listeningMessage.toString());
                            break;
                        }
                        break;
                    case TRADES:
                        TradeMessage tradeMessage = (TradeMessage) GsonUtil.GSON.fromJson(asJsonObject.get(DATA_KEY), TradeMessage.class);
                        sendStreamMessageToListeners(marketDataStreamMessageType, tradeMessage);
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug(tradeMessage.toString());
                            break;
                        }
                        break;
                    case QUOTES:
                        QuoteMessage quoteMessage = (QuoteMessage) GsonUtil.GSON.fromJson(asJsonObject.get(DATA_KEY), QuoteMessage.class);
                        sendStreamMessageToListeners(marketDataStreamMessageType, quoteMessage);
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug(quoteMessage.toString());
                            break;
                        }
                        break;
                    case AGGREGATE_MINUTE:
                        AggregateMinuteMessage aggregateMinuteMessage = (AggregateMinuteMessage) GsonUtil.GSON.fromJson(asJsonObject.get(DATA_KEY), AggregateMinuteMessage.class);
                        sendStreamMessageToListeners(marketDataStreamMessageType, aggregateMinuteMessage);
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug(aggregateMinuteMessage.toString());
                            break;
                        }
                        break;
                    default:
                        LOGGER.error("Unhandled stream type: " + marketDataStreamMessageType);
                        break;
                }
            } catch (JsonParseException e) {
                LOGGER.error("Could not parse message: " + asJsonObject, e);
            }
        }
    }

    @Override // net.jacobpeterson.abstracts.websocket.client.WebsocketClient
    public void sendStreamMessageToListeners(StreamMessageType streamMessageType, StreamMessage streamMessage) {
        Preconditions.checkState(streamMessageType instanceof MarketDataStreamMessageType);
        Preconditions.checkState(streamMessage instanceof MarketDataStreamMessage);
        MarketDataStreamMessageType marketDataStreamMessageType = (MarketDataStreamMessageType) streamMessageType;
        MarketDataStreamMessage marketDataStreamMessage = (MarketDataStreamMessage) streamMessage;
        Iterator it = new ArrayList(this.listeners).iterator();
        while (it.hasNext()) {
            MarketDataStreamListener marketDataStreamListener = (MarketDataStreamListener) it.next();
            boolean z = false;
            if (marketDataStreamMessage instanceof MarketDataStreamStatusMessage) {
                z = (marketDataStreamListener.getDataStreams() == null || marketDataStreamListener.getDataStreams().isEmpty()) ? true : marketDataStreamListener.getDataStreams().values().stream().anyMatch(set -> {
                    return set.contains(MarketDataStreamMessageType.LISTENING) || set.contains(MarketDataStreamMessageType.AUTHORIZATION);
                });
            } else if (marketDataStreamMessage instanceof MarketDataStreamDataMessage) {
                MarketDataStreamDataMessage marketDataStreamDataMessage = (MarketDataStreamDataMessage) marketDataStreamMessage;
                if (marketDataStreamListener.getDataStreams() == null || marketDataStreamListener.getDataStreams().isEmpty()) {
                    z = true;
                } else {
                    Set<MarketDataStreamMessageType> orDefault = marketDataStreamListener.getDataStreams().getOrDefault(marketDataStreamDataMessage.getTicker(), null);
                    Set<MarketDataStreamMessageType> orDefault2 = marketDataStreamListener.getDataStreams().getOrDefault(ALL_TICKERS, null);
                    if (orDefault != null) {
                        z = orDefault.contains(marketDataStreamMessageType);
                    } else if (orDefault2 != null) {
                        z = orDefault2.contains(marketDataStreamMessageType);
                    }
                }
            }
            if (z) {
                marketDataStreamListener.onStreamUpdate(marketDataStreamMessageType, marketDataStreamMessage);
            }
        }
    }

    @Override // net.jacobpeterson.abstracts.websocket.client.WebsocketClient
    public boolean isConnected() {
        return this.marketDataWebsocketClientEndpoint.getUserSession().isOpen();
    }

    @Override // net.jacobpeterson.abstracts.websocket.client.WebsocketClient
    public boolean isAuthenticated() {
        return this.authenticated;
    }

    private boolean isAuthorizationMessageSuccess(AuthorizationMessage authorizationMessage) {
        return authorizationMessage.getData().getStatus().equalsIgnoreCase("authorized") && authorizationMessage.getData().getAction().equalsIgnoreCase("authenticate");
    }

    private void submitStreamRequest() {
        JsonObject jsonObject = new JsonObject();
        jsonObject.addProperty("action", "listen");
        JsonArray jsonArray = new JsonArray();
        getRegisteredMessageTypes().forEach((str, set) -> {
            set.forEach(marketDataStreamMessageType -> {
                jsonArray.add(marketDataStreamMessageType.getAPIName() + "." + str);
            });
        });
        JsonObject jsonObject2 = new JsonObject();
        jsonObject2.add("streams", jsonArray);
        jsonObject.add(DATA_KEY, jsonObject2);
        this.marketDataWebsocketClientEndpoint.sendMessage(jsonObject.toString());
        LOGGER.info("Requested subscriptions to update to " + jsonArray);
    }

    public HashMap<String, Set<MarketDataStreamMessageType>> getRegisteredMessageTypes() {
        HashMap<String, Set<MarketDataStreamMessageType>> hashMap = new HashMap<>();
        Iterator it = new ArrayList(this.listeners).iterator();
        while (it.hasNext()) {
            MarketDataStreamListener marketDataStreamListener = (MarketDataStreamListener) it.next();
            if (marketDataStreamListener.getDataStreams() != null && !marketDataStreamListener.getDataStreams().isEmpty()) {
                marketDataStreamListener.getDataStreams().forEach((str, set) -> {
                    HashSet hashSet = (HashSet) set.stream().filter((v0) -> {
                        return v0.isAPISubscribable();
                    }).collect(Collectors.toCollection(HashSet::new));
                    if (hashMap.containsKey(str)) {
                        ((Set) hashMap.get(str)).addAll(hashSet);
                    } else {
                        hashMap.put(str, hashSet);
                    }
                });
            }
        }
        return hashMap;
    }
}
