package io.debezium.server.rabbitmq;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConnectionFactoryConfigurator;
import com.rabbitmq.stream.Address;
import com.rabbitmq.stream.ByteCapacity;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.StreamCreator;
import com.rabbitmq.stream.StreamException;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.Header;
import io.debezium.server.BaseChangeConsumer;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Named;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named("rabbitmqstream")
@Dependent
/* loaded from: input_file:io/debezium/server/rabbitmq/RabbitMqStreamNativeChangeConsumer.class */
public class RabbitMqStreamNativeChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMqStreamNativeChangeConsumer.class);
    private static final String PROP_PREFIX = "debezium.sink.rabbitmqstream.";
    private static final String PROP_STREAM = "debezium.sink.rabbitmqstream.stream";
    private static final String PROP_CONNECTION_PREFIX = "debezium.sink.rabbitmqstream.connection.";

    @ConfigProperty(name = PROP_STREAM)
    Optional<String> stream;

    @ConfigProperty(name = "debezium.sink.rabbitmqstream.stream.maxAge")
    Optional<Duration> streamMaxAge;

    @ConfigProperty(name = "debezium.sink.rabbitmqstream.stream.maxLength")
    Optional<String> streamMaxLength;

    @ConfigProperty(name = "debezium.sink.rabbitmqstream.stream.maxSegmentSize")
    Optional<String> streamMaxSegmentSize;

    @ConfigProperty(name = "debezium.sink.rabbitmqstream.ackTimeout", defaultValue = "30000")
    int ackTimeout;

    @ConfigProperty(name = "debezium.sink.rabbitmqstream.null.value", defaultValue = "default")
    String nullValue;
    Environment environment;
    Map<String, Producer> streamProducers = new HashMap();

    private void createStream(Environment environment, String str) {
        LOGGER.info("Creating stream '{}'", str);
        StreamCreator stream = environment.streamCreator().stream(str);
        Optional<Duration> optional = this.streamMaxAge;
        Objects.requireNonNull(stream);
        optional.ifPresent(stream::maxAge);
        Optional<U> map = this.streamMaxLength.map(ByteCapacity::from);
        Objects.requireNonNull(stream);
        map.ifPresent(stream::maxLengthBytes);
        Optional<U> map2 = this.streamMaxSegmentSize.map(ByteCapacity::from);
        Objects.requireNonNull(stream);
        map2.ifPresent(stream::maxSegmentSizeBytes);
        stream.create();
    }

    @PostConstruct
    void connect() {
        Config config = ConfigProvider.getConfig();
        ConnectionFactory connectionFactory = new ConnectionFactory();
        ConnectionFactoryConfigurator.load(connectionFactory, (Map) getConfigSubset(config, PROP_CONNECTION_PREFIX).entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            if (entry.getValue() == null) {
                return null;
            }
            return entry.getValue().toString();
        })), "");
        LOGGER.info("Using connection to {}:{}", connectionFactory.getHost(), Integer.valueOf(connectionFactory.getPort()));
        try {
            Address address = new Address(connectionFactory.getHost(), connectionFactory.getPort());
            this.environment = Environment.builder().host(address.host()).port(address.port()).addressResolver(address2 -> {
                return address;
            }).username(connectionFactory.getUsername()).password(connectionFactory.getPassword()).virtualHost(connectionFactory.getVirtualHost()).build();
        } catch (StreamException | IllegalArgumentException e) {
            throw new DebeziumException(e);
        }
    }

    @PreDestroy
    void close() {
        try {
            if (this.environment != null) {
                this.environment.close();
            }
            if (this.streamProducers != null) {
                Iterator<Producer> it = this.streamProducers.values().iterator();
                while (it.hasNext()) {
                    it.next().close();
                }
            }
        } catch (Exception e) {
            throw new DebeziumException(e);
        }
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        for (ChangeEvent<Object, Object> changeEvent : list) {
            LOGGER.trace("Received event '{}'", changeEvent);
            try {
                String orElse = this.stream.orElse(this.streamNameMapper.map(changeEvent.destination()));
                Producer producer = this.streamProducers.get(orElse);
                if (producer == null) {
                    if (!this.environment.streamExists(orElse)) {
                        createStream(this.environment, orElse);
                    }
                    producer = this.environment.producerBuilder().confirmTimeout(Duration.ofSeconds(this.ackTimeout)).stream(orElse).build();
                    this.streamProducers.put(orElse, producer);
                }
                producer.send(producer.messageBuilder().addData(getBytes(changeEvent.value() != null ? changeEvent.value() : this.nullValue)).build(), confirmationStatus -> {
                });
                recordCommitter.markProcessed(changeEvent);
            } catch (StreamException e) {
                throw new DebeziumException(e);
            }
        }
        LOGGER.trace("Sent messages");
        recordCommitter.markBatchFinished();
    }

    private Map<String, Object> convertRabbitMqHeaders(ChangeEvent<Object, Object> changeEvent) {
        List<Header> headers = changeEvent.headers();
        HashMap hashMap = new HashMap();
        for (Header header : headers) {
            hashMap.put(header.getKey(), header.getValue());
        }
        return hashMap;
    }
}
