package org.joyqueue.client.internal.producer.support;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.client.internal.exception.ClientException;
import org.joyqueue.client.internal.producer.MessageSender;
import org.joyqueue.client.internal.producer.callback.AsyncBatchSendCallback;
import org.joyqueue.client.internal.producer.callback.AsyncMultiBatchSendCallback;
import org.joyqueue.client.internal.producer.callback.AsyncSendCallback;
import org.joyqueue.client.internal.producer.config.SenderConfig;
import org.joyqueue.client.internal.producer.converter.MessageSenderConverter;
import org.joyqueue.client.internal.producer.domain.FetchFeedbackData;
import org.joyqueue.client.internal.producer.domain.ProduceMessage;
import org.joyqueue.client.internal.producer.domain.SendBatchResultData;
import org.joyqueue.client.internal.producer.domain.SendPrepareResult;
import org.joyqueue.client.internal.producer.domain.SendResultData;
import org.joyqueue.client.internal.producer.transport.ProducerClient;
import org.joyqueue.client.internal.producer.transport.ProducerClientGroup;
import org.joyqueue.client.internal.producer.transport.ProducerClientManager;
import org.joyqueue.client.internal.transport.ConnectionState;
import org.joyqueue.domain.QosLevel;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.network.command.ProduceMessageAckData;
import org.joyqueue.network.command.ProduceMessagePrepareResponse;
import org.joyqueue.network.command.ProduceMessageResponse;
import org.joyqueue.network.command.TxStatus;
import org.joyqueue.network.domain.BrokerNode;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.command.CommandCallback;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/client/internal/producer/support/DefaultMessageSender.class */
public class DefaultMessageSender extends Service implements MessageSender {
    protected static final Logger logger = LoggerFactory.getLogger(DefaultMessageSender.class);
    private ProducerClientManager producerClientManager;
    private SenderConfig config;
    private ConnectionState connectionState = new ConnectionState();

    public DefaultMessageSender(ProducerClientManager producerClientManager, SenderConfig senderConfig) {
        Preconditions.checkArgument(producerClientManager != null, "producerClientManager not null");
        this.producerClientManager = producerClientManager;
        this.config = senderConfig;
    }

    @Override // org.joyqueue.client.internal.producer.MessageSender
    public SendResultData send(BrokerNode brokerNode, String str, String str2, String str3, ProduceMessage produceMessage, QosLevel qosLevel, long j, long j2) {
        SendBatchResultData batchSend = batchSend(brokerNode, str, str2, str3, Lists.newArrayList(new ProduceMessage[]{produceMessage}), qosLevel, j, j2);
        SendResultData sendResultData = new SendResultData();
        sendResultData.setCode(batchSend.getCode());
        if (CollectionUtils.isNotEmpty(batchSend.getResult())) {
            sendResultData.setResult(batchSend.getResult().get(0));
        }
        return sendResultData;
    }

    @Override // org.joyqueue.client.internal.producer.MessageSender
    public SendBatchResultData batchSend(BrokerNode brokerNode, String str, String str2, String str3, List<ProduceMessage> list, QosLevel qosLevel, long j, long j2) {
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(1);
        newHashMapWithExpectedSize.put(str, list);
        return batchSend(brokerNode, str2, str3, newHashMapWithExpectedSize, qosLevel, j, j2).get(str);
    }

    @Override // org.joyqueue.client.internal.producer.MessageSender
    public void sendAsync(BrokerNode brokerNode, String str, String str2, String str3, final ProduceMessage produceMessage, QosLevel qosLevel, long j, long j2, final AsyncSendCallback asyncSendCallback) {
        batchSendAsync(brokerNode, str, str2, str3, Lists.newArrayList(new ProduceMessage[]{produceMessage}), qosLevel, j, j2, new AsyncBatchSendCallback() { // from class: org.joyqueue.client.internal.producer.support.DefaultMessageSender.1
            @Override // org.joyqueue.client.internal.producer.callback.AsyncBatchSendCallback
            public void onSuccess(List<ProduceMessage> list, SendBatchResultData sendBatchResultData) {
                SendResultData sendResultData = new SendResultData();
                sendResultData.setCode(sendBatchResultData.getCode());
                if (CollectionUtils.isNotEmpty(sendBatchResultData.getResult())) {
                    sendResultData.setResult(sendBatchResultData.getResult().get(0));
                }
                asyncSendCallback.onSuccess(produceMessage, sendResultData);
            }

            @Override // org.joyqueue.client.internal.producer.callback.AsyncBatchSendCallback
            public void onException(List<ProduceMessage> list, Throwable th) {
                asyncSendCallback.onException(produceMessage, th);
            }
        });
    }

