package org.jsmart.zerocode.core.kafka.receive;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.jsmart.zerocode.core.kafka.KafkaConstants;
import org.jsmart.zerocode.core.kafka.consume.ConsumerLocalConfigs;
import org.jsmart.zerocode.core.kafka.helper.KafkaConsumerHelper;
import org.jsmart.zerocode.core.kafka.helper.KafkaFileRecordHelper;
import org.jsmart.zerocode.core.kafka.receive.message.ConsumerJsonRecord;
import org.jsmart.zerocode.core.utils.RunnerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:org/jsmart/zerocode/core/kafka/receive/KafkaReceiver.class */
public class KafkaReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaReceiver.class);

    @Named("kafka.consumer.properties")
    @Inject(optional = true)
    private String consumerPropertyFile;

    @Inject
    private ConsumerCommonConfigs consumerCommonConfigs;

    public String receive(String str, String str2, String str3) throws IOException {
        ConsumerLocalConfigs deriveEffectiveConfigs = KafkaConsumerHelper.deriveEffectiveConfigs(KafkaConsumerHelper.readConsumerLocalTestProperties(str3), this.consumerCommonConfigs);
        LOGGER.info("\n### Kafka Consumer Effective configs:{}\n", deriveEffectiveConfigs);
        Consumer createConsumer = KafkaConsumerHelper.createConsumer(str, this.consumerPropertyFile, str2);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        int i = 0;
        KafkaConsumerHelper.handleSeekOffset(deriveEffectiveConfigs, createConsumer);
        LOGGER.info("initial polling to trigger ConsumerGroupJoin");
        ConsumerRecords initialPollWaitingForConsumerGroupJoin = KafkaConsumerHelper.initialPollWaitingForConsumerGroupJoin(createConsumer, deriveEffectiveConfigs);
        if (!initialPollWaitingForConsumerGroupJoin.isEmpty()) {
            LOGGER.info("Received {} records on initial poll\n", Integer.valueOf(initialPollWaitingForConsumerGroupJoin.count()));
            appendNewRecords(initialPollWaitingForConsumerGroupJoin, arrayList, arrayList2, deriveEffectiveConfigs);
            KafkaConsumerHelper.handleCommitSyncAsync(createConsumer, this.consumerCommonConfigs, deriveEffectiveConfigs);
        }
        while (i < KafkaConsumerHelper.getMaxTimeOuts(deriveEffectiveConfigs).intValue()) {
            LOGGER.info("polling records  - noOfTimeOuts reached : " + i);
            ConsumerRecords poll = createConsumer.poll(Duration.ofMillis(KafkaConsumerHelper.getPollTime(deriveEffectiveConfigs).longValue()));
            i++;
            if (poll.count() != 0) {
                LOGGER.info("Received {} records after {} timeouts\n", Integer.valueOf(poll.count()), Integer.valueOf(i));
                appendNewRecords(poll, arrayList, arrayList2, deriveEffectiveConfigs);
                KafkaConsumerHelper.handleCommitSyncAsync(createConsumer, this.consumerCommonConfigs, deriveEffectiveConfigs);
            }
        }
        createConsumer.close();
        KafkaFileRecordHelper.handleRecordsDump(deriveEffectiveConfigs, arrayList, arrayList2);
        return KafkaConsumerHelper.prepareResult(deriveEffectiveConfigs, arrayList2, arrayList);
    }

    private void appendNewRecords(ConsumerRecords consumerRecords, List<ConsumerRecord> list, List<ConsumerJsonRecord> list2, ConsumerLocalConfigs consumerLocalConfigs) throws IOException {
        Iterator it = consumerRecords.iterator();
        LOGGER.info("Consumer chosen recordType: " + consumerLocalConfigs.getRecordType());
        String recordType = consumerLocalConfigs.getRecordType();
        boolean z = -1;
        switch (recordType.hashCode()) {
            case 80904:
                if (recordType.equals(KafkaConstants.RAW)) {
                    z = false;
                    break;
                }
                break;
            case 2021682:
                if (recordType.equals(KafkaConstants.AVRO)) {
                    z = 2;
                    break;
                }
                break;
            case 2286824:
                if (recordType.equals(KafkaConstants.JSON)) {
                    z = 3;
                    break;
                }
                break;
            case 76403144:
                if (recordType.equals(KafkaConstants.PROTO)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                KafkaConsumerHelper.readRaw(list, it);
                return;
            case RunnerUtils.MIN_COUNT /* 1 */:
            case true:
            case true:
                KafkaConsumerHelper.readJson(list2, it, consumerLocalConfigs);
                return;
            default:
                throw new RuntimeException("Unsupported record type - '" + consumerLocalConfigs.getRecordType() + "'. Supported values are 'JSON','RAW'");
        }
    }
}
