package io.activej.datastream.processor;

import io.activej.common.Preconditions;
import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.dsl.HasStreamInput;
import io.activej.datastream.dsl.HasStreamOutputs;
import io.activej.eventloop.Eventloop;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Function;

/* loaded from: input_file:io/activej/datastream/processor/StreamSplitter.class */
public final class StreamSplitter<I, O> implements HasStreamInput<I>, HasStreamOutputs<O> {
    private final Function<StreamDataAcceptor<O>[], StreamDataAcceptor<I>> acceptorFactory;
    private boolean started;
    private final List<StreamSplitter<I, O>.Output> outputs = new ArrayList();
    private StreamDataAcceptor<O>[] dataAcceptors = new StreamDataAcceptor[8];
    private final StreamSplitter<I, O>.Input input = new Input();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/activej/datastream/processor/StreamSplitter$Input.class */
    public final class Input extends AbstractStreamConsumer<I> {
        private Input() {
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onStarted() {
            StreamSplitter.this.sync();
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onEndOfStream() {
            Iterator it = StreamSplitter.this.outputs.iterator();
            while (it.hasNext()) {
                ((Output) it.next()).sendEndOfStream();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/activej/datastream/processor/StreamSplitter$Output.class */
    public final class Output extends AbstractStreamSupplier<O> {
        final int index;

        public Output(int i) {
            this.index = i;
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onResumed() {
            StreamSplitter.this.dataAcceptors[this.index] = getDataAcceptor();
            StreamSplitter.this.sync();
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onSuspended() {
            StreamSplitter.this.dataAcceptors[this.index] = getDataAcceptor();
            StreamSplitter.this.sync();
        }
    }

    private StreamSplitter(Function<StreamDataAcceptor<O>[], StreamDataAcceptor<I>> function) {
        this.acceptorFactory = function;
    }

    public static <I, O> StreamSplitter<I, O> create(BiConsumer<I, StreamDataAcceptor<O>[]> biConsumer) {
        return create(streamDataAcceptorArr -> {
            return obj -> {
                biConsumer.accept(obj, streamDataAcceptorArr);
            };
        });
    }

    public static <I, O> StreamSplitter<I, O> create(Function<StreamDataAcceptor<O>[], StreamDataAcceptor<I>> function) {
        StreamSplitter<I, O> streamSplitter = new StreamSplitter<>(function);
        Eventloop currentEventloop = Eventloop.getCurrentEventloop();
        Objects.requireNonNull(streamSplitter);
        currentEventloop.post(streamSplitter::start);
        return streamSplitter;
    }

    public StreamSupplier<O> newOutput() {
        Preconditions.checkState(!this.started, "Cannot add new inputs after StreamUnion has been started");
        StreamSplitter<I, O>.Output output = new Output(this.outputs.size());
        this.outputs.add(output);
        if (this.outputs.size() > this.dataAcceptors.length) {
            this.dataAcceptors = (StreamDataAcceptor[]) Arrays.copyOf(this.dataAcceptors, this.dataAcceptors.length * 2);
        }
        return output;
    }

    @Override // io.activej.datastream.dsl.HasStreamInput
    public StreamConsumer<I> getInput() {
        return this.input;
    }

    @Override // io.activej.datastream.dsl.HasStreamOutputs
    public List<? extends StreamSupplier<O>> getOutputs() {
        return this.outputs;
    }

    private void start() {
        this.started = true;
        this.dataAcceptors = (StreamDataAcceptor[]) Arrays.copyOf(this.dataAcceptors, this.outputs.size());
        this.input.getAcknowledgement().whenException(th -> {
            this.outputs.forEach(output -> {
                output.closeEx(th);
            });
        });
        Promise all = Promises.all(this.outputs.stream().map((v0) -> {
            return v0.getEndOfStream();
        }));
        StreamSplitter<I, O>.Input input = this.input;
        Objects.requireNonNull(input);
        Promise whenResult = all.whenResult(input::acknowledge);
        StreamSplitter<I, O>.Input input2 = this.input;
        Objects.requireNonNull(input2);
        whenResult.whenException(input2::closeEx);
        sync();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sync() {
        if (this.started) {
            if (this.outputs.stream().allMatch((v0) -> {
                return v0.isReady();
            })) {
                this.input.resume(this.acceptorFactory.apply(this.dataAcceptors));
            } else {
                this.input.suspend();
            }
        }
    }
}
