package io.activej.datastream;

import io.activej.async.process.AsyncCloseable;
import io.activej.csp.ChannelConsumer;
import io.activej.datastream.StreamConsumers;
import io.activej.datastream.processor.StreamConsumerTransformer;
import io.activej.datastream.processor.StreamTransformer;
import io.activej.eventloop.Eventloop;
import io.activej.promise.CompleteNullPromise;
import io.activej.promise.Promise;
import java.util.function.Consumer;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/activej/datastream/StreamConsumer.class */
public interface StreamConsumer<T> extends AsyncCloseable {
    void consume(@NotNull StreamSupplier<T> streamSupplier);

    @Nullable
    StreamDataAcceptor<T> getDataAcceptor();

    Promise<Void> getAcknowledgement();

    default boolean isComplete() {
        return getAcknowledgement().isComplete();
    }

    default boolean isResult() {
        return getAcknowledgement().isResult();
    }

    default boolean isException() {
        return getAcknowledgement().isException();
    }

    static <T> StreamConsumer<T> idle() {
        return new StreamConsumers.Idle();
    }

    static <T> StreamConsumer<T> skip() {
        return new StreamConsumers.Skip();
    }

    static <T> StreamConsumer<T> of(Consumer<T> consumer) {
        return new StreamConsumers.OfConsumer(consumer);
    }

    static <T> StreamConsumer<T> closingWithError(Throwable th) {
        return new StreamConsumers.ClosingWithError(th);
    }

    static <T> StreamConsumer<T> ofPromise(Promise<? extends StreamConsumer<T>> promise) {
        return promise.isResult() ? (StreamConsumer) promise.getResult() : new StreamConsumers.OfPromise(promise);
    }

    static <T> StreamConsumer<T> ofChannelConsumer(ChannelConsumer<T> channelConsumer) {
        return new StreamConsumers.OfChannelConsumer(channelConsumer);
    }

    static <T> StreamConsumer<T> ofSupplier(Function<StreamSupplier<T>, Promise<Void>> function) {
        StreamTransformer identity = StreamTransformer.identity();
        CompleteNullPromise completeNullPromise = (Promise) function.apply(identity.getOutput());
        StreamConsumer<T> streamConsumer = (StreamConsumer<T>) identity.getInput();
        return completeNullPromise == Promise.complete() ? streamConsumer : streamConsumer.withAcknowledgement(promise -> {
            return promise.both(completeNullPromise);
        });
    }

    static <T> StreamConsumer<T> ofAnotherEventloop(@NotNull Eventloop eventloop, @NotNull StreamConsumer<T> streamConsumer) {
        return Eventloop.getCurrentEventloop() == eventloop ? streamConsumer : new StreamConsumers.OfAnotherEventloop(eventloop, streamConsumer);
    }

    default <R> R transformWith(StreamConsumerTransformer<T, R> streamConsumerTransformer) {
        return streamConsumerTransformer.transform(this);
    }

    default StreamConsumer<T> withAcknowledgement(Function<Promise<Void>, Promise<Void>> function) {
        Promise<Void> acknowledgement = getAcknowledgement();
        final Promise<Void> apply = function.apply(acknowledgement);
        return acknowledgement == apply ? this : new ForwardingStreamConsumer<T>(this) { // from class: io.activej.datastream.StreamConsumer.1
            @Override // io.activej.datastream.ForwardingStreamConsumer, io.activej.datastream.StreamConsumer
            public Promise<Void> getAcknowledgement() {
                return apply;
            }
        };
    }
}
