package ru.quipy.streams;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.Intrinsics;
import kotlin.reflect.KClass;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineName;
import kotlinx.coroutines.CoroutineScopeKt;
import kotlinx.coroutines.CoroutineStart;
import kotlinx.coroutines.ExecutorsKt;
import kotlinx.coroutines.Job;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.quipy.domain.Aggregate;
import ru.quipy.domain.Event;
import ru.quipy.mapper.EventMapper;
import ru.quipy.utils.NamedThreadFactory;

/* compiled from: EventStreamSubscriber.kt */
@Metadata(mv = {1, 6, 0}, k = 1, xi = 48, d1 = {"��V\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010$\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0010\u000b\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0006\u0018��*\b\b��\u0010\u0001*\u00020\u00022\u00020\u0003:\u0001\u001dB\u0082\u0001\u0012\f\u0010\u0004\u001a\b\u0012\u0004\u0012\u00028��0\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u001e\u0010\b\u001a\u001a\u0012\u0004\u0012\u00020\n\u0012\u0010\u0012\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00028��0\f0\u000b0\t\u0012B\u0010\r\u001a>\u0012\u0012\u0012\u0010\u0012\f\b\u0001\u0012\b\u0012\u0004\u0012\u00028��0\f0\u000b\u0012&\u0012$\b\u0001\u0012\n\u0012\b\u0012\u0004\u0012\u00028��0\f\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00110\u0010\u0012\u0006\u0012\u0004\u0018\u00010\u00030\u000f0\u000eø\u0001��¢\u0006\u0002\u0010\u0012J*\u0010\u0019\u001a\b\u0012\u0004\u0012\u00028��0\f2\u0006\u0010\u001a\u001a\u00020\n2\u0012\u0010\u001b\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00028��0\f0\u000bH\u0002J\u0006\u0010\u001c\u001a\u00020\u0011R\u000e\u0010\u0013\u001a\u00020\u0014X\u0082\u000e¢\u0006\u0002\n��R\u0014\u0010\u0004\u001a\b\u0012\u0004\u0012\u00028��0\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��RM\u0010\r\u001a>\u0012\u0012\u0012\u0010\u0012\f\b\u0001\u0012\b\u0012\u0004\u0012\u00028��0\f0\u000b\u0012&\u0012$\b\u0001\u0012\n\u0012\b\u0012\u0004\u0012\u00028��0\f\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00110\u0010\u0012\u0006\u0012\u0004\u0018\u00010\u00030\u000f0\u000eX\u0082\u0004ø\u0001��¢\u0006\u0002\n��R\u000e\u0010\u0015\u001a\u00020\u0016X\u0082\u0004¢\u0006\u0002\n��R&\u0010\b\u001a\u001a\u0012\u0004\u0012\u00020\n\u0012\u0010\u0012\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00028��0\f0\u000b0\tX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0017\u001a\u00020\u0018X\u0082\u0004¢\u0006\u0002\n��\u0082\u0002\u0004\n\u0002\b\u0019¨\u0006\u001e"}, d2 = {"Lru/quipy/streams/EventStreamSubscriber;", "A", "Lru/quipy/domain/Aggregate;", "", "aggregateEventStream", "Lru/quipy/streams/AggregateEventStream;", "eventMapper", "Lru/quipy/mapper/EventMapper;", "nameToEventClassFunc", "Lkotlin/Function1;", "", "Lkotlin/reflect/KClass;", "Lru/quipy/domain/Event;", "handlers", "", "Lkotlin/Function2;", "Lkotlin/coroutines/Continuation;", "", "(Lru/quipy/streams/AggregateEventStream;Lru/quipy/mapper/EventMapper;Lkotlin/jvm/functions/Function1;Ljava/util/Map;)V", "active", "", "logger", "Lorg/slf4j/Logger;", "subscriptionCoroutine", "Lkotlinx/coroutines/Job;", "payloadToEvent", "payload", "eventType", "stopAndDestroy", "EventStreamSubscriptionBuilder", "tiny-event-sourcing-lib"})
/* loaded from: input_file:ru/quipy/streams/EventStreamSubscriber.class */
public final class EventStreamSubscriber<A extends Aggregate> {

