package org.darkphoenixs.kafka.pool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.darkphoenixs.kafka.core.KafkaConstants;
import org.darkphoenixs.kafka.core.KafkaMessageSender;
import org.darkphoenixs.kafka.core.KafkaMessageSenderImpl;
import org.darkphoenixs.kafka.core.ZookeeperBrokers;
import org.darkphoenixs.kafka.core.ZookeeperHosts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PropertiesLoaderUtils;

/* loaded from: input_file:org/darkphoenixs/kafka/pool/KafkaMessageSenderPool.class */
public class KafkaMessageSenderPool<K, V> {
    private static final String tagger = "KafkaMessageSenderPool";
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageSenderPool.class);
    private static final int defaultSize = (Runtime.getRuntime().availableProcessors() * 2) + 1;
    protected Semaphore freeSender;
    protected LinkedBlockingQueue<KafkaMessageSender<K, V>> queue;
    protected ExecutorService pool;
    private int poolSize;
    private Resource config;
    protected ReadWriteLock closingLock = new ReentrantReadWriteLock();
    protected Properties props = new Properties();
    private ThreadFactory threadFactory = new KafkaPoolThreadFactory(tagger, true);

    /* loaded from: input_file:org/darkphoenixs/kafka/pool/KafkaMessageSenderPool$DestroyTask.class */
    class DestroyTask implements Callable<Boolean> {
        CountDownLatch count;

        public DestroyTask(CountDownLatch countDownLatch) {
            this.count = countDownLatch;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Boolean call() throws Exception {
            KafkaMessageSender<K, V> poll = KafkaMessageSenderPool.this.queue.poll();
            if (poll != null) {
                poll.shutDown();
                this.count.countDown();
            }
            return true;
        }
    }

    /* loaded from: input_file:org/darkphoenixs/kafka/pool/KafkaMessageSenderPool$InitTask.class */
    class InitTask implements Callable<Boolean> {
        CountDownLatch count;
        KafkaMessageSenderPool<K, V> pool;

        public InitTask(CountDownLatch countDownLatch, KafkaMessageSenderPool<K, V> kafkaMessageSenderPool) {
            this.count = countDownLatch;
            this.pool = kafkaMessageSenderPool;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Boolean call() throws Exception {
            KafkaMessageSenderImpl kafkaMessageSenderImpl = new KafkaMessageSenderImpl(KafkaMessageSenderPool.this.props, this.pool);
            if (kafkaMessageSenderImpl != null) {
                KafkaMessageSenderPool.this.queue.offer(kafkaMessageSenderImpl);
                this.count.countDown();
            }
            return true;
        }
    }

    public ThreadFactory getThreadFactory() {
        return this.threadFactory;
    }

    public void setThreadFactory(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }

    public void setPoolSize(int i) {
        if (i < defaultSize) {
            i = defaultSize;
        }
        this.poolSize = i;
        this.freeSender = new Semaphore(i);
        this.queue = new LinkedBlockingQueue<>(i);
        this.pool = Executors.newFixedThreadPool(i, this.threadFactory);
    }

    public void setZkhosts(ZookeeperHosts zookeeperHosts) {
        ZookeeperBrokers zookeeperBrokers = new ZookeeperBrokers(zookeeperHosts.getBrokerZkStr(), zookeeperHosts.getBrokerZkPath(), zookeeperHosts.getTopic());
        setBrokerStr(zookeeperBrokers.getBrokerInfo());
        zookeeperBrokers.close();
    }

    public void setConfig(Resource resource) {
        this.config = resource;
        try {
            PropertiesLoaderUtils.fillProperties(this.props, this.config);
        } catch (IOException e) {
            logger.error(e.getMessage());
        }
    }

    public void setClientId(String str) {
        this.props.setProperty(KafkaConstants.CLIENT_ID, str);
    }

    public void setBrokerStr(String str) {
        this.props.setProperty(KafkaConstants.BROKER_LIST, str);
    }

    public String getClientId() {
        return this.props.getProperty(KafkaConstants.CLIENT_ID);
    }

    public String getBrokerStr() {
        return this.props.getProperty(KafkaConstants.BROKER_LIST);
    }

    public int getPoolSize() {
        return this.poolSize;
    }

    public Resource getConfig() {
        return this.config;
    }

    public Properties getProps() {
        return this.props;
    }

    public void setProps(Properties properties) {
        this.props = properties;
    }

    public synchronized void init() {
        if (this.poolSize == 0) {
            setPoolSize(defaultSize);
        }
        logger.info("Message sender pool initializing. poolSize : " + this.poolSize + " config : " + this.props.toString());
        ArrayList arrayList = new ArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(this.poolSize);
        for (int i = 0; i < this.poolSize; i++) {
            arrayList.add(new InitTask(countDownLatch, this));
        }
        try {
            this.pool.invokeAll(arrayList);
            countDownLatch.await(2L, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            logger.error("Failed to init the MessageSenderPool", e);
        }
    }

    public KafkaMessageSender<K, V> getSender(long j) {
        try {
            if (!this.freeSender.tryAcquire(j, TimeUnit.MILLISECONDS)) {
                throw new RuntimeException("Timeout waiting for idle object in the pool.");
            }
            KafkaMessageSender<K, V> kafkaMessageSender = null;
            this.closingLock.readLock().lock();
            try {
                try {
                    kafkaMessageSender = this.queue.poll();
                    if (kafkaMessageSender == null) {
                        kafkaMessageSender = new KafkaMessageSenderImpl(this.props, this);
                        if (kafkaMessageSender != null) {
                            logger.info("Add new sender to the pool.");
                            this.queue.offer(kafkaMessageSender);
                        }
                    }
                } catch (Exception e) {
                    logger.error("Failed to get the MessageSender", e);
                    this.closingLock.readLock().unlock();
                }
                return kafkaMessageSender;
            } finally {
                this.closingLock.readLock().unlock();
            }
        } catch (InterruptedException e2) {
            throw new RuntimeException("Interrupted waiting for idle object in the pool .");
        }
    }

    public void returnSender(KafkaMessageSender<K, V> kafkaMessageSender) {
        if (this.queue.contains(kafkaMessageSender)) {
            return;
        }
        this.queue.offer(kafkaMessageSender);
        this.freeSender.release();
    }

    public synchronized void destroy() {
        logger.info("Message sender pool closing.");
        this.closingLock.writeLock().lock();
        try {
            try {
                ArrayList arrayList = new ArrayList();
                int size = this.queue.size();
                CountDownLatch countDownLatch = new CountDownLatch(size);
                for (int i = 0; i < size; i++) {
                    arrayList.add(new DestroyTask(countDownLatch));
                }
                this.pool.invokeAll(arrayList);
                countDownLatch.await(2L, TimeUnit.MINUTES);
                this.pool.shutdownNow();
                this.closingLock.writeLock().unlock();
            } catch (Exception e) {
                logger.error("Failed to close the MessageSenderPool", e);
                this.closingLock.writeLock().unlock();
            }
        } catch (Throwable th) {
            this.closingLock.writeLock().unlock();
            throw th;
        }
    }
}
