package org.darkphoenixs.kafka.pool;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;
import kafka.serializer.DefaultDecoder;
import kafka.utils.VerifiableProperties;
import org.darkphoenixs.kafka.core.KafkaConstants;
import org.darkphoenixs.kafka.core.KafkaMessageAdapter;
import org.darkphoenixs.kafka.core.KafkaMessageReceiver;
import org.darkphoenixs.kafka.core.KafkaMessageReceiverImpl;
import org.darkphoenixs.mq.exception.MQException;
import org.darkphoenixs.mq.util.RefleTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PropertiesLoaderUtils;

/* loaded from: input_file:org/darkphoenixs/kafka/pool/KafkaMessageReceiverPool.class */
public class KafkaMessageReceiverPool<K, V> implements MessageReceiverPool<K, V> {
    private static final String tagger = "KafkaMessageReceiverPool";
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageReceiverPool.class);
    protected ConsumerConnector consumer;
    protected ExecutorService pool;
    private KafkaMessageAdapter<?, ?> messageAdapter;
    private int poolSize;
    private Resource config;
    private ThreadFactory threadFactory;
    protected Properties props = new Properties();
    private Boolean autoCommit = true;
    private Class<?> keyDecoderClass = DefaultDecoder.class;
    private Class<?> valDecoderClass = DefaultDecoder.class;

    /* loaded from: input_file:org/darkphoenixs/kafka/pool/KafkaMessageReceiverPool$ReceiverThread.class */
    class ReceiverThread implements Runnable {
        private KafkaStream<K, V> stream;
        private KafkaMessageAdapter<?, ?> adapter;

        public ReceiverThread(KafkaStream<K, V> kafkaStream, KafkaMessageAdapter<?, ?> kafkaMessageAdapter) {
            this.stream = kafkaStream;
            this.adapter = kafkaMessageAdapter;
        }

        @Override // java.lang.Runnable
        public void run() {
            KafkaMessageReceiverPool.logger.info(Thread.currentThread().getName() + " clientId: " + this.stream.clientId() + " start.");
            ConsumerIterator it = this.stream.iterator();
            while (it.hasNext()) {
                MessageAndMetadata<?, ?> next = it.next();
                try {
                    this.adapter.messageAdapter(next);
                } catch (MQException e) {
                    KafkaMessageReceiverPool.logger.error(Thread.currentThread().getName() + " productArity: " + next.productArity() + " productPrefix: " + next.productPrefix() + " topic: " + next.topic() + " offset: " + next.offset() + " partition: " + next.partition() + " Exception: " + e.getMessage());
                }
                if (!KafkaMessageReceiverPool.this.getAutoCommit().booleanValue()) {
                    KafkaMessageReceiverPool.this.consumer.commitOffsets();
                }
            }
            KafkaMessageReceiverPool.logger.info(Thread.currentThread().getName() + " clientId: " + this.stream.clientId() + " end.");
        }
    }

    public ThreadFactory getThreadFactory() {
        return this.threadFactory;
    }

    public void setThreadFactory(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }

    public String getClientId() {
        return this.props.getProperty(KafkaConstants.CLIENT_ID);
    }

    public void setClientId(String str) {
        this.props.setProperty(KafkaConstants.CLIENT_ID, str);
    }

    public String getZookeeperStr() {
        return this.props.getProperty(KafkaConstants.ZOOKEEPER_LIST);
    }

    public void setZookeeperStr(String str) {
        this.props.setProperty(KafkaConstants.ZOOKEEPER_LIST, str);
    }

    public Boolean getAutoCommit() {
        return this.autoCommit;
    }

    public void setAutoCommit(boolean z) {
        this.autoCommit = Boolean.valueOf(z);
        this.props.setProperty(KafkaConstants.AUTO_COMMIT_ENABLE, String.valueOf(z));
    }

    public Properties getProps() {
        return this.props;
    }

    public void setProps(Properties properties) {
        this.props = properties;
    }

    public int getPoolSize() {
        return this.poolSize;
    }

    public void setPoolSize(int i) {
        this.poolSize = i;
    }

    public Resource getConfig() {
        return this.config;
    }

    public void setConfig(Resource resource) {
        this.config = resource;
        try {
            PropertiesLoaderUtils.fillProperties(this.props, this.config);
        } catch (IOException e) {
            logger.error(e.getMessage());
        }
    }

    public Class<?> getKeyDecoderClass() {
        return this.keyDecoderClass;
    }

    public void setKeyDecoderClass(Class<?> cls) {
        this.keyDecoderClass = cls;
    }

    public Class<?> getValDecoderClass() {
        return this.valDecoderClass;
    }

    public void setValDecoderClass(Class<?> cls) {
        this.valDecoderClass = cls;
    }

    public KafkaMessageAdapter<?, ?> getMessageAdapter() {
        return this.messageAdapter;
    }

    public void setMessageAdapter(KafkaMessageAdapter<?, ?> kafkaMessageAdapter) {
        this.messageAdapter = kafkaMessageAdapter;
    }

    @Override // org.darkphoenixs.kafka.pool.MessageReceiverPool
    public KafkaMessageReceiver<K, V> getReceiver() {
        return new KafkaMessageReceiverImpl(this.props, this);
    }

    @Override // org.darkphoenixs.kafka.pool.MessageReceiverPool
    public void returnReceiver(KafkaMessageReceiver<K, V> kafkaMessageReceiver) {
        if (kafkaMessageReceiver != null) {
            kafkaMessageReceiver.shutDown();
        }
    }

    @Override // org.darkphoenixs.kafka.pool.MessageReceiverPool
    public synchronized void init() {
        String destinationName = this.messageAdapter.getDestination().getDestinationName();
        int partitionCount = getReceiver().getPartitionCount(destinationName);
        if (this.poolSize == 0 || this.poolSize > partitionCount) {
            setPoolSize(partitionCount);
        }
        this.threadFactory = new KafkaPoolThreadFactory("KafkaMessageReceiverPool-" + destinationName);
        this.pool = Executors.newFixedThreadPool(this.poolSize, this.threadFactory);
        logger.info("Message receiver pool initializing. poolSize : " + this.poolSize + " config : " + this.props.toString());
        this.consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(this.props));
        HashMap hashMap = new HashMap();
        hashMap.put(destinationName, new Integer(this.poolSize));
        VerifiableProperties verifiableProperties = new VerifiableProperties(this.props);
        Iterator it = ((List) this.consumer.createMessageStreams(hashMap, (Decoder) RefleTool.newInstance(this.keyDecoderClass, verifiableProperties), (Decoder) RefleTool.newInstance(this.valDecoderClass, verifiableProperties)).get(destinationName)).iterator();
        while (it.hasNext()) {
            this.pool.submit(new ReceiverThread((KafkaStream) it.next(), this.messageAdapter));
        }
    }

    @Override // org.darkphoenixs.kafka.pool.MessageReceiverPool
    public synchronized void destroy() {
        logger.info("Message receiver pool closing.");
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
        if (this.pool != null) {
            this.pool.shutdown();
            try {
                if (!this.pool.awaitTermination(5000L, TimeUnit.MILLISECONDS)) {
                    logger.warn("Timed out waiting for consumer threads to shut down, exiting uncleanly");
                }
            } catch (InterruptedException e) {
                logger.error("Interrupted during shutdown, exiting uncleanly");
            }
        }
    }
}
