package io.streamthoughts.azkarra.api.events.reactive;

import io.streamthoughts.azkarra.api.events.BlockingRecordQueue;
import io.streamthoughts.azkarra.api.events.EventStream;
import io.streamthoughts.azkarra.api.events.callback.QueueCallback;
import io.streamthoughts.azkarra.api.events.reactive.internal.SequentialSubscriptionIdGenerator;
import io.streamthoughts.azkarra.api.events.reactive.internal.SubscriptionId;
import io.streamthoughts.azkarra.api.events.reactive.internal.SubscriptionIdGenerator;
import io.streamthoughts.azkarra.api.model.KV;
import java.time.Duration;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/azkarra/api/events/reactive/AsyncMulticastEventStreamPublisher.class */
public class AsyncMulticastEventStreamPublisher<K, V> implements EventStreamPublisher<K, V> {
    private static final int MAX_POLL_RECORDS = 100;
    private final EventStream<K, V> stream;
    private final SubscriptionIdGenerator idGenerator;
    private final AsyncMulticastEventStreamPublisher<K, V>.EventLoop eventLoop;
    private final AtomicBoolean closed;
    private BlockingRecordQueue<K, V> queue;
    private final ConcurrentHashMap<SubscriptionId, EventStreamSubscription<KV<K, V>>> subscriptions;
    private static final Logger LOG = LoggerFactory.getLogger(AsyncMulticastEventStreamPublisher.class);
    private static final Duration AWAIT_TERMINATION_TIMEOUT = Duration.ofSeconds(30);

    /* loaded from: input_file:io/streamthoughts/azkarra/api/events/reactive/AsyncMulticastEventStreamPublisher$Cancel.class */
    static final class Cancel<K, V> extends SubscriptionSignal {
        Cancel(EventStreamSubscription<KV<K, V>> eventStreamSubscription) {
            super(eventStreamSubscription);
        }

        @Override // io.streamthoughts.azkarra.api.events.reactive.AsyncMulticastEventStreamPublisher.SubscriptionSignal
        public void execute() {
            this.subscription.doOnCancel();
        }
    }

    /* loaded from: input_file:io/streamthoughts/azkarra/api/events/reactive/AsyncMulticastEventStreamPublisher$Complete.class */
    enum Complete implements Signal {
        Instance
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamthoughts/azkarra/api/events/reactive/AsyncMulticastEventStreamPublisher$EventLoop.class */
    public class EventLoop implements Runnable {
        private final ConcurrentLinkedQueue<Signal> inboundSignals = new ConcurrentLinkedQueue<>();
        private final AtomicBoolean on = new AtomicBoolean(false);
        private ExecutorService executor = Executors.newSingleThreadExecutor(runnable -> {
            return new Thread(runnable, "event-streams-loop-publisher");
        });

        EventLoop() {
        }

        public void signal(Signal signal) {
            if (this.inboundSignals.offer(signal)) {
                tryScheduleToExecute();
            }
        }

