package de._125m125.kt.websocket.reactive;

import de._125m125.kt.ktapi.core.KtNotificationManager;
import de._125m125.kt.ktapi.core.NotificationListener;
import de._125m125.kt.ktapi.core.entities.Entity;
import de._125m125.kt.ktapi.core.entities.HistoryEntry;
import de._125m125.kt.ktapi.core.entities.Item;
import de._125m125.kt.ktapi.core.entities.Message;
import de._125m125.kt.ktapi.core.entities.OrderBookEntry;
import de._125m125.kt.ktapi.core.entities.Payout;
import de._125m125.kt.ktapi.core.entities.Trade;
import de._125m125.kt.ktapi.core.entities.UpdateNotification;
import de._125m125.kt.ktapi.core.users.KtUserStore;
import de._125m125.kt.ktapi.core.users.UserKey;
import de._125m125.kt.ktapi.websocket.events.MessageReceivedEvent;
import de._125m125.kt.ktapi.websocket.events.WebsocketEventListening;
import de._125m125.kt.ktapi.websocket.events.listeners.AbstractKtWebsocketNotificationHandler;
import de._125m125.kt.ktapi.websocket.events.listeners.VerificationMode;
import de._125m125.kt.ktapi.websocket.requests.subscription.SubscriptionRequestData;
import de._125m125.kt.ktapi.websocket.requests.subscription.SubscriptionRequestDataFactory;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/_125m125/kt/websocket/reactive/ReactiveKtWebsocketNotificationHandler.class */
public class ReactiveKtWebsocketNotificationHandler extends AbstractKtWebsocketNotificationHandler<Disposable> {
    private static final Logger logger = LoggerFactory.getLogger(ReactiveKtWebsocketNotificationHandler.class);
    protected final Map<KtNotificationManager.Priority, Subject<UpdateNotification<?>>> subjects;

    public static Subject<UpdateNotification<?>> createSubject() {
        return PublishSubject.create().toSerialized();
    }

    public ReactiveKtWebsocketNotificationHandler(KtUserStore ktUserStore, VerificationMode verificationMode) {
        this(ktUserStore, verificationMode, new SubscriptionRequestDataFactory());
    }

    public ReactiveKtWebsocketNotificationHandler(KtUserStore ktUserStore, VerificationMode verificationMode, SubscriptionRequestDataFactory subscriptionRequestDataFactory) {
        super(logger, ktUserStore, verificationMode, subscriptionRequestDataFactory);
        this.subjects = Collections.synchronizedMap(new EnumMap(KtNotificationManager.Priority.class));
    }

    public void unsubscribe(Disposable disposable) {
        disposable.dispose();
    }

    protected void addListener(SubscriptionRequestData subscriptionRequestData, String str, String str2, NotificationListener notificationListener, CompletableFuture<Disposable> completableFuture, KtNotificationManager.Priority priority) {
        Observable filter = this.subjects.computeIfAbsent(priority, priority2 -> {
            return createSubject();
        }).filter(updateNotification -> {
            return str.equals(updateNotification.getSource());
        });
        if (str2 != null) {
            filter = filter.filter(updateNotification2 -> {
                return str2.equals(updateNotification2.getKey());
            });
        }
        Observable filter2 = filter.filter(updateNotification3 -> {
            return subscriptionRequestData.isSelfCreated() == updateNotification3.isSelfCreated();
        });
        notificationListener.getClass();
        completableFuture.complete(filter2.subscribe((v1) -> {
            r2.update(v1);
        }));
    }

    public void disconnect() {
        super.disconnect();
        this.subjects.values().forEach((v0) -> {
            v0.onComplete();
        });
    }

    @WebsocketEventListening
    public void onMessageReceived(MessageReceivedEvent messageReceivedEvent) {
        if (messageReceivedEvent.getMessage() instanceof UpdateNotification) {
            logger.trace("Received UpdateNotification {}", messageReceivedEvent.getMessage());
            this.subjects.values().forEach(subject -> {
                subject.onNext((UpdateNotification) messageReceivedEvent.getMessage());
            });
        }
    }

    public Observable<HistoryEntry> getHistoryObservable(KtNotificationManager.Priority priority) {
        return getObservable(Entity.HISTORY_ENTRY, HistoryEntry.class, priority);
    }

