package org.joyqueue.client.internal.producer.support;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.joyqueue.client.internal.cluster.ClusterManager;
import org.joyqueue.client.internal.exception.ClientException;
import org.joyqueue.client.internal.metadata.domain.PartitionMetadata;
import org.joyqueue.client.internal.metadata.domain.PartitionNode;
import org.joyqueue.client.internal.metadata.domain.TopicMetadata;
import org.joyqueue.client.internal.metadata.exception.MetadataException;
import org.joyqueue.client.internal.nameserver.NameServerConfig;
import org.joyqueue.client.internal.nameserver.helper.NameServerHelper;
import org.joyqueue.client.internal.producer.MessageSender;
import org.joyqueue.client.internal.producer.callback.AsyncBatchProduceCallback;
import org.joyqueue.client.internal.producer.callback.AsyncBatchProduceCallbackAdapter;
import org.joyqueue.client.internal.producer.callback.AsyncBatchSendCallback;
import org.joyqueue.client.internal.producer.callback.AsyncProduceCallback;
import org.joyqueue.client.internal.producer.checker.ProduceMessageChecker;
import org.joyqueue.client.internal.producer.config.ProducerConfig;
import org.joyqueue.client.internal.producer.domain.ProduceMessage;
import org.joyqueue.client.internal.producer.domain.SendBatchResultData;
import org.joyqueue.client.internal.producer.domain.SendResult;
import org.joyqueue.client.internal.producer.exception.NeedRetryException;
import org.joyqueue.client.internal.producer.exception.ProducerException;
import org.joyqueue.client.internal.producer.helper.ProducerHelper;
import org.joyqueue.client.internal.producer.interceptor.ProduceContext;
import org.joyqueue.client.internal.producer.interceptor.ProducerInterceptor;
import org.joyqueue.client.internal.producer.interceptor.ProducerInterceptorManager;
import org.joyqueue.client.internal.producer.interceptor.ProducerInvocation;
import org.joyqueue.client.internal.producer.interceptor.ProducerInvoker;
import org.joyqueue.client.internal.producer.transport.ProducerClient;
import org.joyqueue.client.internal.producer.transport.ProducerClientManager;
import org.joyqueue.client.internal.transport.ClientState;
import org.joyqueue.domain.ProducerPolicy;
import org.joyqueue.domain.QosLevel;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.network.domain.BrokerNode;
import org.joyqueue.shaded.com.google.common.base.Preconditions;
import org.joyqueue.shaded.com.google.common.collect.Lists;
import org.joyqueue.shaded.org.apache.commons.collections.CollectionUtils;
import org.joyqueue.toolkit.retry.RetryPolicy;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/client/internal/producer/support/MessageProducerInner.class */
public class MessageProducerInner extends Service {
    protected static final Logger logger = LoggerFactory.getLogger(MessageProducerInner.class);
    private ProducerConfig config;
    private NameServerConfig nameServerConfig;
    private MessageSender messageSender;
    private ClusterManager clusterManager;
    private ProducerClientManager producerClientManager;
    private PartitionSelectorManager partitionSelectorManager;
    private ProducerInterceptorManager producerInterceptorManager;

    public MessageProducerInner(ProducerConfig producerConfig, NameServerConfig nameServerConfig, MessageSender messageSender, ClusterManager clusterManager, ProducerClientManager producerClientManager) {
        this(producerConfig, nameServerConfig, messageSender, clusterManager, producerClientManager, new ProducerInterceptorManager());
    }

