package io.streamthoughts.jikkou.kafka.reconcilier;

import io.streamthoughts.jikkou.core.annotation.HandledResource;
import io.streamthoughts.jikkou.core.config.ConfigProperty;
import io.streamthoughts.jikkou.core.config.Configuration;
import io.streamthoughts.jikkou.core.exceptions.ConfigException;
import io.streamthoughts.jikkou.core.exceptions.JikkouRuntimeException;
import io.streamthoughts.jikkou.core.extension.annotations.ConfigPropertySpec;
import io.streamthoughts.jikkou.core.extension.annotations.ExtensionConfigProperties;
import io.streamthoughts.jikkou.core.models.ObjectMeta;
import io.streamthoughts.jikkou.core.models.ResourceListObject;
import io.streamthoughts.jikkou.core.reconcilier.Collector;
import io.streamthoughts.jikkou.core.selectors.AggregateSelector;
import io.streamthoughts.jikkou.core.selectors.Selector;
import io.streamthoughts.jikkou.kafka.collections.V1KafkaTableRecordList;
import io.streamthoughts.jikkou.kafka.internals.KafkaRecord;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientContext;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientFactory;
import io.streamthoughts.jikkou.kafka.internals.admin.DefaultAdminClientFactory;
import io.streamthoughts.jikkou.kafka.internals.consumer.ConsumerFactory;
import io.streamthoughts.jikkou.kafka.internals.consumer.ConsumerRecordCallback;
import io.streamthoughts.jikkou.kafka.internals.consumer.DefaultConsumerFactory;
import io.streamthoughts.jikkou.kafka.internals.consumer.KafkaLogToEndConsumer;
import io.streamthoughts.jikkou.kafka.model.DataHandle;
import io.streamthoughts.jikkou.kafka.model.DataType;
import io.streamthoughts.jikkou.kafka.model.DataValue;
import io.streamthoughts.jikkou.kafka.model.KafkaRecordHeader;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTableRecord;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTableRecordSpec;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@HandledResource(type = V1KafkaTableRecord.class)
@ExtensionConfigProperties(properties = {@ConfigPropertySpec(name = Config.TOPIC_CONFIG_NAME, description = Config.TOPIC_CONFIG_DESCRIPTION, type = String.class), @ConfigPropertySpec(name = Config.KEY_TYPE_CONFIG_NAME, description = Config.KEY_TYPE_CONFIG_DESCRIPTION, type = String.class), @ConfigPropertySpec(name = Config.VALUE_TYPE_CONFIG_NAME, description = Config.VALUE_TYPE_CONFIG_DESCRIPTION, type = String.class), @ConfigPropertySpec(name = Config.SKIP_MESSAGE_ON_ERROR_CONFIG_NAME, description = Config.SKIP_MESSAGE_ON_ERROR_CONFIG_DESCRIPTION, type = Boolean.class, defaultValue = "false", isRequired = false)})
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/reconcilier/AdminClientKafkaTableCollector.class */
public final class AdminClientKafkaTableCollector implements Collector<V1KafkaTableRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(AdminClientKafkaTableCollector.class);
    public static final Map<String, Object> EMPTY_CONFIG = Collections.emptyMap();
    private ConsumerFactory<byte[], byte[]> consumerFactory;
    private AdminClientFactory adminClientFactory;

    /* loaded from: input_file:io/streamthoughts/jikkou/kafka/reconcilier/AdminClientKafkaTableCollector$Config.class */
    public static class Config {
        public static final String TOPIC_CONFIG_NAME = "topic-name";
        public static final String TOPIC_CONFIG_DESCRIPTION = "The topic name to consume on.";
        public static ConfigProperty<String> TOPIC_NAME_CONFIG = ConfigProperty.ofString(TOPIC_CONFIG_NAME).description(TOPIC_CONFIG_DESCRIPTION);
        public static final String KEY_TYPE_CONFIG_NAME = "key-type";
        public static final String KEY_TYPE_CONFIG_DESCRIPTION = "The record key type.";
        public static ConfigProperty<String> KEY_TYPE_CONFIG = ConfigProperty.ofString(KEY_TYPE_CONFIG_NAME).description(KEY_TYPE_CONFIG_DESCRIPTION);
        public static final String VALUE_TYPE_CONFIG_NAME = "value-type";
        public static final String VALUE_TYPE_CONFIG_DESCRIPTION = "The record value type.";
        public static ConfigProperty<String> VALUE_TYPE_CONFIG = ConfigProperty.ofString(VALUE_TYPE_CONFIG_NAME).description(VALUE_TYPE_CONFIG_DESCRIPTION);
        public static final String SKIP_MESSAGE_ON_ERROR_CONFIG_NAME = "skip-message-on-error";
        public static final String SKIP_MESSAGE_ON_ERROR_CONFIG_DESCRIPTION = "If there is an error when processing a message, skip it instead of halt.";
        public static ConfigProperty<Boolean> SKIP_MESSAGE_ON_ERROR_CONFIG = ConfigProperty.ofBoolean(SKIP_MESSAGE_ON_ERROR_CONFIG_NAME).description(SKIP_MESSAGE_ON_ERROR_CONFIG_DESCRIPTION).orElse(false);
        private final Configuration configuration;

        public Config(Configuration configuration) {
            this.configuration = (Configuration) Objects.requireNonNull(configuration, "configuration must not be null");
        }

        public boolean skipMessageOnError() {
            return ((Boolean) SKIP_MESSAGE_ON_ERROR_CONFIG.evaluate(this.configuration)).booleanValue();
        }

        public String topicName() {
            return (String) TOPIC_NAME_CONFIG.evaluate(this.configuration);
        }

        public DataType keyType() {
            return DataType.valueOf(((String) KEY_TYPE_CONFIG.evaluate(this.configuration)).toUpperCase(Locale.ROOT));
        }

        public DataType valueType() {
            return DataType.valueOf(((String) VALUE_TYPE_CONFIG.evaluate(this.configuration)).toUpperCase(Locale.ROOT));
        }

        public Map<String, Object> clientConfig() {
            return (Map) KafkaClientConfiguration.CONSUMER_CLIENT_CONFIG.evaluate(this.configuration);
        }
    }

    /* loaded from: input_file:io/streamthoughts/jikkou/kafka/reconcilier/AdminClientKafkaTableCollector$InternalConsumerRecordCallback.class */
    public static class InternalConsumerRecordCallback implements ConsumerRecordCallback<byte[], byte[]> {
        private final Map<DataHandle, V1KafkaTableRecord> accumulator = new LinkedHashMap();
        private final DataType keyType;
        private final DataType valueType;
        private final boolean skipMessageOnError;

        public InternalConsumerRecordCallback(DataType dataType, DataType dataType2, boolean z) {
            this.keyType = dataType;
            this.valueType = dataType2;
            this.skipMessageOnError = z;
        }

        @Override // java.util.function.Consumer
        public void accept(KafkaRecord<byte[], byte[]> kafkaRecord) {
            AdminClientKafkaTableCollector.LOG.debug("Consuming from kafka from {}-{} at offset {}", new Object[]{kafkaRecord.topic(), kafkaRecord.partition(), kafkaRecord.offset()});
            if (kafkaRecord.key() == null) {
                AdminClientKafkaTableCollector.LOG.debug("Skipping record with key 'null' from {}-{} at offset {}", new Object[]{kafkaRecord.topic(), kafkaRecord.partition(), kafkaRecord.offset()});
                return;
            }
            DataHandle dataHandle = deserialize(kafkaRecord, kafkaRecord.key(), this.keyType, true).get();
            if (kafkaRecord.value() == null) {
                AdminClientKafkaTableCollector.LOG.debug("Detecting tombstone record for key '{}' from {}-{} at offset {}", new Object[]{dataHandle, kafkaRecord.topic(), kafkaRecord.partition(), kafkaRecord.offset()});
                this.accumulator.remove(dataHandle);
            } else {
                this.accumulator.put(dataHandle, V1KafkaTableRecord.builder().withMetadata(ObjectMeta.builder().withName(kafkaRecord.topic()).withAnnotation("kafka.jikkou.io/record-partition", kafkaRecord.partition()).withAnnotation("kafka.jikkou.io/record-offset", kafkaRecord.offset()).withAnnotation("kafka.jikkou.io/record-timestamp", kafkaRecord.timestamp()).build()).withSpec(V1KafkaTableRecordSpec.builder().withKey(new DataValue(this.keyType, dataHandle)).withValue(new DataValue(this.valueType, deserialize(kafkaRecord, kafkaRecord.value(), this.valueType, false).get())).withHeaders(StreamSupport.stream(kafkaRecord.headers().spliterator(), false).map(header -> {
                    return new KafkaRecordHeader(header.key(), new String(header.value(), StandardCharsets.UTF_8));
                }).toList()).build()).build());
            }
        }

        private Optional<DataHandle> deserialize(KafkaRecord<byte[], byte[]> kafkaRecord, byte[] bArr, DataType dataType, boolean z) {
            try {
                return dataType.getDataSerde().deserialize(kafkaRecord.topic(), (ByteBuffer) Optional.ofNullable(bArr).map(ByteBuffer::wrap).orElse(null), AdminClientKafkaTableCollector.EMPTY_CONFIG, z).or(() -> {
                    return Optional.of(DataHandle.NULL);
                });
            } catch (Exception e) {
                if (!this.skipMessageOnError) {
                    throw new JikkouRuntimeException(String.format("Error while deserializing record from from %s-%d at offset %d. %s", kafkaRecord.topic(), kafkaRecord.partition(), kafkaRecord.offset(), e.getLocalizedMessage()), e);
                }
                AdminClientKafkaTableCollector.LOG.info("Skip message from {}-{} at offset {}. Error while deserializing record: {}", new Object[]{kafkaRecord.topic(), kafkaRecord.partition(), kafkaRecord.offset(), e.getLocalizedMessage()});
                return Optional.empty();
            }
        }

        public List<V1KafkaTableRecord> allRecords() {
            return new ArrayList(this.accumulator.values());
        }
    }

    public AdminClientKafkaTableCollector() {
    }

    public AdminClientKafkaTableCollector(@Nullable ConsumerFactory<byte[], byte[]> consumerFactory, @Nullable AdminClientFactory adminClientFactory) {
        this.consumerFactory = consumerFactory;
        this.adminClientFactory = adminClientFactory;
    }

    public AdminClientKafkaTableCollector(@NotNull Configuration configuration) {
        configure(configuration);
    }

    public void configure(@NotNull Configuration configuration) throws ConfigException {
        LOG.info("Configuring");
        if (this.consumerFactory == null) {
            this.consumerFactory = new DefaultConsumerFactory(new Config(configuration).clientConfig()).setKeyDeserializer(new ByteArrayDeserializer()).setValueDeserializer(new ByteArrayDeserializer());
        }
        if (this.adminClientFactory == null) {
            this.adminClientFactory = new DefaultAdminClientFactory((Supplier<Map<String, Object>>) () -> {
                return (Map) KafkaClientConfiguration.ADMIN_CLIENT_CONFIG.evaluate(configuration);
            });
        }
    }

    public ResourceListObject<V1KafkaTableRecord> listAll(@NotNull Configuration configuration, @NotNull List<Selector> list) {
        Config config = new Config(configuration);
        String str = config.topicName();
        LOG.debug("Checking if kafka topic {} is compacted", str);
        AdminClientContext adminClientContext = new AdminClientContext(this.adminClientFactory);
        try {
            if (!adminClientContext.isTopicCleanupPolicyCompact(str, false)) {
                throw new JikkouRuntimeException(String.format("Cannot list records from non compacted topic '%s'. Topic must be configured with: %s=%s", str, "cleanup.policy", "compact"));
            }
            adminClientContext.close();
            LOG.debug("Listing all records from kafka topic {}", config.topicName());
            KafkaLogToEndConsumer kafkaLogToEndConsumer = new KafkaLogToEndConsumer(this.consumerFactory);
            InternalConsumerRecordCallback internalConsumerRecordCallback = new InternalConsumerRecordCallback(config.keyType(), config.valueType(), config.skipMessageOnError());
            kafkaLogToEndConsumer.readTopicToEnd(config.topicName(), internalConsumerRecordCallback);
            Stream<V1KafkaTableRecord> filter = internalConsumerRecordCallback.allRecords().stream().filter(v1KafkaTableRecord -> {
                DataValue key = v1KafkaTableRecord.m41getSpec().getKey();
                return (key == null || key.data().isNull()) ? false : true;
            });
            AggregateSelector aggregateSelector = new AggregateSelector(list);
            return new V1KafkaTableRecordList((List) filter.filter((v1) -> {
                return r1.apply(v1);
            }).collect(Collectors.toList()));
        } catch (Throwable th) {
            try {
                adminClientContext.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
