package org.opensearch.migrations.replay;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.google.protobuf.CodedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.opensearch.migrations.replay.traffic.expiration.ExpiringTrafficStreamMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opensearch/migrations/replay/KafkaPrinter.class */
public class KafkaPrinter {
    private static final Logger log = LoggerFactory.getLogger(KafkaPrinter.class);
    public static final Duration CONSUMER_POLL_TIMEOUT = Duration.ofSeconds(1);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/opensearch/migrations/replay/KafkaPrinter$Parameters.class */
    public static class Parameters {

        @Parameter(required = true, names = {"--kafka-traffic-brokers"}, arity = 1, description = "Comma-separated list of host and port pairs that are the addresses of the Kafka brokers to bootstrap with e.g. 'localhost:9092,localhost2:9092'")
        String kafkaTrafficBrokers;

        @Parameter(required = true, names = {"--kafka-traffic-topic"}, arity = 1, description = "Topic name used to pull messages from Kafka")
        String kafkaTrafficTopic;

        @Parameter(required = true, names = {"--kafka-traffic-group-id"}, arity = 1, description = "Consumer group id that is used when pulling messages from Kafka")
        String kafkaTrafficGroupId;

        @Parameter(required = false, names = {"--kafka-traffic-enable-msk-auth"}, arity = ExpiringTrafficStreamMap.ACCUMULATION_TIMESTAMP_NOT_SET_YET_SENTINEL, description = "Enables SASL properties required for connecting to MSK with IAM auth")
        boolean kafkaTrafficEnableMSKAuth;

        @Parameter(required = false, names = {"--kafka-traffic-property-file"}, arity = 1, description = "File path for Kafka properties file to use for additional or overriden Kafka properties")
        String kafkaTrafficPropertyFile;

        @Parameter(required = false, names = {"--output-directory"}, arity = 1, description = "If provided will place output inside file(s) within this directory. Otherwise, output will be sent to STDOUT")
        String outputDirectoryPath;

        @Parameter(required = false, names = {"--combine-partition-output"}, arity = ExpiringTrafficStreamMap.ACCUMULATION_TIMESTAMP_NOT_SET_YET_SENTINEL, description = "Creates a single output file with output from all partitions combined. Requires '--output-directory' to be specified.")
        boolean combinePartitionOutput;

        @Parameter(required = false, names = {"--partition-limits"}, description = "Partition limit option will only print records for the provided partitions and up to the given limit specified. It will terminate the printer when all limits have been met. Argument can be used multiple times, and may be comma-separated, e.g. 'test-topic:0:10, test-topic:1:32' ")
        List<String> partitionLimits = new ArrayList();

        @Parameter(required = false, names = {"--timeout-seconds"}, arity = 1, description = "Timeout option for how long KafkaPrinter will continue to read from Kafka before terminating.")
        long timeoutSeconds = 0;

        @Parameter(required = false, names = {"--partition-offsets"}, description = "Partition offsets to start consuming from. Defaults to first offset in partition. Format: 'topic_name:partition_id:offset,topic_name:partition_id:offset'")
        List<String> partitionOffsets = new ArrayList();

        Parameters() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/opensearch/migrations/replay/KafkaPrinter$PartitionTracker.class */
    public static class PartitionTracker {
        long currentRecordCount;
        long recordLimit;

        PartitionTracker(long j, long j2) {
            this.currentRecordCount = j;
            this.recordLimit = j2;
        }
    }

    public static Parameters parseArgs(String[] strArr) {
        Parameters parameters = new Parameters();
        JCommander jCommander = new JCommander(parameters);
        try {
            jCommander.parse(strArr);
            return parameters;
        } catch (ParameterException e) {
            System.err.println(e.getMessage());
            System.err.println("Got args: " + String.join("; ", strArr));
            jCommander.usage();
            throw e;
        }
    }

