package ai.tock.bot.connector.web.channel;

import ai.tock.bot.connector.web.channel.ChannelEvent;
import ai.tock.shared.LoggersKt;
import com.github.salomonbrys.kodein.InjectedProperty;
import com.mongodb.client.MongoIterable;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import java.util.function.Consumer;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.collections.KMongoIterableKt;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.PropertyReference1Impl;
import kotlin.jvm.internal.SourceDebugExtension;
import kotlin.reflect.KProperty;
import mu.KLogger;
import mu.KotlinLogging;
import org.bson.conversions.Bson;
import org.jetbrains.annotations.NotNull;
import org.litote.kmongo.FiltersKt;
import org.litote.kmongo.MongoCollectionsKt;
import org.litote.kmongo.SetTo;
import org.litote.kmongo.UpdatesKt;
import org.litote.kmongo.reactivestreams.MongoSharedCollectionsKt;

/* compiled from: ChannelMongoDAO.kt */
@Metadata(mv = {1, 9, 0}, k = 1, xi = 48, d1 = {"��X\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n��\n\u0002\u0010\t\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0007\n\u0002\u0010\u000b\n\u0002\b\u0002\bÀ\u0002\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002J \u0010\u001b\u001a\u00020\u001c2\u0006\u0010\u001d\u001a\u00020\u00042\u0006\u0010\u001e\u001a\u00020\u00042\u0006\u0010\u001f\u001a\u00020 H\u0016J\u0010\u0010!\u001a\u00020\u001c2\u0006\u0010\"\u001a\u00020 H\u0016J\u0018\u0010#\u001a\u00020\u001c2\u0006\u0010$\u001a\u00020\r2\u0006\u0010\u001f\u001a\u00020 H\u0002J\u0010\u0010%\u001a\u00020\u001c2\u0006\u0010&\u001a\u00020\rH\u0016J\u0014\u0010'\u001a\u00020(*\u00020\u000f2\u0006\u0010)\u001a\u00020\u0004H\u0002R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082T¢\u0006\u0002\n��R\u001b\u0010\u0005\u001a\u00020\u00068BX\u0082\u0084\u0002¢\u0006\f\n\u0004\b\t\u0010\n\u001a\u0004\b\u0007\u0010\bR\u0014\u0010\u000b\u001a\b\u0012\u0004\u0012\u00020\r0\fX\u0082\u0004¢\u0006\u0002\n��R\u001b\u0010\u000e\u001a\u00020\u000f8BX\u0082\u0084\u0002¢\u0006\f\n\u0004\b\u0012\u0010\n\u001a\u0004\b\u0010\u0010\u0011R\u000e\u0010\u0013\u001a\u00020\u0014X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0015\u001a\u00020\u0016X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0017\u001a\u00020\u0016X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0018\u001a\u00020\u0016X\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0019\u001a\b\u0012\u0004\u0012\u00020\r0\u001aX\u0082\u0004¢\u0006\u0002\n��¨\u0006*"}, d2 = {"Lai/tock/bot/connector/web/channel/ChannelMongoDAO;", "Lai/tock/bot/connector/web/channel/ChannelDAO;", "()V", "COLLECTION_NAME", "", "asyncDatabase", "Lcom/mongodb/reactivestreams/client/MongoDatabase;", "getAsyncDatabase", "()Lcom/mongodb/reactivestreams/client/MongoDatabase;", "asyncDatabase$delegate", "Lcom/github/salomonbrys/kodein/InjectedProperty;", "asyncWebChannelResponseCol", "Lcom/mongodb/reactivestreams/client/MongoCollection;", "Lai/tock/bot/connector/web/channel/ChannelEvent;", "database", "Lcom/mongodb/client/MongoDatabase;", "getDatabase", "()Lcom/mongodb/client/MongoDatabase;", "database$delegate", "logger", "Lmu/KLogger;", "messageQueueMaxCount", "", "messageQueueMaxSize", "messageQueueTtl", "webChannelResponseCol", "Lcom/mongodb/client/MongoCollection;", "handleMissedEvents", "", "appId", "recipientId", "handler", "Lai/tock/bot/connector/web/channel/ChannelEvent$Handler;", "listenChanges", "listener", "process", "event", "save", "channelEvent", "collectionExists", "", "collectionName", "tock-bot-connector-web"})
@SourceDebugExtension({"SMAP\nChannelMongoDAO.kt\nKotlin\n*S Kotlin\n*F\n+ 1 ChannelMongoDAO.kt\nai/tock/bot/connector/web/channel/ChannelMongoDAO\n+ 2 Mongos.kt\nai/tock/shared/MongosKt\n+ 3 MongoSharedCollections.kt\norg/litote/kmongo/reactivestreams/MongoSharedCollectionsKt\n+ 4 KMongoIterable.kt\nkotlin/collections/KMongoIterableKt\n+ 5 GInjected.kt\ncom/github/salomonbrys/kodein/GInjectedKt\n+ 6 types.kt\ncom/github/salomonbrys/kodein/TypesKt\n+ 7 MongoSharedDatabases.kt\norg/litote/kmongo/reactivestreams/MongoSharedDatabasesKt\n+ 8 MongoDatabases.kt\norg/litote/kmongo/MongoDatabasesKt\n*L\n1#1,138:1\n234#2,5:139\n245#2:163\n60#3,19:144\n61#4:164\n80#5:165\n80#5:167\n277#6:166\n277#6:168\n41#7:169\n40#8,11:170\n*S KotlinDebug\n*F\n+ 1 ChannelMongoDAO.kt\nai/tock/bot/connector/web/channel/ChannelMongoDAO\n*L\n92#1:139,5\n92#1:163\n92#1:144,19\n111#1:164\n42#1:165\n45#1:167\n42#1:166\n45#1:168\n74#1:169\n75#1:170,11\n*E\n"})
/* loaded from: input_file:ai/tock/bot/connector/web/channel/ChannelMongoDAO.class */
public final class ChannelMongoDAO implements ChannelDAO {

