package io.greptime;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import io.greptime.common.Display;
import io.greptime.common.Endpoint;
import io.greptime.common.Lifecycle;
import io.greptime.common.util.Clock;
import io.greptime.common.util.Ensures;
import io.greptime.common.util.MetricsUtil;
import io.greptime.common.util.SerializingExecutor;
import io.greptime.flight.AsyncExecCallOption;
import io.greptime.flight.GreptimeFlightClient;
import io.greptime.flight.GreptimeRequest;
import io.greptime.models.Err;
import io.greptime.models.QueryOk;
import io.greptime.models.QueryRequest;
import io.greptime.models.Result;
import io.greptime.models.SelectRows;
import io.greptime.options.QueryOptions;
import io.greptime.rpc.Context;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.arrow.flight.FlightCallHeaders;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.HeaderCallOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/greptime/QueryClient.class */
public class QueryClient implements Query, Lifecycle<QueryOptions>, Display {
    private static final Logger LOG = LoggerFactory.getLogger(QueryClient.class);
    private static final AtomicLong QUERY_ID = new AtomicLong(0);
    private QueryOptions opts;
    private RouterClient routerClient;
    private Executor asyncPool;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/greptime/QueryClient$InnerMetricHelper.class */
    public static final class InnerMetricHelper {
        static final Histogram READ_ROWS_NUM = MetricsUtil.histogram("read_rows_num");
        static final Meter READ_FAILURE_NUM = MetricsUtil.meter("read_failure_num");
        static final Meter READ_QPS = MetricsUtil.meter("read_qps");

        InnerMetricHelper() {
        }

        static Histogram readRowsNum() {
            return READ_ROWS_NUM;
        }

        static Meter readFailureNum() {
            return READ_FAILURE_NUM;
        }

        static Meter readQps() {
            return READ_QPS;
        }

        static Meter readByRetries(int i) {
            return MetricsUtil.meter(new Object[]{"read_by_retries", Integer.valueOf(Math.min(3, i))});
        }
    }

    public boolean init(QueryOptions queryOptions) {
        this.opts = (QueryOptions) Ensures.ensureNonNull(queryOptions, "null `QueryClient.opts`");
        this.routerClient = this.opts.getRouterClient();
        Executor asyncPool = this.opts.getAsyncPool();
        this.asyncPool = asyncPool != null ? asyncPool : new SerializingExecutor("query_client");
        return true;
    }

    public void shutdownGracefully() {
    }

    @Override // io.greptime.Query
    public CompletableFuture<Result<QueryOk, Err>> query(QueryRequest queryRequest, Context context) {
        Ensures.ensureNonNull(queryRequest, "null `request");
        context.with("QueryId", Long.valueOf(QUERY_ID.incrementAndGet()));
        context.with("QueryStart", Long.valueOf(Clock.defaultClock().getTick()));
        return query0(queryRequest, context, 0).whenCompleteAsync((result, th) -> {
            InnerMetricHelper.readQps().mark();
            if (result == null || !result.isOk()) {
                InnerMetricHelper.readFailureNum().mark();
            }
        }, this.asyncPool);
    }

    private CompletableFuture<Result<QueryOk, Err>> query0(QueryRequest queryRequest, Context context, int i) {
        InnerMetricHelper.readByRetries(i).mark();
        return this.routerClient.route().thenComposeAsync(endpoint -> {
            return queryFrom(endpoint, queryRequest, context, i);
        }, this.asyncPool).thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) result -> {
            if (result.isOk()) {
                LOG.debug("Success to read from {}, ok={}.", "GreptimeDB", result.getOk());
                return Util.completedCf(result);
            }
            Err err = (Err) result.getErr();
            LOG.warn("Failed to read from {}, retries={}, err={}.", new Object[]{"GreptimeDB", Integer.valueOf(i), err});
            if (i <= this.opts.getMaxRetries()) {
                return Util.shouldNotRetry(err) ? Util.completedCf(result) : query0(queryRequest, context, i + 1);
            }
            LOG.error("Retried {} times still failed.", Integer.valueOf(i));
            return Util.completedCf(result);
        }, this.asyncPool);
    }

    private CompletableFuture<Result<QueryOk, Err>> queryFrom(Endpoint endpoint, QueryRequest queryRequest, Context context, int i) {
        GreptimeFlightClient flightClient = this.routerClient.getFlightClient(endpoint);
        if (this.opts.getAuthInfo() != null) {
            queryRequest.setAuthInfo(this.opts.getAuthInfo());
        }
        GreptimeRequest greptimeRequest = new GreptimeRequest(queryRequest.m17into());
        FlightCallHeaders flightCallHeaders = new FlightCallHeaders();
        flightCallHeaders.insert("retries", String.valueOf(i));
        FlightStream stream = flightClient.doRequest(greptimeRequest, new HeaderCallOption(flightCallHeaders), new AsyncExecCallOption(this.asyncPool)).getStream();
        context.with("Endpoint", endpoint);
        return Util.completedCf(Result.ok(QueryOk.ok(queryRequest.getQl(), new SelectRows.DefaultSelectRows(context, InnerMetricHelper.readRowsNum(), stream))));
    }

    public void display(Display.Printer printer) {
        printer.println("--- QueryClient ---").print("maxRetries=").println(Integer.valueOf(this.opts.getMaxRetries())).print("asyncPool=").println(this.asyncPool);
    }

    public String toString() {
        return "QueryClient{opts=" + this.opts + ", routerClient=" + this.routerClient + ", asyncPool=" + this.asyncPool + '}';
    }
}
