package org.opensearch.migrations.utils.kafka;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.zip.GZIPInputStream;
import lombok.Generated;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.opensearch.migrations.trafficcapture.kafkaoffloader.KafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opensearch/migrations/utils/kafka/KafkaLoader.class */
public class KafkaLoader {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaLoader.class);
    private static final String DELIMITER = "\\|";
    private String kafkaPropertiesFile;
    private String kafkaConnection;
    private String kafkaClientId;
    private boolean mskAuthEnabled;

    /* loaded from: input_file:org/opensearch/migrations/utils/kafka/KafkaLoader$InvalidKafkaExportFormat.class */
    public static class InvalidKafkaExportFormat extends Exception {
        public InvalidKafkaExportFormat(String str) {
            super(str);
        }
    }

    public KafkaLoader(String str, String str2, String str3, boolean z) {
        this.kafkaPropertiesFile = str;
        this.kafkaConnection = str2;
        this.kafkaClientId = str3;
        this.mskAuthEnabled = z;
    }

    public void loadRecordsToKafkaFromCompressedFile(String str, String str2, int i) throws Exception {
        KafkaProducer kafkaProducer = new KafkaProducer(KafkaConfig.buildKafkaProperties(this.kafkaPropertiesFile, this.kafkaConnection, this.kafkaClientId, this.mskAuthEnabled));
        try {
            readLinesAndSendToKafka(createBufferedReaderFromFile(str), kafkaProducer, str2, i);
            kafkaProducer.close();
        } catch (Throwable th) {
            try {
                kafkaProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    public void readLinesAndSendToKafka(BufferedReader bufferedReader, Producer<String, byte[]> producer, String str, int i) throws Exception {
        ArrayList arrayList = new ArrayList();
        int i2 = 0;
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                log.info("End of stream reached");
                waitForFutures(arrayList);
                log.info("Sent total of " + i2 + " messages to kafka topic " + str);
                return;
            }
            String[] split = readLine.split(DELIMITER);
            if (split.length != 2) {
                throw new InvalidKafkaExportFormat("Expected record in format '<key>|<value>' but did not find the correct number of delimiters");
            }
            arrayList.add(producer.send(new ProducerRecord(str, split[0], Base64.getDecoder().decode(split[1]))));
            i2++;
            if (i2 % i == 0) {
                waitForFutures(arrayList);
                log.info("Sent " + i2 + " messages to kafka topic " + str);
                arrayList.clear();
            }
        }
    }

    private void waitForFutures(List<Future<RecordMetadata>> list) throws ExecutionException, InterruptedException {
        Iterator<Future<RecordMetadata>> it = list.iterator();
        while (it.hasNext()) {
            it.next().get();
        }
    }

    private BufferedReader createBufferedReaderFromFile(String str) throws Exception {
        InputStreamReader inputStreamReader = new InputStreamReader(new GZIPInputStream(new FileInputStream(str)));
        try {
            return new BufferedReader(inputStreamReader);
        } catch (Exception e) {
            try {
                inputStreamReader.close();
            } catch (Exception e2) {
                log.atError().setCause(e2).setMessage("Caught exception while closing InputStreamReader that was in response to an earlier thrown exception.  Swallowing the inner exception and throwing the original one.").log();
            }
            throw e;
        }
    }
}