    @NotNull
    public static final ChannelMongoDAO INSTANCE;
    static final /* synthetic */ KProperty<Object>[] $$delegatedProperties;

    @NotNull
    private static final String COLLECTION_NAME = "web_channel_event";

    @NotNull
    private static final InjectedProperty asyncDatabase$delegate;

    @NotNull
    private static final InjectedProperty database$delegate;

    @NotNull
    private static final KLogger logger;

    @NotNull
    private static final MongoCollection<ChannelEvent> asyncWebChannelResponseCol;

    @NotNull
    private static final com.mongodb.client.MongoCollection<ChannelEvent> webChannelResponseCol;
    private static final long messageQueueTtl;
    private static final long messageQueueMaxCount;
    private static final long messageQueueMaxSize;

    private ChannelMongoDAO() {
    }

    private final MongoDatabase getAsyncDatabase() {
        return (MongoDatabase) asyncDatabase$delegate.getValue(this, $$delegatedProperties[0]);
    }

    private final com.mongodb.client.MongoDatabase getDatabase() {
        return (com.mongodb.client.MongoDatabase) database$delegate.getValue(this, $$delegatedProperties[1]);
    }

    private final boolean collectionExists(com.mongodb.client.MongoDatabase mongoDatabase, String str) {
        MongoIterable listCollectionNames = mongoDatabase.listCollectionNames();
        Intrinsics.checkNotNullExpressionValue(listCollectionNames, "listCollectionNames(...)");
        return KMongoIterableKt.contains(listCollectionNames, str);
    }