    public Observable<Item> getItemObservable(KtNotificationManager.Priority priority) {
        return getObservable(Entity.ITEM, Item.class, priority);
    }

    public Observable<Item> getItemObservable(UserKey userKey, KtNotificationManager.Priority priority) {
        return getObservable(Entity.ITEM, Item.class, userKey, priority);
    }

    public Observable<Item> getItemObservable(String str, KtNotificationManager.Priority priority) {
        return getObservable(Entity.ITEM, Item.class, priority).filter(item -> {
            return str.equals(item.getId());
        });
    }

    public Observable<Item> getItemObservable(UserKey userKey, String str, KtNotificationManager.Priority priority) {
        return getObservable(Entity.ITEM, Item.class, userKey, priority).filter(item -> {
            return str.equals(item.getId());
        });
    }

    public Observable<Message> getMessageObservable(KtNotificationManager.Priority priority) {
        return getObservable(Entity.MESSAGE, Message.class, priority);
    }

    public Observable<Message> getMessageObservable(UserKey userKey, KtNotificationManager.Priority priority) {
        return getObservable(Entity.MESSAGE, Message.class, userKey, priority);
    }

    public Observable<OrderBookEntry> getOrderbookObservable(KtNotificationManager.Priority priority) {
        return getObservable(Entity.ORDERBOOK_ENTRY, OrderBookEntry.class, priority);
    }

    public Observable<Payout> getPayoutObservable(KtNotificationManager.Priority priority) {
        return getObservable(Entity.PAYOUT, Payout.class, priority);
    }

    public Observable<Payout> getPayoutObservable(UserKey userKey, KtNotificationManager.Priority priority) {
        return getObservable(Entity.PAYOUT, Payout.class, userKey, priority);
    }

    public Observable<Payout> getPayoutObservable(long j, KtNotificationManager.Priority priority) {
        return getObservable(Entity.PAYOUT, Payout.class, priority).filter(payout -> {
            return j == payout.getId();
        }).takeUntil(payout2 -> {
            return "SUCCESS".equals(payout2.getState()) || "CANCELLED".equals(payout2.getState()) || "FAILED_TAKEN".equals(payout2.getState());
        });
    }

    public Observable<Trade> getTradeObservable(KtNotificationManager.Priority priority) {
        return getObservable(Entity.TRADE, Trade.class, priority);
    }

    public Observable<Trade> getTradeObservable(UserKey userKey, KtNotificationManager.Priority priority) {
        return getObservable(Entity.TRADE, Trade.class, userKey, priority);
    }

    public Observable<Trade> getTradeObservable(long j, KtNotificationManager.Priority priority) {
        return getObservable(Entity.TRADE, Trade.class, priority).filter(trade -> {
            return j == trade.getId();
        }).takeUntil(trade2 -> {
            return trade2.getAmount() == trade2.getSold() && trade2.getToTakeItems() == 0 && trade2.getToTakeMoney() == 0.0d;
        });
    }

    private <U> Observable<U> getObservable(Entity entity, Class<U> cls, KtNotificationManager.Priority priority) {
        if (entity.getInstanceClass() != cls) {
            throw new IllegalArgumentException("type " + entity + " does not map to " + cls);
        }
        Observable flatMap = this.subjects.computeIfAbsent(priority, priority2 -> {
            return createSubject();
        }).filter(updateNotification -> {
            return entity.getUpdateChannel().equals(updateNotification.getSource());
        }).flatMap(updateNotification2 -> {
            return Observable.fromArray(updateNotification2.getChangedEntries());
        });
        cls.getClass();
        return flatMap.map(cls::cast);
    }

    private <U> Observable<U> getObservable(Entity entity, Class<U> cls, UserKey userKey, KtNotificationManager.Priority priority) {
        if (entity.getInstanceClass() != cls) {
            throw new IllegalArgumentException("type " + entity + " does not map to " + cls);
        }
        Observable flatMap = this.subjects.computeIfAbsent(priority, priority2 -> {
            return createSubject();
        }).filter(updateNotification -> {
            return entity.getUpdateChannel().equals(updateNotification.getSource());
        }).filter(updateNotification2 -> {
            return userKey.getUserId().equals(updateNotification2.getKey());
        }).flatMap(updateNotification3 -> {
            return Observable.fromArray(updateNotification3.getChangedEntries());
        });
        cls.getClass();
        return flatMap.map(cls::cast);
    }
}
