package io.activej.datastream;

import io.activej.common.Check;
import io.activej.common.Preconditions;
import io.activej.eventloop.Eventloop;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/activej/datastream/AbstractStreamConsumer.class */
public abstract class AbstractStreamConsumer<T> implements StreamConsumer<T> {
    private StreamSupplier<T> supplier;
    private boolean endOfStream;
    private boolean initialized;

    @Nullable
    private StreamDataAcceptor<T> dataAcceptor;
    private final boolean CHECK = Check.isEnabled(getClass());
    private final SettablePromise<Void> acknowledgement = new SettablePromise<>();
    protected final Eventloop eventloop = Eventloop.getCurrentEventloop();

    public AbstractStreamConsumer() {
        if (this.eventloop.inEventloopThread()) {
            this.eventloop.post(this::tryInitialize);
        } else {
            this.eventloop.execute(this::tryInitialize);
        }
    }

    @Override // io.activej.datastream.StreamConsumer
    public final void consume(@NotNull StreamSupplier<T> streamSupplier) {
        if (this.CHECK) {
            Preconditions.checkState(this.eventloop.inEventloopThread(), "Not in eventloop thread");
        }
        Preconditions.checkState(!isStarted());
        tryInitialize();
        if (this.acknowledgement.isComplete()) {
            return;
        }
        this.supplier = streamSupplier;
        if (!streamSupplier.isException()) {
            onStarted();
        }
        streamSupplier.getEndOfStream().whenResult(this::endOfStream).whenException(this::closeEx);
    }

    @Override // io.activej.datastream.StreamConsumer
    @Nullable
    public final StreamDataAcceptor<T> getDataAcceptor() {
        return this.dataAcceptor;
    }

    protected void onInit() {
    }

    protected void onStarted() {
    }

    public final boolean isStarted() {
        return this.supplier != null;
    }

    public final StreamSupplier<T> getSupplier() {
        return this.supplier;
    }

    private void endOfStream() {
        if (this.CHECK) {
            Preconditions.checkState(this.eventloop.inEventloopThread(), "Not in eventloop thread");
        }
        if (this.endOfStream) {
            return;
        }
        this.endOfStream = true;
        onEndOfStream();
    }

    protected void onEndOfStream() {
    }

    public final void resume(@Nullable StreamDataAcceptor<T> streamDataAcceptor) {
        if (this.CHECK) {
            Preconditions.checkState(this.eventloop.inEventloopThread(), "Not in eventloop thread");
        }
        if (this.endOfStream || this.dataAcceptor == streamDataAcceptor) {
            return;
        }
        this.dataAcceptor = streamDataAcceptor;
        if (isStarted()) {
            this.supplier.updateDataAcceptor();
        }
    }

    public final void suspend() {
        resume(null);
    }

    public final void acknowledge() {
        if (this.CHECK) {
            Preconditions.checkState(this.eventloop.inEventloopThread(), "Not in eventloop thread");
        }
        this.endOfStream = true;
        if (this.acknowledgement.trySet((Object) null)) {
            tryInitialize();
            cleanup();
        }
    }

    @Override // io.activej.datastream.StreamConsumer
    public final Promise<Void> getAcknowledgement() {
        if (this.CHECK) {
            Preconditions.checkState(this.eventloop.inEventloopThread(), "Not in eventloop thread");
        }
        return this.acknowledgement;
    }

    public final boolean isEndOfStream() {
        return this.endOfStream;
    }

    public final void closeEx(@NotNull Throwable th) {
        if (this.CHECK) {
            Preconditions.checkState(this.eventloop.inEventloopThread(), "Not in eventloop thread");
        }
        this.endOfStream = true;
        if (this.acknowledgement.trySetException(th)) {
            tryInitialize();
            onError(th);
            cleanup();
        }
    }

    protected void onError(Throwable th) {
    }

    private void tryInitialize() {
        if (this.initialized) {
            return;
        }
        this.initialized = true;
        onInit();
    }

    private void cleanup() {
        onComplete();
        this.eventloop.post(this::onCleanup);
        this.acknowledgement.resetCallbacks();
        this.dataAcceptor = null;
    }

    protected void onComplete() {
    }

    protected void onCleanup() {
    }
}