    public MessageProducerInner(ProducerConfig producerConfig, NameServerConfig nameServerConfig, MessageSender messageSender, ClusterManager clusterManager, ProducerClientManager producerClientManager, ProducerInterceptorManager producerInterceptorManager) {
        this.config = producerConfig;
        this.nameServerConfig = nameServerConfig;
        this.messageSender = messageSender;
        this.clusterManager = clusterManager;
        this.producerClientManager = producerClientManager;
        this.producerInterceptorManager = producerInterceptorManager;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.joyqueue.toolkit.service.Service, org.joyqueue.toolkit.service.Activity
    public void validate() throws Exception {
        this.partitionSelectorManager = new PartitionSelectorManager();
    }

    public synchronized void addInterceptor(ProducerInterceptor producerInterceptor) {
        this.producerInterceptorManager.addInterceptor(producerInterceptor);
    }

    public synchronized void removeInterceptor(ProducerInterceptor producerInterceptor) {
        this.producerInterceptorManager.removeInterceptor(producerInterceptor);
    }

    public SendResult send(ProduceMessage produceMessage, String str, long j, TimeUnit timeUnit, boolean z, boolean z2, AsyncProduceCallback asyncProduceCallback) {
        List<SendResult> batchSend = batchSend(Lists.newArrayList(produceMessage), str, j, timeUnit, z, z2, asyncProduceCallback == null ? null : new AsyncBatchProduceCallbackAdapter(asyncProduceCallback));
        if (CollectionUtils.isEmpty(batchSend)) {
            return null;
        }
        return batchSend.get(0);
    }

    public List<SendResult> batchSend(List<ProduceMessage> list, String str, long j, TimeUnit timeUnit, boolean z, boolean z2, AsyncBatchProduceCallback asyncBatchProduceCallback) {
        Preconditions.checkArgument(timeUnit != null, "timeoutUnit not null");
        ProduceMessageChecker.checkMessages(list, this.config);
        return doBatchSend(list, str, j, timeUnit, z, z2, asyncBatchProduceCallback);
    }

    public List<SendResult> doBatchSend(List<ProduceMessage> list, String str, long j, TimeUnit timeUnit, boolean z, boolean z2, AsyncBatchProduceCallback asyncBatchProduceCallback) {
        TopicMetadata andCheckTopicMetadata = getAndCheckTopicMetadata(list.get(0).getTopic());
        return doBatchSend(list, andCheckTopicMetadata, getAvailableBrokers(andCheckTopicMetadata), str, j, timeUnit, z, z2, asyncBatchProduceCallback);
    }

    public List<SendResult> doBatchSend(final List<ProduceMessage> list, final TopicMetadata topicMetadata, final List<BrokerNode> list2, final String str, final long j, final TimeUnit timeUnit, final boolean z, final boolean z2, final AsyncBatchProduceCallback asyncBatchProduceCallback) {
        try {
            return new ProducerInvocation(this.config, this.nameServerConfig, topicMetadata, list, this.producerInterceptorManager, new ProducerInvoker() { // from class: org.joyqueue.client.internal.producer.support.MessageProducerInner.1
                @Override // org.joyqueue.client.internal.producer.interceptor.ProducerInvoker
                public List<SendResult> invoke(ProduceContext produceContext) {
                    return MessageProducerInner.this.doBatchSendInternal(list, topicMetadata, list2, str, j, timeUnit, z, z2, asyncBatchProduceCallback);
                }

                @Override // org.joyqueue.client.internal.producer.interceptor.ProducerInvoker
                public List<SendResult> reject(ProduceContext produceContext) {
                    throw new ProducerException("reject send", JoyQueueCode.CN_UNKNOWN_ERROR.getCode());
                }
            }).invoke();
        } catch (Exception e) {
            if (e instanceof ProducerException) {
                throw ((ProducerException) e);
            }
            throw new ProducerException(e);
        }
    }

    protected List<SendResult> doBatchSendInternal(List<ProduceMessage> list, TopicMetadata topicMetadata, List<BrokerNode> list2, String str, long j, TimeUnit timeUnit, boolean z, boolean z2, AsyncBatchProduceCallback asyncBatchProduceCallback) {
        ProducerPolicy producerPolicy = topicMetadata.getProducerPolicy();
        String topic = topicMetadata.getTopic();
        String app = this.config.getApp();
        long intValue = this.config.getProduceTimeout() == -1 ? producerPolicy.getTimeOut().intValue() : this.config.getProduceTimeout();
        long millis = timeUnit.toMillis(j);
        boolean z3 = z2 && isFailover(list);
        RetryPolicy retryPolicy = this.config.getRetryPolicy();
        int i = 0;
        int intValue2 = z3 ? retryPolicy.getMaxRetrys().intValue() : 0;
        PartitionNode dispatchPartitions = dispatchPartitions(list, topicMetadata, list2);
        ClientException clientException = null;
        List<SendResult> list3 = null;
        for (int i2 = 0; i2 <= intValue2; i2++) {
            if (i != 0 && z3) {
                if (i == 1) {
                    list2 = Lists.newArrayList(list2);
                }
                ProducerHelper.clearPartitions(list);
                list2.remove(dispatchPartitions.getPartitionMetadata().getLeader());
                dispatchPartitions = dispatchPartitions(list, topicMetadata, list2);
            }
            PartitionNode.PartitionNodeTracer begin = dispatchPartitions.begin();
            try {
                list3 = doSendBatchMessage(dispatchPartitions.getPartitionMetadata().getLeader(), topic, app, list, str, this.config.getQosLevel(), intValue, millis, z, asyncBatchProduceCallback);
                begin.end();
                break;
            } catch (MetadataException e) {
                clientException = e;
                i++;
                topicMetadata = getAndCheckTopicMetadata(topicMetadata.getTopic());
                begin.error();
                logger.debug("send message exception, topic: {}, app:{}, messages: {}", new Object[]{topic, app, list, e});
            } catch (NeedRetryException e2) {
                clientException = new ProducerException(e2.getMessage(), e2.getCode(), e2.getCause());
                i++;
                begin.error();
                logger.debug("send message exception, topic: {}, app:{}, messages: {}", new Object[]{topic, app, list, e2});
            } catch (ClientException e3) {
                clientException = e3;
                i++;
                begin.error();
                logger.debug("send message exception, topic: {}, app:{}, messages: {}", new Object[]{topic, app, list, e3});
            } catch (Exception e4) {
                begin.error();
                logger.error("send message exception, topic: {}, app:{}, messages: {}", new Object[]{topic, app, list, e4});
                if (e4 instanceof ProducerException) {
                    throw ((ProducerException) e4);
                }
                throw new ProducerException(e4);
            }
        }
        if (i > retryPolicy.getMaxRetrys().intValue()) {
            if (clientException instanceof ProducerException) {
                throw ((ProducerException) clientException);
            }
            throw new ProducerException(clientException);
        }
        if (i != 0) {
            logger.warn("send message success, retry {} times, topic: {}, app: {}, error: {}", new Object[]{Integer.valueOf(i), topic, app, clientException.getMessage()});
        }
        return list3;
    }

    protected List<SendResult> doSendBatchMessage(BrokerNode brokerNode, String str, String str2, List<ProduceMessage> list, String str3, QosLevel qosLevel, long j, long j2, boolean z, final AsyncBatchProduceCallback asyncBatchProduceCallback) {
        if (logger.isDebugEnabled()) {
            logger.debug("batch send message, broker: {}, topic: {}, app: {}, messages: {}, txId: {}, qosLevel: {}", new Object[]{brokerNode, str, str2, list, str3, qosLevel});
        }
        if (z) {
            this.messageSender.batchSendOneway(brokerNode, str, str2, str3, list, qosLevel, j, j2);
            return null;
        }
        if (asyncBatchProduceCallback == null) {
            return handleSendBatchResultData(str, str2, this.messageSender.batchSend(brokerNode, str, str2, str3, list, qosLevel, j, j2));
        }
        this.messageSender.batchSendAsync(brokerNode, str, str2, str3, list, qosLevel, j, j2, new AsyncBatchSendCallback() { // from class: org.joyqueue.client.internal.producer.support.MessageProducerInner.2
            @Override // org.joyqueue.client.internal.producer.callback.AsyncBatchSendCallback
            public void onSuccess(List<ProduceMessage> list2, SendBatchResultData sendBatchResultData) {
                if (sendBatchResultData.getCode().equals(JoyQueueCode.SUCCESS)) {
                    asyncBatchProduceCallback.onSuccess(list2, sendBatchResultData.getResult());
                } else {
                    asyncBatchProduceCallback.onException(list2, new ProducerException(sendBatchResultData.getCode().getMessage(new Object[0]), sendBatchResultData.getCode().getCode()));
                }
            }

            @Override // org.joyqueue.client.internal.producer.callback.AsyncBatchSendCallback
            public void onException(List<ProduceMessage> list2, Throwable th) {
                asyncBatchProduceCallback.onException(list2, th);
            }
        });
        return null;
    }

    protected List<SendResult> handleSendBatchResultData(String str, String str2, SendBatchResultData sendBatchResultData) {
        if (sendBatchResultData == null) {
            throw new ProducerException(JoyQueueCode.CN_UNKNOWN_ERROR.getMessage(new Object[0]), JoyQueueCode.CN_UNKNOWN_ERROR.getCode());
        }
        JoyQueueCode code = sendBatchResultData.getCode();
        if (code.equals(JoyQueueCode.SUCCESS)) {
            return sendBatchResultData.getResult();
        }
        switch (code) {
            case CN_NO_PERMISSION:
            case CN_SERVICE_NOT_AVAILABLE:
            case FW_PRODUCE_MESSAGE_BROKER_NOT_LEADER:
                logger.debug("send message error, no permission, topic: {}", str);
                this.clusterManager.updateTopicMetadata(str, str2);
                throw new MetadataException(code.getMessage(new Object[0]), code.getCode());
            case FW_PUT_MESSAGE_TOPIC_NOT_WRITE:
                logger.debug("send message error, topic not write, topic: {}", str);
                break;
            case FW_TOPIC_NOT_EXIST:
                logger.debug("send message error, topic not exist, topic: {}", str);
                throw new ProducerException(code.getMessage(new Object[0]), code.getCode());
            case FW_BROKER_NOT_WRITABLE:
                logger.debug("send message error, broker not writable, topic: {}", str);
                break;
            default:
                logger.error("send message error, topic: {}, code: {}, error: {}", new Object[]{str, code, code.getMessage(new Object[0])});
                throw new NeedRetryException(code.getMessage(new Object[0]), code.getCode());
        }
        throw new NeedRetryException(code.getMessage(new Object[0]), code.getCode());
    }

    public TopicMetadata getAndCheckTopicMetadata(String str) {
        TopicMetadata fetchTopicMetadata = this.clusterManager.fetchTopicMetadata(getTopicFullName(str), this.config.getApp());
        if (fetchTopicMetadata == null) {
            throw new ProducerException(String.format("topic %s is not exist", str), JoyQueueCode.FW_TOPIC_NOT_EXIST.getCode());
        }
        if (fetchTopicMetadata.getProducerPolicy() == null) {
            throw new ProducerException(String.format("topic %s producer %s is not exist", str, this.nameServerConfig.getApp()), JoyQueueCode.FW_PRODUCER_NOT_EXISTS.getCode());
        }
        return fetchTopicMetadata;
    }

    public String getTopicFullName(String str) {
        return NameServerHelper.getTopicFullName(str, this.nameServerConfig);
    }

    public List<BrokerNode> getRegionBrokers(TopicMetadata topicMetadata) {
        return topicMetadata.getProducerPolicy().getNearby().booleanValue() ? topicMetadata.getWritableNearbyBrokers() : topicMetadata.getWritableBrokers();
    }

    public List<BrokerNode> filterNotAvailableBrokers(List<BrokerNode> list) {
        if (CollectionUtils.isEmpty(list)) {
            return list;
        }
        ArrayList arrayList = null;
        for (BrokerNode brokerNode : list) {
            ProducerClient tryGetClient = this.producerClientManager.tryGetClient(brokerNode);
            if (tryGetClient != null && !tryGetClient.getState().equals(ClientState.CONNECTED)) {
                if (arrayList == null) {
                    arrayList = Lists.newArrayList(list);
                }
                arrayList.remove(brokerNode);
            }
        }
        return arrayList == null ? list : arrayList;
    }

    public List<BrokerNode> getAvailableBrokers(TopicMetadata topicMetadata) {
        return filterNotAvailableBrokers(getRegionBrokers(topicMetadata));
    }

    public List<PartitionMetadata> getBrokerPartitions(TopicMetadata topicMetadata, List<BrokerNode> list) {
        if (topicMetadata.getBrokers().equals(list)) {
            return topicMetadata.getPartitions();
        }
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(topicMetadata.getPartitions().size());
        Iterator<BrokerNode> it = list.iterator();
        while (it.hasNext()) {
            List<PartitionMetadata> brokerPartitions = topicMetadata.getBrokerPartitions(it.next().getId());
            if (brokerPartitions != null) {
                newArrayListWithCapacity.addAll(brokerPartitions);
            }
        }
        return newArrayListWithCapacity;
    }

    public PartitionNode dispatchPartitions(List<ProduceMessage> list, TopicMetadata topicMetadata, List<BrokerNode> list2) {
        if (CollectionUtils.isEmpty(list2)) {
            throw new ProducerException(String.format("no broker available, topic: %s, messages: %s", topicMetadata.getTopic(), list), JoyQueueCode.FW_TOPIC_NO_PARTITIONGROUP.getCode());
        }
        PartitionNode dispatchPartitions = ProducerHelper.dispatchPartitions(list, topicMetadata, list2, this.partitionSelectorManager.getPartitionSelector(topicMetadata.getTopic(), this.config.getSelectorType()));
        if (dispatchPartitions == null || dispatchPartitions.getPartitionMetadata().getLeader() == null) {
            throw new ProducerException(String.format("partition not available, topic: %s, messages: %s", topicMetadata.getTopic(), list), JoyQueueCode.FW_TOPIC_NO_PARTITIONGROUP.getCode());
        }
        ProducerHelper.setPartitions(list, dispatchPartitions.getPartitionMetadata().getId());
        return dispatchPartitions;
    }

    public boolean isFailover(List<ProduceMessage> list) {
        return list.get(0).getPartition() == Short.MIN_VALUE;
    }
}
