package io.greptime;

import com.codahale.metrics.Timer;
import io.greptime.BulkWriteService;
import io.greptime.common.Display;
import io.greptime.common.Endpoint;
import io.greptime.common.Lifecycle;
import io.greptime.common.util.Clock;
import io.greptime.common.util.Ensures;
import io.greptime.common.util.MetricExecutor;
import io.greptime.common.util.MetricsUtil;
import io.greptime.common.util.SerializingExecutor;
import io.greptime.limit.AbstractLimiter;
import io.greptime.limit.LimitedPolicy;
import io.greptime.models.ArrowHelper;
import io.greptime.models.AuthInfo;
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import io.greptime.options.BulkWriteOptions;
import io.greptime.rpc.Context;
import io.greptime.rpc.TlsOptions;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.FlightCallHeaders;
import org.apache.arrow.flight.HeaderCallOption;
import org.apache.arrow.vector.types.pojo.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/greptime/BulkWriteClient.class */
public class BulkWriteClient implements BulkWrite, Health, Lifecycle<BulkWriteOptions>, Display {
    private static final Logger LOG = LoggerFactory.getLogger(BulkWriteClient.class);
    private BulkWriteOptions opts;
    private RouterClient routerClient;
    private Executor asyncPool;

    /* loaded from: input_file:io/greptime/BulkWriteClient$BulkWriteLimiter.class */
    static class BulkWriteLimiter extends AbstractLimiter<Void, Integer> {
        public BulkWriteLimiter(int i) {
            super(i, new LimitedPolicy.BlockingPolicy(), "bulk_write_limiter_acquire");
        }

        @Override // io.greptime.limit.AbstractLimiter
        public int calculatePermits(Void r3) {
            return 1;
        }

