package kafka.examples;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;

/* loaded from: input_file:kafka/examples/ExactlyOnceMessageProcessor.class */
public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebalanceListener, AutoCloseable {
    private static final int MAX_RETRIES = 5;
    private final String bootstrapServers;
    private final String inputTopic;
    private final String outputTopic;
    private final String groupInstanceId;
    private final CountDownLatch latch;
    private final String transactionalId;
    private volatile boolean closed;
    private final KafkaProducer<Integer, String> producer;
    private final KafkaConsumer<Integer, String> consumer;

    public ExactlyOnceMessageProcessor(String str, String str2, String str3, String str4, CountDownLatch countDownLatch) {
        super(str);
        this.bootstrapServers = str2;
        this.inputTopic = str3;
        this.outputTopic = str4;
        this.transactionalId = "tid-" + str;
        this.producer = new Producer("processor-producer", KafkaProperties.BOOTSTRAP_SERVERS, str4, true, this.transactionalId, true, -1, 10000, null).createKafkaProducer();
        this.groupInstanceId = "giid-" + str;
        this.consumer = new Consumer("processor-consumer", KafkaProperties.BOOTSTRAP_SERVERS, str3, "processor-group", Optional.of(this.groupInstanceId), true, -1, null).createKafkaConsumer();
        this.latch = countDownLatch;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        int i = 0;
        int i2 = 0;
        long j = Long.MAX_VALUE;
        try {
            KafkaProducer<Integer, String> createKafkaProducer = new Producer("processor-producer", this.bootstrapServers, this.outputTopic, true, this.transactionalId, true, -1, 10000, null).createKafkaProducer();
            try {
                KafkaConsumer<Integer, String> createKafkaConsumer = new Consumer("processor-consumer", this.bootstrapServers, this.inputTopic, "processor-group", Optional.of(this.groupInstanceId), true, -1, null).createKafkaConsumer();
                try {
                    createKafkaProducer.initTransactions();
                    createKafkaConsumer.subscribe(Collections.singleton(this.inputTopic), this);
                    Utils.printOut("Processing new records", new Object[0]);
                    while (!this.closed && j > 0) {
                        try {
                            ConsumerRecords poll = createKafkaConsumer.poll(Duration.ofMillis(200L));
                            if (!poll.isEmpty()) {
                                createKafkaProducer.beginTransaction();
                                Iterator it = poll.iterator();
                                while (it.hasNext()) {
                                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                                    createKafkaProducer.send(new ProducerRecord(this.outputTopic, (Integer) consumerRecord.key(), ((String) consumerRecord.value()) + "-ok"));
                                }
                                createKafkaProducer.sendOffsetsToTransaction(getOffsetsToCommit(createKafkaConsumer), createKafkaConsumer.groupMetadata());
                                createKafkaProducer.commitTransaction();
                                i2 += poll.count();
                                i = 0;
                            }
                        } catch (KafkaException e) {
                            Utils.printOut("Aborting transaction: %s", e.getMessage());
                            createKafkaProducer.abortTransaction();
                            i = maybeRetry(i, createKafkaConsumer);
                        } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e2) {
                            Utils.printOut("Invalid or no offset found, using latest", new Object[0]);
                            createKafkaConsumer.seekToEnd(Collections.emptyList());
                            createKafkaConsumer.commitSync();
                            i = 0;
                        } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e3) {
                            Utils.printErr(e3.getMessage(), new Object[0]);
                            shutdown();
                        }
                        j = getRemainingRecords(createKafkaConsumer);
                        if (j != Long.MAX_VALUE) {
                            Utils.printOut("Remaining records: %d", Long.valueOf(j));
                        }
                    }
                    if (createKafkaConsumer != null) {
                        createKafkaConsumer.close();
                    }
                    if (createKafkaProducer != null) {
                        createKafkaProducer.close();
                    }
                } catch (Throwable th) {
                    if (createKafkaConsumer != null) {
                        try {
                            createKafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            Utils.printErr("Unhandled exception", new Object[0]);
            th3.printStackTrace();
        }
        Utils.printOut("Processed %d records", Integer.valueOf(i2));
        shutdown();
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        Utils.printOut("Revoked partitions: %s", collection);
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        Utils.printOut("Assigned partitions: %s", collection);
    }

    public void onPartitionsLost(Collection<TopicPartition> collection) {
        Utils.printOut("Lost partitions: %s", collection);
    }

    public void shutdown() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.latch.countDown();
    }

    private Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit(KafkaConsumer<Integer, String> kafkaConsumer) {
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : kafkaConsumer.assignment()) {
            hashMap.put(topicPartition, new OffsetAndMetadata(kafkaConsumer.position(topicPartition), (String) null));
        }
        return hashMap;
    }

    private long getRemainingRecords(KafkaConsumer<Integer, String> kafkaConsumer) {
        Map endOffsets = kafkaConsumer.endOffsets(new ArrayList(kafkaConsumer.assignment()));
        if (endOffsets.isEmpty()) {
            return Long.MAX_VALUE;
        }
        return kafkaConsumer.assignment().stream().mapToLong(topicPartition -> {
            long position = kafkaConsumer.position(topicPartition);
            if (endOffsets.containsKey(topicPartition)) {
                return ((Long) endOffsets.get(topicPartition)).longValue() - position;
            }
            return 0L;
        }).sum();
    }

    private int maybeRetry(int i, KafkaConsumer<Integer, String> kafkaConsumer) {
        int i2;
        if (i < 0) {
            Utils.printErr("The number of retries must be greater than zero", new Object[0]);
            shutdown();
        }
        if (i < MAX_RETRIES) {
            Map committed = kafkaConsumer.committed(kafkaConsumer.assignment());
            kafkaConsumer.assignment().forEach(topicPartition -> {
                OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata) committed.get(topicPartition);
                if (offsetAndMetadata != null) {
                    kafkaConsumer.seek(topicPartition, offsetAndMetadata.offset());
                } else {
                    kafkaConsumer.seekToBeginning(Collections.singleton(topicPartition));
                }
            });
            i2 = i + 1;
        } else {
            Utils.printErr("Skipping records after %d retries", Integer.valueOf(MAX_RETRIES));
            kafkaConsumer.commitSync();
            i2 = 0;
        }
        return i2;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.producer != null) {
            this.producer.close();
        }
        if (this.consumer != null) {
            this.consumer.close();
        }
    }
}
