package io.debezium.server.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConnectionFactoryConfigurator;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.Header;
import io.debezium.server.BaseChangeConsumer;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Named;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named("rabbitmq")
@Dependent
/* loaded from: input_file:io/debezium/server/rabbitmq/RabbitMqStreamChangeConsumer.class */
public class RabbitMqStreamChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMqStreamChangeConsumer.class);
    private static final String PROP_PREFIX = "debezium.sink.rabbitmq.";
    private static final String PROP_CONNECTION_PREFIX = "debezium.sink.rabbitmq.connection.";

    @ConfigProperty(name = "debezium.sink.rabbitmq.exchange", defaultValue = "")
    Optional<String> exchange;

    @ConfigProperty(name = "debezium.sink.rabbitmq.routingKey", defaultValue = "")
    Optional<String> routingKey;

    @ConfigProperty(name = "debezium.sink.rabbitmq.autoCreateRoutingKey", defaultValue = "false")
    Boolean autoCreateRoutingKey;

    @ConfigProperty(name = "debezium.sink.rabbitmq.routingKeyDurable", defaultValue = "true")
    Boolean routingKeyDurable;

    @ConfigProperty(name = "debezium.sink.rabbitmq.routingKeyFromTopicName", defaultValue = "false")
    Boolean routingKeyFromTopicName;

    @ConfigProperty(name = "debezium.sink.rabbitmq.deliveryMode", defaultValue = "2")
    int deliveryMode;

    @ConfigProperty(name = "debezium.sink.rabbitmq.ackTimeout", defaultValue = "30000")
    int ackTimeout;

    @ConfigProperty(name = "debezium.sink.rabbitmq.null.value", defaultValue = "default")
    String nullValue;
    Connection connection;
    Channel channel;

    @PostConstruct
    void connect() {
        Config config = ConfigProvider.getConfig();
        ConnectionFactory connectionFactory = new ConnectionFactory();
        ConnectionFactoryConfigurator.load(connectionFactory, (Map) getConfigSubset(config, PROP_CONNECTION_PREFIX).entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            if (entry.getValue() == null) {
                return null;
            }
            return entry.getValue().toString();
        })), "");
        LOGGER.info("Using connection to {}:{}", connectionFactory.getHost(), Integer.valueOf(connectionFactory.getPort()));
        try {
            this.connection = connectionFactory.newConnection();
            this.channel = this.connection.createChannel();
            this.channel.confirmSelect();
            if (!this.routingKeyFromTopicName.booleanValue() && this.autoCreateRoutingKey.booleanValue()) {
                String orElse = this.routingKey.orElse("");
                LOGGER.info("Creating queue for routing key named '{}'", orElse);
                this.channel.queueDeclare(orElse, this.routingKeyDurable.booleanValue(), false, false, (Map) null);
            }
        } catch (IOException | TimeoutException e) {
            throw new DebeziumException(e);
        }
    }

    @PreDestroy
    void close() {
        try {
            if (this.channel != null) {
                this.channel.close();
            }
            if (this.connection != null) {
                this.connection.close();
            }
        } catch (IOException | TimeoutException e) {
            throw new DebeziumException(e);
        }
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        for (ChangeEvent<Object, Object> changeEvent : list) {
            LOGGER.trace("Received event '{}'", changeEvent);
            String orElse = this.routingKey.orElse(this.routingKeyFromTopicName.booleanValue() ? this.streamNameMapper.map(changeEvent.destination()) : "");
            String orElse2 = this.exchange.orElse(this.streamNameMapper.map(changeEvent.destination()));
            try {
                if (this.routingKeyFromTopicName.booleanValue() && this.autoCreateRoutingKey.booleanValue()) {
                    LOGGER.trace("Creating queue for routing key named '{}'", orElse);
                    this.channel.queueDeclare(orElse, this.routingKeyDurable.booleanValue(), false, false, (Map) null);
                }
                this.channel.basicPublish(orElse2, orElse, new AMQP.BasicProperties.Builder().deliveryMode(Integer.valueOf(this.deliveryMode)).headers(convertRabbitMqHeaders(changeEvent)).build(), getBytes(changeEvent.value() != null ? changeEvent.value() : this.nullValue));
                recordCommitter.markProcessed(changeEvent);
            } catch (IOException e) {
                throw new DebeziumException(e);
            }
        }
        try {
            this.channel.waitForConfirmsOrDie(this.ackTimeout);
            LOGGER.trace("Sent messages");
            recordCommitter.markBatchFinished();
        } catch (IOException | TimeoutException e2) {
            throw new DebeziumException(e2);
        }
    }

    private Map<String, Object> convertRabbitMqHeaders(ChangeEvent<Object, Object> changeEvent) {
        List<Header> headers = changeEvent.headers();
        HashMap hashMap = new HashMap();
        for (Header header : headers) {
            hashMap.put(header.getKey(), header.getValue());
        }
        return hashMap;
    }
}
