package io.activej.datastream.processor;

import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.promise.Promise;
import java.util.Objects;

/* loaded from: input_file:io/activej/datastream/processor/StreamTransducer.class */
public final class StreamTransducer<I, O> implements StreamTransformer<I, O> {
    private final Transducer<I, O, Object> transducer;
    private final StreamTransducer<I, O>.Input input = new Input();
    private final StreamTransducer<I, O>.Output output = new Output();
    private Object accumulator;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/activej/datastream/processor/StreamTransducer$Input.class */
    public final class Input extends AbstractStreamConsumer<I> {
        private Input() {
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onStarted() {
            StreamDataAcceptor<O> dataAcceptor;
            if (StreamTransducer.this.transducer.isOneToMany()) {
                Output output = StreamTransducer.this.output;
                Objects.requireNonNull(output);
                dataAcceptor = output::send;
            } else {
                dataAcceptor = StreamTransducer.this.output.getDataAcceptor();
            }
            StreamDataAcceptor<O> streamDataAcceptor = dataAcceptor;
            StreamTransducer.this.accumulator = StreamTransducer.this.transducer.onStarted(streamDataAcceptor);
            StreamTransducer.this.sync();
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onEndOfStream() {
            StreamDataAcceptor<O> dataAcceptor;
            if (StreamTransducer.this.transducer.isOneToMany()) {
                Output output = StreamTransducer.this.output;
                Objects.requireNonNull(output);
                dataAcceptor = output::send;
            } else {
                dataAcceptor = StreamTransducer.this.output.getDataAcceptor();
            }
            StreamTransducer.this.transducer.onEndOfStream(dataAcceptor, StreamTransducer.this.accumulator);
            StreamTransducer.this.output.sendEndOfStream();
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onCleanup() {
            StreamTransducer.this.accumulator = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/activej/datastream/processor/StreamTransducer$Output.class */
    public final class Output extends AbstractStreamSupplier<O> {
        private Output() {
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onResumed() {
            StreamTransducer.this.sync();
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onSuspended() {
            StreamTransducer.this.sync();
        }
    }

    private StreamTransducer(Transducer<I, O, ?> transducer) {
        this.transducer = transducer;
        Promise<Void> acknowledgement = this.input.getAcknowledgement();
        StreamTransducer<I, O>.Output output = this.output;
        Objects.requireNonNull(output);
        acknowledgement.whenException(output::closeEx);
        Promise<Void> endOfStream = this.output.getEndOfStream();
        StreamTransducer<I, O>.Input input = this.input;
        Objects.requireNonNull(input);
        Promise whenResult = endOfStream.whenResult(input::acknowledge);
        StreamTransducer<I, O>.Input input2 = this.input;
        Objects.requireNonNull(input2);
        whenResult.whenException(input2::closeEx);
    }

    public static <I, O> StreamTransducer<I, O> create(Transducer<I, O, ?> transducer) {
        return new StreamTransducer<>(transducer);
    }

    @Override // io.activej.datastream.dsl.HasStreamInput
    public StreamConsumer<I> getInput() {
        return this.input;
    }

    @Override // io.activej.datastream.dsl.HasStreamOutput
    public StreamSupplier<O> getOutput() {
        return this.output;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sync() {
        StreamDataAcceptor<O> dataAcceptor;
        Transducer<I, O, Object> transducer = this.transducer;
        if (transducer.isOneToMany()) {
            StreamTransducer<I, O>.Output output = this.output;
            Objects.requireNonNull(output);
            dataAcceptor = output::send;
        } else {
            dataAcceptor = this.output.getDataAcceptor();
        }
        StreamDataAcceptor<O> streamDataAcceptor = dataAcceptor;
        Object obj = this.accumulator;
        if (this.output.isReady()) {
            this.input.resume(obj2 -> {
                transducer.onItem(streamDataAcceptor, obj2, obj);
            });
        } else {
            this.input.suspend();
        }
    }
}