    public static void main(String[] strArr) throws FileNotFoundException {
        try {
            Parameters parseArgs = parseArgs(strArr);
            if (parseArgs.combinePartitionOutput && parseArgs.outputDirectoryPath == null) {
                throw new ParameterException("The '--output-directory' parameter is required for using '--combine-partition-output'.");
            }
            String str = parseArgs.kafkaTrafficBrokers;
            String str2 = parseArgs.kafkaTrafficGroupId;
            String str3 = parseArgs.kafkaTrafficTopic;
            Properties properties = new Properties();
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            if (parseArgs.kafkaTrafficPropertyFile != null) {
                try {
                    FileInputStream fileInputStream = new FileInputStream(parseArgs.kafkaTrafficPropertyFile);
                    try {
                        properties.load(fileInputStream);
                        fileInputStream.close();
                    } finally {
                    }
                } catch (IOException e) {
                    log.error("Unable to load properties from kafka.properties file.");
                    return;
                }
            }
            if (parseArgs.kafkaTrafficEnableMSKAuth) {
                properties.setProperty("security.protocol", "SASL_SSL");
                properties.setProperty("sasl.mechanism", "AWS_MSK_IAM");
                properties.setProperty("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;");
                properties.setProperty("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
            }
            properties.setProperty("bootstrap.servers", str);
            properties.setProperty("group.id", str2);
            HashMap hashMap = new HashMap();
            if (!parseArgs.partitionLimits.isEmpty()) {
                for (String str4 : parseArgs.partitionLimits) {
                    String[] split = str4.split(":");
                    if (split.length != 3) {
                        throw new ParameterException("Partition limit provided does not match the expected format: topic_name:partition_id:num_records, actual value: " + str4);
                    }
                    TopicPartition topicPartition = new TopicPartition(split[0], Integer.parseInt(split[1]));
                    if (hashMap.containsKey(topicPartition)) {
                        throw new ParameterException("Duplicate parameter limit detected for the partition: " + topicPartition);
                    }
                    hashMap.put(topicPartition, new PartitionTracker(0L, Long.parseLong(split[2])));
                }
            }
            final HashMap hashMap2 = new HashMap();
            if (!parseArgs.partitionOffsets.isEmpty()) {
                for (String str5 : parseArgs.partitionOffsets) {
                    String[] split2 = str5.split(":");
                    if (split2.length != 3) {
                        throw new ParameterException("Partition offset provided does not match the expected format: topic_name:partition_id:offset, actual value: " + str5);
                    }
                    hashMap2.put(new TopicPartition(split2[0], Integer.parseInt(split2[1])), Long.valueOf(Long.parseLong(split2[2])));
                }
            }
            String str6 = parseArgs.outputDirectoryPath == null ? "./" : parseArgs.outputDirectoryPath;
            String str7 = !str6.endsWith(File.separator) ? str6 + File.separator : str6;
            String uuid = UUID.randomUUID().toString();
            boolean z = false;
            HashMap hashMap3 = new HashMap();
            if (hashMap.isEmpty()) {
                hashMap3.put(0, CodedOutputStream.newInstance(parseArgs.outputDirectoryPath == null ? System.out : new FileOutputStream(String.format("%s%s_%s_%s.proto", str7, parseArgs.kafkaTrafficTopic, "all", uuid))));
            } else if (parseArgs.combinePartitionOutput || parseArgs.outputDirectoryPath == null) {
                hashMap3.put(0, CodedOutputStream.newInstance(parseArgs.outputDirectoryPath == null ? System.out : new FileOutputStream(String.format("%s%s_%s_%s.proto", str7, parseArgs.kafkaTrafficTopic, "all", uuid))));
            } else {
                for (TopicPartition topicPartition2 : hashMap.keySet()) {
                    z = true;
                    hashMap3.put(Integer.valueOf(topicPartition2.partition()), CodedOutputStream.newInstance(new FileOutputStream(String.format("%s%s_%d_%s.proto", str7, topicPartition2.topic(), Integer.valueOf(topicPartition2.partition()), uuid))));
                }
            }
            try {
                try {
                    final KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
                    try {
                        kafkaConsumer.subscribe(Collections.singleton(str3), new ConsumerRebalanceListener() { // from class: org.opensearch.migrations.replay.KafkaPrinter.1
                            private final Set<TopicPartition> partitionsAssignedAtSomeTime = new HashSet();

                            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                                KafkaPrinter.log.info("Partitions Assigned: {}", collection);
                                HashSet hashSet = new HashSet(collection);
                                hashSet.retainAll(this.partitionsAssignedAtSomeTime);
                                kafkaConsumer.seekToBeginning(hashSet);
                                this.partitionsAssignedAtSomeTime.addAll(hashSet);
                                Map map = hashMap2;
                                KafkaConsumer kafkaConsumer2 = kafkaConsumer;
                                collection.forEach(topicPartition3 -> {
                                    Long l = (Long) map.get(topicPartition3);
                                    long position = kafkaConsumer2.position(topicPartition3);
                                    if (l == null) {
                                        KafkaPrinter.log.info("Did not find specified startingOffset for partition {}", topicPartition3);
                                    } else if (position >= l.longValue()) {
                                        KafkaPrinter.log.info("Not changing fetch offsets because current offset is {} and startingOffset is {} for partition {}", new Object[]{Long.valueOf(position), l, topicPartition3});
                                    } else {
                                        kafkaConsumer2.seek(topicPartition3, l.longValue());
                                        KafkaPrinter.log.info("Found a specified startingOffset for partition {} that is greater than current offset {}. Seeking to {}", new Object[]{topicPartition3, Long.valueOf(position), l});
                                    }
                                });
                            }

                            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                            }
                        });
                        pipeRecordsToProtoBufDelimited(kafkaConsumer, getDelimitedProtoBufOutputter(hashMap, hashMap3, z), parseArgs.timeoutSeconds, hashMap);
                        kafkaConsumer.close();
                        log.info("This consumer close successfully.");
                    } catch (Throwable th) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    log.info("This consumer close successfully.");
                    throw th3;
                }
            } catch (Exception e2) {
                log.error("Unexpected exception", e2);
                log.info("This consumer close successfully.");
            } catch (WakeupException e3) {
                log.info("Wake up exception!");
                log.info("This consumer close successfully.");
            }
        } catch (ParameterException e4) {
        }
    }

