package tech.ytsaurus.client;

import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ytsaurus.client.request.CreateNode;
import tech.ytsaurus.client.request.GetNode;
import tech.ytsaurus.client.request.LockNode;
import tech.ytsaurus.client.request.StartTransaction;
import tech.ytsaurus.client.request.WriteTable;
import tech.ytsaurus.client.rows.EntityTableSchemaCreator;
import tech.ytsaurus.client.rows.UnversionedRow;
import tech.ytsaurus.client.rows.UnversionedRowSerializer;
import tech.ytsaurus.client.rpc.RpcOptions;
import tech.ytsaurus.client.rpc.RpcUtil;
import tech.ytsaurus.core.GUID;
import tech.ytsaurus.core.cypress.CypressNodeType;
import tech.ytsaurus.core.cypress.YPath;
import tech.ytsaurus.core.request.LockMode;
import tech.ytsaurus.core.tables.TableSchema;
import tech.ytsaurus.lang.NonNullApi;
import tech.ytsaurus.lang.NonNullFields;

/* compiled from: RetryingTableWriterImpl.java */
@NonNullApi
@NonNullFields
/* loaded from: input_file:tech/ytsaurus/client/RetryingTableWriterBaseImpl.class */
class RetryingTableWriterBaseImpl<T> {
    static final Logger logger = LoggerFactory.getLogger(RetryingTableWriterImpl.class);
    final ApiServiceClient apiServiceClient;
    final ScheduledExecutorService executor;
    WriteTable<T> secondaryReq;
    final RpcOptions rpcOptions;

    @Nullable
    TableRowsSerializer<T> tableRowsSerializer;
    final Semaphore semaphore;
    final CompletableFuture<InitResult> init;
    volatile WriteTable<T> req;

    @Nullable
    private volatile Buffer<T> buffer;
    final Queue<WriteTask<T>> writeTasks = new ConcurrentLinkedQueue();
    final Set<Abortable<?>> processing = new HashSet();
    final Queue<CompletableFuture<Void>> handledEvents = new ConcurrentLinkedQueue();
    final CompletableFuture<Void> result = new CompletableFuture<>();
    final CompletableFuture<Void> firstBufferHandled = new CompletableFuture<>();
    volatile boolean canceled = false;
    volatile boolean closed = false;
    int nextWriteTaskIndex = 0;
    volatile CompletableFuture<Void> readyEvent = CompletableFuture.completedFuture(null);

    /* JADX INFO: Access modifiers changed from: package-private */
    public RetryingTableWriterBaseImpl(ApiServiceClient apiServiceClient, ScheduledExecutorService scheduledExecutorService, WriteTable<T> writeTable, RpcOptions rpcOptions, SerializationResolver serializationResolver) {
        WriteTable<T> requestWithTableSchema = needSetTableSchema(writeTable) ? getRequestWithTableSchema(writeTable) : writeTable;
        this.apiServiceClient = apiServiceClient;
        this.executor = scheduledExecutorService;
        this.rpcOptions = rpcOptions;
        this.req = requestWithTableSchema.toBuilder().setNeedRetries(false).build();
        this.secondaryReq = this.req.toBuilder().setPath(requestWithTableSchema.getYPath().append(true)).build();
        YPath yPath = this.req.getYPath();
        boolean booleanValue = ((Boolean) yPath.getAppend().orElse(false)).booleanValue();
        LockMode lockMode = booleanValue ? LockMode.Shared : LockMode.Exclusive;
        this.semaphore = new Semaphore(this.req.getMaxWritesInFlight());
        StartTransaction.Builder builder = StartTransaction.master().toBuilder();
        Optional<GUID> transactionId = requestWithTableSchema.getTransactionId();
        Objects.requireNonNull(builder);
        transactionId.ifPresent(builder::setParentId);
        this.init = apiServiceClient.startTransaction(builder.build()).thenCompose(apiServiceTransaction -> {
            CompletableFuture<GUID> completedFuture;
            if (booleanValue) {
                completedFuture = CompletableFuture.completedFuture(apiServiceTransaction);
            } else {
                HashMap hashMap = new HashMap();
                this.req.getTableSchema().ifPresent(tableSchema -> {
                    hashMap.put("schema", tableSchema.toYTree());
                });
                completedFuture = apiServiceTransaction.createNode(((CreateNode.Builder) CreateNode.builder().setPath(yPath)).setType(CypressNodeType.TABLE).setAttributes(hashMap).setIgnoreExisting(true).build());
            }
            return completedFuture.thenCompose(obj -> {
                return apiServiceTransaction.lockNode(new LockNode(yPath, lockMode));
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) lockNodeResult -> {
                return apiServiceTransaction.getNode(((GetNode.Builder) ((GetNode.Builder) GetNode.builder().setPath(yPath.justPath())).setAttributes(List.of("schema"))).build());
            }).thenApply(yTreeNode -> {
                return new InitResult(apiServiceTransaction, TableSchema.fromYTree(yTreeNode.getAttributeOrThrow("schema")));
            }).thenApply(initResult -> {
                this.req.getSerializationContext().getSkiffSerializer().ifPresent(entitySkiffSerializer -> {
                    entitySkiffSerializer.setTableSchema(initResult.schema);
                });
                if (this.req.getTableSchema().isEmpty() && this.req.getSerializationContext().getSkiffSerializer().isPresent()) {
                    this.req = this.req.toBuilder().setTableSchema(initResult.schema).build();
                    this.secondaryReq = this.secondaryReq.toBuilder().setTableSchema(initResult.schema).build();
                }
                this.tableRowsSerializer = (TableRowsSerializer) TableRowsSerializerUtil.createTableRowsSerializer(this.req.getSerializationContext(), serializationResolver).orElse(null);
                if (this.tableRowsSerializer == null) {
                    if (this.req.getSerializationContext().getObjectClass().isEmpty()) {
                        throw new IllegalStateException("No object clazz");
                    }
                    Class<T> cls = this.req.getSerializationContext().getObjectClass().get();
                    if (UnversionedRow.class.equals(cls)) {
                        this.tableRowsSerializer = new TableRowsWireSerializer(new UnversionedRowSerializer());
                    } else {
                        this.tableRowsSerializer = new TableRowsWireSerializer(serializationResolver.createWireRowSerializer(serializationResolver.forClass(cls, initResult.schema)));
                    }
                }
                Buffer<T> buffer = new Buffer<>(this.tableRowsSerializer);
                buffer.handled.thenRun(() -> {
                    this.firstBufferHandled.complete(null);
                });
                this.buffer = buffer;
                return initResult;
            });
        });
        this.init.handle((initResult, th) -> {
            if (th == null) {
                return null;
            }
            cancel();
            return null;
        });
    }

