package io.greptime;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.google.common.util.concurrent.RateLimiter;
import io.greptime.common.Display;
import io.greptime.common.Endpoint;
import io.greptime.common.Lifecycle;
import io.greptime.common.util.Clock;
import io.greptime.common.util.Ensures;
import io.greptime.common.util.MetricExecutor;
import io.greptime.common.util.MetricsUtil;
import io.greptime.common.util.SerializingExecutor;
import io.greptime.errors.LimitedException;
import io.greptime.errors.ServerException;
import io.greptime.errors.StreamException;
import io.greptime.limit.AbstractLimiter;
import io.greptime.limit.LimitedPolicy;
import io.greptime.limit.WriteLimiter;
import io.greptime.models.Err;
import io.greptime.models.Result;
import io.greptime.models.Table;
import io.greptime.models.TableHelper;
import io.greptime.models.WriteOk;
import io.greptime.models.WriteTables;
import io.greptime.options.WriteOptions;
import io.greptime.rpc.Context;
import io.greptime.rpc.Observer;
import io.greptime.v1.Common;
import io.greptime.v1.Database;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/greptime/WriteClient.class */
public class WriteClient implements Write, Lifecycle<WriteOptions>, Display {
    private static final Logger LOG = LoggerFactory.getLogger(WriteClient.class);
    private WriteOptions opts;
    private RouterClient routerClient;
    private Executor asyncPool;
    private WriteLimiter writeLimiter;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/greptime/WriteClient$DefaultWriteLimiter.class */
    public static class DefaultWriteLimiter extends WriteLimiter {
        public DefaultWriteLimiter(int i, LimitedPolicy limitedPolicy) {
            super(i, limitedPolicy, "write_limiter_acquire");
        }

        @Override // io.greptime.limit.AbstractLimiter
        public int calculatePermits(Collection<Table> collection) {
            return ((Integer) collection.stream().map((v0) -> {
                return v0.rowCount();
            }).reduce(0, (v0, v1) -> {
                return Integer.sum(v0, v1);
            })).intValue();
        }