        @Override // io.greptime.limit.AbstractLimiter
        public Integer rejected(Void r5, AbstractLimiter.RejectedState rejectedState) {
            throw new IllegalStateException("A blocking limiter should never get here");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/greptime/BulkWriteClient$DefaultBulkStreamWriter.class */
    public static class DefaultBulkStreamWriter implements BulkStreamWriter {
        private final BulkWriteLimiter pipelineWriteLimiter;
        private final BulkWriteService writer;
        private final TableSchema tableSchema;
        private final AtomicReference<Table.TableBufferRoot> current = new AtomicReference<>();

        public DefaultBulkStreamWriter(BulkWriteService bulkWriteService, TableSchema tableSchema, int i) {
            this.writer = bulkWriteService;
            this.tableSchema = tableSchema;
            this.pipelineWriteLimiter = new BulkWriteLimiter(i);
        }

        @Override // io.greptime.BulkStreamWriter
        public Table.TableBufferRoot tableBufferRoot(int i) {
            Table.TableBufferRoot tableBufferRoot = Table.tableBufferRoot(this.tableSchema, this.writer.getRoot(), i);
            this.current.set(tableBufferRoot);
            return tableBufferRoot;
        }

        @Override // io.greptime.BulkStreamWriter
        public CompletableFuture<Integer> writeNext() throws Exception {
            Table.TableBufferRoot andSet = this.current.getAndSet(null);
            if (andSet != null) {
                andSet.complete();
            }
            if (!isStreamReady()) {
                BulkWriteClient.LOG.debug("Stream busy with pending requests. Check `isStreamReady()` before calling `writeNext()` to avoid busy-waiting.");
            }
            return this.pipelineWriteLimiter.acquireAndDo(null, () -> {
                Clock defaultClock = Clock.defaultClock();
                long tick = defaultClock.getTick();
                BulkWriteService.PutStage putNext = this.writer.putNext();
                InnerMetricHelper.prepareTime().update(defaultClock.duration(tick), TimeUnit.MILLISECONDS);
                long tick2 = defaultClock.getTick();
                CompletableFuture future = putNext.future();
                future.whenComplete((num, th) -> {
                    InnerMetricHelper.putTime().update(defaultClock.duration(tick2), TimeUnit.MILLISECONDS);
                });
                BulkWriteClient.LOG.info("Write request sent successfully, in-flight requests: {}", Integer.valueOf(putNext.numInFlight()));
                return future;
            });
        }

        @Override // io.greptime.BulkStreamWriter
        public void completed() throws Exception {
            this.writer.completed();
            this.writer.waitServerCompleted();
            this.writer.close();
        }

        @Override // io.greptime.BulkStreamWriter
        public boolean isStreamReady() {
            return this.writer.isStreamReady();
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            this.writer.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/greptime/BulkWriteClient$InnerMetricHelper.class */
    public static final class InnerMetricHelper {
        static final Timer BULK_WRITE_PREPARE_TIME = MetricsUtil.timer("bulk_write_prepare_time");
        static final Timer BULK_WRITE_PUT_TIME = MetricsUtil.timer("bulk_write_put_time");

        InnerMetricHelper() {
        }

        static Timer prepareTime() {
            return BULK_WRITE_PREPARE_TIME;
        }

        static Timer putTime() {
            return BULK_WRITE_PUT_TIME;
        }
    }

    public boolean init(BulkWriteOptions bulkWriteOptions) {
        this.opts = (BulkWriteOptions) Ensures.ensureNonNull(bulkWriteOptions, "null `BulkWriteClient.opts`");
        this.routerClient = this.opts.getRouterClient();
        Executor asyncPool = this.opts.getAsyncPool();
        this.asyncPool = asyncPool != null ? asyncPool : new SerializingExecutor("buld_write_client");
        this.asyncPool = new MetricExecutor(this.asyncPool, "async_bulk_write_pool.time");
        return true;
    }

    public void shutdownGracefully() {
    }

    @Override // io.greptime.BulkWrite
    public BulkStreamWriter bulkStreamWriter(TableSchema tableSchema, long j, long j2, long j3, int i, Context context) {
        return (BulkStreamWriter) this.routerClient.route().thenApply(endpoint -> {
            return bulkStreamWriteTo(endpoint, tableSchema, j, j2, j3, i, context);
        }).join();
    }

    private BulkStreamWriter bulkStreamWriteTo(Endpoint endpoint, TableSchema tableSchema, long j, long j2, long j3, int i, Context context) {
        TlsOptions tlsOptions = this.opts.getTlsOptions();
        Schema createSchema = ArrowHelper.createSchema(tableSchema);
        BulkWriteManager create = BulkWriteManager.create(endpoint, j, j2, ArrowHelper.getArrowCompressionType(context), tlsOptions);
        String database = this.opts.getDatabase();
        String tableName = tableSchema.getTableName();
        AuthInfo authInfo = this.opts.getAuthInfo();
        FlightCallHeaders flightCallHeaders = new FlightCallHeaders();
        context.entrySet().forEach(entry -> {
            flightCallHeaders.insert((String) entry.getKey(), String.valueOf(entry.getValue()));
        });
        flightCallHeaders.insert("x-greptime-db-name", database);
        if (authInfo != null) {
            flightCallHeaders.insert("x-greptime-auth", authInfo.toBase64());
        }
        BulkWriteService intoBulkWriteStream = create.intoBulkWriteStream(tableName, createSchema, j3, i, new CallOption[]{new HeaderCallOption(flightCallHeaders), new AsyncExecCallOption(this.asyncPool)});
        intoBulkWriteStream.start();
        if (this.opts.isUseZeroCopyWrite()) {
            intoBulkWriteStream.tryUseZeroCopyWrite();
        }
        return new DefaultBulkStreamWriter(intoBulkWriteStream, tableSchema, i);
    }

    @Override // io.greptime.Health
    public CompletableFuture<Map<Endpoint, Boolean>> checkHealth() {
        return this.routerClient.checkHealth();
    }

    public void display(Display.Printer printer) {
        printer.println("--- BulkWriteClient ---").print("asyncPool=").println(this.asyncPool);
    }

    public String toString() {
        return "BulkWriteClient{opts=" + this.opts + ", routerClient=" + this.routerClient + ", asyncPool=" + this.asyncPool + '}';
    }
}
