package io.strimzi.kafka.bridge.amqp;

import io.strimzi.kafka.bridge.EmbeddedFormat;
import io.strimzi.kafka.bridge.Endpoint;
import io.strimzi.kafka.bridge.QoSEndpoint;
import io.strimzi.kafka.bridge.SinkBridgeEndpoint;
import io.strimzi.kafka.bridge.SinkTopicSubscription;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.converter.MessageConverter;
import io.strimzi.kafka.bridge.tracker.SimpleOffsetTracker;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.common.PartitionInfo;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonSender;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.message.Message;

/* loaded from: input_file:io/strimzi/kafka/bridge/amqp/AmqpSinkBridgeEndpoint.class */
public class AmqpSinkBridgeEndpoint<K, V> extends SinkBridgeEndpoint<K, V> {
    private static final String GROUP_ID_MATCH = "/group.id/";
    private MessageConverter<K, V, Message, Collection<Message>> converter;
    private ProtonSender sender;

    public AmqpSinkBridgeEndpoint(Vertx vertx, BridgeConfig bridgeConfig, EmbeddedFormat embeddedFormat, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        super(vertx, bridgeConfig, embeddedFormat, deserializer, deserializer2);
        this.converter = null;
    }

    @Override // io.strimzi.kafka.bridge.BridgeEndpoint
    public void open() {
    }

    @Override // io.strimzi.kafka.bridge.SinkBridgeEndpoint, io.strimzi.kafka.bridge.BridgeEndpoint
    public void close() {
        if (this.offsetTracker != null) {
            this.offsetTracker.clear();
        }
        if (this.sender != null && this.sender.isOpen()) {
            this.sender.close();
        }
        super.close();
    }

    @Override // io.strimzi.kafka.bridge.BridgeEndpoint
    public void handle(Endpoint<?> endpoint) {
        ProtonSender protonSender = (ProtonLink) endpoint.get();
        AmqpConfig amqpConfig = this.bridgeConfig.getAmqpConfig();
        if (!(protonSender instanceof ProtonSender)) {
            throw new IllegalArgumentException("This Proton link must be a sender");
        }
        try {
            if (this.converter == null) {
                this.converter = (MessageConverter<K, V, Message, Collection<Message>>) AmqpBridge.instantiateConverter(amqpConfig.getMessageConverter());
            }
            this.sender = protonSender;
            this.name = this.sender.getName();
            String address = this.sender.getRemoteSource().getAddress();
            int indexOf = address.indexOf(GROUP_ID_MATCH);
            if (indexOf == -1 || indexOf == 0 || indexOf == address.length() - GROUP_ID_MATCH.length()) {
                this.log.warn("Local detached");
                throw new AmqpErrorConditionException(AmqpBridge.AMQP_ERROR_NO_GROUPID, indexOf == -1 ? "Mandatory group.id not specified in the address" : indexOf == 0 ? "Empty topic in specified address" : "Empty consumer group in specified address");
            }
            this.sender.setSource(this.sender.getRemoteSource()).closeHandler(asyncResult -> {
                if (asyncResult.succeeded()) {
                    processCloseSender((ProtonSender) asyncResult.result());
                }
            }).detachHandler(asyncResult2 -> {
                processCloseSender(this.sender);
            });
            this.sender.open();
            this.groupId = address.substring(indexOf + GROUP_ID_MATCH.length());
            SinkTopicSubscription sinkTopicSubscription = new SinkTopicSubscription(address.substring(0, indexOf).replace('/', '.'));
            this.log.debug("topic {} group.id {}", sinkTopicSubscription.getTopic(), this.groupId);
            Map filter = this.sender.getRemoteSource().getFilter();
            if (filter != null) {
                Object obj = filter.get(Symbol.getSymbol(AmqpBridge.AMQP_PARTITION_FILTER));
                Object obj2 = filter.get(Symbol.getSymbol(AmqpBridge.AMQP_OFFSET_FILTER));
                checkFilters(obj, obj2);
                this.log.debug("partition {} offset {}", obj, obj2);
                sinkTopicSubscription.setPartition((Integer) obj);
                sinkTopicSubscription.setOffset((Long) obj2);
            }
            this.offsetTracker = new SimpleOffsetTracker(sinkTopicSubscription.getTopic());
            this.qos = mapQoS(this.sender.getQoS());
            initConsumer(true, null);
            setPartitionsRevokedHandler(this::partitionsRevokedHandler);
            setPartitionsAssignedHandler(this::partitionsAssignedHandler);
            setSubscribeHandler(this::subscribeHandler);
            setPartitionHandler(this::partitionHandler);
            setAssignHandler(this::assignHandler);
            setSeekHandler(this::seekHandler);
            setReceivedHandler(this::sendAmqpMessage);
            setCommitHandler(this::commitHandler);
            flowCheck();
            this.topicSubscriptions.add(sinkTopicSubscription);
            if (sinkTopicSubscription.getPartition() == null) {
                subscribe(true);
            } else {
                assign(true);
            }
        } catch (AmqpErrorConditionException e) {
            AmqpBridge.detachWithError(protonSender, e.toCondition());
            close();
        }
    }

    @Override // io.strimzi.kafka.bridge.BridgeEndpoint
    public void handle(Endpoint<?> endpoint, Handler<?> handler) {
    }

    private void sendAmqpError(String str, String str2, AsyncResult<?> asyncResult) {
        sendAmqpError(AmqpBridge.newError(str, str2 + (asyncResult.cause().getMessage() != null ? ": " + asyncResult.cause().getMessage() : "")));
    }

