package io.datarouter.client.hbase.pool;

import io.datarouter.client.hbase.client.HBaseOptions;
import io.datarouter.client.hbase.config.DatarouterHBaseSettingRoot;
import io.datarouter.storage.client.ClientId;
import io.datarouter.storage.client.ClientType;
import io.datarouter.storage.util.DatarouterCounters;
import io.datarouter.util.concurrent.ExecutorServiceTool;
import io.datarouter.util.concurrent.SemaphoreTool;
import io.datarouter.util.concurrent.ThreadTool;
import io.datarouter.util.mutable.MutableString;
import io.datarouter.util.number.NumberFormatter;
import java.io.IOException;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/client/hbase/pool/HBaseTablePool.class */
public class HBaseTablePool {
    private static final Logger logger = LoggerFactory.getLogger(HBaseTablePool.class);
    private static final int DEFAULT_MIN_THREADS_PER_HTABLE = 1;
    private static final int DEFAULT_MAX_THREADS_PER_HTABLE = 1024;
    private static final boolean LOG_ACTIONS = true;
    private static final long LOG_SEMAPHORE_ACQUISITIONS_OVER_MS = 2000;
    private static final long THROTTLE_INCONSISTENT_LOG_EVERY_X_MS = 500;
    private final Connection connection;
    private final ClientId clientId;
    private final ClientType<?, ?> clientType;
    private final int maxHTables;
    private final int minThreadsPerHTable;
    private final int maxThreadsPerHTable;
    private final Semaphore htableSemaphore;
    private final BlockingQueue<HBaseTableExecutorService> executorServiceQueue;
    private volatile boolean shuttingDown;
    private volatile long lastLoggedWarning = 0;
    private final Map<Table, HBaseTableExecutorService> activeHTables = new ConcurrentHashMap();

    public HBaseTablePool(HBaseOptions hBaseOptions, DatarouterHBaseSettingRoot datarouterHBaseSettingRoot, Connection connection, ClientId clientId, ClientType<?, ?> clientType) {
        this.connection = connection;
        this.clientId = clientId;
        this.clientType = clientType;
        this.maxHTables = hBaseOptions.maxHTables(clientId.getName(), datarouterHBaseSettingRoot.executorThreadCount.intValue()).intValue();
        this.minThreadsPerHTable = hBaseOptions.minThreadsPerHTable(clientId.getName(), 1).intValue();
        this.maxThreadsPerHTable = hBaseOptions.maxThreadsPerHTable(clientId.getName(), DEFAULT_MAX_THREADS_PER_HTABLE).intValue();
        this.htableSemaphore = new Semaphore(this.maxHTables);
        this.executorServiceQueue = new LinkedBlockingQueue(this.maxHTables);
    }

