package org.jsmart.zerocode.core.kafka.helper;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.Resources;
import com.google.gson.Gson;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.jsmart.zerocode.core.di.provider.GsonSerDeProvider;
import org.jsmart.zerocode.core.di.provider.ObjectMapperProvider;
import org.jsmart.zerocode.core.kafka.KafkaConstants;
import org.jsmart.zerocode.core.kafka.common.CommonConfigs;
import org.jsmart.zerocode.core.kafka.consume.ConsumerLocalConfigs;
import org.jsmart.zerocode.core.kafka.consume.ConsumerLocalConfigsWrap;
import org.jsmart.zerocode.core.kafka.receive.ConsumerCommonConfigs;
import org.jsmart.zerocode.core.kafka.receive.message.ConsumerJsonRecord;
import org.jsmart.zerocode.core.kafka.receive.message.ConsumerJsonRecords;
import org.jsmart.zerocode.core.kafka.receive.message.ConsumerRawRecords;
import org.jsmart.zerocode.core.utils.SmartUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/jsmart/zerocode/core/kafka/helper/KafkaConsumerHelper.class */
public class KafkaConsumerHelper {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerHelper.class);
    private static final Gson gson = new GsonSerDeProvider().m6get();
    private static final ObjectMapper objectMapper = new ObjectMapperProvider().m9get();

    public static Consumer createConsumer(String str, String str2, String str3) {
        try {
            InputStream openStream = Resources.getResource(str2).openStream();
            Throwable th = null;
            try {
                try {
                    Properties properties = new Properties();
                    properties.load(openStream);
                    properties.put(CommonConfigs.BOOTSTRAP_SERVERS, str);
                    org.jsmart.zerocode.core.kafka.common.KafkaCommonUtils.resolveValuePlaceHolders(properties);
                    KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
                    kafkaConsumer.subscribe(Collections.singletonList(str3));
                    if (openStream != null) {
                        if (0 != 0) {
                            try {
                                openStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            openStream.close();
                        }
                    }
                    return kafkaConsumer;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException("Exception while reading kafka properties and creating a consumer- " + e);
        }
    }

    public static ConsumerRecords initialPollWaitingForConsumerGroupJoin(Consumer consumer) {
        for (int i = 0; i < 10; i++) {
            if (!consumer.assignment().isEmpty()) {
                return new ConsumerRecords(new HashMap());
            }
            ConsumerRecords poll = consumer.poll(Duration.of(500L, ChronoUnit.MILLIS));
            if (!poll.isEmpty()) {
                return poll;
            }
        }
        throw new RuntimeException("\n********* Kafka Consumer unable to join in time *********\n");
    }

    public static void validateLocalConfigs(ConsumerLocalConfigs consumerLocalConfigs) {
        if (consumerLocalConfigs != null) {
            validateCommitFlags(consumerLocalConfigs.getCommitSync(), consumerLocalConfigs.getCommitAsync());
            validateSeekConfig(consumerLocalConfigs);
        }
    }

    public static void validateCommonConfigs(ConsumerCommonConfigs consumerCommonConfigs) {
        validateCommitFlags(consumerCommonConfigs.getCommitSync(), consumerCommonConfigs.getCommitAsync());
    }

    public static ConsumerLocalConfigs deriveEffectiveConfigs(ConsumerLocalConfigs consumerLocalConfigs, ConsumerCommonConfigs consumerCommonConfigs) {
        validateCommonConfigs(consumerCommonConfigs);
        validateLocalConfigs(consumerLocalConfigs);
        return createEffective(consumerCommonConfigs, consumerLocalConfigs);
    }

    public static ConsumerLocalConfigs createEffective(ConsumerCommonConfigs consumerCommonConfigs, ConsumerLocalConfigs consumerLocalConfigs) {
        Boolean bool;
        Boolean bool2;
        if (consumerLocalConfigs == null) {
            return new ConsumerLocalConfigs(consumerCommonConfigs.getRecordType(), consumerCommonConfigs.getProtoClassType(), consumerCommonConfigs.getFileDumpTo(), consumerCommonConfigs.getCommitAsync(), consumerCommonConfigs.getCommitSync(), consumerCommonConfigs.getShowRecordsConsumed(), consumerCommonConfigs.getMaxNoOfRetryPollsOrTimeouts(), consumerCommonConfigs.getPollingTime(), consumerCommonConfigs.getSeek());
        }
        String str = (String) Optional.ofNullable(consumerLocalConfigs.getRecordType()).orElse(consumerCommonConfigs.getRecordType());
        String str2 = (String) Optional.ofNullable(consumerLocalConfigs.getProtoClassType()).orElse(consumerCommonConfigs.getProtoClassType());
        String str3 = (String) Optional.ofNullable(consumerLocalConfigs.getFileDumpTo()).orElse(consumerCommonConfigs.getFileDumpTo());
        Boolean bool3 = (Boolean) Optional.ofNullable(consumerLocalConfigs.getShowRecordsConsumed()).orElse(consumerCommonConfigs.getShowRecordsConsumed());
        Integer num = (Integer) Optional.ofNullable(consumerLocalConfigs.getMaxNoOfRetryPollsOrTimeouts()).orElse(consumerCommonConfigs.getMaxNoOfRetryPollsOrTimeouts());
        Long l = (Long) Optional.ofNullable(consumerLocalConfigs.getPollingTime()).orElse(consumerCommonConfigs.getPollingTime());
        String str4 = (String) Optional.ofNullable(consumerLocalConfigs.getSeek()).orElse(consumerCommonConfigs.getSeek());
        Boolean commitSync = consumerLocalConfigs.getCommitSync();
        Boolean commitAsync = consumerLocalConfigs.getCommitAsync();
        if (commitSync == null && commitAsync == null) {
            bool = consumerCommonConfigs.getCommitSync();
            bool2 = consumerCommonConfigs.getCommitAsync();
        } else {
            bool = commitSync;
            bool2 = commitAsync;
        }
        return new ConsumerLocalConfigs(str, str2, str3, bool2, bool, bool3, num, l, str4);
    }

    public static ConsumerLocalConfigs readConsumerLocalTestProperties(String str) {
        try {
            return ((ConsumerLocalConfigsWrap) new ObjectMapperProvider().m9get().readValue(str, ConsumerLocalConfigsWrap.class)).getConsumerLocalConfigs();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static Integer getMaxTimeOuts(ConsumerLocalConfigs consumerLocalConfigs) {
        return (Integer) Optional.ofNullable(consumerLocalConfigs.getMaxNoOfRetryPollsOrTimeouts()).orElse(KafkaConstants.MAX_NO_OF_RETRY_POLLS_OR_TIME_OUTS);
    }

    public static Long getPollTime(ConsumerLocalConfigs consumerLocalConfigs) {
        return (Long) Optional.ofNullable(consumerLocalConfigs.getPollingTime()).orElse(KafkaConstants.DEFAULT_POLLING_TIME_MILLI_SEC);
    }

    public static void readRaw(List<ConsumerRecord> list, Iterator it) {
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            LOGGER.info("\nRecord Key - {} , Record value - {}, Record partition - {}, Record offset - {}", new Object[]{consumerRecord.key(), consumerRecord.value(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset())});
            list.add(consumerRecord);
        }
    }

    public static void readJson(List<ConsumerJsonRecord> list, Iterator it, ConsumerLocalConfigs consumerLocalConfigs) throws IOException {
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            Object key = consumerRecord.key();
            Object value = consumerRecord.value();
            Headers<Header> headers = consumerRecord.headers();
            String obj = (consumerLocalConfigs == null || !KafkaConstants.PROTO.equalsIgnoreCase(consumerLocalConfigs.getRecordType())) ? value.toString() : convertProtobufToJson(consumerRecord, consumerLocalConfigs);
            LOGGER.info("\nRecord Key - {} , Record value - {}, Record partition - {}, Record offset - {}, Headers - {}", new Object[]{key, obj, Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), headers});
            JsonNode readTree = objectMapper.readTree(obj);
            HashMap hashMap = null;
            if (headers != null) {
                hashMap = new HashMap();
                for (Header header : headers) {
                    hashMap.put(header.key(), new String(header.value()));
                }
            }
            list.add(new ConsumerJsonRecord(consumerRecord.key(), null, readTree, hashMap));
        }
    }

    private static String convertProtobufToJson(ConsumerRecord consumerRecord, ConsumerLocalConfigs consumerLocalConfigs) {
        if (StringUtils.isEmpty(consumerLocalConfigs.getProtoClassType())) {
            throw new IllegalArgumentException("[protoClassType] is required consumer config for recordType PROTO.");
        }
        try {
            return JsonFormat.printer().includingDefaultValueFields().preservingProtoFieldNames().print(createMessageOrBuilder(consumerLocalConfigs.getProtoClassType(), (byte[]) consumerRecord.value()));
        } catch (InvalidProtocolBufferException e) {
            throw new IllegalArgumentException((Throwable) e);
        }
    }

    private static MessageOrBuilder createMessageOrBuilder(String str, byte[] bArr) {
        try {
            return (MessageOrBuilder) Class.forName(str).getMethod("parseFrom", byte[].class).invoke(null, bArr);
        } catch (ClassNotFoundException | IllegalAccessException | IllegalArgumentException | NoSuchMethodException | SecurityException | InvocationTargetException e) {
            throw new IllegalArgumentException(e);
        }
    }

    public static String prepareResult(ConsumerLocalConfigs consumerLocalConfigs, List<ConsumerJsonRecord> list, List<ConsumerRecord> list2) throws JsonProcessingException {
        String prettyPrintJson;
        if (consumerLocalConfigs == null || consumerLocalConfigs.getShowRecordsConsumed().booleanValue()) {
            prettyPrintJson = (consumerLocalConfigs == null || !KafkaConstants.RAW.equals(consumerLocalConfigs.getRecordType())) ? (consumerLocalConfigs == null || !(KafkaConstants.JSON.equals(consumerLocalConfigs.getRecordType()) || KafkaConstants.PROTO.equalsIgnoreCase(consumerLocalConfigs.getRecordType()))) ? "{\"error\" : \"recordType Undecided, Please chose recordType as JSON or RAW\"}" : SmartUtils.prettyPrintJson(objectMapper.writeValueAsString(new ConsumerJsonRecords(list))) : SmartUtils.prettyPrintJson(gson.toJson(new ConsumerRawRecords(list2)));
        } else {
            int size = list.size();
            prettyPrintJson = SmartUtils.prettyPrintJson(gson.toJson(new ConsumerRawRecords(Integer.valueOf(size == 0 ? list2.size() : size))));
        }
        return prettyPrintJson;
    }

    public static void handleCommitSyncAsync(Consumer<Long, String> consumer, ConsumerCommonConfigs consumerCommonConfigs, ConsumerLocalConfigs consumerLocalConfigs) {
        Boolean bool;
        Boolean bool2;
        if (consumerLocalConfigs == null) {
            LOGGER.warn("[No local test configs]-Kafka client neither did `commitAsync()` nor `commitSync()`");
            return;
        }
        Boolean commitSync = consumerLocalConfigs.getCommitSync();
        Boolean commitAsync = consumerLocalConfigs.getCommitAsync();
        if (commitSync == null && commitAsync == null) {
            bool = consumerCommonConfigs.getCommitSync();
            bool2 = consumerCommonConfigs.getCommitAsync();
        } else {
            bool = commitSync;
            bool2 = commitAsync;
        }
        if (bool != null && bool.booleanValue()) {
            consumer.commitSync();
        } else if (bool2 == null || !bool2.booleanValue()) {
            LOGGER.warn("Kafka client neither configured for `commitAsync()` nor `commitSync()`");
        } else {
            consumer.commitAsync();
        }
    }

    public static void handleSeekOffset(ConsumerLocalConfigs consumerLocalConfigs, Consumer consumer) {
        if (StringUtils.isEmpty(consumerLocalConfigs.getSeek())) {
            return;
        }
        String[] seekTopicPartitionOffset = consumerLocalConfigs.getSeekTopicPartitionOffset();
        String str = seekTopicPartitionOffset[0];
        int parseInt = Integer.parseInt(seekTopicPartitionOffset[1]);
        long parseLong = Long.parseLong(seekTopicPartitionOffset[2]);
        TopicPartition topicPartition = new TopicPartition(str, parseInt);
        HashSet hashSet = new HashSet();
        hashSet.add(topicPartition);
        consumer.unsubscribe();
        consumer.assign(hashSet);
        if (parseLong > -1) {
            consumer.seek(topicPartition, parseLong);
        } else {
            consumer.seekToEnd(hashSet);
            consumer.seek(topicPartition, consumer.position(topicPartition) + parseLong);
        }
    }

    private static void validateCommitFlags(Boolean bool, Boolean bool2) {
        if (bool != null && bool2 != null && bool.booleanValue() && bool2.booleanValue()) {
            throw new RuntimeException("\n********* Both commitSync and commitAsync can not be true *********\n");
        }
    }

    private static void validateSeekConfig(ConsumerLocalConfigs consumerLocalConfigs) {
        String seek = consumerLocalConfigs.getSeek();
        if (StringUtils.isEmpty(seek)) {
            return;
        }
        String[] split = seek.split(",");
        if (split == null || split.length < 3) {
            throw new RuntimeException("\n------> 'seek' should contain 'topic,partition,offset' e.g. 'topic1,0,2' ");
        }
    }
}
