package io.datakernel.stream.net;

import io.datakernel.async.SettableStage;
import io.datakernel.async.Stage;
import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.eventloop.AsyncTcpSocket;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamStatus;

/* loaded from: input_file:io/datakernel/stream/net/SocketStreamConsumer.class */
final class SocketStreamConsumer extends AbstractStreamConsumer<ByteBuf> implements StreamDataReceiver<ByteBuf> {
    private final AsyncTcpSocket asyncTcpSocket;
    private final SettableStage<Void> sentStage;
    private int writeLoop;
    private boolean sent;

    private SocketStreamConsumer(AsyncTcpSocket asyncTcpSocket, SettableStage<Void> settableStage) {
        this.asyncTcpSocket = asyncTcpSocket;
        this.sentStage = settableStage;
    }

    public static SocketStreamConsumer create(AsyncTcpSocket asyncTcpSocket) {
        return new SocketStreamConsumer(asyncTcpSocket, SettableStage.create());
    }

    @Override // io.datakernel.stream.AbstractStreamConsumer
    protected void onStarted() {
        getProducer().produce(this);
    }

    @Override // io.datakernel.stream.AbstractStreamConsumer
    public void onEndOfStream() {
        this.asyncTcpSocket.writeEndOfStream();
    }

    @Override // io.datakernel.stream.AbstractStreamConsumer
    protected void onError(Throwable th) {
        this.sentStage.setException(th);
    }

    @Override // io.datakernel.stream.StreamDataReceiver
    public void onData(ByteBuf byteBuf) {
        if (getStatus().isClosed()) {
            byteBuf.recycle();
            return;
        }
        this.asyncTcpSocket.write(byteBuf);
        int loop = this.eventloop.getLoop();
        if (this.writeLoop == 0) {
            this.writeLoop = loop;
        } else if (loop != this.writeLoop) {
            this.writeLoop = loop;
            getProducer().suspend();
        }
    }

    public void onWrite() {
        this.writeLoop = 0;
        if (getStatus().isOpen()) {
            getProducer().produce(this);
        } else if (getStatus() == StreamStatus.END_OF_STREAM) {
            this.sent = true;
            this.sentStage.set((Object) null);
        }
    }

    public boolean isClosed() {
        return !isWired() || this.sent || getStatus() == StreamStatus.CLOSED_WITH_ERROR;
    }

    public Stage<Void> getSentStage() {
        return this.sentStage;
    }
}