    private void sendAmqpError(ErrorCondition errorCondition) {
        AmqpBridge.detachWithError(this.sender, errorCondition);
        close();
    }

    private void sendAmqpMessage(KafkaConsumerRecord<K, V> kafkaConsumerRecord) {
        int partition = kafkaConsumerRecord.partition();
        long offset = kafkaConsumerRecord.offset();
        String str = partition + "_" + offset;
        Message message = this.converter.toMessage(this.sender.getSource().getAddress(), kafkaConsumerRecord);
        if (this.sender.getQoS() == ProtonQoS.AT_MOST_ONCE) {
            this.sender.send(ProtonHelper.tag(str), message);
        } else {
            this.offsetTracker.track(partition, offset, kafkaConsumerRecord.record());
            this.log.debug("Tracked {} - {} [{}]", new Object[]{kafkaConsumerRecord.topic(), Integer.valueOf(kafkaConsumerRecord.partition()), Long.valueOf(kafkaConsumerRecord.offset())});
            this.sender.send(ProtonHelper.tag(str), message, protonDelivery -> {
                String str2 = new String(protonDelivery.getTag(), StandardCharsets.UTF_8);
                this.offsetTracker.delivered(partition, offset);
                this.log.debug("Message tag {} delivered {} to {}", new Object[]{str2, protonDelivery.getRemoteState(), this.sender.getSource().getAddress()});
            });
        }
        flowCheck();
    }

    private void flowCheck() {
        if (this.sender.sendQueueFull()) {
            pause();
            this.sender.sendQueueDrainHandler(protonSender -> {
                resume();
            });
        }
    }

    private void processCloseSender(ProtonSender protonSender) {
        this.log.info("Remote AMQP receiver detached");
        close();
    }

    private void checkFilters(Object obj, Object obj2) throws AmqpErrorConditionException {
        if (obj != null && !(obj instanceof Integer)) {
            throw new AmqpErrorConditionException(AmqpBridge.AMQP_ERROR_WRONG_PARTITION_FILTER, "Wrong partition filter");
        }
        if (obj2 != null && !(obj2 instanceof Long)) {
            throw new AmqpErrorConditionException(AmqpBridge.AMQP_ERROR_WRONG_OFFSET_FILTER, "Wrong offset filter");
        }
        if (obj == null && obj2 != null) {
            throw new AmqpErrorConditionException(AmqpBridge.AMQP_ERROR_NO_PARTITION_FILTER, "No partition filter specified");
        }
        if (obj != null && ((Integer) obj).intValue() < 0) {
            throw new AmqpErrorConditionException(AmqpBridge.AMQP_ERROR_WRONG_FILTER, "Wrong filter");
        }
        if (obj2 != null && ((Long) obj2).longValue() < 0) {
            throw new AmqpErrorConditionException(AmqpBridge.AMQP_ERROR_WRONG_FILTER, "Wrong filter");
        }
    }

    private void partitionsRevokedHandler(Set<TopicPartition> set) {
    }

    private void partitionsAssignedHandler(Set<TopicPartition> set) {
        if (set.isEmpty()) {
            sendAmqpError(AmqpBridge.newError(AmqpBridge.AMQP_ERROR_NO_PARTITIONS, "All partitions already have a receiver"));
        } else {
            if (this.sender.isOpen()) {
                return;
            }
            this.sender.setSource(this.sender.getRemoteSource()).open();
        }
    }

    private void subscribeHandler(AsyncResult<Void> asyncResult) {
        if (asyncResult.failed()) {
            sendAmqpError(AmqpBridge.AMQP_ERROR_KAFKA_SUBSCRIBE, "Error subscribing to topic " + this.topicSubscriptions, asyncResult);
        }
    }

    private void partitionHandler(AsyncResult<Optional<PartitionInfo>> asyncResult) {
        if (asyncResult.failed()) {
            sendAmqpError(AmqpBridge.AMQP_ERROR_KAFKA_SUBSCRIBE, "Error getting partition info for topic " + this.topicSubscriptions, asyncResult);
        } else {
            if (((Optional) asyncResult.result()).isPresent()) {
                return;
            }
            sendAmqpError(AmqpBridge.newError(AmqpBridge.AMQP_ERROR_PARTITION_NOT_EXISTS, "Specified partition doesn't exist"));
        }
    }

    private void assignHandler(AsyncResult<Void> asyncResult) {
        if (asyncResult.failed()) {
            sendAmqpError(AmqpBridge.AMQP_ERROR_KAFKA_SUBSCRIBE, "Error assigning to topic %s" + this.topicSubscriptions, asyncResult);
        }
    }

    private void seekHandler(AsyncResult<Void> asyncResult) {
        if (asyncResult.failed()) {
            sendAmqpError(AmqpBridge.AMQP_ERROR_KAFKA_SUBSCRIBE, String.format("Error seeking for topic %s", this.topicSubscriptions), asyncResult);
        }
    }

    private void commitHandler(AsyncResult<Void> asyncResult) {
        if (asyncResult.failed()) {
            sendAmqpError(new ErrorCondition(Symbol.getSymbol(AmqpBridge.AMQP_ERROR_KAFKA_COMMIT), "Error in commit"));
        }
    }

    private QoSEndpoint mapQoS(ProtonQoS protonQoS) {
        if (protonQoS == ProtonQoS.AT_MOST_ONCE) {
            return QoSEndpoint.AT_MOST_ONCE;
        }
        if (protonQoS == ProtonQoS.AT_LEAST_ONCE) {
            return QoSEndpoint.AT_LEAST_ONCE;
        }
        throw new IllegalArgumentException("Proton QoS not supported !");
    }
}