    @Override // org.joyqueue.client.internal.producer.MessageSender
    public void batchSendAsync(BrokerNode brokerNode, final String str, String str2, String str3, List<ProduceMessage> list, QosLevel qosLevel, long j, long j2, final AsyncBatchSendCallback asyncBatchSendCallback) {
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(1);
        newHashMapWithExpectedSize.put(str, list);
        batchSendAsync(brokerNode, str2, str3, newHashMapWithExpectedSize, qosLevel, j, j2, new AsyncMultiBatchSendCallback() { // from class: org.joyqueue.client.internal.producer.support.DefaultMessageSender.2
            @Override // org.joyqueue.client.internal.producer.callback.AsyncMultiBatchSendCallback
            public void onSuccess(Map<String, List<ProduceMessage>> map, Map<String, SendBatchResultData> map2) {
                asyncBatchSendCallback.onSuccess(map.get(str), map2.get(str));
            }

            @Override // org.joyqueue.client.internal.producer.callback.AsyncMultiBatchSendCallback
            public void onException(Map<String, List<ProduceMessage>> map, Throwable th) {
                asyncBatchSendCallback.onException(map.get(str), th);
            }
        });
    }

    @Override // org.joyqueue.client.internal.producer.MessageSender
    public CompletableFuture<SendBatchResultData> batchSendAsync(BrokerNode brokerNode, final String str, String str2, String str3, List<ProduceMessage> list, QosLevel qosLevel, long j, long j2) {
        final CompletableFuture<SendBatchResultData> completableFuture = new CompletableFuture<>();
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(1);
        newHashMapWithExpectedSize.put(str, list);
        batchSendAsync(brokerNode, str2, str3, newHashMapWithExpectedSize, qosLevel, j, j2, new AsyncMultiBatchSendCallback() { // from class: org.joyqueue.client.internal.producer.support.DefaultMessageSender.3
            @Override // org.joyqueue.client.internal.producer.callback.AsyncMultiBatchSendCallback
            public void onSuccess(Map<String, List<ProduceMessage>> map, Map<String, SendBatchResultData> map2) {
                completableFuture.complete(map2.get(str));
            }

            @Override // org.joyqueue.client.internal.producer.callback.AsyncMultiBatchSendCallback
            public void onException(Map<String, List<ProduceMessage>> map, Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture;
    }

    @Override // org.joyqueue.client.internal.producer.MessageSender
    public void sendOneway(BrokerNode brokerNode, String str, String str2, String str3, ProduceMessage produceMessage, QosLevel qosLevel, long j, long j2) {
        batchSendOneway(brokerNode, str, str2, str3, Lists.newArrayList(new ProduceMessage[]{produceMessage}), qosLevel, j, j2);
    }

    @Override // org.joyqueue.client.internal.producer.MessageSender
    public void batchSendOneway(BrokerNode brokerNode, String str, String str2, String str3, List<ProduceMessage> list, QosLevel qosLevel, long j, long j2) {
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(1);
        newHashMapWithExpectedSize.put(str, list);
        batchSendOneway(brokerNode, str2, str3, newHashMapWithExpectedSize, qosLevel, j, j2);
    }

    @Override // org.joyqueue.client.internal.producer.MessageSender
    public void batchSendOneway(BrokerNode brokerNode, String str, String str2, Map<String, List<ProduceMessage>> map, QosLevel qosLevel, long j, long j2) {
        checkState();
        HashMap newHashMap = Maps.newHashMap();
        for (Map.Entry<String, List<ProduceMessage>> entry : map.entrySet()) {
            String key = entry.getKey();
            newHashMap.put(key, MessageSenderConverter.convertToProduceMessageData(key, str, str2, entry.getValue(), qosLevel, j, this.config.isCompress(), this.config.getCompressThreshold(), this.config.getCompressType(), this.config.isBatch()));
        }
        ProducerClient orCreateClient = this.producerClientManager.getOrCreateClient(brokerNode);
        handleAddProducers(brokerNode, map.keySet(), str, orCreateClient);
        orCreateClient.produceMessageOneway(str, newHashMap, j2);
    }

    @Override // org.joyqueue.client.internal.producer.MessageSender
    public Map<String, SendBatchResultData> batchSend(BrokerNode brokerNode, String str, String str2, Map<String, List<ProduceMessage>> map, QosLevel qosLevel, long j, long j2) {
        checkState();
        HashMap newHashMap = Maps.newHashMap();
        for (Map.Entry<String, List<ProduceMessage>> entry : map.entrySet()) {
            String key = entry.getKey();
            newHashMap.put(key, MessageSenderConverter.convertToProduceMessageData(key, str, str2, entry.getValue(), qosLevel, j, this.config.isCompress(), this.config.getCompressThreshold(), this.config.getCompressType(), this.config.isBatch()));
        }
        ProducerClient orCreateClient = this.producerClientManager.getOrCreateClient(brokerNode);
        handleAddProducers(brokerNode, map.keySet(), str, orCreateClient);
        ProduceMessageResponse produceMessage = orCreateClient.produceMessage(str, newHashMap, j2);
        HashMap newHashMap2 = Maps.newHashMap();
        for (Map.Entry entry2 : produceMessage.getData().entrySet()) {
            newHashMap2.put(entry2.getKey(), MessageSenderConverter.convertToBatchResultData((String) entry2.getKey(), str, (ProduceMessageAckData) entry2.getValue()));
        }
        return newHashMap2;
    }

    @Override // org.joyqueue.client.internal.producer.MessageSender
    public void batchSendAsync(BrokerNode brokerNode, final String str, String str2, final Map<String, List<ProduceMessage>> map, QosLevel qosLevel, long j, long j2, final AsyncMultiBatchSendCallback asyncMultiBatchSendCallback) {
        checkState();
        HashMap newHashMap = Maps.newHashMap();
        for (Map.Entry<String, List<ProduceMessage>> entry : map.entrySet()) {
            String key = entry.getKey();
            newHashMap.put(key, MessageSenderConverter.convertToProduceMessageData(key, str, str2, entry.getValue(), qosLevel, j, this.config.isCompress(), this.config.getCompressThreshold(), this.config.getCompressType(), this.config.isBatch()));
        }
        ProducerClient orCreateClient = this.producerClientManager.getOrCreateClient(brokerNode);
        handleAddProducers(brokerNode, map.keySet(), str, orCreateClient);
        try {
            orCreateClient.asyncProduceMessage(str, newHashMap, j2, new CommandCallback() { // from class: org.joyqueue.client.internal.producer.support.DefaultMessageSender.4
                public void onSuccess(Command command, Command command2) {
                    ProduceMessageResponse produceMessageResponse = (ProduceMessageResponse) command2.getPayload();
                    HashMap newHashMap2 = Maps.newHashMap();
                    for (Map.Entry entry2 : produceMessageResponse.getData().entrySet()) {
                        newHashMap2.put(entry2.getKey(), MessageSenderConverter.convertToBatchResultData((String) entry2.getKey(), str, (ProduceMessageAckData) entry2.getValue()));
                    }
                    asyncMultiBatchSendCallback.onSuccess(map, newHashMap2);
                }

                public void onException(Command command, Throwable th) {
                    asyncMultiBatchSendCallback.onException(map, th);
                }
            });
        } catch (ClientException e) {
            asyncMultiBatchSendCallback.onException(map, e);
        }
    }

    @Override // org.joyqueue.client.internal.producer.MessageSender
    public CompletableFuture<Map<String, SendBatchResultData>> batchSendAsync(BrokerNode brokerNode, String str, String str2, Map<String, List<ProduceMessage>> map, QosLevel qosLevel, long j, long j2) {
        final CompletableFuture<Map<String, SendBatchResultData>> completableFuture = new CompletableFuture<>();
        batchSendAsync(brokerNode, str, str2, map, qosLevel, j, j2, new AsyncMultiBatchSendCallback() { // from class: org.joyqueue.client.internal.producer.support.DefaultMessageSender.5
            @Override // org.joyqueue.client.internal.producer.callback.AsyncMultiBatchSendCallback
            public void onSuccess(Map<String, List<ProduceMessage>> map2, Map<String, SendBatchResultData> map3) {
                completableFuture.complete(map3);
            }

            @Override // org.joyqueue.client.internal.producer.callback.AsyncMultiBatchSendCallback
            public void onException(Map<String, List<ProduceMessage>> map2, Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture;
    }

    @Override // org.joyqueue.client.internal.producer.MessageSender
    public SendPrepareResult prepare(BrokerNode brokerNode, String str, String str2, String str3, long j, long j2, long j3) {
        checkState();
        ProducerClient orCreateClient = this.producerClientManager.getOrCreateClient(brokerNode);
        handleAddProducers(brokerNode, Lists.newArrayList(new String[]{str}), str2, orCreateClient);
        ProduceMessagePrepareResponse produceMessagePrepare = orCreateClient.produceMessagePrepare(str, str2, j, str3, j2, j3);
        return new SendPrepareResult(produceMessagePrepare.getTxId(), produceMessagePrepare.getCode());
    }

    @Override // org.joyqueue.client.internal.producer.MessageSender
    public JoyQueueCode commit(BrokerNode brokerNode, String str, String str2, String str3, long j) {
        checkState();
        ProducerClient orCreateClient = this.producerClientManager.getOrCreateClient(brokerNode);
        handleAddProducers(brokerNode, Lists.newArrayList(new String[]{str}), str2, orCreateClient);
        return orCreateClient.produceMessageCommit(str, str2, str3, j).getCode();
    }

    @Override // org.joyqueue.client.internal.producer.MessageSender
    public JoyQueueCode rollback(BrokerNode brokerNode, String str, String str2, String str3, long j) {
        checkState();
        ProducerClient orCreateClient = this.producerClientManager.getOrCreateClient(brokerNode);
        handleAddProducers(brokerNode, Lists.newArrayList(new String[]{str}), str2, orCreateClient);
        return orCreateClient.produceMessageRollback(str, str2, str3, j).getCode();
    }

    @Override // org.joyqueue.client.internal.producer.MessageSender
    public FetchFeedbackData fetchFeedback(BrokerNode brokerNode, String str, String str2, TxStatus txStatus, int i, long j, long j2) {
        checkState();
        ProducerClient orCreateClient = this.producerClientManager.getOrCreateClient(brokerNode);
        handleAddProducers(brokerNode, Lists.newArrayList(new String[]{str}), str2, orCreateClient);
        return MessageSenderConverter.convertToFetchFeedbackData(str, str2, orCreateClient.fetchFeedback(str, str2, txStatus, i, j, j2));
    }

    protected void checkState() {
        if (!isStarted()) {
            throw new ClientException("sender is not started", JoyQueueCode.CN_SERVICE_NOT_AVAILABLE.getCode());
        }
    }

    protected void handleAddProducers(BrokerNode brokerNode, Collection<String> collection, String str, ProducerClient producerClient) {
        producerClient.addProducers(collection, str);
        this.connectionState.addBrokerNode(brokerNode);
        this.connectionState.addTopics(collection);
        this.connectionState.addApp(str);
    }

    protected void doStop() {
        handleRemoveProducers();
    }

    protected void handleRemoveProducers() {
        Set<BrokerNode> brokerNodes = this.connectionState.getBrokerNodes();
        Set<String> topics = this.connectionState.getTopics();
        Set<String> apps = this.connectionState.getApps();
        Iterator<BrokerNode> it = brokerNodes.iterator();
        while (it.hasNext()) {
            handleRemoveProducers(it.next(), topics, apps);
        }
    }

    protected void handleRemoveProducers(BrokerNode brokerNode, Set<String> set, Set<String> set2) {
        ProducerClientGroup clientGroup = this.producerClientManager.getClientGroup(brokerNode);
        if (clientGroup == null) {
            return;
        }
        for (String str : set2) {
            Iterator<ProducerClient> it = clientGroup.getClients().iterator();
            while (it.hasNext()) {
                try {
                    it.next().removeProducers(set, str);
                } catch (Exception e) {
                    logger.warn("remove producers exception, topics: {}, app: {}, exception: {}", new Object[]{set, str, e.getMessage()});
                    logger.debug("remove producers exception, topics: {}, app: {}", new Object[]{set, str, e});
                }
            }
        }
    }
}