    @Override // ai.tock.bot.connector.web.channel.ChannelDAO
    public void listenChanges(@NotNull final ChannelEvent.Handler handler) {
        Intrinsics.checkNotNullParameter(handler, "listener");
        final MongoCollection<ChannelEvent> mongoCollection = asyncWebChannelResponseCol;
        Function1<ChangeStreamDocument<ChannelEvent>, Unit> function1 = new Function1<ChangeStreamDocument<ChannelEvent>, Unit>() { // from class: ai.tock.bot.connector.web.channel.ChannelMongoDAO$listenChanges$1
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(1);
            }

            public final void invoke(ChangeStreamDocument<ChannelEvent> changeStreamDocument) {
                Intrinsics.checkNotNullParameter(changeStreamDocument, "it");
                ChannelEvent channelEvent = (ChannelEvent) changeStreamDocument.getFullDocument();
                if (channelEvent != null) {
                    ChannelMongoDAO.INSTANCE.process(channelEvent, ChannelEvent.Handler.this);
                }
            }

            public /* bridge */ /* synthetic */ Object invoke(Object obj) {
                invoke((ChangeStreamDocument<ChannelEvent>) obj);
                return Unit.INSTANCE;
            }
        };
        final FullDocument fullDocument = FullDocument.DEFAULT;
        MongoSharedCollectionsKt.watchIndefinitely(mongoCollection, new Function1<MongoCollection<ChannelEvent>, ChangeStreamPublisher<ChannelEvent>>() { // from class: ai.tock.bot.connector.web.channel.ChannelMongoDAO$listenChanges$$inlined$watch$default$4
            public final ChangeStreamPublisher<ChannelEvent> invoke(MongoCollection<ChannelEvent> mongoCollection2) {
                Intrinsics.checkNotNullParameter(mongoCollection2, "it");
                ChangeStreamPublisher<ChannelEvent> fullDocument2 = mongoCollection.watch(ChannelEvent.class).fullDocument(fullDocument);
                Intrinsics.checkNotNullExpressionValue(fullDocument2, "fullDocument(...)");
                return fullDocument2;
            }
        }, new Function0<Unit>() { // from class: ai.tock.bot.connector.web.channel.ChannelMongoDAO$listenChanges$$inlined$watch$default$1
            public final void invoke() {
                KotlinLogging.INSTANCE.logger(new Function0<Unit>() { // from class: ai.tock.bot.connector.web.channel.ChannelMongoDAO$listenChanges$$inlined$watch$default$1.1
                    public final void invoke() {
                    }

                    /* renamed from: invoke, reason: collision with other method in class */
                    public /* bridge */ /* synthetic */ Object m23invoke() {
                        invoke();
                        return Unit.INSTANCE;
                    }
                }).info(new Function0<Object>() { // from class: ai.tock.bot.connector.web.channel.ChannelMongoDAO$listenChanges$$inlined$watch$default$1.2
                    public final Object invoke() {
                        return "Subscribe stream";
                    }
                });
            }

            /* renamed from: invoke, reason: collision with other method in class */
            public /* bridge */ /* synthetic */ Object m21invoke() {
                invoke();
                return Unit.INSTANCE;
            }
        }, new Function1<Throwable, Unit>() { // from class: ai.tock.bot.connector.web.channel.ChannelMongoDAO$listenChanges$$inlined$watch$default$2
            public final void invoke(Throwable th) {
                Intrinsics.checkNotNullParameter(th, "it");
                LoggersKt.error(KotlinLogging.INSTANCE.logger(new Function0<Unit>() { // from class: ai.tock.bot.connector.web.channel.ChannelMongoDAO$listenChanges$$inlined$watch$default$2.1
                    public final void invoke() {
                    }

                    /* renamed from: invoke, reason: collision with other method in class */
                    public /* bridge */ /* synthetic */ Object m27invoke() {
                        invoke();
                        return Unit.INSTANCE;
                    }
                }), th);
            }

            public /* bridge */ /* synthetic */ Object invoke(Object obj) {
                invoke((Throwable) obj);
                return Unit.INSTANCE;
            }
        }, new Function0<Unit>() { // from class: ai.tock.bot.connector.web.channel.ChannelMongoDAO$listenChanges$$inlined$watch$default$3
            public final void invoke() {
                KotlinLogging.INSTANCE.logger(new Function0<Unit>() { // from class: ai.tock.bot.connector.web.channel.ChannelMongoDAO$listenChanges$$inlined$watch$default$3.1
                    public final void invoke() {
                    }

                    /* renamed from: invoke, reason: collision with other method in class */
                    public /* bridge */ /* synthetic */ Object m31invoke() {
                        invoke();
                        return Unit.INSTANCE;
                    }
                }).warn(new Function0<Object>() { // from class: ai.tock.bot.connector.web.channel.ChannelMongoDAO$listenChanges$$inlined$watch$default$3.2
                    public final Object invoke() {
                        return "Reopen stream";
                    }
                });
            }

            /* renamed from: invoke, reason: collision with other method in class */
            public /* bridge */ /* synthetic */ Object m29invoke() {
                invoke();
                return Unit.INSTANCE;
            }
        }, 5000L, function1);
    }

    @Override // ai.tock.bot.connector.web.channel.ChannelDAO
    public void handleMissedEvents(@NotNull String str, @NotNull String str2, @NotNull final ChannelEvent.Handler handler) {
        Intrinsics.checkNotNullParameter(str, "appId");
        Intrinsics.checkNotNullParameter(str2, "recipientId");
        Intrinsics.checkNotNullParameter(handler, "handler");
        MongoIterable find = webChannelResponseCol.find(FiltersKt.and(new Bson[]{FiltersKt.eq(new PropertyReference1Impl() { // from class: ai.tock.bot.connector.web.channel.ChannelMongoDAO$handleMissedEvents$1
            public Object get(Object obj) {
                return ((ChannelEvent) obj).getAppId();
            }
        }, str), FiltersKt.eq(new PropertyReference1Impl() { // from class: ai.tock.bot.connector.web.channel.ChannelMongoDAO$handleMissedEvents$2
            public Object get(Object obj) {
                return ((ChannelEvent) obj).getRecipientId();
            }
        }, str2), FiltersKt.eq(new PropertyReference1Impl() { // from class: ai.tock.bot.connector.web.channel.ChannelMongoDAO$handleMissedEvents$3
            public Object get(Object obj) {
                return ((ChannelEvent) obj).getStatus();
            }
        }, ChannelEvent.Status.ENQUEUED)}));
        Intrinsics.checkNotNullExpressionValue(find, "find(...)");
        find.forEach(new Consumer() { // from class: ai.tock.bot.connector.web.channel.ChannelMongoDAO$handleMissedEvents$$inlined$forEach$1
            /* JADX WARN: Multi-variable type inference failed */
            @Override // java.util.function.Consumer
            public final void accept(T t) {
                ChannelEvent channelEvent = (ChannelEvent) t;
                ChannelMongoDAO channelMongoDAO = ChannelMongoDAO.INSTANCE;
                Intrinsics.checkNotNull(channelEvent);
                channelMongoDAO.process(channelEvent, ChannelEvent.Handler.this);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void process(ChannelEvent channelEvent, ChannelEvent.Handler handler) {
        try {
            handler.invoke(channelEvent).onComplete((v1) -> {
                process$lambda$1(r1, v1);
            }, ChannelMongoDAO::process$lambda$2);
        } catch (Exception e) {
            logger.error(e, new Function0<Object>() { // from class: ai.tock.bot.connector.web.channel.ChannelMongoDAO$process$3
                public final Object invoke() {
                    return "Failed to send SSE message";
                }
            });
        }
    }

    @Override // ai.tock.bot.connector.web.channel.ChannelDAO
    public void save(@NotNull ChannelEvent channelEvent) {
        Intrinsics.checkNotNullParameter(channelEvent, "channelEvent");
        MongoCollectionsKt.save(webChannelResponseCol, channelEvent);
    }

    private static final void process$lambda$1(ChannelEvent channelEvent, Boolean bool) {
        Intrinsics.checkNotNull(bool);
        if (bool.booleanValue()) {
            MongoCollectionsKt.updateOneById$default(webChannelResponseCol, channelEvent.get_id(), new SetTo[]{UpdatesKt.setTo(new PropertyReference1Impl() { // from class: ai.tock.bot.connector.web.channel.ChannelMongoDAO$process$1$1
                public Object get(Object obj) {
                    return ((ChannelEvent) obj).getStatus();
                }
            }, ChannelEvent.Status.PROCESSED)}, (UpdateOptions) null, 4, (Object) null);
        }
    }

    private static final void process$lambda$2(Throwable th) {
        logger.error(th, new Function0<Object>() { // from class: ai.tock.bot.connector.web.channel.ChannelMongoDAO$process$2$1
            public final Object invoke() {
                return "Failed to send SSE message";
            }
        });
    }

    /*  JADX ERROR: JadxRuntimeException in pass: BlockSplitter
        jadx.core.utils.exceptions.JadxRuntimeException: Unexpected missing predecessor for block: B:4:0x00da
        	at jadx.core.dex.visitors.blocks.BlockSplitter.addTempConnectionsForExcHandlers(BlockSplitter.java:275)
        	at jadx.core.dex.visitors.blocks.BlockSplitter.visit(BlockSplitter.java:68)
        */
    static {
        /*
            Method dump skipped, instructions count: 450
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: ai.tock.bot.connector.web.channel.ChannelMongoDAO.m19clinit():void");
    }
}
