package co.easimart.vertx.stream;

import io.netty.buffer.ByteBuf;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/easimart/vertx/stream/ReadBufferInputStream.class */
public class ReadBufferInputStream extends InputStream {
    private static Logger logger = LoggerFactory.getLogger(ReadBufferInputStream.class);
    private final ReadStream<Buffer> readStream;
    private Long totalReadSize;
    private final ConcurrentLinkedQueue<ByteBuf> buffers;
    private final AtomicInteger currentBufferSize;
    private final AtomicBoolean completed;
    private final AtomicBoolean closed;
    private final AtomicBoolean paused;
    private final AtomicReference<Throwable> canceled;
    private Long totalConsumedSize;
    final int MAX_BUFFER_SIZE = 4194304;

    protected void setupHandlers() {
        this.readStream.handler(buffer -> {
            if (this.closed.get() || this.canceled.get() != null) {
                this.readStream.handler((Handler) null);
                return;
            }
            ByteBuf byteBuf = buffer.getByteBuf();
            int readableBytes = byteBuf.readableBytes();
            this.totalReadSize = Long.valueOf(this.totalReadSize.longValue() + readableBytes);
            this.buffers.add(byteBuf);
            if (this.currentBufferSize.compareAndSet(0, readableBytes)) {
                synchronized (this) {
                    notifyAll();
                }
            } else if (this.currentBufferSize.addAndGet(readableBytes) >= 4194304) {
                pauseReadStreamIfBufferIsFull();
            }
        });
        this.readStream.endHandler(r4 -> {
            this.completed.set(true);
            if (isBufferEmpty()) {
                synchronized (this) {
                    notifyAll();
                }
            }
        });
        this.readStream.exceptionHandler(this::cancel);
    }

    private void pauseReadStreamIfBufferIsFull() {
        synchronized (this.paused) {
            if (isBufferFull() && !this.completed.get() && this.paused.compareAndSet(false, true)) {
                this.readStream.pause();
            }
        }
    }

    private void resumeReadStreamIfBufferIsNotFull() {
        synchronized (this.paused) {
            if (!isBufferFull() && !this.completed.get() && this.paused.compareAndSet(true, false)) {
                this.readStream.resume();
            }
        }
    }

    public ReadBufferInputStream(ReadStream<Buffer> readStream) {
        this.readStream = readStream;
        setupHandlers();
        this.buffers = new ConcurrentLinkedQueue<>();
        this.currentBufferSize = new AtomicInteger(0);
        this.completed = new AtomicBoolean(false);
        this.closed = new AtomicBoolean(false);
        this.canceled = new AtomicReference<>(null);
        this.paused = new AtomicBoolean(false);
        this.totalConsumedSize = 0L;
        this.totalReadSize = 0L;
    }

    public boolean isBufferFull() {
        return this.currentBufferSize.get() >= 4194304;
    }

    public boolean isBufferEmpty() {
        return this.currentBufferSize.get() <= 0;
    }

    private void checkClosedOrCanceled() throws IOException {
        if (this.closed.get()) {
            throw new IOException("The stream has been closed");
        }
        Throwable th = this.canceled.get();
        if (th != null) {
            throw new IOException("The stream is unexpectedly stopped", th);
        }
    }

    @Override // java.io.InputStream
    public synchronized int read() throws IOException {
        byte[] bArr = new byte[1];
        for (int i = 0; i < 10; i++) {
            int read = read(bArr, 0, 1);
            if (read < 0) {
                return read;
            }
            if (read == 1) {
                return bArr[0];
            }
        }
        throw new IllegalStateException("Cannot read one byte because InputStream.read() keeps returning zero for 10 times.");
    }

    @Override // java.io.InputStream
    public synchronized int read(@Nonnull byte[] bArr, int i, int i2) throws IOException {
        checkClosedOrCanceled();
        while (isBufferEmpty()) {
            if (this.completed.get()) {
                return -1;
            }
            checkClosedOrCanceled();
            logger.debug("Stream is waiting. read={}, consumed={}, buffer size={}", new Object[]{getTotalReceivedSize(), getTotalConsumedSize(), Integer.valueOf(this.currentBufferSize.get())});
            try {
                wait(10000L);
            } catch (InterruptedException e) {
            }
        }
        ConcurrentLinkedQueue<ByteBuf> concurrentLinkedQueue = this.buffers;
        ByteBuf peek = concurrentLinkedQueue.peek();
        int min = Math.min(i2, peek.readableBytes());
        peek.readBytes(bArr, i, min);
        if (peek.readableBytes() == 0) {
            concurrentLinkedQueue.poll();
        }
        reclaimBufferSpace(min);
        this.totalConsumedSize = Long.valueOf(this.totalConsumedSize.longValue() + min);
        return min;
    }

    private boolean reclaimBufferSpace(int i) {
        long addAndGet = this.currentBufferSize.addAndGet(-i);
        if (addAndGet + i < 4194304 || addAndGet >= 4194304) {
            return false;
        }
        resumeReadStreamIfBufferIsNotFull();
        return true;
    }

    @Override // java.io.InputStream
    public synchronized int available() throws IOException {
        return this.currentBufferSize.get();
    }

    @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() throws IOException {
        this.closed.set(true);
        this.buffers.clear();
        this.currentBufferSize.set(0);
        notifyAll();
    }

    public void cancel(Throwable th) {
        this.canceled.set(th);
        this.buffers.clear();
        this.currentBufferSize.set(0);
        synchronized (this) {
            notifyAll();
        }
    }

    public Long getTotalConsumedSize() {
        return this.totalConsumedSize;
    }

    public Long getTotalReceivedSize() {
        return this.totalReadSize;
    }
}