    public TableSchema getSchema() {
        if (this.tableRowsSerializer == null) {
            throw new RuntimeException("No tableRowsSerializer in TableWriter");
        }
        return this.tableRowsSerializer instanceof TableRowsWireSerializer ? ((TableRowsWireSerializer) this.tableRowsSerializer).getSchema() : TableSchema.builder().build();
    }

    private boolean addAbortable(Abortable<?> abortable) {
        boolean z = false;
        synchronized (this) {
            if (this.canceled) {
                z = true;
            }
            this.processing.add(abortable);
        }
        if (!z) {
            return false;
        }
        abortable.abort();
        return true;
    }

    private synchronized void removeAbortable(Abortable<?> abortable) {
        this.processing.remove(abortable);
    }

    private <R extends Abortable<?>, U> CompletableFuture<U> tryWith(CompletableFuture<R> completableFuture, Function<R, CompletableFuture<U>> function) {
        return completableFuture.thenCompose(abortable -> {
            if (addAbortable(abortable)) {
                return RpcUtil.failedFuture(new IllegalStateException("Already canceled"));
            }
            CompletableFuture completableFuture2 = (CompletableFuture) function.apply(abortable);
            completableFuture2.whenComplete((BiConsumer) (obj, th) -> {
                if (th != null) {
                    abortable.abort();
                }
                removeAbortable(abortable);
            });
            return completableFuture2;
        });
    }

    private boolean needSetTableSchema(WriteTable<T> writeTable) {
        return writeTable.getSerializationContext().getSkiffSerializer().isPresent() && !((Boolean) writeTable.getYPath().getAppend().orElse(false)).booleanValue() && writeTable.getTableSchema().isEmpty();
    }

    private WriteTable<T> getRequestWithTableSchema(WriteTable<T> writeTable) {
        return writeTable.toBuilder().setTableSchema(EntityTableSchemaCreator.create(writeTable.getSerializationContext().getObjectClass().orElseThrow(IllegalStateException::new), null)).build();
    }

    private <U> U checkedGet(CompletableFuture<U> completableFuture) {
        if (completableFuture.isDone() || !completableFuture.isCompletedExceptionally()) {
            return completableFuture.join();
        }
        throw new IllegalArgumentException("internal error");
    }