        @Override // io.greptime.limit.AbstractLimiter
        public Result<WriteOk, Err> rejected(Collection<Table> collection, AbstractLimiter.RejectedState rejectedState) {
            return Result.err(Err.writeErr(Result.FLOW_CONTROL, new LimitedException(String.format("Write limited by client, acquirePermits=%d, maxPermits=%d, availablePermits=%d.", Integer.valueOf(rejectedState.acquirePermits()), Integer.valueOf(rejectedState.maxPermits()), Integer.valueOf(rejectedState.availablePermits()))), null));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/greptime/WriteClient$InnerMetricHelper.class */
    public static final class InnerMetricHelper {
        static final Histogram INSERT_ROWS_SUCCESS_NUM = MetricsUtil.histogram("insert_rows_success_num");
        static final Histogram DELETE_ROWS_SUCCESS_NUM = MetricsUtil.histogram("delete_rows_success_num");
        static final Histogram INSERT_ROWS_FAILURE_NUM = MetricsUtil.histogram("insert_rows_failure_num");
        static final Histogram DELETE_ROWS_FAILURE_NUM = MetricsUtil.histogram("delete_rows_failure_num");
        static final Histogram WRITE_STREAM_LIMITER_ACQUIRE_WAIT_TIME = MetricsUtil.histogram("write_stream_limiter_acquire_wait_time");
        static final Meter WRITE_FAILURE_NUM = MetricsUtil.meter("write_failure_num");
        static final Meter WRITE_QPS = MetricsUtil.meter("write_qps");

        InnerMetricHelper() {
        }

        static Histogram writeRowsSuccessNum(WriteOp writeOp) {
            switch (writeOp) {
                case Insert:
                    return INSERT_ROWS_SUCCESS_NUM;
                case Delete:
                    return DELETE_ROWS_SUCCESS_NUM;
                default:
                    throw new IllegalArgumentException("Unsupported write operation: " + writeOp);
            }
        }

        static Histogram writeRowsFailureNum(WriteOp writeOp) {
            switch (writeOp) {
                case Insert:
                    return INSERT_ROWS_FAILURE_NUM;
                case Delete:
                    return DELETE_ROWS_FAILURE_NUM;
                default:
                    throw new IllegalArgumentException("Unsupported write operation: " + writeOp);
            }
        }

        static Histogram writeStreamLimiterAcquireWaitTime() {
            return WRITE_STREAM_LIMITER_ACQUIRE_WAIT_TIME;
        }

        static Meter writeFailureNum() {
            return WRITE_FAILURE_NUM;
        }

        static Meter writeQps() {
            return WRITE_QPS;
        }

        static Meter writeByRetries(int i) {
            return MetricsUtil.meter(new Object[]{"write_by_retries", Integer.valueOf(Math.min(3, i))});
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/greptime/WriteClient$RateLimitingStreamWriter.class */
    public static abstract class RateLimitingStreamWriter implements StreamWriter<Table, WriteOk> {
        private final Observer<WriteTables> observer;
        private final RateLimiter rateLimiter;

        RateLimitingStreamWriter(Observer<WriteTables> observer, double d) {
            this.observer = observer;
            if (d > 0.0d) {
                this.rateLimiter = RateLimiter.create(d);
            } else {
                this.rateLimiter = null;
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // io.greptime.StreamWriter
        public StreamWriter<Table, WriteOk> write(Table table, WriteOp writeOp) {
            Ensures.ensureNonNull(table, "null `table`");
            WriteTables writeTables = new WriteTables(table, writeOp);
            if (this.rateLimiter != null) {
                InnerMetricHelper.writeStreamLimiterAcquireWaitTime().update((long) this.rateLimiter.acquire(table.pointCount()));
            }
            this.observer.onNext(writeTables);
            return this;
        }
    }

    public boolean init(WriteOptions writeOptions) {
        this.opts = (WriteOptions) Ensures.ensureNonNull(writeOptions, "null `WriteClient.opts`");
        this.routerClient = this.opts.getRouterClient();
        Executor asyncPool = this.opts.getAsyncPool();
        this.asyncPool = asyncPool != null ? asyncPool : new SerializingExecutor("write_client");
        this.asyncPool = new MetricExecutor(this.asyncPool, "async_write_pool.time");
        this.writeLimiter = new DefaultWriteLimiter(this.opts.getMaxInFlightWriteRows(), this.opts.getLimitedPolicy());
        return true;
    }

    public void shutdownGracefully() {
    }

    @Override // io.greptime.Write
    public CompletableFuture<Result<WriteOk, Err>> write(Collection<Table> collection, WriteOp writeOp, Context context) {
        Ensures.ensureNonNull(collection, "null `tables`");
        Ensures.ensure(!collection.isEmpty(), "empty `tables`");
        long tick = Clock.defaultClock().getTick();
        WriteTables writeTables = new WriteTables(collection, writeOp);
        return this.writeLimiter.acquireAndDo(collection, () -> {
            return write0(writeTables, context, 0).whenCompleteAsync((result, th) -> {
                InnerMetricHelper.writeQps().mark();
                if (result != null) {
                    if (Util.isRwLogging()) {
                        LOG.info("Write to {} with operation {}, duration={} ms, result={}.", new Object[]{"GreptimeDB", writeOp, Long.valueOf(Clock.defaultClock().duration(tick)), result});
                    }
                    if (result.isOk()) {
                        WriteOk writeOk = (WriteOk) result.getOk();
                        InnerMetricHelper.writeRowsSuccessNum(writeOp).update(writeOk.getSuccess());
                        InnerMetricHelper.writeRowsFailureNum(writeOp).update(writeOk.getFailure());
                        return;
                    }
                }
                InnerMetricHelper.writeFailureNum().mark();
            }, this.asyncPool);
        });
    }

    @Override // io.greptime.Write
    public StreamWriter<Table, WriteOk> streamWriter(int i, Context context) {
        int defaultStreamMaxWritePointsPerSecond = i > 0 ? i : this.opts.getDefaultStreamMaxWritePointsPerSecond();
        CompletableFuture completableFuture = new CompletableFuture();
        return (StreamWriter) this.routerClient.route().thenApply(endpoint -> {
            return streamWriteTo(endpoint, context, Util.toObserver(completableFuture));
        }).thenApply((Function<? super U, ? extends U>) observer -> {
            return new RateLimitingStreamWriter(observer, defaultStreamMaxWritePointsPerSecond) { // from class: io.greptime.WriteClient.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // io.greptime.WriteClient.RateLimitingStreamWriter, io.greptime.StreamWriter
                public StreamWriter<Table, WriteOk> write(Table table, WriteOp writeOp) {
                    if (completableFuture.isCompletedExceptionally()) {
                        completableFuture.getNow(null);
                    }
                    return super.write(table, writeOp);
                }

                @Override // io.greptime.StreamWriter
                public CompletableFuture<WriteOk> completed() {
                    observer.onCompleted();
                    return completableFuture;
                }
            };
        }).join();
    }

    private CompletableFuture<Result<WriteOk, Err>> write0(WriteTables writeTables, Context context, int i) {
        InnerMetricHelper.writeByRetries(i).mark();
        return this.routerClient.route().thenComposeAsync(endpoint -> {
            return writeTo(endpoint, writeTables, context, i);
        }, this.asyncPool).thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) result -> {
            if (result.isOk()) {
                LOG.debug("Success to write to {}, ok={}.", "GreptimeDB", result.getOk());
                return Util.completedCf(result);
            }
            Err err = (Err) result.getErr();
            LOG.warn("Failed to write to {}, retries={}, err={}.", new Object[]{"GreptimeDB", Integer.valueOf(i), err});
            if (i + 1 <= this.opts.getMaxRetries()) {
                return Util.shouldNotRetry(err) ? Util.completedCf(result) : write0(writeTables, context, i + 1);
            }
            LOG.error("Retried {} times still failed.", Integer.valueOf(i));
            return Util.completedCf(result);
        }, this.asyncPool);
    }

    private CompletableFuture<Result<WriteOk, Err>> writeTo(Endpoint endpoint, WriteTables writeTables, Context context, int i) {
        Database.GreptimeRequest greptimeRequest = TableHelper.toGreptimeRequest(writeTables, this.opts.getDatabase(), this.opts.getAuthInfo());
        context.with("retries", Integer.valueOf(i));
        return this.routerClient.invoke(endpoint, greptimeRequest, context).thenApplyAsync(greptimeResponse -> {
            Common.Status status = greptimeResponse.getHeader().getStatus();
            int statusCode = status.getStatusCode();
            return Status.isSuccess(statusCode) ? WriteOk.ok(greptimeResponse.getAffectedRows().getValue(), 0).mapToResult() : Err.writeErr(statusCode, new ServerException(status.getErrMsg()), endpoint).mapToResult();
        }, this.asyncPool);
    }

    private Observer<WriteTables> streamWriteTo(Endpoint endpoint, Context context, final Observer<WriteOk> observer) {
        final Observer invokeClientStreaming = this.routerClient.invokeClientStreaming(endpoint, Database.GreptimeRequest.getDefaultInstance(), context, new Observer<Database.GreptimeResponse>() { // from class: io.greptime.WriteClient.2
            public void onNext(Database.GreptimeResponse greptimeResponse) {
                Result<WriteOk, Err> mapToResult = WriteOk.ok(greptimeResponse.getAffectedRows().getValue(), 0).mapToResult();
                if (mapToResult.isOk()) {
                    observer.onNext(mapToResult.getOk());
                } else {
                    observer.onError(new StreamException(String.valueOf(mapToResult.getErr())));
                }
            }

            public void onError(Throwable th) {
                observer.onError(th);
            }

            public void onCompleted() {
                observer.onCompleted();
            }
        });
        return new Observer<WriteTables>() { // from class: io.greptime.WriteClient.3
            public void onNext(WriteTables writeTables) {
                invokeClientStreaming.onNext(TableHelper.toGreptimeRequest(writeTables, WriteClient.this.opts.getDatabase(), WriteClient.this.opts.getAuthInfo()));
            }

            public void onError(Throwable th) {
                invokeClientStreaming.onError(th);
            }

            public void onCompleted() {
                invokeClientStreaming.onCompleted();
            }
        };
    }

    public void display(Display.Printer printer) {
        printer.println("--- WriteClient ---").print("maxRetries=").println(Integer.valueOf(this.opts.getMaxRetries())).print("asyncPool=").println(this.asyncPool);
    }

    public String toString() {
        return "WriteClient{opts=" + this.opts + ", routerClient=" + this.routerClient + ", asyncPool=" + this.asyncPool + '}';
    }
}
