package io.strimzi.kafka.bridge.amqp;

import io.strimzi.kafka.bridge.ConnectionEndpoint;
import io.strimzi.kafka.bridge.EmbeddedFormat;
import io.strimzi.kafka.bridge.HealthCheckable;
import io.strimzi.kafka.bridge.MetricsReporter;
import io.strimzi.kafka.bridge.SourceBridgeEndpoint;
import io.strimzi.kafka.bridge.amqp.converter.AmqpDefaultMessageConverter;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.converter.MessageConverter;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Promise;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.core.net.PemTrustOptions;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonClientOptions;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.ProtonServer;
import io.vertx.proton.ProtonServerOptions;
import io.vertx.proton.ProtonSession;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/strimzi/kafka/bridge/amqp/AmqpBridge.class */
public class AmqpBridge extends AbstractVerticle implements HealthCheckable {
    private static final Logger log = LoggerFactory.getLogger(AmqpBridge.class);
    public static final String AMQP_PARTITION_ANNOTATION = "x-opt-bridge.partition";
    public static final String AMQP_KEY_ANNOTATION = "x-opt-bridge.key";
    public static final String AMQP_OFFSET_ANNOTATION = "x-opt-bridge.offset";
    public static final String AMQP_TOPIC_ANNOTATION = "x-opt-bridge.topic";
    public static final String AMQP_ERROR_NO_PARTITIONS = "io.strimzi:no-free-partitions";
    public static final String AMQP_ERROR_NO_GROUPID = "io.strimzi:no-group-id";
    public static final String AMQP_ERROR_PARTITION_NOT_EXISTS = "io.strimzi:partition-not-exists";
    public static final String AMQP_ERROR_SEND_TO_KAFKA = "io.strimzi:error-to-kafka";
    public static final String AMQP_ERROR_WRONG_PARTITION_FILTER = "io.strimzi:wrong-partition-filter";
    public static final String AMQP_ERROR_WRONG_OFFSET_FILTER = "io.strimzi:wrong-offset-filter";
    public static final String AMQP_ERROR_NO_PARTITION_FILTER = "io.strimzi:no-partition-filter";
    public static final String AMQP_ERROR_WRONG_FILTER = "io.strimzi:wrong-filter";
    public static final String AMQP_ERROR_KAFKA_SUBSCRIBE = "io.strimzi:kafka-subscribe";
    public static final String AMQP_ERROR_KAFKA_COMMIT = "io.strimzi:kafka-commit";
    public static final String AMQP_ERROR_CONFIGURATION = "io.strimzi:configuration";
    public static final String AMQP_PARTITION_FILTER = "io.strimzi:partition-filter:int";
    public static final String AMQP_OFFSET_FILTER = "io.strimzi:offset-filter:long";
    private final BridgeConfig bridgeConfig;
    private static final String CONTAINER_ID = "kafka-bridge-service";
    private static final int HEALTH_SERVER_PORT = 8080;
    private ProtonServer server;
    private ProtonClient client;
    private Map<ProtonConnection, ConnectionEndpoint> endpoints;
    private boolean isReady = false;
    private MetricsReporter metricsReporter;

    public AmqpBridge(BridgeConfig bridgeConfig, MetricsReporter metricsReporter) {
        this.bridgeConfig = bridgeConfig;
        this.metricsReporter = metricsReporter;
    }

    private void bindAmqpServer(Promise<Void> promise) {
        this.server = ProtonServer.create(this.vertx, createServerOptions()).connectHandler(this::processConnection).listen(asyncResult -> {
            if (!asyncResult.succeeded()) {
                log.error("Error starting AMQP-Kafka Bridge", asyncResult.cause());
                promise.fail(asyncResult.cause());
            } else {
                log.info("AMQP-Kafka Bridge started and listening on port {}", Integer.valueOf(((ProtonServer) asyncResult.result()).actualPort()));
                log.info("AMQP-Kafka Bridge bootstrap servers {}", this.bridgeConfig.getKafkaConfig().getConfig().get("bootstrap.servers"));
                this.isReady = true;
                promise.complete();
            }
        });
    }

    private void connectAmqpClient(Promise<Void> promise) {
        this.client = ProtonClient.create(this.vertx);
        String host = this.bridgeConfig.getAmqpConfig().getHost();
        int port = this.bridgeConfig.getAmqpConfig().getPort();
        this.client.connect(createClientOptions(), host, port, asyncResult -> {
            if (!asyncResult.succeeded()) {
                log.error("Error connecting AMQP-Kafka Bridge as client", asyncResult.cause());
                promise.fail(asyncResult.cause());
                return;
            }
            ProtonConnection protonConnection = (ProtonConnection) asyncResult.result();
            protonConnection.setContainer(CONTAINER_ID);
            processConnection(protonConnection);
            log.info("AMQP-Kafka Bridge started and connected in client mode to {}:{}", host, Integer.valueOf(port));
            log.info("AMQP-Kafka Bridge bootstrap servers {}", this.bridgeConfig.getKafkaConfig().getConfig().get("bootstrap.servers"));
            this.isReady = true;
            promise.complete();
        });
    }