    @NotNull
    private final AggregateEventStream<A> aggregateEventStream;

    @NotNull
    private final EventMapper eventMapper;

    @NotNull
    private final Function1<String, KClass<Event<A>>> nameToEventClassFunc;

    @NotNull
    private final Map<KClass<? extends Event<A>>, Function2<Event<A>, Continuation<? super Unit>, Object>> handlers;
    private volatile boolean active;

    @NotNull
    private final Logger logger;

    @NotNull
    private final Job subscriptionCoroutine;

    /* compiled from: EventStreamSubscriber.kt */
    @Metadata(mv = {1, 6, 0}, k = 1, xi = 48, d1 = {"��J\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010%\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0006\u0018��*\b\b\u0001\u0010\u0001*\u00020\u00022\u00020\u0003B;\u0012\f\u0010\u0004\u001a\b\u0012\u0004\u0012\u00028\u00010\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u001e\u0010\b\u001a\u001a\u0012\u0004\u0012\u00020\n\u0012\u0010\u0012\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00028\u00010\f0\u000b0\t¢\u0006\u0002\u0010\rJ\f\u0010\u0013\u001a\b\u0012\u0004\u0012\u00028\u00010\u0014JV\u0010\u0015\u001a\b\u0012\u0004\u0012\u00028\u00010��\"\u000e\b\u0002\u0010\u0016*\b\u0012\u0004\u0012\u00028\u00010\f2\f\u0010\u0017\u001a\b\u0012\u0004\u0012\u0002H\u00160\u000b2\"\u0010\u0018\u001a\u001e\b\u0001\u0012\u0004\u0012\u0002H\u0016\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00120\u0011\u0012\u0006\u0012\u0004\u0018\u00010\u00030\u0010ø\u0001��¢\u0006\u0002\u0010\u0019R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��RM\u0010\u000e\u001a>\u0012\u0012\u0012\u0010\u0012\f\b\u0001\u0012\b\u0012\u0004\u0012\u00028\u00010\f0\u000b\u0012&\u0012$\b\u0001\u0012\n\u0012\b\u0012\u0004\u0012\u00028\u00010\f\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00120\u0011\u0012\u0006\u0012\u0004\u0018\u00010\u00030\u00100\u000fX\u0082\u0004ø\u0001��¢\u0006\u0002\n��R&\u0010\b\u001a\u001a\u0012\u0004\u0012\u00020\n\u0012\u0010\u0012\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00028\u00010\f0\u000b0\tX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0004\u001a\b\u0012\u0004\u0012\u00028\u00010\u0005X\u0082\u0004¢\u0006\u0002\n��\u0082\u0002\u0004\n\u0002\b\u0019¨\u0006\u001a"}, d2 = {"Lru/quipy/streams/EventStreamSubscriber$EventStreamSubscriptionBuilder;", "A", "Lru/quipy/domain/Aggregate;", "", "wrapped", "Lru/quipy/streams/AggregateEventStream;", "eventMapper", "Lru/quipy/mapper/EventMapper;", "nameToEventClassFunc", "Lkotlin/Function1;", "", "Lkotlin/reflect/KClass;", "Lru/quipy/domain/Event;", "(Lru/quipy/streams/AggregateEventStream;Lru/quipy/mapper/EventMapper;Lkotlin/jvm/functions/Function1;)V", "handlers", "", "Lkotlin/Function2;", "Lkotlin/coroutines/Continuation;", "", "subscribe", "Lru/quipy/streams/EventStreamSubscriber;", "when", "E", "eventType", "eventHandler", "(Lkotlin/reflect/KClass;Lkotlin/jvm/functions/Function2;)Lru/quipy/streams/EventStreamSubscriber$EventStreamSubscriptionBuilder;", "tiny-event-sourcing-lib"})
    /* loaded from: input_file:ru/quipy/streams/EventStreamSubscriber$EventStreamSubscriptionBuilder.class */
    public static final class EventStreamSubscriptionBuilder<A extends Aggregate> {

