package io.trino.plugin.eventlistener.kafka;

import com.google.common.base.Preconditions;
import io.airlift.log.Logger;
import io.trino.plugin.eventlistener.kafka.metadata.EnvMetadataProvider;
import io.trino.plugin.eventlistener.kafka.metadata.MetadataProvider;
import io.trino.plugin.eventlistener.kafka.metadata.NoOpMetadataProvider;
import io.trino.plugin.eventlistener.kafka.metrics.KafkaEventListenerJmxStats;
import io.trino.plugin.eventlistener.kafka.producer.KafkaProducerFactory;
import io.trino.plugin.eventlistener.kafka.producer.SSLKafkaProducerFactory;
import io.trino.plugin.kafka.utils.PropertiesUtils;
import io.trino.spi.eventlistener.QueryCompletedEvent;
import io.trino.spi.eventlistener.QueryCreatedEvent;
import io.trino.spi.eventlistener.SplitCompletedEvent;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.SwitchBootstraps;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;

/* loaded from: input_file:io/trino/plugin/eventlistener/kafka/KafkaEventPublisher.class */
public class KafkaEventPublisher {
    private static final Logger LOG = Logger.get(KafkaEventPublisher.class);
    private final Producer<String, String> kafkaProducer;
    private final KafkaRecordBuilder kafkaRecordBuilder;
    private final KafkaEventListenerJmxStats stats;

    public KafkaEventPublisher(KafkaEventListenerConfig kafkaEventListenerConfig, KafkaProducerFactory kafkaProducerFactory, KafkaEventListenerJmxStats kafkaEventListenerJmxStats) throws Exception {
        this.stats = (KafkaEventListenerJmxStats) Objects.requireNonNull(kafkaEventListenerJmxStats, "stats cannot be null");
        Objects.requireNonNull(kafkaEventListenerConfig, "config cannot be null");
        Objects.requireNonNull(kafkaProducerFactory, "producerFactory cannot be null");
        Preconditions.checkArgument(kafkaEventListenerConfig.getCreatedTopicName().isPresent() || kafkaEventListenerConfig.getCompletedTopicName().isPresent(), "Either created or completed topic must be present");
        String orElse = kafkaEventListenerConfig.getCreatedTopicName().orElse("");
        String orElse2 = kafkaEventListenerConfig.getCompletedTopicName().orElse("");
        String orElse3 = kafkaEventListenerConfig.getSplitCompletedTopicName().orElse("");
        Map<String, String> readProperties = PropertiesUtils.readProperties(kafkaEventListenerConfig.getResourceConfigFiles());
        LOG.info("Creating Kafka publisher (SSL=%s) for topics: %s/%s with excluded fields: %s and kafka config overrides: %s", new Object[]{Boolean.valueOf(kafkaProducerFactory instanceof SSLKafkaProducerFactory), orElse, orElse2, kafkaEventListenerConfig.getExcludedFields(), readProperties});
        this.kafkaProducer = kafkaProducerFactory.producer(readProperties);
        checkConnectivityToBrokers(kafkaEventListenerConfig.getPublishCreatedEvent() ? orElse : orElse2, kafkaEventListenerConfig.getRequestTimeout().toMillis());
        this.kafkaRecordBuilder = new KafkaRecordBuilder(orElse, orElse2, orElse3, kafkaEventListenerConfig.getExcludedFields(), metadataProvider(kafkaEventListenerConfig));
        LOG.info("Successfully created Kafka publisher.");
    }

    private void checkConnectivityToBrokers(String str, long j) throws Exception {
        LOG.info("checking connectivity to brokers (fetching partitions for topic=%s).", new Object[]{str});
        CompletableFuture.runAsync(() -> {
            this.kafkaProducer.partitionsFor(str);
        }).get(j, TimeUnit.MILLISECONDS);
        LOG.info("connectivity check succeeded.");
    }

    private MetadataProvider metadataProvider(KafkaEventListenerConfig kafkaEventListenerConfig) {
        return kafkaEventListenerConfig.getEnvironmentVariablePrefix().isPresent() ? new EnvMetadataProvider(kafkaEventListenerConfig.getEnvironmentVariablePrefix().get()) : new NoOpMetadataProvider();
    }

