package io.activej.datastream;

import io.activej.common.Checks;
import io.activej.eventloop.Eventloop;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import java.util.ArrayDeque;
import java.util.Objects;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/activej/datastream/AbstractStreamSupplier.class */
public abstract class AbstractStreamSupplier<T> implements StreamSupplier<T> {
    private static final boolean CHECK = Checks.isEnabled(AbstractStreamSupplier.class);
    public static final StreamDataAcceptor<?> NO_ACCEPTOR = obj -> {
    };

    @Nullable
    private StreamDataAcceptor<T> dataAcceptor;
    private StreamDataAcceptor<T> dataAcceptorBuffered;
    private StreamConsumer<T> consumer;
    private boolean flushRequest;
    private boolean flushRunning;
    private boolean initialized;
    private int flushAsync;
    private boolean endOfStreamRequest;

    @Nullable
    private SettablePromise<Void> flushPromise;
    private final ArrayDeque<T> buffer = new ArrayDeque<>();
    private final SettablePromise<Void> endOfStream = new SettablePromise<>();
    private final SettablePromise<Void> acknowledgement = new SettablePromise<>();
    protected final Eventloop eventloop = Eventloop.getCurrentEventloop();

    public AbstractStreamSupplier() {
        ArrayDeque<T> arrayDeque = this.buffer;
        Objects.requireNonNull(arrayDeque);
        this.dataAcceptorBuffered = arrayDeque::addLast;
        if (this.eventloop.inEventloopThread()) {
            this.eventloop.post(this::ensureInitialized);
        } else {
            this.eventloop.execute(this::ensureInitialized);
        }
    }

    @Override // io.activej.datastream.StreamSupplier
    public final Promise<Void> streamTo(@NotNull StreamConsumer<T> streamConsumer) {
        if (CHECK) {
            Checks.checkState(this.eventloop.inEventloopThread(), "Not in eventloop thread");
        }
        Checks.checkState(!isStarted());
        ensureInitialized();
        this.consumer = streamConsumer;
        streamConsumer.getAcknowledgement().whenResult(this::acknowledge).whenException(this::closeEx);
        if (!isEndOfStream()) {
            onStarted();
        }
        this.dataAcceptor = (StreamDataAcceptor<T>) NO_ACCEPTOR;
        streamConsumer.consume(this);
        updateDataAcceptor();
        return this.acknowledgement;
    }

    protected void onInit() {
    }

    protected void onStarted() {
    }

    public final boolean isStarted() {
        return this.consumer != null;
    }

    public final StreamConsumer<T> getConsumer() {
        return this.consumer;
    }

