package io.activej.datastream.processor;

import io.activej.common.initializer.WithInitializer;
import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.dsl.HasStreamInputs;
import io.activej.datastream.dsl.HasStreamOutput;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Deque;
import java.util.List;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/activej/datastream/processor/StreamLeftJoin.class */
public final class StreamLeftJoin<K, L, R, V> implements HasStreamInputs, HasStreamOutput<V>, WithInitializer<StreamLeftJoin<K, L, R, V>> {
    private final Comparator<K> keyComparator;
    private final Function<L, K> leftKeyFunction;
    private final Function<R, K> rightKeyFunction;
    private final LeftJoiner<K, L, R, V> leftJoiner;
    private final ArrayDeque<L> leftDeque = new ArrayDeque<>();
    private final ArrayDeque<R> rightDeque = new ArrayDeque<>();
    private final StreamLeftJoin<K, L, R, V>.Input<L> left = new Input<>(this.leftDeque);
    private final StreamLeftJoin<K, L, R, V>.Input<R> right = new Input<>(this.rightDeque);
    private final StreamLeftJoin<K, L, R, V>.Output output = new Output();

    /* loaded from: input_file:io/activej/datastream/processor/StreamLeftJoin$Input.class */
    private final class Input<I> extends AbstractStreamConsumer<I> implements StreamDataAcceptor<I> {
        private final Deque<I> deque;

        public Input(Deque<I> deque) {
            this.deque = deque;
        }