    static boolean checkAllRecordsCompleted(Collection<PartitionTracker> collection) {
        for (PartitionTracker partitionTracker : collection) {
            if (partitionTracker.currentRecordCount < partitionTracker.recordLimit) {
                return false;
            }
        }
        return true;
    }

    static void pipeRecordsToProtoBufDelimited(Consumer<String, byte[]> consumer, java.util.function.Consumer<Stream<ConsumerRecord<String, byte[]>>> consumer2, long j, Map<TopicPartition, PartitionTracker> map) {
        long currentTimeMillis = System.currentTimeMillis() + (j * 1000);
        boolean z = true;
        while (z) {
            if (!map.isEmpty() && checkAllRecordsCompleted(map.values())) {
                log.info("All partition limits have been met, stopping Kafka polls");
                z = false;
            } else if (j <= 0 || System.currentTimeMillis() < currentTimeMillis) {
                for (PartitionTracker partitionTracker : map.values()) {
                    log.debug("Tracker is at {} records for limit {}", Long.valueOf(partitionTracker.currentRecordCount), Long.valueOf(partitionTracker.recordLimit));
                }
                processNextChunkOfKafkaEvents(consumer, consumer2);
            } else {
                log.warn("Specified timeout of {} seconds has been breached, stopping Kafka polls", Long.valueOf(j));
                z = false;
            }
        }
    }

    static void processNextChunkOfKafkaEvents(Consumer<String, byte[]> consumer, java.util.function.Consumer<Stream<ConsumerRecord<String, byte[]>>> consumer2) {
        consumer2.accept(StreamSupport.stream(consumer.poll(CONSUMER_POLL_TIMEOUT).spliterator(), false));
    }

    static java.util.function.Consumer<Stream<ConsumerRecord<String, byte[]>>> getDelimitedProtoBufOutputter(Map<TopicPartition, PartitionTracker> map, Map<Integer, CodedOutputStream> map2, boolean z) {
        HashSet hashSet = new HashSet();
        return stream -> {
            stream.forEach(consumerRecord -> {
                try {
                    if (map.isEmpty()) {
                        CodedOutputStream codedOutputStream = (CodedOutputStream) map2.get(0);
                        hashSet.add(codedOutputStream);
                        byte[] bArr = (byte[]) consumerRecord.value();
                        codedOutputStream.writeUInt32NoTag(bArr.length);
                        codedOutputStream.writeRawBytes(bArr);
                    } else {
                        TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
                        log.debug("Incoming record for topic:{} and partition:{}", topicPartition.topic(), Integer.valueOf(topicPartition.partition()));
                        PartitionTracker partitionTracker = (PartitionTracker) map.get(topicPartition);
                        boolean z2 = false;
                        if (partitionTracker != null && partitionTracker.currentRecordCount < partitionTracker.recordLimit) {
                            partitionTracker.currentRecordCount++;
                            z2 = true;
                        }
                        if (z2) {
                            CodedOutputStream codedOutputStream2 = z ? (CodedOutputStream) map2.get(Integer.valueOf(topicPartition.partition())) : (CodedOutputStream) map2.get(0);
                            hashSet.add(codedOutputStream2);
                            byte[] bArr2 = (byte[]) consumerRecord.value();
                            codedOutputStream2.writeUInt32NoTag(bArr2.length);
                            codedOutputStream2.writeRawBytes(bArr2);
                        }
                    }
                } catch (IOException e) {
                    throw e;
                }
            });
            try {
                Iterator it = hashSet.iterator();
                while (it.hasNext()) {
                    ((CodedOutputStream) it.next()).flush();
                }
            } catch (IOException e) {
                throw e;
            }
        };
    }
}