    @Override // io.activej.datastream.StreamSupplier
    public final void updateDataAcceptor() {
        StreamDataAcceptor<T> dataAcceptor;
        if (CHECK) {
            Checks.checkState(this.eventloop.inEventloopThread(), "Not in eventloop thread");
        }
        if (!isStarted() || this.endOfStream.isComplete() || this.dataAcceptor == (dataAcceptor = this.consumer.getDataAcceptor())) {
            return;
        }
        this.dataAcceptor = dataAcceptor;
        if (dataAcceptor != null) {
            if (!isEndOfStream()) {
                this.dataAcceptorBuffered = dataAcceptor;
            }
            flush();
        } else {
            if (isEndOfStream()) {
                return;
            }
            ArrayDeque<T> arrayDeque = this.buffer;
            Objects.requireNonNull(arrayDeque);
            this.dataAcceptorBuffered = arrayDeque::addLast;
            onSuspended();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void asyncBegin() {
        this.flushAsync++;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void asyncEnd() {
        Checks.checkState(this.flushAsync > 0);
        this.flushAsync--;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void asyncResume() {
        Checks.checkState(this.flushAsync > 0);
        this.flushAsync--;
        resume();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void resume() {
        if (this.flushRunning) {
            this.flushRequest = true;
        } else {
            if (!isReady() || isEndOfStream()) {
                return;
            }
            onResumed();
        }
    }

    public final void send(T t) {
        this.dataAcceptorBuffered.accept(t);
    }

    public final Promise<Void> sendEndOfStream() {
        if (CHECK) {
            Checks.checkState(this.eventloop.inEventloopThread(), "Not in eventloop thread");
        }
        if (this.endOfStreamRequest) {
            return this.flushPromise;
        }
        if (this.flushAsync > 0) {
            asyncEnd();
        }
        this.endOfStreamRequest = true;
        this.dataAcceptorBuffered = (StreamDataAcceptor<T>) NO_ACCEPTOR;
        flush();
        return getFlushPromise();
    }

    @NotNull
    public final Promise<Void> getFlushPromise() {
        if (isEndOfStream()) {
            return this.endOfStream;
        }
        if (this.flushPromise != null) {
            return this.flushPromise;
        }
        if (this.dataAcceptor != null) {
            return Promise.complete();
        }
        this.flushPromise = new SettablePromise<>();
        return this.flushPromise;
    }

    private void ensureInitialized() {
        if (this.initialized) {
            return;
        }
        this.initialized = true;
        onInit();
    }

    private void flush() {
        if (CHECK) {
            Checks.checkState(this.eventloop.inEventloopThread(), "Not in eventloop thread");
        }
        this.flushRequest = true;
        if (this.flushRunning || this.flushAsync > 0 || this.endOfStream.isComplete() || !isStarted()) {
            return;
        }
        this.flushRunning = true;
        while (this.flushRequest) {
            this.flushRequest = false;
            while (isReady() && !this.buffer.isEmpty()) {
                this.dataAcceptor.accept(this.buffer.pollFirst());
            }
            if (isReady() && !isEndOfStream()) {
                onResumed();
            }
        }
        this.flushRunning = false;
        if (this.flushAsync <= 0 && this.buffer.isEmpty() && !this.endOfStream.isComplete()) {
            if (this.endOfStreamRequest) {
                this.dataAcceptor = null;
                if (this.flushPromise != null) {
                    this.flushPromise.set((Object) null);
                }
                this.endOfStream.set((Object) null);
                return;
            }
            if (this.flushPromise != null) {
                SettablePromise<Void> settablePromise = this.flushPromise;
                this.flushPromise = null;
                settablePromise.set((Object) null);
            }
        }
    }

    protected void onResumed() {
    }

    protected void onSuspended() {
    }

    @Nullable
    public final StreamDataAcceptor<T> getDataAcceptor() {
        return this.dataAcceptor;
    }

    @NotNull
    public final StreamDataAcceptor<T> getBufferedDataAcceptor() {
        return this.dataAcceptorBuffered;
    }

    public final boolean isReady() {
        return this.dataAcceptor != null;
    }

    @Override // io.activej.datastream.StreamSupplier
    public final Promise<Void> getEndOfStream() {
        if (CHECK) {
            Checks.checkState(this.eventloop.inEventloopThread(), "Not in eventloop thread");
        }
        return this.endOfStream;
    }

    public final boolean isEndOfStream() {
        return this.endOfStreamRequest;
    }

    private void acknowledge() {
        ensureInitialized();
        if (this.acknowledgement.trySet((Object) null)) {
            onAcknowledge();
            close();
            cleanup();
        }
    }

    protected void onAcknowledge() {
    }

    @Override // io.activej.datastream.StreamSupplier
    public final Promise<Void> getAcknowledgement() {
        return this.acknowledgement;
    }

    public final void closeEx(@NotNull Throwable th) {
        if (CHECK) {
            Checks.checkState(this.eventloop.inEventloopThread(), "Not in eventloop thread");
        }
        ensureInitialized();
        this.endOfStreamRequest = true;
        this.dataAcceptor = null;
        this.dataAcceptorBuffered = (StreamDataAcceptor<T>) NO_ACCEPTOR;
        if (this.flushPromise != null) {
            this.flushPromise.trySetException(th);
        }
        this.endOfStream.trySetException(th);
        if (this.acknowledgement.trySetException(th)) {
            onError(th);
            cleanup();
        }
    }

    protected void onError(Throwable th) {
    }

    private void cleanup() {
        onComplete();
        this.eventloop.post(this::onCleanup);
        this.buffer.clear();
        if (this.flushPromise != null) {
            this.flushPromise.resetCallbacks();
        }
    }

    protected void onComplete() {
    }

    protected void onCleanup() {
    }
}