    private void processWriteTask(WriteTask<T> writeTask) {
        if (this.canceled) {
            return;
        }
        writeTask.retryPolicy.onNewAttempt();
        tryWith(this.apiServiceClient.startTransaction(StartTransaction.master().toBuilder().setParentId(((InitResult) checkedGet(this.init)).transaction.getId()).build()), apiServiceTransaction -> {
            return tryWith(apiServiceTransaction.writeTable(this.req).thenApply(tableWriter -> {
                return (RawTableWriter) tableWriter;
            }), rawTableWriter -> {
                return rawTableWriter.readyEvent().thenCompose(r6 -> {
                    if (rawTableWriter.write(writeTask.data)) {
                        return rawTableWriter.finish();
                    }
                    throw new IllegalStateException("internal error");
                }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) obj -> {
                    return apiServiceTransaction.commit();
                }).thenApply(r5 -> {
                    this.req = this.secondaryReq;
                    writeTask.handled.complete(null);
                    this.semaphore.release();
                    if (this.writeTasks.isEmpty()) {
                        return null;
                    }
                    tryStartProcessWriteTask();
                    return null;
                });
            });
        }).handle((BiFunction<? super U, Throwable, ? extends U>) (obj, th) -> {
            if (th == null) {
                return null;
            }
            Optional<Duration> backoffDuration = writeTask.retryPolicy.getBackoffDuration(th, this.rpcOptions);
            if (backoffDuration.isEmpty()) {
                writeTask.handled.completeExceptionally(th);
                this.result.completeExceptionally(th);
                return null;
            }
            logger.debug("Got error, we will retry it in {} seconds, message='{}'", Long.valueOf(backoffDuration.get().toNanos() / 1000000000), th.getMessage());
            this.executor.schedule(() -> {
                processWriteTask(writeTask);
            }, backoffDuration.get().toNanos(), TimeUnit.NANOSECONDS);
            return null;
        });
    }

    private void tryStartProcessWriteTask() {
        if (this.semaphore.tryAcquire()) {
            this.executor.execute(() -> {
                WriteTask<T> peek = this.writeTasks.peek();
                if (peek == null) {
                    this.semaphore.release();
                    return;
                }
                if (!this.firstBufferHandled.isDone() && peek.index > 0) {
                    this.semaphore.release();
                    return;
                }
                WriteTask<T> poll = this.writeTasks.poll();
                if (poll == null) {
                    this.semaphore.release();
                    return;
                }
                synchronized (this) {
                    if (this.canceled) {
                        return;
                    }
                    if (this.buffer == null && !this.closed) {
                        if (this.tableRowsSerializer == null) {
                            throw new RuntimeException("No tableRowsSerializer in TableWriter");
                        }
                        this.buffer = new Buffer<>(this.tableRowsSerializer);
                        this.readyEvent.complete(null);
                    }
                    processWriteTask(poll);
                }
            });
        }
    }

    private void flushBuffer(boolean z) {
        Buffer<T> buffer = this.buffer;
        if (buffer == null) {
            return;
        }
        if (z && buffer.size() == 0) {
            this.buffer = null;
            return;
        }
        RetryPolicy retryPolicy = this.rpcOptions.getRetryPolicyFactory().get();
        int i = this.nextWriteTaskIndex;
        this.nextWriteTaskIndex = i + 1;
        this.writeTasks.add(new WriteTask<>(buffer, retryPolicy, i));
        this.handledEvents.add(buffer.handled);
        if (z || this.writeTasks.size() > this.req.getMaxWritesInFlight()) {
            this.buffer = null;
            this.readyEvent = new CompletableFuture<>();
        } else {
            if (this.tableRowsSerializer == null) {
                throw new RuntimeException("No tableRowsSerializer in TableWriter");
            }
            this.buffer = new Buffer<>(this.tableRowsSerializer);
        }
        tryStartProcessWriteTask();
    }

    public boolean write(List<T> list, TableSchema tableSchema) {
        Buffer<T> buffer;
        if (!this.init.isDone() || this.result.isCompletedExceptionally() || this.closed || this.canceled || (buffer = this.buffer) == null) {
            return false;
        }
        buffer.write(list, tableSchema);
        if (buffer.size() < this.req.getChunkSize()) {
            return true;
        }
        flushBuffer(false);
        return true;
    }

    public CompletableFuture<Void> readyEvent() {
        return this.result.isCompletedExceptionally() ? this.result : CompletableFuture.anyOf(this.result, CompletableFuture.allOf(this.init, this.readyEvent)).thenCompose(obj -> {
            return CompletableFuture.completedFuture(null);
        });
    }

    public CompletableFuture<?> close() {
        synchronized (this) {
            this.closed = true;
            flushBuffer(true);
        }
        return this.init.thenCompose(initResult -> {
            return CompletableFuture.anyOf(this.result, CompletableFuture.allOf((CompletableFuture[]) this.handledEvents.toArray(new CompletableFuture[0]))).thenApply(obj -> {
                return initResult;
            });
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) initResult2 -> {
            return initResult2.transaction.commit();
        }).whenComplete((BiConsumer) (r3, th) -> {
            if (th != null) {
                cancel();
            }
        });
    }

    public CompletableFuture<TableSchema> getTableSchema() {
        return this.init.thenApply(initResult -> {
            return initResult.schema;
        });
    }

    public synchronized void cancel() {
        this.canceled = true;
        this.buffer = null;
        this.readyEvent = new CompletableFuture<>();
        Iterator<Abortable<?>> it = this.processing.iterator();
        while (it.hasNext()) {
            it.next().abort();
        }
        this.init.join().transaction.abort();
    }
}