    public void start(Promise<Void> promise) throws Exception {
        log.info("Starting AMQP-Kafka bridge verticle...");
        this.endpoints = new HashMap();
        AmqpMode mode = this.bridgeConfig.getAmqpConfig().getMode();
        log.info("AMQP-Kafka Bridge configured in {} mode", mode);
        if (mode == AmqpMode.SERVER) {
            bindAmqpServer(promise);
        } else {
            connectAmqpClient(promise);
        }
    }

    public void stop(Promise<Void> promise) throws Exception {
        log.info("Stopping AMQP-Kafka bridge verticle ...");
        this.isReady = false;
        this.endpoints.forEach((protonConnection, connectionEndpoint) -> {
            if (connectionEndpoint.getSource() != null) {
                connectionEndpoint.getSource().close();
            }
            if (!connectionEndpoint.getSinks().isEmpty()) {
                connectionEndpoint.getSinks().stream().forEach(sinkBridgeEndpoint -> {
                    sinkBridgeEndpoint.close();
                });
            }
            protonConnection.close();
        });
        this.endpoints.clear();
        if (this.server != null) {
            this.server.close(asyncResult -> {
                if (asyncResult.succeeded()) {
                    log.info("AMQP-Kafka bridge has been shut down successfully");
                    promise.complete();
                } else {
                    log.info("Error while shutting down AMQP-Kafka bridge", asyncResult.cause());
                    promise.fail(asyncResult.cause());
                }
            });
        }
    }

    private ProtonServerOptions createServerOptions() {
        ProtonServerOptions protonServerOptions = new ProtonServerOptions();
        protonServerOptions.setHost(this.bridgeConfig.getAmqpConfig().getHost());
        protonServerOptions.setPort(this.bridgeConfig.getAmqpConfig().getPort());
        if (this.bridgeConfig.getAmqpConfig().getCertDir() != null && this.bridgeConfig.getAmqpConfig().getCertDir().length() > 0) {
            String certDir = this.bridgeConfig.getAmqpConfig().getCertDir();
            log.info("Enabling SSL configuration for AMQP with TLS certificates from {}", certDir);
            protonServerOptions.setSsl(true).setPemTrustOptions(new PemTrustOptions().addCertPath(new File(certDir, "ca.crt").getAbsolutePath())).setPemKeyCertOptions(new PemKeyCertOptions().addCertPath(new File(certDir, "tls.crt").getAbsolutePath()).addKeyPath(new File(certDir, "tls.key").getAbsolutePath()));
        }
        return protonServerOptions;
    }

    private ProtonClientOptions createClientOptions() {
        ProtonClientOptions protonClientOptions = new ProtonClientOptions();
        protonClientOptions.setConnectTimeout(1000);
        protonClientOptions.setReconnectAttempts(-1).setReconnectInterval(1000L);
        if (this.bridgeConfig.getAmqpConfig().getCertDir() != null && this.bridgeConfig.getAmqpConfig().getCertDir().length() > 0) {
            String certDir = this.bridgeConfig.getAmqpConfig().getCertDir();
            log.info("Enabling SSL configuration for AMQP with TLS certificates from {}", certDir);
            protonClientOptions.setSsl(true).addEnabledSaslMechanism("EXTERNAL").setHostnameVerificationAlgorithm("").setPemTrustOptions(new PemTrustOptions().addCertPath(new File(certDir, "ca.crt").getAbsolutePath())).setPemKeyCertOptions(new PemKeyCertOptions().addCertPath(new File(certDir, "tls.crt").getAbsolutePath()).addKeyPath(new File(certDir, "tls.key").getAbsolutePath()));
        }
        return protonClientOptions;
    }

    private void processConnection(ProtonConnection protonConnection) {
        protonConnection.openHandler(this::processOpenConnection).closeHandler(this::processCloseConnection).disconnectHandler(this::processDisconnection).sessionOpenHandler(this::processOpenSession).receiverOpenHandler(protonReceiver -> {
            processOpenReceiver(protonConnection, protonReceiver);
        }).senderOpenHandler(protonSender -> {
            processOpenSender(protonConnection, protonSender);
        });
        if (this.bridgeConfig.getAmqpConfig().getMode() == AmqpMode.CLIENT) {
            protonConnection.open();
        }
    }

    private void processOpenConnection(AsyncResult<ProtonConnection> asyncResult) {
        if (asyncResult.succeeded()) {
            log.info("Connection opened by {} {}", ((ProtonConnection) asyncResult.result()).getRemoteHostname(), ((ProtonConnection) asyncResult.result()).getRemoteContainer());
            ProtonConnection protonConnection = (ProtonConnection) asyncResult.result();
            protonConnection.open();
            if (this.endpoints.containsKey(protonConnection)) {
                return;
            }
            this.endpoints.put(protonConnection, new ConnectionEndpoint());
        }
    }

