package org.joyqueue.client.internal.producer.feedback;

import java.util.Iterator;
import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.client.internal.cluster.ClusterManager;
import org.joyqueue.client.internal.metadata.domain.TopicMetadata;
import org.joyqueue.client.internal.producer.MessageSender;
import org.joyqueue.client.internal.producer.callback.TxFeedbackCallback;
import org.joyqueue.client.internal.producer.domain.FeedbackData;
import org.joyqueue.client.internal.producer.domain.FetchFeedbackData;
import org.joyqueue.client.internal.producer.domain.TransactionStatus;
import org.joyqueue.client.internal.producer.feedback.config.TxFeedbackConfig;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.network.command.TxStatus;
import org.joyqueue.network.domain.BrokerNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/client/internal/producer/feedback/TxFeedbackDispatcher.class */
public class TxFeedbackDispatcher {
    protected static final Logger logger = LoggerFactory.getLogger(TxFeedbackDispatcher.class);
    private TxFeedbackConfig config;
    private String topic;
    private TxFeedbackCallback txFeedbackCallback;
    private MessageSender messageSender;
    private ClusterManager clusterManager;

    public TxFeedbackDispatcher(TxFeedbackConfig txFeedbackConfig, String str, TxFeedbackCallback txFeedbackCallback, MessageSender messageSender, ClusterManager clusterManager) {
        this.config = txFeedbackConfig;
        this.topic = str;
        this.txFeedbackCallback = txFeedbackCallback;
        this.messageSender = messageSender;
        this.clusterManager = clusterManager;
    }

    public void dispatch() {
        TopicMetadata fetchTopicMetadata = this.clusterManager.fetchTopicMetadata(this.topic, this.config.getApp());
        if (fetchTopicMetadata == null) {
            logger.warn("topic {} not exist", this.topic);
            return;
        }
        Iterator<BrokerNode> it = fetchTopicMetadata.getBrokers().iterator();
        while (it.hasNext()) {
            doFeedback(fetchTopicMetadata, it.next());
        }
    }

    protected void doFeedback(TopicMetadata topicMetadata, BrokerNode brokerNode) {
        try {
            FetchFeedbackData fetchFeedback = this.messageSender.fetchFeedback(brokerNode, this.topic, this.config.getApp(), TxStatus.UNKNOWN, this.config.getFetchSize(), this.config.getLongPollTimeout(), this.config.getTimeout());
            if (!fetchFeedback.getCode().equals(JoyQueueCode.SUCCESS)) {
                logger.error("fetch feedback error, topic: {}, error: {}", this.topic, fetchFeedback.getCode().getMessage(new Object[0]));
            } else {
                if (CollectionUtils.isEmpty(fetchFeedback.getData())) {
                    logger.debug("fetch feedback is empty, topic: {}", this.topic);
                    return;
                }
                Iterator<FeedbackData> it = fetchFeedback.getData().iterator();
                while (it.hasNext()) {
                    doConfirm(brokerNode, topicMetadata, it.next());
                }
            }
        } catch (Exception e) {
            logger.error("fetch feedback exception, topic: {}", this.topic, e);
        }
    }

    protected void doConfirm(BrokerNode brokerNode, TopicMetadata topicMetadata, FeedbackData feedbackData) {
        try {
            TransactionStatus confirm = this.txFeedbackCallback.confirm(TopicName.parse(topicMetadata.getTopic()), feedbackData.getTxId(), feedbackData.getTransactionId());
            if (confirm == null) {
                logger.warn("confirm feedback error, status is null, topic: {}, transactionId: {}", this.topic, feedbackData.getTransactionId());
                return;
            }
            try {
                if (confirm.equals(TransactionStatus.PREPARE)) {
                    logger.debug("commit transaction, status: {}, txId: {}, transactionId: {}", new Object[]{confirm, feedbackData.getTxId(), feedbackData.getTransactionId()});
                    this.messageSender.commit(brokerNode, this.topic, this.config.getApp(), feedbackData.getTxId(), this.config.getTimeout());
                } else if (confirm.equals(TransactionStatus.COMMITTED)) {
                    logger.debug("rollback transaction, status: {}, txId: {}, transactionId: {}", new Object[]{confirm, feedbackData.getTxId(), feedbackData.getTransactionId()});
                    this.messageSender.rollback(brokerNode, this.topic, this.config.getApp(), feedbackData.getTxId(), this.config.getTimeout());
                } else if (confirm.equals(TransactionStatus.ROLLBACK)) {
                    logger.debug("rollback transaction, status: {}, txId: {}, transactionId: {}", new Object[]{confirm, feedbackData.getTxId(), feedbackData.getTransactionId()});
                    this.messageSender.rollback(brokerNode, this.topic, this.config.getApp(), feedbackData.getTxId(), this.config.getTimeout());
                }
            } catch (Exception e) {
                logger.error("commit feedback exception, topic: {}, transactionId: {}", new Object[]{this.topic, feedbackData.getTransactionId(), e});
            }
        } catch (Exception e2) {
            logger.error("confirm feedback exception, topic: {}, transactionId: {}", new Object[]{this.topic, feedbackData.getTransactionId(), e2});
        }
    }
}