    public void publishCompletedEvent(QueryCompletedEvent queryCompletedEvent) {
        this.stats.completedEventReceived();
        String queryId = queryCompletedEvent.getMetadata().getQueryId();
        LOG.debug("preparing to send QueryCompletedEvent for query id: %s", new Object[]{queryId});
        ProducerRecord<String, String> producerRecord = null;
        try {
            producerRecord = this.kafkaRecordBuilder.buildCompletedRecord(queryCompletedEvent);
        } catch (Exception e) {
            this.stats.completedEventBuildFailure();
            LOG.warn(e, "unable to build QueryCompletedEvent for query id: %s", new Object[]{queryId});
        }
        if (producerRecord != null) {
            this.kafkaProducer.send(producerRecord, (recordMetadata, exc) -> {
                if (exc == null) {
                    this.stats.completedEventSuccessfulDispatch();
                    LOG.debug("successfully sent QueryCompletedEvent for query id: %s", new Object[]{queryId});
                    return;
                }
                Objects.requireNonNull(exc);
                switch ((int) SwitchBootstraps.typeSwitch(MethodHandles.lookup(), "typeSwitch", MethodType.methodType(Integer.TYPE, Exception.class, Integer.TYPE), TimeoutException.class, RecordTooLargeException.class, InvalidRecordException.class).dynamicInvoker().invoke(exc, 0) /* invoke-custom */) {
                    case 0:
                        this.stats.completedEventSendFailureTimeout();
                        break;
                    case 1:
                        this.stats.completedEventSendFailureTooLarge();
                        break;
                    case 2:
                        this.stats.completedEventSendFailureInvalidRecord();
                        break;
                    default:
                        this.stats.completedEventSendFailureOther();
                        break;
                }
                LOG.warn(exc, "failed to send QueryCompletedEvent for query id: %s. Uncompressed message size: %s. Partition: %s", new Object[]{queryId, Integer.valueOf(recordMetadata.serializedValueSize()), Integer.valueOf(recordMetadata.partition())});
            });
        }
    }

    public void publishCreatedEvent(QueryCreatedEvent queryCreatedEvent) {
        this.stats.createdEventReceived();
        String queryId = queryCreatedEvent.getMetadata().getQueryId();
        LOG.debug("preparing to send QueryCreatedEvent for query id: %s", new Object[]{queryId});
        ProducerRecord<String, String> producerRecord = null;
        try {
            producerRecord = this.kafkaRecordBuilder.buildStartedRecord(queryCreatedEvent);
        } catch (Exception e) {
            this.stats.createdEventBuildFailure();
            LOG.warn(e, "unable to build QueryCreatedEvent for query id: %s", new Object[]{queryId});
        }
        if (producerRecord != null) {
            this.kafkaProducer.send(producerRecord, (recordMetadata, exc) -> {
                if (exc == null) {
                    this.stats.createdEventSuccessfulDispatch();
                    LOG.debug("successfully sent QueryCreatedEvent for query id: %s", new Object[]{queryId});
                    return;
                }
                Objects.requireNonNull(exc);
                switch ((int) SwitchBootstraps.typeSwitch(MethodHandles.lookup(), "typeSwitch", MethodType.methodType(Integer.TYPE, Exception.class, Integer.TYPE), TimeoutException.class, RecordTooLargeException.class, InvalidRecordException.class).dynamicInvoker().invoke(exc, 0) /* invoke-custom */) {
                    case 0:
                        this.stats.createdEventSendFailureTimeout();
                        break;
                    case 1:
                        this.stats.createdEventSendFailureTooLarge();
                        break;
                    case 2:
                        this.stats.createdEventSendFailureInvalidRecord();
                        break;
                    default:
                        this.stats.createdEventSendFailureOther();
                        break;
                }
                LOG.warn(exc, "failed to send QueryCreatedEvent for query id: %s. Uncompressed message size: %s. Partition: %s", new Object[]{queryId, Integer.valueOf(recordMetadata.serializedValueSize()), Integer.valueOf(recordMetadata.partition())});
            });
        }
    }

    public void publishSplitCompletedEvent(SplitCompletedEvent splitCompletedEvent) {
        this.stats.splitCompletedEventReceived();
        String queryId = splitCompletedEvent.getQueryId();
        LOG.debug("preparing to send SplitCompletedEvent for query id: %s", new Object[]{queryId});
        ProducerRecord<String, String> producerRecord = null;
        try {
            producerRecord = this.kafkaRecordBuilder.buildSplitCompletedRecord(splitCompletedEvent);
        } catch (Exception e) {
            this.stats.splitCompletedEventBuildFailure();
            LOG.warn(e, "unable to build SplitCompletedEvent for query id: %s", new Object[]{queryId});
        }
        if (producerRecord != null) {
            this.kafkaProducer.send(producerRecord, (recordMetadata, exc) -> {
                if (exc == null) {
                    this.stats.splitCompletedEventSuccessfulDispatch();
                    LOG.debug("successfully sent SplitCompletedEvent for query id: %s", new Object[]{queryId});
                    return;
                }
                Objects.requireNonNull(exc);
                switch ((int) SwitchBootstraps.typeSwitch(MethodHandles.lookup(), "typeSwitch", MethodType.methodType(Integer.TYPE, Exception.class, Integer.TYPE), TimeoutException.class, RecordTooLargeException.class, InvalidRecordException.class).dynamicInvoker().invoke(exc, 0) /* invoke-custom */) {
                    case 0:
                        this.stats.splitCompletedEventSendFailureTimeout();
                        break;
                    case 1:
                        this.stats.splitCompletedEventSendFailureTooLarge();
                        break;
                    case 2:
                        this.stats.splitCompletedEventSendFailureInvalidRecord();
                        break;
                    default:
                        this.stats.splitCompletedEventSendFailureOther();
                        break;
                }
                LOG.warn(exc, "failed to send SplitCompletedEvent for query id: %s. Uncompressed message size: %s. Partition: %s", new Object[]{queryId, Integer.valueOf(recordMetadata.serializedValueSize()), Integer.valueOf(recordMetadata.partition())});
            });
        }
    }

    public void shutdown() {
        this.kafkaProducer.close();
    }
}
