package tech.ydb.spark.connector.write;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.Issue;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.spark.connector.YdbTypes;
import tech.ydb.table.values.ListValue;
import tech.ydb.table.values.StructType;
import tech.ydb.table.values.Value;

/* loaded from: input_file:tech/ydb/spark/connector/write/YdbDataWriter.class */
public abstract class YdbDataWriter implements DataWriter<InternalRow> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) YdbDataWriter.class);
    private final YdbTypes types;
    private final StructType structType;
    private final ValueReader[] readers;
    private final int maxBatchSize;
    private final Map<CompletableFuture<?>, CompletableFuture<?>> writesInFly = new ConcurrentHashMap();
    private List<Value<?>> currentBatch = new ArrayList();
    private volatile Status lastError = null;
    private final int maxConcurrency = 5;
    private final Semaphore semaphore = new Semaphore(this.maxConcurrency);

    public YdbDataWriter(YdbTypes ydbTypes, StructType structType, ValueReader[] valueReaderArr, int i) {
        this.types = ydbTypes;
        this.structType = structType;
        this.readers = valueReaderArr;
        this.maxBatchSize = i;
    }

    abstract CompletableFuture<Status> executeWrite(ListValue listValue);

    public void write(InternalRow internalRow) throws IOException {
        if (this.lastError != null) {
            logger.warn("ydb writer got error {} on write", this.lastError);
            this.lastError.expectSuccess("Cannot execute write");
        }
        Value<?>[] valueArr = new Value[this.readers.length];
        for (int i = 0; i < valueArr.length; i++) {
            valueArr[i] = this.readers[i].read(this.types, internalRow);
        }
        this.currentBatch.add(this.structType.newValueUnsafe(valueArr));
        if (this.currentBatch.size() >= this.maxBatchSize) {
            writeBatch();
        }
    }

    public WriterCommitMessage commit() throws IOException {
        writeBatch();
        this.semaphore.acquireUninterruptibly(this.maxConcurrency);
        this.semaphore.release(this.maxConcurrency);
        if (this.lastError != null) {
            logger.warn("ydb writer got error {} on commit", this.lastError);
            this.lastError.expectSuccess("cannot commit write");
        }
        return new YdbWriteCommit();
    }

    public void abort() throws IOException {
        this.writesInFly.keySet().forEach(completableFuture -> {
            completableFuture.cancel(false);
        });
        this.semaphore.acquireUninterruptibly(this.maxConcurrency);
        this.semaphore.release(this.maxConcurrency);
    }

    public void close() throws IOException {
    }

    private void writeBatch() {
        if (this.currentBatch.isEmpty()) {
            return;
        }
        Value[] valueArr = (Value[]) this.currentBatch.toArray(new Value[0]);
        this.currentBatch = new ArrayList();
        this.semaphore.acquireUninterruptibly();
        if (this.lastError != null) {
            this.semaphore.release();
            return;
        }
        CompletableFuture<Status> executeWrite = executeWrite(ListValue.of((Value<?>[]) valueArr));
        this.writesInFly.put(executeWrite, executeWrite);
        executeWrite.whenComplete((status, th) -> {
            this.writesInFly.remove(executeWrite);
            if (status != null && !status.isSuccess()) {
                this.lastError = status;
            }
            if (th != null) {
                this.lastError = Status.of(StatusCode.CLIENT_INTERNAL_ERROR, th, new Issue[0]);
            }
            this.semaphore.release();
        });
    }
}