    public Table checkOut(String str, MutableString mutableString) {
        HBaseTableExecutorService poll;
        if (this.shuttingDown) {
            return null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        checkConsistencyAndAcquireSempahore(str);
        setProgress(mutableString, "passed semaphore");
        Table table = null;
        try {
            DatarouterCounters.incClientTable(this.clientType, "connection getHTable", this.clientId.getName(), str, 1L);
            while (true) {
                poll = this.executorServiceQueue.poll();
                setProgress(mutableString, "polled queue " + (poll == null ? "null" : "success"));
                if (poll == null) {
                    poll = new HBaseTableExecutorService(this.minThreadsPerHTable, this.maxThreadsPerHTable);
                    setProgress(mutableString, "new HTableExecutorService()");
                    DatarouterCounters.incClientTable(this.clientType, "connection create HTable", this.clientId.getName(), str, 1L);
                    logWithPoolInfo("created new HTableExecutorService", str);
                    break;
                }
                if (!poll.isExpired()) {
                    DatarouterCounters.incClientTable(this.clientType, "got pooled HTable executor", this.clientId.getName(), str, 1L);
                    break;
                }
                ExecutorServiceTool.shutdown(poll.getExec(), Duration.ofDays(1L));
                logWithPoolInfo("discarded expired HTableExecutorService", str);
            }
            table = this.connection.getTable(TableName.valueOf(str), poll.getExec());
            setProgress(mutableString, "created HTable");
            this.activeHTables.put(table, poll);
            setProgress(mutableString, "added to activeHTables");
            recordSlowCheckout(System.currentTimeMillis() - currentTimeMillis, str);
            logIfInconsistentCounts(true, str);
            return table;
        } catch (Exception e) {
            if (table != null) {
                this.activeHTables.remove(table);
                setProgress(mutableString, "removed from activeHTables");
            }
            this.htableSemaphore.release();
            setProgress(mutableString, "released sempahore");
            throw new RuntimeException(e);
        }
    }

    public void checkIn(Table table, boolean z) {
        String nameAsString = table.getName().getNameAsString();
        try {
            HBaseTableExecutorService remove = this.activeHTables.remove(table);
            if (remove == null) {
                logWithPoolInfo("HTable returned to pool but HTableExecutorService not found", nameAsString);
                DatarouterCounters.incClientTable(this.clientType, "HTable returned to pool but HTableExecutorService not found", this.clientId.getName(), nameAsString, 1L);
                return;
            }
            try {
                remove.markLastCheckinMs();
                remove.purge();
                if (z) {
                    logWithPoolInfo("ThreadPoolExecutor possibly tarnished, discarding", nameAsString);
                    DatarouterCounters.incClientTable(this.clientType, "HTable executor possiblyTarnished", this.clientId.getName(), nameAsString, 1L);
                    remove.terminateAndBlockUntilFinished();
                } else if (remove.isDyingOrDead(nameAsString)) {
                    logWithPoolInfo("ThreadPoolExecutor not reusable, discarding", nameAsString);
                    DatarouterCounters.incClientTable(this.clientType, "HTable executor isDyingOrDead", this.clientId.getName(), nameAsString, 1L);
                    remove.terminateAndBlockUntilFinished();
                } else if (!remove.isTaskQueueEmpty()) {
                    logWithPoolInfo("ThreadPoolExecutor taskQueue not empty, discarding", nameAsString);
                    DatarouterCounters.incClientTable(this.clientType, "HTable executor taskQueue not empty", this.clientId.getName(), nameAsString, 1L);
                    remove.terminateAndBlockUntilFinished();
                } else if (!remove.waitForActiveThreadsToSettle(nameAsString)) {
                    logWithPoolInfo("active thread count would not settle to 0", nameAsString);
                    DatarouterCounters.incClientTable(this.clientType, "HTable executor pool active threads won't quit", this.clientId.getName(), nameAsString, 1L);
                    remove.terminateAndBlockUntilFinished();
                } else if (this.executorServiceQueue.offer(remove)) {
                    DatarouterCounters.incClientTable(this.clientType, "connection HTable returned to pool", this.clientId.getName(), nameAsString, 1L);
                } else {
                    logWithPoolInfo("checkIn HTable but queue already full, so close and discard", nameAsString);
                    DatarouterCounters.incClientTable(this.clientType, "HTable executor pool overflow", this.clientId.getName(), nameAsString, 1L);
                    remove.terminateAndBlockUntilFinished();
                }
            } finally {
                releaseSempahoreAndCheckConsistency(nameAsString);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public Integer getTotalPoolSize() {
        return Integer.valueOf(this.executorServiceQueue.size());
    }

    public void shutdown() {
        this.shuttingDown = true;
        if (htableSemaphoreActivePermits() != 0) {
            logger.warn("Still {} active tables.  Sleeping {}ms", Integer.valueOf(htableSemaphoreActivePermits()), 5000);
            ThreadTool.sleepUnchecked(5000L);
        }
        Iterator it = this.executorServiceQueue.iterator();
        while (it.hasNext()) {
            ((HBaseTableExecutorService) it.next()).terminateAndBlockUntilFinished();
        }
        try {
            this.connection.close();
        } catch (IOException e) {
            logger.warn("", e);
        }
    }

    private void checkConsistencyAndAcquireSempahore(String str) {
        logIfInconsistentCounts(true, str);
        long currentTimeMillis = System.currentTimeMillis();
        SemaphoreTool.acquire(this.htableSemaphore);
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        if (currentTimeMillis2 > LOG_SEMAPHORE_ACQUISITIONS_OVER_MS) {
            logger.warn("acquiring semaphore took " + NumberFormatter.addCommas(Long.valueOf(currentTimeMillis2)) + "ms");
        }
    }

    private synchronized void releaseSempahoreAndCheckConsistency(String str) {
        this.htableSemaphore.release();
        logIfInconsistentCounts(false, str);
    }

    private int htableSemaphoreActivePermits() {
        return this.maxHTables - this.htableSemaphore.availablePermits();
    }

    private void setProgress(MutableString mutableString, String str) {
        if (mutableString == null) {
            return;
        }
        mutableString.set(str);
    }

    private void recordSlowCheckout(long j, String str) {
        if (j > 1) {
            DatarouterCounters.incClientTable(this.clientType, "connection open > 1ms", this.clientId.getName(), str, 1L);
        }
    }

    private boolean areCountsConsistent() {
        int size;
        int htableSemaphoreActivePermits = htableSemaphoreActivePermits();
        return htableSemaphoreActivePermits <= this.maxHTables && (size = this.activeHTables.size()) <= this.maxHTables && size <= htableSemaphoreActivePermits;
    }

    public void forceLogIfInconsistentCounts(boolean z, String str) {
        innerLogIfInconsistentCounts(z, str);
    }

    private void logIfInconsistentCounts(boolean z, String str) {
        innerLogIfInconsistentCounts(z, str);
    }

    private void innerLogIfInconsistentCounts(boolean z, String str) {
        if (!areCountsConsistent()) {
            if (System.currentTimeMillis() - this.lastLoggedWarning < THROTTLE_INCONSISTENT_LOG_EVERY_X_MS) {
                return;
            } else {
                logWithPoolInfo("inconsistent pool counts on " + (z ? "checkOut" : "checkIn"), str);
            }
        }
        this.lastLoggedWarning = System.currentTimeMillis();
    }

    private void logWithPoolInfo(String str, String str2) {
        innerLogWithPoolInfo(str, str2);
    }

    private void innerLogWithPoolInfo(String str, String str2) {
        logger.info(String.valueOf(getPoolInfoLogMessage(str2)) + ", " + str);
    }

    private String getPoolInfoLogMessage(String str) {
        return "max=" + this.maxHTables + ", blocked=" + this.htableSemaphore.getQueueLength() + ", idle=" + this.executorServiceQueue.size() + ", permits=" + htableSemaphoreActivePermits() + ", HTables=" + this.activeHTables.size() + ", client=" + this.clientId.getName() + ", table=" + str;
    }
}
