package io.streamthoughts.jikkou.kafka.reporter;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.streamthoughts.jikkou.annotation.ExtensionEnabled;
import io.streamthoughts.jikkou.api.JikkouInfo;
import io.streamthoughts.jikkou.api.change.Change;
import io.streamthoughts.jikkou.api.change.ChangeResult;
import io.streamthoughts.jikkou.api.config.Configuration;
import io.streamthoughts.jikkou.api.error.ConfigException;
import io.streamthoughts.jikkou.api.io.Jackson;
import io.streamthoughts.jikkou.api.reporter.ChangeReporter;
import io.streamthoughts.jikkou.common.utils.AsyncUtils;
import io.streamthoughts.jikkou.kafka.internals.KafkaRecord;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientContext;
import io.streamthoughts.jikkou.kafka.internals.producer.DefaultProducerFactory;
import io.streamthoughts.jikkou.kafka.internals.producer.KafkaRecordSender;
import io.streamthoughts.jikkou.kafka.internals.producer.ProducerFactory;
import io.streamthoughts.jikkou.kafka.reporter.ce.CloudEventEntityBuilder;
import io.streamthoughts.jikkou.kafka.reporter.ce.CloudEventExtension;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtensionEnabled(false)
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/reporter/KafkaChangeReporter.class */
public class KafkaChangeReporter implements ChangeReporter {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaChangeReporter.class);
    public static final int NUM_PARTITIONS = 1;
    private KafkaChangeReporterConfig configuration;
    private ObjectMapper objectMapper;
    private ProducerFactory<byte[], byte[]> producerFactory;

    public KafkaChangeReporter() {
        this.objectMapper = Jackson.JSON_OBJECT_MAPPER;
    }

    public KafkaChangeReporter(@NotNull Configuration configuration) {
        this.objectMapper = Jackson.JSON_OBJECT_MAPPER;
        configure(configuration);
    }

    public KafkaChangeReporter(@NotNull ProducerFactory<byte[], byte[]> producerFactory, @NotNull ObjectMapper objectMapper) {
        this.objectMapper = Jackson.JSON_OBJECT_MAPPER;
        this.producerFactory = (ProducerFactory) Objects.requireNonNull(producerFactory, "producerFactory cannot be null");
        this.objectMapper = (ObjectMapper) Objects.requireNonNull(objectMapper, "objectMapper cannot be null");
    }

    public void configure(@NotNull Configuration configuration) throws ConfigException {
        LOG.info("Configuration");
        this.configuration = new KafkaChangeReporterConfig(configuration);
        if (this.producerFactory == null) {
            this.producerFactory = new DefaultProducerFactory(this.configuration.producerConfig(), (Serializer) new ByteArraySerializer(), (Serializer) new ByteArraySerializer());
        }
    }

    public void report(List<ChangeResult<Change>> list) {
        LOG.info("Starting reporting for {} changes", Integer.valueOf(list.size()));
        checkIfTopicNeedToBeCreated();
        String str = this.configuration.topicName();
        String eventSource = this.configuration.eventSource();
        List list2 = filterRelevantChangeResults(list).map(changeResult -> {
            try {
                return KafkaRecord.builder().header("content-type", "application/cloudevents+json; charset=UTF-8").value(this.objectMapper.writeValueAsBytes(CloudEventEntityBuilder.newBuilder().withSpecVersion("1.0").withId("uuid:" + UUID.randomUUID()).withTime(ZonedDateTime.now(ZoneOffset.UTC)).withType("io.jikkou.resourcechangeevent").withSource(eventSource).withDataContentType("application/json").withExtension(CloudEventExtension.of("iojikkouversion", JikkouInfo.getVersion())).withData(changeResult).build())).topic(str).build();
            } catch (JsonProcessingException e) {
                throw new RuntimeException((Throwable) e);
            }
        }).toList();
        Producer<byte[], byte[]> createProducer = this.producerFactory.createProducer();
        try {
            List send = new KafkaRecordSender(createProducer).send(list2);
            LOG.debug("Flushing any pending requests in producer");
            createProducer.flush();
            if (createProducer != null) {
                createProducer.close();
            }
            try {
                AsyncUtils.waitForAll(send).get();
                LOG.debug("Sending completed for {} records", Integer.valueOf(list.size()));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (ExecutionException e2) {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private Stream<ChangeResult<Change>> filterRelevantChangeResults(List<ChangeResult<Change>> list) {
        return list.stream().filter(changeResult -> {
            return changeResult.isChanged() && !changeResult.isFailed();
        });
    }

    private void checkIfTopicNeedToBeCreated() {
        if (this.configuration.isTopicCreationEnabled()) {
            AdminClient create = AdminClient.create(this.configuration.adminClientConfig());
            try {
                new AdminClientContext(create).createTopic(this.configuration.topicName(), 1, (short) this.configuration.defaultReplicationFactor());
                if (create != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }
}
