package io.activej.datastream.processor;

import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.promise.Promise;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Predicate;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/activej/datastream/processor/StreamFilter.class */
public abstract class StreamFilter<I, O> implements StreamTransformer<I, O> {
    private final StreamFilter<I, O>.Input input = new Input();
    private final StreamFilter<I, O>.Output output = new Output();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/activej/datastream/processor/StreamFilter$Input.class */
    public final class Input extends AbstractStreamConsumer<I> {
        private Input() {
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onStarted() {
            StreamFilter streamFilter = StreamFilter.this;
            Output output = StreamFilter.this.output;
            Objects.requireNonNull(output);
            streamFilter.onStarted(output::send);
            StreamFilter.this.sync();
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onEndOfStream() {
            StreamFilter streamFilter = StreamFilter.this;
            Output output = StreamFilter.this.output;
            Objects.requireNonNull(output);
            streamFilter.onEndOfStream(output::send);
            StreamFilter.this.output.sendEndOfStream();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/activej/datastream/processor/StreamFilter$Output.class */
    public final class Output extends AbstractStreamSupplier<O> {
        private Output() {
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onResumed() {
            StreamFilter.this.sync();
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onSuspended() {
            StreamFilter.this.sync();
        }
    }

    public StreamFilter() {
        Promise<Void> acknowledgement = this.input.getAcknowledgement();
        StreamFilter<I, O>.Output output = this.output;
        Objects.requireNonNull(output);
        acknowledgement.whenException(output::closeEx);
        Promise<Void> acknowledgement2 = this.output.getAcknowledgement();
        StreamFilter<I, O>.Input input = this.input;
        Objects.requireNonNull(input);
        Promise whenResult = acknowledgement2.whenResult(input::acknowledge);
        StreamFilter<I, O>.Input input2 = this.input;
        Objects.requireNonNull(input2);
        whenResult.whenException(input2::closeEx);
    }

    public static <T> StreamFilter<T, T> create(final Predicate<T> predicate) {
        return new StreamFilter<T, T>() { // from class: io.activej.datastream.processor.StreamFilter.1
            @Override // io.activej.datastream.processor.StreamFilter
            @NotNull
            protected StreamDataAcceptor<T> onResumed(@NotNull StreamDataAcceptor<T> streamDataAcceptor) {
                Predicate predicate2 = predicate;
                return obj -> {
                    if (predicate2.test(obj)) {
                        streamDataAcceptor.accept(obj);
                    }
                };
            }
        };
    }

    public static <I, O> StreamFilter<I, O> mapper(final Function<I, O> function) {
        return new StreamFilter<I, O>() { // from class: io.activej.datastream.processor.StreamFilter.2
            @Override // io.activej.datastream.processor.StreamFilter
            @NotNull
            protected StreamDataAcceptor<I> onResumed(@NotNull StreamDataAcceptor<O> streamDataAcceptor) {
                Function function2 = function;
                return obj -> {
                    streamDataAcceptor.accept(function2.apply(obj));
                };
            }
        };
    }

    @Override // io.activej.datastream.dsl.HasStreamInput
    public StreamConsumer<I> getInput() {
        return this.input;
    }

    @Override // io.activej.datastream.dsl.HasStreamOutput
    public StreamSupplier<O> getOutput() {
        return this.output;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sync() {
        StreamDataAcceptor<O> dataAcceptor = this.output.getDataAcceptor();
        if (dataAcceptor != null) {
            this.input.resume(onResumed(isOneToMany() ? this.output.getBufferedDataAcceptor() : dataAcceptor));
        } else {
            this.input.suspend();
        }
    }

    protected boolean isOneToMany() {
        return false;
    }

    protected void onStarted(@NotNull StreamDataAcceptor<O> streamDataAcceptor) {
    }

    protected void onEndOfStream(@NotNull StreamDataAcceptor<O> streamDataAcceptor) {
    }

    @NotNull
    protected abstract StreamDataAcceptor<I> onResumed(@NotNull StreamDataAcceptor<O> streamDataAcceptor);
}