        @NotNull
        private final AggregateEventStream<A> wrapped;

        @NotNull
        private final EventMapper eventMapper;

        @NotNull
        private final Function1<String, KClass<Event<A>>> nameToEventClassFunc;

        @NotNull
        private final Map<KClass<? extends Event<A>>, Function2<Event<A>, Continuation<? super Unit>, Object>> handlers;

        /* JADX WARN: Multi-variable type inference failed */
        public EventStreamSubscriptionBuilder(@NotNull AggregateEventStream<A> aggregateEventStream, @NotNull EventMapper eventMapper, @NotNull Function1<? super String, ? extends KClass<Event<A>>> function1) {
            Intrinsics.checkNotNullParameter(aggregateEventStream, "wrapped");
            Intrinsics.checkNotNullParameter(eventMapper, "eventMapper");
            Intrinsics.checkNotNullParameter(function1, "nameToEventClassFunc");
            this.wrapped = aggregateEventStream;
            this.eventMapper = eventMapper;
            this.nameToEventClassFunc = function1;
            this.handlers = new LinkedHashMap();
        }

        @NotNull
        public final <E extends Event<A>> EventStreamSubscriptionBuilder<A> when(@NotNull KClass<E> kClass, @NotNull Function2<? super E, ? super Continuation<? super Unit>, ? extends Object> function2) {
            Intrinsics.checkNotNullParameter(kClass, "eventType");
            Intrinsics.checkNotNullParameter(function2, "eventHandler");
            this.handlers.put(kClass, function2);
            return this;
        }

        @NotNull
        public final EventStreamSubscriber<A> subscribe() {
            return new EventStreamSubscriber<>(this.wrapped, this.eventMapper, this.nameToEventClassFunc, this.handlers);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public EventStreamSubscriber(@NotNull AggregateEventStream<A> aggregateEventStream, @NotNull EventMapper eventMapper, @NotNull Function1<? super String, ? extends KClass<Event<A>>> function1, @NotNull Map<KClass<? extends Event<A>>, ? extends Function2<? super Event<A>, ? super Continuation<? super Unit>, ? extends Object>> map) {
        Intrinsics.checkNotNullParameter(aggregateEventStream, "aggregateEventStream");
        Intrinsics.checkNotNullParameter(eventMapper, "eventMapper");
        Intrinsics.checkNotNullParameter(function1, "nameToEventClassFunc");
        Intrinsics.checkNotNullParameter(map, "handlers");
        this.aggregateEventStream = aggregateEventStream;
        this.eventMapper = eventMapper;
        this.nameToEventClassFunc = function1;
        this.handlers = map;
        this.active = true;
        Logger logger = LoggerFactory.getLogger(EventStreamSubscriber.class);
        Intrinsics.checkNotNullExpressionValue(logger, "getLogger(EventStreamSubscriber::class.java)");
        this.logger = logger;
        CoroutineName coroutineName = new CoroutineName("handlingCoroutine");
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory(this.aggregateEventStream.getStreamName() + "-sbscr"));
        Intrinsics.checkNotNullExpressionValue(newSingleThreadExecutor, "newSingleThreadExecutor(…ream.streamName}-sbscr\"))");
        this.subscriptionCoroutine = BuildersKt.launch$default(CoroutineScopeKt.CoroutineScope(coroutineName.plus(ExecutorsKt.from(newSingleThreadExecutor))), (CoroutineContext) null, (CoroutineStart) null, new EventStreamSubscriber$subscriptionCoroutine$1(this, null), 3, (Object) null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Event<A> payloadToEvent(String str, KClass<Event<A>> kClass) {
        return this.eventMapper.toEvent(str, kClass);
    }

    public final void stopAndDestroy() {
        this.active = false;
        Job.DefaultImpls.cancel$default(this.subscriptionCoroutine, (CancellationException) null, 1, (Object) null);
        this.aggregateEventStream.stopAndDestroy();
    }
}