        private void tryScheduleToExecute() {
            if (this.on.compareAndSet(false, true)) {
                try {
                    this.executor.execute(this);
                } catch (Throwable th) {
                    if (AsyncMulticastEventStreamPublisher.this.closed.get()) {
                        this.on.set(false);
                    }
                }
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean isShutdown;
            if (this.on.get()) {
                try {
                    Signal poll = this.inboundSignals.poll();
                    if (poll != null) {
                        AsyncMulticastEventStreamPublisher.LOG.trace("processing: {}", poll.getClass().getSimpleName());
                        if (poll == Send.Instance) {
                            maySendNextRecords();
                        }
                        if (poll == Complete.Instance) {
                            try {
                                mayDrainAllRecords();
                                shutdown();
                                this.inboundSignals.clear();
                            } catch (Throwable th) {
                                shutdown();
                                this.inboundSignals.clear();
                                throw th;
                            }
                        }
                        if (poll instanceof SubscriptionSignal) {
                            ((SubscriptionSignal) poll).execute();
                        }
                    }
                    if (isShutdown) {
                        return;
                    }
                } finally {
                    this.on.set(false);
                    if (!this.executor.isShutdown() && !this.inboundSignals.isEmpty()) {
                        tryScheduleToExecute();
                    }
                }
            }
        }

        private void shutdown() {
            AsyncMulticastEventStreamPublisher.LOG.info("Shutting down event-loop");
            this.executor.shutdown();
        }

        void awaitTerminationAndDoComplete(Duration duration) {
            try {
                this.executor.awaitTermination(duration.toMillis(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                this.executor.shutdownNow();
                AsyncMulticastEventStreamPublisher.LOG.error("EventLoop failed to terminate before timeout, unsent events: {}", Integer.valueOf(AsyncMulticastEventStreamPublisher.this.queue.size()));
            } finally {
                AsyncMulticastEventStreamPublisher.this.subscriptions().forEach((v0) -> {
                    v0.doOnComplete();
                });
            }
        }

        private void mayDrainAllRecords() {
            if (AsyncMulticastEventStreamPublisher.this.hasActiveSubscriptions() && AsyncMulticastEventStreamPublisher.this.hasMoreRecords()) {
                LinkedList linkedList = new LinkedList();
                AsyncMulticastEventStreamPublisher.this.queue.drainTo(linkedList);
                linkedList.forEach(kv -> {
                    AsyncMulticastEventStreamPublisher.this.subscriptions().forEach(eventStreamSubscription -> {
                        if (eventStreamSubscription.canReceived()) {
                            eventStreamSubscription.doOnNext(kv);
                        }
                    });
                });
            }
        }

        private void maySendNextRecords() {
            for (int i = 0; AsyncMulticastEventStreamPublisher.this.hasMoreRecords() && i < AsyncMulticastEventStreamPublisher.MAX_POLL_RECORDS; i++) {
                if (!AsyncMulticastEventStreamPublisher.this.hasActiveSubscriptions()) {
                    AsyncMulticastEventStreamPublisher.LOG.trace("no subscription ready for next records. Signal ignored.");
                    return;
                } else {
                    KV<K, V> poll = AsyncMulticastEventStreamPublisher.this.queue.poll();
                    AsyncMulticastEventStreamPublisher.this.subscriptions().forEach(eventStreamSubscription -> {
                        if (eventStreamSubscription.canReceived()) {
                            eventStreamSubscription.doOnNext(poll);
                        }
                    });
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/streamthoughts/azkarra/api/events/reactive/AsyncMulticastEventStreamPublisher$EventStreamSubscription.class */
    public interface EventStreamSubscription<T> extends Flow.Subscription {
        void doOnNext(T t);

        void doOnSubscribe();

        boolean canReceived();

        void doOnRequest(long j);

        void doOnComplete();

        void doOnCancel();
    }

    /* loaded from: input_file:io/streamthoughts/azkarra/api/events/reactive/AsyncMulticastEventStreamPublisher$InternalEventStreamSubscription.class */
    private class InternalEventStreamSubscription implements EventStreamSubscription<KV<K, V>> {
        private final Flow.Subscriber<? super KV<K, V>> subscriber;
        private final SubscriptionId subscriptionId;
        private long demands = 0;
        private boolean completed = false;
        private boolean done = false;

        InternalEventStreamSubscription(Flow.Subscriber<? super KV<K, V>> subscriber, SubscriptionId subscriptionId) {
            this.subscriber = subscriber;
            this.subscriptionId = subscriptionId;
        }

        @Override // io.streamthoughts.azkarra.api.events.reactive.AsyncMulticastEventStreamPublisher.EventStreamSubscription
        public boolean canReceived() {
            return this.demands > 0 && !this.done;
        }

        @Override // io.streamthoughts.azkarra.api.events.reactive.AsyncMulticastEventStreamPublisher.EventStreamSubscription
        public void doOnRequest(long j) {
            if (j < 1) {
                this.subscriber.onError(new IllegalArgumentException("non-positive request signals are illegal"));
                return;
            }
            if (this.demands + j < 1) {
                this.demands = Long.MAX_VALUE;
            } else {
                this.demands += j;
            }
            AsyncMulticastEventStreamPublisher.this.eventLoop.signal(Send.Instance);
        }

        private void decrementRequested() {
            if (this.demands != Long.MAX_VALUE) {
                this.demands--;
            }
        }

        @Override // io.streamthoughts.azkarra.api.events.reactive.AsyncMulticastEventStreamPublisher.EventStreamSubscription
        public void doOnSubscribe() {
            AsyncMulticastEventStreamPublisher.this.subscriptions.put(this.subscriptionId, this);
            this.subscriber.onSubscribe(this);
        }

        @Override // io.streamthoughts.azkarra.api.events.reactive.AsyncMulticastEventStreamPublisher.EventStreamSubscription
        public void doOnComplete() {
            if (this.completed) {
                return;
            }
            try {
                doOnCancel();
                this.subscriber.onComplete();
                this.completed = true;
            } catch (Exception e) {
                AsyncMulticastEventStreamPublisher.LOG.error("Exception occurred while calling onComplete", e);
            }
        }

        @Override // io.streamthoughts.azkarra.api.events.reactive.AsyncMulticastEventStreamPublisher.EventStreamSubscription
        public void doOnCancel() {
            AsyncMulticastEventStreamPublisher.this.subscriptions.remove(this.subscriptionId);
            this.done = true;
        }

        @Override // io.streamthoughts.azkarra.api.events.reactive.AsyncMulticastEventStreamPublisher.EventStreamSubscription
        public void doOnNext(KV<K, V> kv) {
            try {
                this.subscriber.onNext(kv);
                decrementRequested();
            } catch (Exception e) {
                AsyncMulticastEventStreamPublisher.LOG.error("Exception occurred while calling onNext", e);
            }
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void request(long j) {
            if (this.completed) {
                return;
            }
            AsyncMulticastEventStreamPublisher.this.eventLoop.signal(new Request(this, j));
        }

        @Override // java.util.concurrent.Flow.Subscription
        public void cancel() {
            if (this.done) {
                return;
            }
            AsyncMulticastEventStreamPublisher.this.eventLoop.signal(new Cancel(this));
        }

        void init() {
            AsyncMulticastEventStreamPublisher.this.eventLoop.signal(new Subscribe(this));
        }
    }

    /* loaded from: input_file:io/streamthoughts/azkarra/api/events/reactive/AsyncMulticastEventStreamPublisher$Request.class */
    static final class Request<K, V> extends SubscriptionSignal {
        final long demands;

        Request(EventStreamSubscription<KV<K, V>> eventStreamSubscription, long j) {
            super(eventStreamSubscription);
            this.demands = j;
        }

        @Override // io.streamthoughts.azkarra.api.events.reactive.AsyncMulticastEventStreamPublisher.SubscriptionSignal
        public void execute() {
            this.subscription.doOnRequest(this.demands);
        }
    }

    /* loaded from: input_file:io/streamthoughts/azkarra/api/events/reactive/AsyncMulticastEventStreamPublisher$Send.class */
    enum Send implements Signal {
        Instance
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/streamthoughts/azkarra/api/events/reactive/AsyncMulticastEventStreamPublisher$Signal.class */
    public interface Signal {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/streamthoughts/azkarra/api/events/reactive/AsyncMulticastEventStreamPublisher$Subscribe.class */
    public static final class Subscribe<K, V> extends SubscriptionSignal {
        Subscribe(EventStreamSubscription<KV<K, V>> eventStreamSubscription) {
            super(eventStreamSubscription);
        }

        @Override // io.streamthoughts.azkarra.api.events.reactive.AsyncMulticastEventStreamPublisher.SubscriptionSignal
        public void execute() {
            this.subscription.doOnSubscribe();
        }
    }

    /* loaded from: input_file:io/streamthoughts/azkarra/api/events/reactive/AsyncMulticastEventStreamPublisher$SubscriptionSignal.class */
    static abstract class SubscriptionSignal implements Signal {
        final EventStreamSubscription subscription;

        SubscriptionSignal(EventStreamSubscription eventStreamSubscription) {
            this.subscription = (EventStreamSubscription) Objects.requireNonNull(eventStreamSubscription, "subscription cannot be null");
        }

        public abstract void execute();
    }

    public AsyncMulticastEventStreamPublisher(EventStream<K, V> eventStream) {
        this(eventStream, new SequentialSubscriptionIdGenerator());
    }

    private AsyncMulticastEventStreamPublisher(EventStream<K, V> eventStream, SubscriptionIdGenerator subscriptionIdGenerator) {
        this.eventLoop = new EventLoop();
        this.closed = new AtomicBoolean(false);
        this.subscriptions = new ConcurrentHashMap<>();
        this.stream = (EventStream) Objects.requireNonNull(eventStream, "stream cannot be null");
        this.idGenerator = (SubscriptionIdGenerator) Objects.requireNonNull(subscriptionIdGenerator, "subscriptionIdGenerator cannot be null");
        init();
    }

    private void init() {
        this.stream.open(blockingRecordQueue -> {
            this.queue = blockingRecordQueue;
            blockingRecordQueue.setQueueCallback(new QueueCallback() { // from class: io.streamthoughts.azkarra.api.events.reactive.AsyncMulticastEventStreamPublisher.1
                @Override // io.streamthoughts.azkarra.api.events.callback.QueueCallback
                public void onClosed() {
                    AsyncMulticastEventStreamPublisher.this.closed.set(true);
                    AsyncMulticastEventStreamPublisher.this.eventLoop.signal(Complete.Instance);
                    AsyncMulticastEventStreamPublisher.this.eventLoop.awaitTerminationAndDoComplete(AsyncMulticastEventStreamPublisher.AWAIT_TERMINATION_TIMEOUT);
                }

                @Override // io.streamthoughts.azkarra.api.events.callback.QueueCallback
                public void onQueued() {
                    AsyncMulticastEventStreamPublisher.this.eventLoop.signal(Send.Instance);
                }
            });
        });
    }

    @Override // io.streamthoughts.azkarra.api.events.reactive.EventStreamPublisher
    public String type() {
        return this.stream.type();
    }

    @Override // io.streamthoughts.azkarra.api.events.reactive.EventStreamPublisher, java.util.concurrent.Flow.Publisher
    public void subscribe(Flow.Subscriber<? super KV<K, V>> subscriber) {
        Objects.requireNonNull(subscriber, "subscriber cannot be null");
        if (!this.closed.get()) {
            new InternalEventStreamSubscription(subscriber, this.idGenerator.generateNext()).init();
        } else {
            subscriber.onSubscribe(new Flow.Subscription() { // from class: io.streamthoughts.azkarra.api.events.reactive.AsyncMulticastEventStreamPublisher.2
                @Override // java.util.concurrent.Flow.Subscription
                public void cancel() {
                }

                @Override // java.util.concurrent.Flow.Subscription
                public void request(long j) {
                }
            });
            subscriber.onComplete();
        }
    }

    private boolean hasMoreRecords() {
        return !this.queue.isEmpty();
    }

    private boolean hasActiveSubscriptions() {
        return this.subscriptions.values().stream().anyMatch((v0) -> {
            return v0.canReceived();
        });
    }

    private Collection<EventStreamSubscription<KV<K, V>>> subscriptions() {
        return this.subscriptions.values();
    }
}
