package org.apache.pulsar.functions.worker.dlog;

import java.io.IOException;
import java.io.InputStream;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.exceptions.EndOfStreamException;

/* loaded from: input_file:org/apache/pulsar/functions/worker/dlog/DLInputStream.class */
public class DLInputStream extends InputStream {
    private final DistributedLogManager dlm;
    private LogReader reader;
    private LogRecordWithInputStream currentLogRecord = null;
    private boolean eos = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/functions/worker/dlog/DLInputStream$LogRecordWithInputStream.class */
    public static class LogRecordWithInputStream {
        private final InputStream payloadStream;

        LogRecordWithInputStream(LogRecordWithDLSN logRecordWithDLSN) {
            this.payloadStream = logRecordWithDLSN.getPayLoadInputStream();
        }

        InputStream getPayLoadInputStream() {
            return this.payloadStream;
        }
    }

    public DLInputStream(DistributedLogManager distributedLogManager) throws IOException {
        this.dlm = distributedLogManager;
        this.reader = distributedLogManager.getInputStream(DLSN.InitialDLSN);
    }

    private LogRecordWithInputStream nextLogRecord() throws IOException {
        try {
            return nextLogRecord(this.reader);
        } catch (EndOfStreamException e) {
            this.eos = true;
            return null;
        }
    }

    private static LogRecordWithInputStream nextLogRecord(LogReader logReader) throws IOException {
        LogRecordWithDLSN readNext = logReader.readNext(false);
        if (null != readNext) {
            return new LogRecordWithInputStream(readNext);
        }
        LogRecordWithDLSN readNext2 = logReader.readNext(false);
        if (null != readNext2) {
            return new LogRecordWithInputStream(readNext2);
        }
        return null;
    }

    @Override // java.io.InputStream
    public int read() throws IOException {
        byte[] bArr = new byte[1];
        if (read(bArr, 0, 1) != 1) {
            return -1;
        }
        return bArr[0];
    }

    @Override // java.io.InputStream
    public int read(byte[] bArr, int i, int i2) throws IOException {
        if (this.eos) {
            return -1;
        }
        int i3 = 0;
        if (this.currentLogRecord == null) {
            this.currentLogRecord = nextLogRecord();
            if (this.currentLogRecord == null) {
                return 0;
            }
        }
        while (i3 < i2) {
            int read = this.currentLogRecord.getPayLoadInputStream().read(bArr, i + i3, i2 - i3);
            if (read == -1) {
                this.currentLogRecord = nextLogRecord();
                if (this.currentLogRecord == null) {
                    return i3;
                }
            } else {
                i3 += read;
            }
        }
        return i3;
    }

    @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.reader.close();
        this.dlm.close();
    }
}
