package io.strimzi.kafka.bridge.amqp;

import io.strimzi.kafka.bridge.EmbeddedFormat;
import io.strimzi.kafka.bridge.Endpoint;
import io.strimzi.kafka.bridge.SourceBridgeEndpoint;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.converter.MessageConverter;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.RecordMetadata;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.message.Message;

/* loaded from: input_file:io/strimzi/kafka/bridge/amqp/AmqpSourceBridgeEndpoint.class */
public class AmqpSourceBridgeEndpoint<K, V> extends SourceBridgeEndpoint<K, V> {
    private MessageConverter<K, V, Message, Collection<Message>> converter;
    private Map<String, ProtonReceiver> receivers;

    public AmqpSourceBridgeEndpoint(Vertx vertx, BridgeConfig bridgeConfig, EmbeddedFormat embeddedFormat, Serializer<K> serializer, Serializer<V> serializer2) {
        super(vertx, bridgeConfig, embeddedFormat, serializer, serializer2);
        this.converter = null;
        this.receivers = new HashMap();
    }

    @Override // io.strimzi.kafka.bridge.SourceBridgeEndpoint, io.strimzi.kafka.bridge.BridgeEndpoint
    public void close() {
        if (this.receivers != null) {
            this.receivers.forEach((str, protonReceiver) -> {
                protonReceiver.close();
            });
            this.receivers.clear();
        }
        super.close();
    }

    @Override // io.strimzi.kafka.bridge.BridgeEndpoint
    public void handle(Endpoint<?> endpoint) {
        ProtonReceiver protonReceiver = (ProtonLink) endpoint.get();
        AmqpConfig amqpConfig = this.bridgeConfig.getAmqpConfig();
        if (!(protonReceiver instanceof ProtonReceiver)) {
            throw new IllegalArgumentException("This Proton link must be a receiver");
        }
        if (this.converter == null) {
            try {
                this.converter = (MessageConverter<K, V, Message, Collection<Message>>) AmqpBridge.instantiateConverter(amqpConfig.getMessageConverter());
            } catch (AmqpErrorConditionException e) {
                AmqpBridge.detachWithError(protonReceiver, e.toCondition());
                return;
            }
        }
        ProtonReceiver protonReceiver2 = protonReceiver;
        this.name = protonReceiver2.getName();
        protonReceiver2.setTarget(protonReceiver2.getRemoteTarget()).setAutoAccept(false).closeHandler(asyncResult -> {
            if (asyncResult.succeeded()) {
                processCloseReceiver((ProtonReceiver) asyncResult.result());
            }
        }).detachHandler(asyncResult2 -> {
            processCloseReceiver(protonReceiver2);
        }).handler((protonDelivery, message) -> {
            processMessage(protonReceiver2, protonDelivery, message);
        });
        if (protonReceiver2.getRemoteQoS() == ProtonQoS.AT_MOST_ONCE) {
            protonReceiver2.setPrefetch(amqpConfig.getFlowCredit());
        } else {
            protonReceiver2.setPrefetch(0).flow(amqpConfig.getFlowCredit());
        }
        protonReceiver2.open();
        this.receivers.put(protonReceiver2.getName(), protonReceiver2);
    }

    @Override // io.strimzi.kafka.bridge.BridgeEndpoint
    public void handle(Endpoint<?> endpoint, Handler<?> handler) {
    }

    private void acceptedDelivery(String str, ProtonDelivery protonDelivery) {
        protonDelivery.disposition(Accepted.getInstance(), true);
        this.log.debug("Delivery sent [accepted] on link {}", str);
    }

    private void rejectedDelivery(String str, ProtonDelivery protonDelivery, Throwable th) {
        Rejected rejected = new Rejected();
        rejected.setError(new ErrorCondition(Symbol.valueOf(AmqpBridge.AMQP_ERROR_SEND_TO_KAFKA), th.getMessage()));
        protonDelivery.disposition(rejected, true);
        this.log.debug("Delivery sent [rejected] on link {}", str);
    }

    private void processMessage(ProtonReceiver protonReceiver, ProtonDelivery protonDelivery, Message message) {
        KafkaProducerRecord<K, V> kafkaRecord = this.converter.toKafkaRecord(protonReceiver.getTarget().getAddress() != null ? protonReceiver.getTarget().getAddress().replace('/', '.') : null, null, message);
        if (protonDelivery.remotelySettled()) {
            send(kafkaRecord, null);
        } else {
            send(kafkaRecord, asyncResult -> {
                if (asyncResult.failed()) {
                    Throwable cause = asyncResult.cause();
                    this.log.error("Error on delivery to Kafka {}", cause.getMessage());
                    rejectedDelivery(protonReceiver.getName(), protonDelivery, cause);
                } else {
                    RecordMetadata recordMetadata = (RecordMetadata) asyncResult.result();
                    this.log.debug("Delivered to Kafka on topic {} at partition {} [{}]", new Object[]{recordMetadata.getTopic(), Integer.valueOf(recordMetadata.getPartition()), Long.valueOf(recordMetadata.getOffset())});
                    acceptedDelivery(protonReceiver.getName(), protonDelivery);
                }
            });
        }
    }

    private void processCloseReceiver(ProtonReceiver protonReceiver) {
        this.log.info("Remote AMQP sender detached");
        protonReceiver.close();
        this.receivers.remove(protonReceiver.getName());
        if (this.receivers.isEmpty()) {
            close();
        }
    }
}
