package org.apache.kafka.tools.consumer;

import java.io.PrintStream;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/tools/consumer/ConsoleConsumer.class */
public class ConsoleConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(ConsoleConsumer.class);
    private static final CountDownLatch SHUTDOWN_LATCH = new CountDownLatch(1);
    static int messageCount = 0;

    /* loaded from: input_file:org/apache/kafka/tools/consumer/ConsoleConsumer$ConsumerWrapper.class */
    public static class ConsumerWrapper {
        final long timeoutMs;
        final Consumer<byte[], byte[]> consumer;
        final Time time = Time.SYSTEM;
        Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator();

        public ConsumerWrapper(ConsoleConsumerOptions consoleConsumerOptions, Consumer<byte[], byte[]> consumer) {
            this.consumer = consumer;
            this.timeoutMs = consoleConsumerOptions.timeoutMs();
            Optional<String> optional = consoleConsumerOptions.topicArg();
            if (!optional.isPresent()) {
                consoleConsumerOptions.includedTopicsArg().ifPresent(str -> {
                    consumer.subscribe(Pattern.compile(str));
                });
            } else if (consoleConsumerOptions.partitionArg().isPresent()) {
                seek(optional.get(), consoleConsumerOptions.partitionArg().getAsInt(), consoleConsumerOptions.offsetArg());
            } else {
                consumer.subscribe(Collections.singletonList(optional.get()));
            }
        }

        private void seek(String str, int i, long j) {
            TopicPartition topicPartition = new TopicPartition(str, i);
            this.consumer.assign(Collections.singletonList(topicPartition));
            if (j == -2) {
                this.consumer.seekToBeginning(Collections.singletonList(topicPartition));
            } else if (j == -1) {
                this.consumer.seekToEnd(Collections.singletonList(topicPartition));
            } else {
                this.consumer.seek(topicPartition, j);
            }
        }

        void resetUnconsumedOffsets() {
            HashMap hashMap = new HashMap();
            while (this.recordIter.hasNext()) {
                ConsumerRecord<byte[], byte[]> next = this.recordIter.next();
                hashMap.putIfAbsent(new TopicPartition(next.topic(), next.partition()), Long.valueOf(next.offset()));
            }
            Consumer<byte[], byte[]> consumer = this.consumer;
            Objects.requireNonNull(consumer);
            hashMap.forEach((v1, v2) -> {
                r1.seek(v1, v2);
            });
        }

        ConsumerRecord<byte[], byte[]> receive() {
            long milliseconds = this.time.milliseconds();
            while (!this.recordIter.hasNext()) {
                this.recordIter = this.consumer.poll(Duration.ofMillis(this.timeoutMs)).iterator();
                if (!this.recordIter.hasNext() && this.time.milliseconds() - milliseconds > this.timeoutMs) {
                    throw new TimeoutException();
                }
            }
            return this.recordIter.next();
        }

        void wakeup() {
            this.consumer.wakeup();
        }

        void cleanup() {
            resetUnconsumedOffsets();
            this.consumer.close();
        }
    }

    public static void main(String[] strArr) throws Exception {
        try {
            run(new ConsoleConsumerOptions(strArr));
        } catch (AuthenticationException e) {
            LOG.error("Authentication failed: terminating consumer process", e);
            Exit.exit(1);
        } catch (Throwable th) {
            LOG.error("Unknown error when running consumer: ", th);
            Exit.exit(1);
        }
    }

    public static void run(ConsoleConsumerOptions consoleConsumerOptions) {
        messageCount = 0;
        ConsumerWrapper consumerWrapper = new ConsumerWrapper(consoleConsumerOptions, new KafkaConsumer(consoleConsumerOptions.consumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer()));
        addShutdownHook(consumerWrapper, consoleConsumerOptions);
        try {
            process(consoleConsumerOptions.maxMessages(), consoleConsumerOptions.formatter(), consumerWrapper, System.out, consoleConsumerOptions.skipMessageOnError());
        } finally {
            consumerWrapper.cleanup();
            consoleConsumerOptions.formatter().close();
            reportRecordCount();
            SHUTDOWN_LATCH.countDown();
        }
    }

    static void addShutdownHook(ConsumerWrapper consumerWrapper, ConsoleConsumerOptions consoleConsumerOptions) {
        Exit.addShutdownHook("consumer-shutdown-hook", () -> {
            try {
                consumerWrapper.wakeup();
                SHUTDOWN_LATCH.await();
            } catch (Throwable th) {
                LOG.error("Exception while running shutdown hook: ", th);
            }
            if (consoleConsumerOptions.enableSystestEventsLogging()) {
                System.out.println("shutdown_complete");
            }
        });
    }

    static void process(int i, MessageFormatter messageFormatter, ConsumerWrapper consumerWrapper, PrintStream printStream, boolean z) {
        do {
            if (messageCount >= i && i != -1) {
                return;
            }
            try {
                ConsumerRecord<byte[], byte[]> receive = consumerWrapper.receive();
                messageCount++;
                try {
                    messageFormatter.writeTo(new ConsumerRecord(receive.topic(), receive.partition(), receive.offset(), receive.timestamp(), receive.timestampType(), 0, 0, (byte[]) receive.key(), (byte[]) receive.value(), receive.headers(), Optional.empty()), printStream);
                } finally {
                    if (z) {
                    }
                }
            } catch (WakeupException e) {
                LOG.trace("Caught WakeupException because consumer is shutdown, ignore and terminate.");
                return;
            } catch (Throwable th) {
                LOG.error("Error processing message, terminating consumer process: ", th);
                return;
            }
        } while (!checkErr(printStream));
    }

    static void reportRecordCount() {
        System.err.println("Processed a total of " + messageCount + " messages");
    }

    static boolean checkErr(PrintStream printStream) {
        boolean checkError = printStream.checkError();
        if (checkError) {
            System.err.println("Unable to write to standard out, closing consumer.");
        }
        return checkError;
    }
}