        @Override // io.activej.datastream.StreamDataAcceptor
        public void accept(I i) {
            boolean isEmpty = this.deque.isEmpty();
            this.deque.addLast(i);
            if (isEmpty) {
                StreamLeftJoin.this.output.join();
            }
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onStarted() {
            StreamLeftJoin.this.output.join();
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onEndOfStream() {
            StreamLeftJoin.this.output.join();
            StreamLeftJoin.this.output.getAcknowledgement().whenResult(this::acknowledge).whenException(this::closeEx);
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onError(Exception exc) {
            StreamLeftJoin.this.output.closeEx(exc);
        }
    }

    /* loaded from: input_file:io/activej/datastream/processor/StreamLeftJoin$LeftInnerLeftJoiner.class */
    public static abstract class LeftInnerLeftJoiner<K, L, R, V> implements LeftJoiner<K, L, R, V> {
        @Override // io.activej.datastream.processor.StreamLeftJoin.LeftJoiner
        public void onOuterJoin(K k, L l, StreamDataAcceptor<V> streamDataAcceptor) {
        }
    }

    /* loaded from: input_file:io/activej/datastream/processor/StreamLeftJoin$LeftJoiner.class */
    public interface LeftJoiner<K, L, R, V> {
        void onInnerJoin(K k, L l, R r, StreamDataAcceptor<V> streamDataAcceptor);

        void onOuterJoin(K k, L l, StreamDataAcceptor<V> streamDataAcceptor);
    }

    /* loaded from: input_file:io/activej/datastream/processor/StreamLeftJoin$Output.class */
    private final class Output extends AbstractStreamSupplier<V> {
        private Output() {
        }

        void join() {
            resume();
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onResumed() {
            StreamDataAcceptor streamDataAcceptor = this::send;
            if (isReady() && !StreamLeftJoin.this.leftDeque.isEmpty() && !StreamLeftJoin.this.rightDeque.isEmpty()) {
                Object peek = StreamLeftJoin.this.leftDeque.peek();
                Object apply = StreamLeftJoin.this.leftKeyFunction.apply(peek);
                Object peek2 = StreamLeftJoin.this.rightDeque.peek();
                Object apply2 = StreamLeftJoin.this.rightKeyFunction.apply(peek2);
                while (true) {
                    int compare = StreamLeftJoin.this.keyComparator.compare(apply, apply2);
                    if (compare >= 0) {
                        if (compare <= 0) {
                            StreamLeftJoin.this.leftJoiner.onInnerJoin(apply, peek, peek2, streamDataAcceptor);
                            StreamLeftJoin.this.leftDeque.poll();
                            if (StreamLeftJoin.this.leftDeque.isEmpty() || !isReady()) {
                                break;
                            }
                            peek = StreamLeftJoin.this.leftDeque.peek();
                            apply = StreamLeftJoin.this.leftKeyFunction.apply(peek);
                        } else {
                            StreamLeftJoin.this.rightDeque.poll();
                            if (StreamLeftJoin.this.rightDeque.isEmpty()) {
                                break;
                            }
                            peek2 = StreamLeftJoin.this.rightDeque.peek();
                            apply2 = StreamLeftJoin.this.rightKeyFunction.apply(peek2);
                        }
                    } else {
                        StreamLeftJoin.this.leftJoiner.onOuterJoin(apply, peek, streamDataAcceptor);
                        StreamLeftJoin.this.leftDeque.poll();
                        if (StreamLeftJoin.this.leftDeque.isEmpty()) {
                            break;
                        }
                        peek = StreamLeftJoin.this.leftDeque.peek();
                        apply = StreamLeftJoin.this.leftKeyFunction.apply(peek);
                    }
                }
            }
            if (!isReady()) {
                StreamLeftJoin.this.left.suspend();
                StreamLeftJoin.this.right.suspend();
            } else {
                if (!StreamLeftJoin.this.left.isEndOfStream() || !StreamLeftJoin.this.right.isEndOfStream()) {
                    StreamLeftJoin.this.left.resume(StreamLeftJoin.this.left);
                    StreamLeftJoin.this.right.resume(StreamLeftJoin.this.right);
                    return;
                }
                while (!StreamLeftJoin.this.leftDeque.isEmpty()) {
                    Object poll = StreamLeftJoin.this.leftDeque.poll();
                    StreamLeftJoin.this.leftJoiner.onOuterJoin(StreamLeftJoin.this.leftKeyFunction.apply(poll), poll, streamDataAcceptor);
                }
                sendEndOfStream();
            }
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onError(Exception exc) {
            StreamLeftJoin.this.left.closeEx(exc);
            StreamLeftJoin.this.right.closeEx(exc);
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onCleanup() {
            StreamLeftJoin.this.leftDeque.clear();
            StreamLeftJoin.this.rightDeque.clear();
        }
    }

    /* loaded from: input_file:io/activej/datastream/processor/StreamLeftJoin$ValueLeftJoiner.class */
    public static abstract class ValueLeftJoiner<K, L, R, V> implements LeftJoiner<K, L, R, V> {
        @Nullable
        public abstract V doInnerJoin(K k, L l, R r);

        @Nullable
        public V doOuterJoin(K k, L l) {
            return null;
        }

        @Override // io.activej.datastream.processor.StreamLeftJoin.LeftJoiner
        public final void onInnerJoin(K k, L l, R r, StreamDataAcceptor<V> streamDataAcceptor) {
            V doInnerJoin = doInnerJoin(k, l, r);
            if (doInnerJoin != null) {
                streamDataAcceptor.accept(doInnerJoin);
            }
        }

        @Override // io.activej.datastream.processor.StreamLeftJoin.LeftJoiner
        public final void onOuterJoin(K k, L l, StreamDataAcceptor<V> streamDataAcceptor) {
            V doOuterJoin = doOuterJoin(k, l);
            if (doOuterJoin != null) {
                streamDataAcceptor.accept(doOuterJoin);
            }
        }
    }

    private StreamLeftJoin(@NotNull Comparator<K> comparator, @NotNull Function<L, K> function, @NotNull Function<R, K> function2, @NotNull LeftJoiner<K, L, R, V> leftJoiner) {
        this.keyComparator = comparator;
        this.leftJoiner = leftJoiner;
        this.leftKeyFunction = function;
        this.rightKeyFunction = function2;
    }

    public static <K, L, R, V> StreamLeftJoin<K, L, R, V> create(Comparator<K> comparator, Function<L, K> function, Function<R, K> function2, LeftJoiner<K, L, R, V> leftJoiner) {
        return new StreamLeftJoin<>(comparator, function, function2, leftJoiner);
    }

    public StreamConsumer<L> getLeft() {
        return this.left;
    }

    public StreamConsumer<R> getRight() {
        return this.right;
    }

    @Override // io.activej.datastream.dsl.HasStreamInputs
    public List<? extends StreamConsumer<?>> getInputs() {
        return Arrays.asList(this.left, this.right);
    }

    @Override // io.activej.datastream.dsl.HasStreamOutput
    public StreamSupplier<V> getOutput() {
        return this.output;
    }
}
