package org.darkphoenixs.kafka.pool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.message.MessageAndMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.darkphoenixs.kafka.core.KafkaMessageAdapter;
import org.darkphoenixs.mq.exception.MQException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/darkphoenixs/kafka/pool/KafkaMessageReceiverRetry.class */
public class KafkaMessageReceiverRetry<T> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageReceiverRetry.class);
    private final int retryCount;
    protected final ExecutorService errorMessagePool;
    private final int errorPoolSize = 1;
    protected final BlockingQueue<T> errorMessageQueue = new LinkedBlockingQueue();
    protected final List<KafkaMessageReceiverRetry<T>.RetryThread> errorRetryThreads = new ArrayList(1);
    protected final ConcurrentMap<String, AtomicInteger> errorMessageCount = new ConcurrentHashMap();

    /* loaded from: input_file:org/darkphoenixs/kafka/pool/KafkaMessageReceiverRetry$RetryThread.class */
    class RetryThread implements Runnable {
        public static final String tagger = "RetryThread";
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaMessageAdapter<?, ?> adapter;

        public RetryThread(KafkaMessageAdapter<?, ?> kafkaMessageAdapter) {
            this.adapter = kafkaMessageAdapter;
        }

        @Override // java.lang.Runnable
        public void run() {
            KafkaMessageReceiverRetry.logger.info(Thread.currentThread().getName() + " start.");
            while (!this.closed.get()) {
                T t = null;
                try {
                    t = KafkaMessageReceiverRetry.this.errorMessageQueue.take();
                } catch (InterruptedException e) {
                    KafkaMessageReceiverRetry.logger.error("BlockingQueue take failed.", e);
                }
                int receiveMessageCount = KafkaMessageReceiverRetry.this.receiveMessageCount(t);
                try {
                    KafkaMessageReceiverRetry.logger.warn("Retry receive message. Number of retries: " + receiveMessageCount);
                    receiveMessageAdapter(t);
                    KafkaMessageReceiverRetry.this.receiveMessageClean(t);
                } catch (MQException e2) {
                    KafkaMessageReceiverRetry.this.receiveMessageRetry(t);
                    receiveMessageError(t, receiveMessageCount, e2);
                }
            }
            KafkaMessageReceiverRetry.logger.info(Thread.currentThread().getName() + " end.");
        }

        public void receiveMessageAdapter(T t) throws MQException {
            if (t instanceof ConsumerRecord) {
                this.adapter.messageAdapter((ConsumerRecord<?, ?>) t);
            } else if (t instanceof MessageAndMetadata) {
                this.adapter.messageAdapter((MessageAndMetadata<?, ?>) t);
            }
        }

        public void receiveMessageError(T t, int i, MQException mQException) {
            if (t instanceof ConsumerRecord) {
                ConsumerRecord consumerRecord = (ConsumerRecord) t;
                KafkaMessageReceiverRetry.logger.error("Receive message failed. retries: " + i + " topic: " + consumerRecord.topic() + " offset: " + consumerRecord.offset() + " partition: " + consumerRecord.partition(), mQException);
            } else if (t instanceof MessageAndMetadata) {
                MessageAndMetadata messageAndMetadata = (MessageAndMetadata) t;
                KafkaMessageReceiverRetry.logger.error("Receive message failed. retries: " + i + " topic: " + messageAndMetadata.topic() + " offset: " + messageAndMetadata.offset() + " partition: " + messageAndMetadata.partition(), mQException);
            }
        }

        public void shutdown() {
            this.closed.set(true);
        }
    }

    public KafkaMessageReceiverRetry(String str, int i, KafkaMessageAdapter<?, ?> kafkaMessageAdapter) {
        this.retryCount = i;
        this.errorMessagePool = Executors.newFixedThreadPool(1, new KafkaPoolThreadFactory("RetryThread-" + str));
        KafkaMessageReceiverRetry<T>.RetryThread retryThread = new RetryThread(kafkaMessageAdapter);
        this.errorRetryThreads.add(retryThread);
        this.errorMessagePool.submit(retryThread);
    }

    public void receiveMessageRetry(T t) {
        try {
            if (t instanceof MessageAndMetadata) {
                MessageAndMetadata messageAndMetadata = (MessageAndMetadata) t;
                if (0 < errorMessageCount(messageAndMetadata.topic(), messageAndMetadata.partition(), messageAndMetadata.offset())) {
                    this.errorMessageQueue.put(t);
                }
            } else if (t instanceof ConsumerRecord) {
                ConsumerRecord consumerRecord = (ConsumerRecord) t;
                if (0 < errorMessageCount(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset())) {
                    this.errorMessageQueue.put(t);
                }
            }
        } catch (InterruptedException e) {
            logger.error("BlockingQueue put failed.", e);
        }
    }

    public void receiveMessageClean(T t) {
        if (t instanceof MessageAndMetadata) {
            MessageAndMetadata messageAndMetadata = (MessageAndMetadata) t;
            this.errorMessageCount.remove(errorMessageKey(messageAndMetadata.topic(), messageAndMetadata.partition(), messageAndMetadata.offset()));
        } else if (t instanceof ConsumerRecord) {
            ConsumerRecord consumerRecord = (ConsumerRecord) t;
            this.errorMessageCount.remove(errorMessageKey(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset()));
        }
    }

    public int receiveMessageCount(T t) {
        if (t instanceof MessageAndMetadata) {
            MessageAndMetadata messageAndMetadata = (MessageAndMetadata) t;
            return this.errorMessageCount.get(errorMessageKey(messageAndMetadata.topic(), messageAndMetadata.partition(), messageAndMetadata.offset())).get();
        }
        if (!(t instanceof ConsumerRecord)) {
            return 0;
        }
        ConsumerRecord consumerRecord = (ConsumerRecord) t;
        return this.errorMessageCount.get(errorMessageKey(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset())).get();
    }

    /* JADX WARN: Code restructure failed: missing block: B:10:0x0029, code lost:
    
        r0.next().shutdown();
     */
    /* JADX WARN: Code restructure failed: missing block: B:13:0x003e, code lost:
    
        if (r3.errorMessagePool == null) goto L24;
     */
    /* JADX WARN: Code restructure failed: missing block: B:14:0x0041, code lost:
    
        r3.errorMessagePool.shutdown();
     */
    /* JADX WARN: Code restructure failed: missing block: B:16:0x0053, code lost:
    
        if (r3.errorMessagePool.isTerminated() != false) goto L22;
     */
    /* JADX WARN: Code restructure failed: missing block: B:18:0x0059, code lost:
    
        org.darkphoenixs.kafka.pool.KafkaMessageReceiverRetry.logger.info("Message Error pool closed.");
     */
    /* JADX WARN: Code restructure failed: missing block: B:19:0x0063, code lost:
    
        return;
     */
    /* JADX WARN: Code restructure failed: missing block: B:22:?, code lost:
    
        return;
     */
    /* JADX WARN: Code restructure failed: missing block: B:2:0x0004, code lost:
    
        if (r3.errorMessageQueue != null) goto L4;
     */
    /* JADX WARN: Code restructure failed: missing block: B:4:0x0010, code lost:
    
        if (r3.errorMessageQueue.isEmpty() != false) goto L19;
     */
    /* JADX WARN: Code restructure failed: missing block: B:7:0x0016, code lost:
    
        r0 = r3.errorRetryThreads.iterator();
     */
    /* JADX WARN: Code restructure failed: missing block: B:9:0x0026, code lost:
    
        if (r0.hasNext() == false) goto L21;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void destroy() {
        /*
            r3 = this;
            r0 = r3
            java.util.concurrent.BlockingQueue<T> r0 = r0.errorMessageQueue
            if (r0 == 0) goto L16
        L7:
            r0 = r3
            java.util.concurrent.BlockingQueue<T> r0 = r0.errorMessageQueue
            boolean r0 = r0.isEmpty()
            if (r0 != 0) goto L16
            goto L7
        L16:
            r0 = r3
            java.util.List<org.darkphoenixs.kafka.pool.KafkaMessageReceiverRetry<T>$RetryThread> r0 = r0.errorRetryThreads
            java.util.Iterator r0 = r0.iterator()
            r4 = r0
        L20:
            r0 = r4
            boolean r0 = r0.hasNext()
            if (r0 == 0) goto L3a
            r0 = r4
            java.lang.Object r0 = r0.next()
            org.darkphoenixs.kafka.pool.KafkaMessageReceiverRetry$RetryThread r0 = (org.darkphoenixs.kafka.pool.KafkaMessageReceiverRetry.RetryThread) r0
            r5 = r0
            r0 = r5
            r0.shutdown()
            goto L20
        L3a:
            r0 = r3
            java.util.concurrent.ExecutorService r0 = r0.errorMessagePool
            if (r0 == 0) goto L63
            r0 = r3
            java.util.concurrent.ExecutorService r0 = r0.errorMessagePool
            r0.shutdown()
        L4a:
            r0 = r3
            java.util.concurrent.ExecutorService r0 = r0.errorMessagePool
            boolean r0 = r0.isTerminated()
            if (r0 != 0) goto L59
            goto L4a
        L59:
            org.slf4j.Logger r0 = org.darkphoenixs.kafka.pool.KafkaMessageReceiverRetry.logger
            java.lang.String r1 = "Message Error pool closed."
            r0.info(r1)
        L63:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.darkphoenixs.kafka.pool.KafkaMessageReceiverRetry.destroy():void");
    }

    private String errorMessageKey(String str, int i, long j) {
        return str + "_" + i + "_" + j;
    }

    protected int errorMessageCount(String str, int i, long j) {
        if (this.retryCount == 0) {
            return 0;
        }
        String errorMessageKey = errorMessageKey(str, i, j);
        AtomicInteger atomicInteger = this.errorMessageCount.get(errorMessageKey);
        if (null == atomicInteger) {
            atomicInteger = this.errorMessageCount.putIfAbsent(errorMessageKey, new AtomicInteger(1));
        }
        if (null == atomicInteger) {
            return 1;
        }
        if (this.retryCount > atomicInteger.get()) {
            return atomicInteger.incrementAndGet();
        }
        this.errorMessageCount.remove(errorMessageKey);
        return 0;
    }
}