    private void processCloseConnection(AsyncResult<ProtonConnection> asyncResult) {
        if (asyncResult.succeeded()) {
            log.info("Connection closed by {} {}", ((ProtonConnection) asyncResult.result()).getRemoteHostname(), ((ProtonConnection) asyncResult.result()).getRemoteContainer());
            closeConnectionEndpoint((ProtonConnection) asyncResult.result());
        }
    }

    private void processDisconnection(ProtonConnection protonConnection) {
        log.info("Disconnection from {} {}", protonConnection.getRemoteHostname(), protonConnection.getRemoteContainer());
        closeConnectionEndpoint(protonConnection);
    }

    private void closeConnectionEndpoint(ProtonConnection protonConnection) {
        if (this.endpoints.containsKey(protonConnection)) {
            ConnectionEndpoint connectionEndpoint = this.endpoints.get(protonConnection);
            if (connectionEndpoint.getSource() != null) {
                connectionEndpoint.getSource().close();
            }
            if (!connectionEndpoint.getSinks().isEmpty()) {
                connectionEndpoint.getSinks().stream().forEach(sinkBridgeEndpoint -> {
                    sinkBridgeEndpoint.close();
                });
            }
            protonConnection.close();
            this.endpoints.remove(protonConnection);
        }
    }

    private void processOpenSession(ProtonSession protonSession) {
        protonSession.closeHandler(asyncResult -> {
            if (asyncResult.succeeded()) {
                ((ProtonSession) asyncResult.result()).close();
            }
        }).open();
    }

    private void processOpenReceiver(ProtonConnection protonConnection, ProtonReceiver protonReceiver) {
        log.info("Remote sender attached {}", protonReceiver.getName());
        ConnectionEndpoint connectionEndpoint = this.endpoints.get(protonConnection);
        SourceBridgeEndpoint source = connectionEndpoint.getSource();
        if (source == null) {
            source = new AmqpSourceBridgeEndpoint(this.vertx, this.bridgeConfig, EmbeddedFormat.JSON, new StringSerializer(), new ByteArraySerializer());
            source.closeHandler(obj -> {
                connectionEndpoint.setSource(null);
            });
            source.open();
            connectionEndpoint.setSource(source);
        }
        source.handle(new AmqpEndpoint(protonReceiver));
    }

    private void processOpenSender(ProtonConnection protonConnection, ProtonSender protonSender) {
        log.info("Remote receiver attached {}", protonSender.getName());
        AmqpSinkBridgeEndpoint amqpSinkBridgeEndpoint = new AmqpSinkBridgeEndpoint(this.vertx, this.bridgeConfig, EmbeddedFormat.JSON, new StringDeserializer(), new ByteArrayDeserializer());
        amqpSinkBridgeEndpoint.closeHandler(bridgeEndpoint -> {
            this.endpoints.get(protonConnection).getSinks().remove(bridgeEndpoint);
        });
        amqpSinkBridgeEndpoint.open();
        this.endpoints.get(protonConnection).getSinks().add(amqpSinkBridgeEndpoint);
        amqpSinkBridgeEndpoint.handle(new AmqpEndpoint(protonSender));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ErrorCondition newError(String str, String str2) {
        return new ErrorCondition(Symbol.getSymbol(str), str2);
    }

    static void detachWithError(ProtonLink<?> protonLink, String str, String str2) {
        detachWithError(protonLink, newError(str, str2));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void detachWithError(ProtonLink<?> protonLink, ErrorCondition errorCondition) {
        log.error("Detaching link {} due to error {}, description: {}", new Object[]{protonLink, errorCondition.getCondition(), errorCondition.getDescription()});
        protonLink.setSource((Source) null).open().setCondition(errorCondition).close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static MessageConverter<?, ?, ?, ?> instantiateConverter(String str) throws AmqpErrorConditionException {
        if (str == null || str.isEmpty()) {
            return new AmqpDefaultMessageConverter();
        }
        try {
            Object newInstance = Class.forName(str).newInstance();
            if (newInstance instanceof MessageConverter) {
                return (MessageConverter) newInstance;
            }
            throw new AmqpErrorConditionException(AMQP_ERROR_CONFIGURATION, "configured message converter class is not an instanceof " + MessageConverter.class.getName() + ": " + str);
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | RuntimeException e) {
            log.debug("Could not instantiate message converter {}", str, e);
            throw new AmqpErrorConditionException(AMQP_ERROR_CONFIGURATION, "configured message converter class could not be instantiated: " + str);
        }
    }

    @Override // io.strimzi.kafka.bridge.HealthCheckable
    public boolean isAlive() {
        return this.isReady;
    }

    @Override // io.strimzi.kafka.bridge.HealthCheckable
    public boolean isReady() {
        return this.isReady;
    }
}
