package tech.ydb.spark.connector.read;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.Issue;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.spark.connector.YdbTypes;
import tech.ydb.spark.connector.common.OperationOption;
import tech.ydb.table.result.ResultSetReader;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:tech/ydb/spark/connector/read/StreamReader.class */
public abstract class StreamReader implements PartitionReader<InternalRow> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) StreamReader.class);
    private static final AtomicInteger COUNTER = new AtomicInteger(0);
    private final String[] outColumns;
    private final YdbTypes types;
    private final ArrayBlockingQueue<QueueItem> queue;
    private final AtomicLong readedRows = new AtomicLong();
    private volatile String id = null;
    private volatile long startedAt = System.currentTimeMillis();
    private volatile QueueItem currentItem = null;
    private volatile Status finishStatus = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:tech/ydb/spark/connector/read/StreamReader$QueueItem.class */
    public class QueueItem {
        private final ResultSetReader reader;
        private final int[] columnIndexes;

        QueueItem(ResultSetReader resultSetReader) {
            this.reader = resultSetReader;
            this.columnIndexes = new int[StreamReader.this.outColumns.length];
            int i = 0;
            for (String str : StreamReader.this.outColumns) {
                int i2 = i;
                i++;
                this.columnIndexes[i2] = resultSetReader.getColumnIndex(str);
            }
        }

        public boolean next() {
            return this.reader.next();
        }

        public InternalRow get() {
            if (this.columnIndexes.length == 0) {
                return InternalRow.empty();
            }
            InternalRow genericInternalRow = new GenericInternalRow(this.columnIndexes.length);
            for (int i = 0; i < this.columnIndexes.length; i++) {
                StreamReader.this.types.setRowValue(genericInternalRow, i, this.reader.getColumn(this.columnIndexes[i]));
            }
            return genericInternalRow;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamReader(YdbTypes ydbTypes, int i, StructType structType) {
        this.types = ydbTypes;
        this.queue = new ArrayBlockingQueue<>(i);
        this.outColumns = structType.fieldNames();
    }

    protected abstract String start();

    protected abstract void cancel();

    /* JADX INFO: Access modifiers changed from: protected */
    public void onComplete(Status status, Throwable th) {
        long currentTimeMillis = System.currentTimeMillis() - this.startedAt;
        if (status != null) {
            if (!status.isSuccess()) {
                logger.warn("[{}] reading finished with error {}", this.id, status);
            }
            this.finishStatus = status;
        }
        if (th != null) {
            logger.error("[{}] reading finished with exception", this.id, th);
            this.finishStatus = Status.of(StatusCode.CLIENT_INTERNAL_ERROR, th, new Issue[0]);
        }
        COUNTER.decrementAndGet();
        logger.info("[{}] got {} rows in {} ms", this.id, Long.valueOf(this.readedRows.get()), Long.valueOf(currentTimeMillis));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onNextPart(ResultSetReader resultSetReader) {
        QueueItem queueItem = new QueueItem(resultSetReader);
        while (this.finishStatus == null) {
            try {
                if (this.queue.offer(queueItem, 100L, TimeUnit.MILLISECONDS)) {
                    this.readedRows.addAndGet(resultSetReader.getRowCount());
                    return;
                }
            } catch (InterruptedException e) {
                logger.warn("[{}] reading was interrupted", this.id);
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

    public boolean next() {
        if (this.id == null) {
            this.startedAt = System.currentTimeMillis();
            this.id = start();
            logger.debug("[{}] started, {} total", this.id, Integer.valueOf(COUNTER.incrementAndGet()));
        }
        while (true) {
            if (this.finishStatus != null) {
                this.finishStatus.expectSuccess("Scan failed.");
                if (this.currentItem == null && this.queue.isEmpty()) {
                    return false;
                }
            }
            if (this.currentItem != null && this.currentItem.next()) {
                return true;
            }
            try {
                this.currentItem = this.queue.poll(100L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Reading was interrupted", e);
            }
        }
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public InternalRow m1263get() {
        if (this.currentItem == null) {
            throw new IllegalStateException("Nothing to read");
        }
        return this.currentItem.get();
    }

    public void close() {
        if (this.finishStatus == null) {
            cancel();
        }
    }

    public static int readQueueMaxSize(CaseInsensitiveStringMap caseInsensitiveStringMap) {
        try {
            int readInt = OperationOption.SCAN_QUEUE_DEPTH.readInt(caseInsensitiveStringMap, 3);
            if (readInt >= 2) {
                return readInt;
            }
            logger.warn("Value of {} property too low, reverting to minimum of 2.", OperationOption.SCAN_QUEUE_DEPTH);
            return 2;
        } catch (NumberFormatException e) {
            logger.warn("Illegal value of {} property, reverting to default of 3.", OperationOption.SCAN_QUEUE_DEPTH, e);
            return 3;
        }
    }
}
