package io.greptime;

import com.google.protobuf.ByteString;
import io.greptime.Metadata;
import io.greptime.common.TimeoutCompletableFuture;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.arrow.flight.BulkFlightClient;
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.PutResult;
import org.apache.arrow.flight.grpc.StatusUtils;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.message.IpcOption;
import org.apache.arrow.vector.types.pojo.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/greptime/BulkWriteService.class */
public class BulkWriteService implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(BulkWriteService.class);
    private final BulkWriteManager manager;
    private final BufferAllocator allocator;
    private final VectorSchemaRoot root;
    private final BulkFlightClient.ClientStreamListener listener;
    private final long timeoutMs;
    private final AtomicLong idGenerator = new AtomicLong(0);
    private final AsyncPutListener metadataListener = new AsyncPutListener();

    /* loaded from: input_file:io/greptime/BulkWriteService$AsyncPutListener.class */
    static class AsyncPutListener implements BulkFlightClient.PutListener {
        private final ConcurrentMap<Long, IdentifiableCompletableFuture> futuresInFlight = new ConcurrentHashMap();
        private final CompletableFuture<Void> completed = new CompletableFuture<>();

        AsyncPutListener() {
            this.completed.whenComplete((r4, th) -> {
                if (th != null) {
                    Iterator<IdentifiableCompletableFuture> it = this.futuresInFlight.values().iterator();
                    while (it.hasNext()) {
                        it.next().completeExceptionally(th);
                    }
                }
                this.futuresInFlight.clear();
            });
        }

        public void attach(long j, IdentifiableCompletableFuture identifiableCompletableFuture) {
            this.futuresInFlight.put(Long.valueOf(j), identifiableCompletableFuture);
            identifiableCompletableFuture.whenComplete((num, th) -> {
                this.futuresInFlight.remove(Long.valueOf(j));
                if (th == null) {
                    BulkWriteService.LOG.debug("Put operation succeeded [id={}], affected rows: {}", Long.valueOf(j), num);
                    return;
                }
                BulkWriteService.LOG.error("Put operation failed [id={}]: {}", new Object[]{Long.valueOf(j), th.getMessage(), th});
                if (th instanceof TimeoutCompletableFuture.FutureDeadlineExceededException) {
                    return;
                }
                onError(th);
            });
            identifiableCompletableFuture.scheduleTimeout();
            if (BulkWriteService.LOG.isDebugEnabled()) {
                BulkWriteService.LOG.debug("Attached future [id={}], current in-flight count: {}", Long.valueOf(j), Integer.valueOf(this.futuresInFlight.size()));
            }
        }

        public int numInFlight() {
            return this.futuresInFlight.size();
        }

        @Override // org.apache.arrow.flight.BulkFlightClient.PutListener
        public void onNext(PutResult putResult) {
            ArrowBuf applicationMetadata = putResult.getApplicationMetadata();
            if (applicationMetadata == null) {
                BulkWriteService.LOG.warn("Received PutResult with null metadata");
                return;
            }
            Metadata.ResponseMetadata fromJson = Metadata.ResponseMetadata.fromJson(ByteString.copyFrom(applicationMetadata.nioBuffer()).toStringUtf8());
            long requestId = fromJson.getRequestId();
            int affectedRows = fromJson.getAffectedRows();
            BulkWriteService.LOG.debug("Received response [id={}], affected rows: {}", Long.valueOf(requestId), Integer.valueOf(affectedRows));
            IdentifiableCompletableFuture identifiableCompletableFuture = this.futuresInFlight.get(Long.valueOf(requestId));
            if (identifiableCompletableFuture != null) {
                identifiableCompletableFuture.complete(Integer.valueOf(affectedRows));
            } else if (requestId != 0) {
                BulkWriteService.LOG.warn("A timeout response [id={}] finally received", Long.valueOf(requestId));
            }
        }

        public void onError(Throwable th) {
            BulkWriteService.LOG.error("Stream error occurred: {}", th.getMessage(), th);
            this.completed.completeExceptionally(StatusUtils.fromThrowable(th));
        }

        public final void onCompleted() {
            BulkWriteService.LOG.info("Server signaled stream completion");
            this.completed.complete(null);
        }

        @Override // org.apache.arrow.flight.BulkFlightClient.PutListener
        public boolean isCancelled() {
            return this.completed.isCancelled();
        }

        @Override // org.apache.arrow.flight.BulkFlightClient.PutListener
        public boolean isCompletedExceptionally() {
            return this.completed.isCompletedExceptionally();
        }

        @Override // org.apache.arrow.flight.BulkFlightClient.PutListener
        public void getResult() {
            try {
                this.completed.get();
            } catch (InterruptedException e) {
                throw StatusUtils.fromThrowable(e);
            } catch (ExecutionException e2) {
                throw StatusUtils.fromThrowable(e2.getCause());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/greptime/BulkWriteService$IdentifiableCompletableFuture.class */
    public static class IdentifiableCompletableFuture extends TimeoutCompletableFuture<Integer> {
        private final long id;

        public IdentifiableCompletableFuture(long j, long j2) {
            super(j2, TimeUnit.MILLISECONDS);
            this.id = j;
        }

        public long getId() {
            return this.id;
        }
    }

    /* loaded from: input_file:io/greptime/BulkWriteService$PutStage.class */
    public static class PutStage {
        private final CompletableFuture<Integer> future;
        private final int numInFlight;

        public PutStage(CompletableFuture<Integer> completableFuture, int i) {
            this.future = completableFuture;
            this.numInFlight = i;
        }

        public CompletableFuture<Integer> future() {
            return this.future;
        }

        public int numInFlight() {
            return this.numInFlight;
        }
    }

    public BulkWriteService(BulkWriteManager bulkWriteManager, BufferAllocator bufferAllocator, Schema schema, FlightDescriptor flightDescriptor, long j, int i, CallOption... callOptionArr) {
        this.manager = bulkWriteManager;
        this.allocator = bufferAllocator;
        this.root = bulkWriteManager.createSchemaRoot(schema);
        this.listener = bulkWriteManager.startPut(flightDescriptor, this.metadataListener, i, callOptionArr);
        this.timeoutMs = j;
    }

    public void start() {
        LOG.debug("Starting bulk write stream with default IPC options");
        this.listener.start(this.root, this.manager.newDefaultDictionaryProvider());
    }

    public void start(IpcOption ipcOption) {
        LOG.debug("Starting bulk write stream with custom IPC options: {}", ipcOption);
        this.listener.start(this.root, this.manager.newDefaultDictionaryProvider(), ipcOption);
    }

    public VectorSchemaRoot getRoot() {
        return this.root;
    }

    public void tryUseZeroCopyWrite() {
        LOG.info("Enabling zero-copy write mode for improved performance");
        this.listener.setUseZeroCopy(true);
    }

    public boolean isStreamReady() {
        return this.listener.isReady();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v7, types: [io.greptime.BulkWriteService$IdentifiableCompletableFuture, java.util.concurrent.CompletableFuture] */
    public PutStage putNext() {
        long nextId = nextId();
        long rowCount = this.root.getRowCount();
        LOG.debug("Starting putNext operation [id={}], total row count: {}", Long.valueOf(nextId), Long.valueOf(rowCount));
        ?? identifiableCompletableFuture = new IdentifiableCompletableFuture(nextId, this.timeoutMs);
        this.metadataListener.attach(nextId, identifiableCompletableFuture);
        byte[] jsonBytesUtf8 = new Metadata.RequestMetadata(nextId).toJsonBytesUtf8();
        try {
            ArrowBuf buffer = this.allocator.buffer(jsonBytesUtf8.length);
            buffer.writeBytes(jsonBytesUtf8);
            LOG.debug("Sending data to server [id={}]", Long.valueOf(nextId));
            this.listener.putNext(buffer);
            int numInFlight = this.metadataListener.numInFlight();
            LOG.debug("Data sent successfully [id={}], in-flight requests: {}", Long.valueOf(nextId), Integer.valueOf(numInFlight));
            PutStage putStage = new PutStage(identifiableCompletableFuture, numInFlight);
            this.root.clear();
            LOG.debug("Cleared root for next batch [id={}], previous row count: {}", Long.valueOf(nextId), Long.valueOf(rowCount));
            return putStage;
        } catch (Throwable th) {
            this.root.clear();
            LOG.debug("Cleared root for next batch [id={}], previous row count: {}", Long.valueOf(nextId), Long.valueOf(rowCount));
            throw th;
        }
    }

    public void completed() {
        LOG.info("Completing bulk write operation, signaling end of transmission");
        this.listener.completed();
    }

    public void waitServerCompleted() {
        LOG.info("Waiting for server to complete processing");
        this.listener.getResult();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        LOG.info("Closing BulkWriteService resources");
        AutoCloseables.close(new AutoCloseable[]{this.root, this.manager});
    }

    private long nextId() {
        long incrementAndGet;
        do {
            incrementAndGet = this.idGenerator.incrementAndGet();
        } while (incrementAndGet == 0);
        return incrementAndGet;
    }
}
