package org.darkphoenixs.kafka.pool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.darkphoenixs.kafka.core.KafkaConstants;
import org.darkphoenixs.kafka.core.KafkaMessageAdapter;
import org.darkphoenixs.kafka.core.KafkaMessageNewReceiver;
import org.darkphoenixs.kafka.core.KafkaMessageReceiver;
import org.darkphoenixs.mq.exception.MQException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PropertiesLoaderUtils;

/* loaded from: input_file:org/darkphoenixs/kafka/pool/KafkaMessageNewReceiverPool.class */
public class KafkaMessageNewReceiverPool<K, V> implements MessageReceiverPool<K, V> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageNewReceiverPool.class);
    protected ExecutorService receivPool;
    protected ExecutorService handlePool;
    protected List<KafkaMessageNewReceiverPool<K, V>.ReceiverThread> threads = new ArrayList();
    private MODEL model = MODEL.MODEL_1;
    private Properties props = new Properties();
    private Resource config;
    private int poolSize;
    private KafkaMessageAdapter<?, ?> messageAdapter;

    /* renamed from: org.darkphoenixs.kafka.pool.KafkaMessageNewReceiverPool$1, reason: invalid class name */
    /* loaded from: input_file:org/darkphoenixs/kafka/pool/KafkaMessageNewReceiverPool$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$darkphoenixs$kafka$pool$KafkaMessageNewReceiverPool$MODEL = new int[MODEL.values().length];

        static {
            try {
                $SwitchMap$org$darkphoenixs$kafka$pool$KafkaMessageNewReceiverPool$MODEL[MODEL.MODEL_1.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$darkphoenixs$kafka$pool$KafkaMessageNewReceiverPool$MODEL[MODEL.MODEL_2.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:org/darkphoenixs/kafka/pool/KafkaMessageNewReceiverPool$HandlerThread.class */
    class HandlerThread implements Runnable {
        public static final String tagger = "HandlerThread";
        private final KafkaMessageAdapter<?, ?> adapter;
        private final ConsumerRecords<K, V> records;

        public HandlerThread(KafkaMessageAdapter<?, ?> kafkaMessageAdapter, ConsumerRecords<K, V> consumerRecords) {
            this.adapter = kafkaMessageAdapter;
            this.records = consumerRecords;
        }

        @Override // java.lang.Runnable
        public void run() {
            KafkaMessageNewReceiverPool.logger.info(Thread.currentThread().getName() + " start.");
            Iterator it = this.records.iterator();
            while (it.hasNext()) {
                ConsumerRecord<?, ?> consumerRecord = (ConsumerRecord) it.next();
                try {
                    this.adapter.messageAdapter(consumerRecord);
                } catch (MQException e) {
                    KafkaMessageNewReceiverPool.logger.error(Thread.currentThread().getName() + " topic: " + consumerRecord.topic() + " offset: " + consumerRecord.offset() + " partition: " + consumerRecord.partition() + " Exception: " + e.getMessage());
                }
            }
            KafkaMessageNewReceiverPool.logger.info(Thread.currentThread().getName() + " end.");
        }
    }

    /* loaded from: input_file:org/darkphoenixs/kafka/pool/KafkaMessageNewReceiverPool$MODEL.class */
    public enum MODEL {
        MODEL_1,
        MODEL_2
    }

    /* loaded from: input_file:org/darkphoenixs/kafka/pool/KafkaMessageNewReceiverPool$ReceiverThread.class */
    class ReceiverThread implements Runnable {
        public static final String tagger = "ReceiverThread";
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer<K, V> consumer;
        private final KafkaMessageAdapter<?, ?> adapter;
        private final String topic;

        public ReceiverThread(Properties properties, String str, KafkaMessageAdapter<?, ?> kafkaMessageAdapter) {
            this.topic = str;
            this.adapter = kafkaMessageAdapter;
            this.consumer = new KafkaConsumer<>(properties);
        }

        @Override // java.lang.Runnable
        public void run() {
            KafkaMessageNewReceiverPool.logger.info(Thread.currentThread().getName() + " start.");
            try {
                try {
                    this.consumer.subscribe(Arrays.asList(this.topic));
                    while (!this.closed.get()) {
                        ConsumerRecords poll = this.consumer.poll(KafkaConstants.MAX_POLL_TIMEOUT);
                        switch (AnonymousClass1.$SwitchMap$org$darkphoenixs$kafka$pool$KafkaMessageNewReceiverPool$MODEL[KafkaMessageNewReceiverPool.this.model.ordinal()]) {
                            case 1:
                                Iterator it = poll.iterator();
                                while (it.hasNext()) {
                                    ConsumerRecord<?, ?> consumerRecord = (ConsumerRecord) it.next();
                                    try {
                                        this.adapter.messageAdapter(consumerRecord);
                                    } catch (MQException e) {
                                        KafkaMessageNewReceiverPool.logger.error(Thread.currentThread().getName() + " topic: " + consumerRecord.topic() + " offset: " + consumerRecord.offset() + " partition: " + consumerRecord.partition() + " Exception: " + e.getMessage());
                                    }
                                }
                                break;
                            case KafkaConstants.INIT_TIMEOUT_MIN /* 2 */:
                                KafkaMessageNewReceiverPool.this.handlePool.execute(new HandlerThread(this.adapter, poll));
                                break;
                        }
                    }
                    this.consumer.close();
                } catch (WakeupException e2) {
                    if (!this.closed.get()) {
                        throw e2;
                    }
                    this.consumer.close();
                }
                KafkaMessageNewReceiverPool.logger.info(Thread.currentThread().getName() + " end.");
            } catch (Throwable th) {
                this.consumer.close();
                throw th;
            }
        }

        public void shutdown() {
            this.closed.set(true);
            this.consumer.wakeup();
        }
    }

    public Properties getProps() {
        return this.props;
    }

    public void setProps(Properties properties) {
        this.props = properties;
    }

    public int getPoolSize() {
        return this.poolSize;
    }

    public void setPoolSize(int i) {
        this.poolSize = i;
    }

    public Resource getConfig() {
        return this.config;
    }

    public void setConfig(Resource resource) {
        this.config = resource;
        try {
            PropertiesLoaderUtils.fillProperties(this.props, this.config);
        } catch (IOException e) {
            logger.error(e.getMessage());
        }
    }

    public String getModel() {
        return this.model.name();
    }

    public void setModel(String str) {
        this.model = MODEL.valueOf(str);
    }

    public KafkaMessageAdapter<?, ?> getMessageAdapter() {
        return this.messageAdapter;
    }

    public void setMessageAdapter(KafkaMessageAdapter<?, ?> kafkaMessageAdapter) {
        this.messageAdapter = kafkaMessageAdapter;
    }

    public String getClientId() {
        return this.props.getProperty(KafkaConstants.CLIENT_ID, "client_new_consumer");
    }

    public String getGroupId() {
        return this.props.getProperty(KafkaConstants.GROUP_ID, "group_new_consumer");
    }

    @Override // org.darkphoenixs.kafka.pool.MessageReceiverPool
    public KafkaMessageReceiver<K, V> getReceiver() {
        Properties properties = (Properties) this.props.clone();
        properties.setProperty(KafkaConstants.GROUP_ID, "group_new_consumer");
        properties.setProperty(KafkaConstants.CLIENT_ID, "client_new_consumer");
        return new KafkaMessageNewReceiver(properties);
    }

    @Override // org.darkphoenixs.kafka.pool.MessageReceiverPool
    public void returnReceiver(KafkaMessageReceiver<K, V> kafkaMessageReceiver) {
        if (kafkaMessageReceiver != null) {
            kafkaMessageReceiver.shutDown();
        }
    }

    @Override // org.darkphoenixs.kafka.pool.MessageReceiverPool
    public synchronized void init() {
        String destinationName = this.messageAdapter.getDestination().getDestinationName();
        KafkaMessageReceiver<K, V> receiver = getReceiver();
        int partitionCount = receiver.getPartitionCount(destinationName);
        returnReceiver(receiver);
        switch (AnonymousClass1.$SwitchMap$org$darkphoenixs$kafka$pool$KafkaMessageNewReceiverPool$MODEL[this.model.ordinal()]) {
            case 1:
                if (this.poolSize == 0 || this.poolSize > partitionCount) {
                    setPoolSize(partitionCount);
                }
                this.receivPool = Executors.newFixedThreadPool(partitionCount, new KafkaPoolThreadFactory("ReceiverThread-" + destinationName));
                logger.info("Message Receiver Pool initializing. poolSize : " + partitionCount);
                break;
            case KafkaConstants.INIT_TIMEOUT_MIN /* 2 */:
                this.receivPool = Executors.newFixedThreadPool(partitionCount, new KafkaPoolThreadFactory("ReceiverThread-" + destinationName));
                this.handlePool = Executors.newFixedThreadPool(this.poolSize, new KafkaPoolThreadFactory("HandlerThread-" + destinationName));
                logger.info("Message Receiver Pool initializing poolSize : " + partitionCount);
                logger.info("Message Handler Pool initializing poolSize : " + this.poolSize);
                break;
        }
        for (int i = 0; i < partitionCount; i++) {
            Properties properties = (Properties) this.props.clone();
            properties.setProperty(KafkaConstants.CLIENT_ID, getClientId() + "-" + i);
            KafkaMessageNewReceiverPool<K, V>.ReceiverThread receiverThread = new ReceiverThread(properties, destinationName, this.messageAdapter);
            this.threads.add(receiverThread);
            this.receivPool.submit(receiverThread);
        }
    }

    @Override // org.darkphoenixs.kafka.pool.MessageReceiverPool
    public synchronized void destroy() {
        if (this.handlePool != null) {
            this.handlePool.shutdown();
        }
        Iterator<KafkaMessageNewReceiverPool<K, V>.ReceiverThread> it = this.threads.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        if (this.receivPool != null) {
            this.receivPool.shutdown();
        }
    }
}
