package org.apache.kafka.tools.consumer;

import java.io.PrintStream;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.consumer.ShareConsumer;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/tools/consumer/ConsoleShareConsumer.class */
public class ConsoleShareConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(ConsoleShareConsumer.class);
    private static final CountDownLatch SHUTDOWN_LATCH = new CountDownLatch(1);
    static int messageCount = 0;

    /* loaded from: input_file:org/apache/kafka/tools/consumer/ConsoleShareConsumer$ConsumerWrapper.class */
    public static class ConsumerWrapper {
        final String topic;
        final ShareConsumer<byte[], byte[]> consumer;
        final long timeoutMs;
        final Time time = Time.SYSTEM;
        Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator();

        public ConsumerWrapper(String str, ShareConsumer<byte[], byte[]> shareConsumer, long j) {
            this.topic = str;
            this.consumer = shareConsumer;
            this.timeoutMs = j;
            shareConsumer.subscribe(Collections.singletonList(str));
        }

        ConsumerRecord<byte[], byte[]> receive() {
            long milliseconds = this.time.milliseconds();
            while (!this.recordIter.hasNext()) {
                this.recordIter = this.consumer.poll(Duration.ofMillis(this.timeoutMs)).iterator();
                if (!this.recordIter.hasNext() && this.time.milliseconds() - milliseconds > this.timeoutMs) {
                    throw new TimeoutException();
                }
            }
            return this.recordIter.next();
        }

        void acknowledge(ConsumerRecord<byte[], byte[]> consumerRecord, AcknowledgeType acknowledgeType) {
            this.consumer.acknowledge(consumerRecord, acknowledgeType);
        }

        void wakeup() {
            this.consumer.wakeup();
        }

        void cleanup() {
            this.consumer.close();
        }
    }

    public static void main(String[] strArr) throws Exception {
        try {
            run(new ConsoleShareConsumerOptions(strArr));
        } catch (AuthenticationException e) {
            LOG.error("Authentication failed: terminating consumer process", e);
            Exit.exit(1);
        } catch (Throwable th) {
            LOG.error("Unknown error when running consumer: ", th);
            Exit.exit(1);
        }
    }

    public static void run(ConsoleShareConsumerOptions consoleShareConsumerOptions) {
        messageCount = 0;
        ConsumerWrapper consumerWrapper = new ConsumerWrapper(consoleShareConsumerOptions.topicArg(), new KafkaShareConsumer(consoleShareConsumerOptions.consumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer()), consoleShareConsumerOptions.timeoutMs() >= 0 ? consoleShareConsumerOptions.timeoutMs() : Long.MAX_VALUE);
        addShutdownHook(consumerWrapper);
        try {
            process(consoleShareConsumerOptions.maxMessages(), consoleShareConsumerOptions.formatter(), consumerWrapper, System.out, consoleShareConsumerOptions.rejectMessageOnError(), consoleShareConsumerOptions.acknowledgeType());
            consumerWrapper.cleanup();
            consoleShareConsumerOptions.formatter().close();
            reportRecordCount();
            SHUTDOWN_LATCH.countDown();
        } catch (Throwable th) {
            consumerWrapper.cleanup();
            consoleShareConsumerOptions.formatter().close();
            reportRecordCount();
            SHUTDOWN_LATCH.countDown();
            throw th;
        }
    }

    private static void addShutdownHook(ConsumerWrapper consumerWrapper) {
        Exit.addShutdownHook("consumer-shutdown-hook", () -> {
            try {
                consumerWrapper.wakeup();
                SHUTDOWN_LATCH.await();
            } catch (Throwable th) {
                LOG.error("Exception while running shutdown hook: ", th);
            }
        });
    }

    static void process(int i, MessageFormatter messageFormatter, ConsumerWrapper consumerWrapper, PrintStream printStream, boolean z, AcknowledgeType acknowledgeType) {
        do {
            if (messageCount >= i && i != -1) {
                return;
            }
            try {
                ConsumerRecord<byte[], byte[]> receive = consumerWrapper.receive();
                messageCount++;
                try {
                    messageFormatter.writeTo(new ConsumerRecord(receive.topic(), receive.partition(), receive.offset(), receive.timestamp(), receive.timestampType(), 0, 0, (byte[]) receive.key(), (byte[]) receive.value(), receive.headers(), Optional.empty()), printStream);
                    consumerWrapper.acknowledge(receive, acknowledgeType);
                } finally {
                    if (z) {
                    }
                }
            } catch (WakeupException e) {
                LOG.trace("Caught WakeupException because consumer is shutdown, ignore and terminate.");
                return;
            } catch (Throwable th) {
                LOG.error("Error processing message, terminating consumer process: ", th);
                return;
            }
        } while (!checkErr(printStream));
    }

    private static void reportRecordCount() {
        System.err.println("Processed a total of " + messageCount + " messages");
    }

    private static boolean checkErr(PrintStream printStream) {
        boolean checkError = printStream.checkError();
        if (checkError) {
            System.err.println("Unable to write to standard out, closing consumer.");
        }
        return checkError;
    }
}
