package org.joyqueue.broker.producer;

import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.BrokerContext;
import org.joyqueue.broker.BrokerContextAware;
import org.joyqueue.broker.buffer.Serializer;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.monitor.BrokerMonitor;
import org.joyqueue.broker.producer.transaction.TransactionManager;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.domain.QosLevel;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.message.BrokerCommit;
import org.joyqueue.message.BrokerMessage;
import org.joyqueue.message.BrokerPrepare;
import org.joyqueue.message.BrokerRollback;
import org.joyqueue.message.JoyQueueLog;
import org.joyqueue.network.session.Producer;
import org.joyqueue.network.session.TransactionId;
import org.joyqueue.store.PartitionGroupStore;
import org.joyqueue.store.StoreService;
import org.joyqueue.store.WriteRequest;
import org.joyqueue.store.WriteResult;
import org.joyqueue.toolkit.concurrent.EventListener;
import org.joyqueue.toolkit.concurrent.LoopThread;
import org.joyqueue.toolkit.lang.Close;
import org.joyqueue.toolkit.metric.Metric;
import org.joyqueue.toolkit.service.Service;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/producer/ProduceManager.class */
public class ProduceManager extends Service implements Produce, BrokerContextAware {
    private static final Logger logger = LoggerFactory.getLogger(ProduceManager.class);
    private ProduceConfig config;
    private TransactionManager transactionManager;
    private ClusterManager clusterManager;
    private StoreService store;
    private BrokerMonitor brokerMonitor;
    private BrokerContext brokerContext;
    private Metric metrics = null;
    private Metric.MetricInstance metric = null;
    private LoopThread metricThread = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/joyqueue/broker/producer/ProduceManager$MetricEventListener.class */
    public class MetricEventListener implements EventListener<WriteResult> {
        final long t0;
        final long startTime;
        final Metric.MetricInstance metric;
        final EventListener<WriteResult> eventListener;
        final String topic;
        final String app;
        final int partitionGroup;
        final List<WriteRequest> writeRequests;

        MetricEventListener(long j, long j2, Metric.MetricInstance metricInstance, EventListener<WriteResult> eventListener, String str, String str2, int i, List<WriteRequest> list) {
            this.t0 = j;
            this.startTime = j2;
            this.metric = metricInstance;
            this.eventListener = eventListener;
            this.topic = str;
            this.app = str2;
            this.partitionGroup = i;
            this.writeRequests = list;
        }

        public void onEvent(WriteResult writeResult) {
            this.metric.addLatency("callback", System.nanoTime() - this.t0);
            if (writeResult.getCode().equals(JoyQueueCode.SUCCESS)) {
                ProduceManager.this.onPutMessage(this.topic, this.app, this.partitionGroup, this.startTime, this.writeRequests);
            }
            this.eventListener.onEvent(writeResult);
        }
    }

    public ProduceManager() {
    }

    public ProduceManager(ProduceConfig produceConfig, ClusterManager clusterManager, StoreService storeService, BrokerMonitor brokerMonitor) {
        this.config = produceConfig;
        this.clusterManager = clusterManager;
        this.store = storeService;
        this.brokerMonitor = brokerMonitor;
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.transactionManager.start();
        if (null != this.metricThread) {
            this.metricThread.start();
        }
        logger.info("ProduceManager is started.");
    }

    protected void validate() throws Exception {
        super.validate();
        if (this.config == null) {
            this.config = new ProduceConfig(this.brokerContext == null ? null : this.brokerContext.getPropertySupplier());
        }
        if (this.store == null && this.brokerContext != null) {
            this.store = this.brokerContext.getStoreService();
        }
        if (this.clusterManager == null && this.brokerContext != null) {
            this.clusterManager = this.brokerContext.getClusterManager();
        }
        if (this.brokerMonitor == null && this.brokerContext != null) {
            this.brokerMonitor = this.brokerContext.getBrokerMonitor();
        }
        Preconditions.checkArgument(this.store != null, "store service can not be null");
        Preconditions.checkArgument(this.clusterManager != null, "cluster manager can not be null");
        if (this.brokerMonitor == null) {
            logger.warn("broker monitor is null.");
        }
        if (!this.clusterManager.isStarted()) {
            logger.warn("The clusterManager is not started, try to start it!");
            this.clusterManager.start();
        }
        this.transactionManager = new TransactionManager(this.config, this.store, this.clusterManager, this.brokerMonitor);
        if (this.config.getPrintMetricIntervalMs() > 0) {
            this.metrics = new Metric("input", 1, new String[]{"callback", "async"}, new String[]{"tps"}, new String[]{"traffic"});
            this.metric = (Metric.MetricInstance) this.metrics.getMetricInstances().get(0);
            this.metricThread = LoopThread.builder().sleepTime(this.config.getPrintMetricIntervalMs(), this.config.getPrintMetricIntervalMs()).name("Metric-Thread").onException(th -> {
                logger.warn("Exception:", th);
            }).doWork(() -> {
                this.metrics.reportAndReset();
            }).build();
        }
    }

    protected void doStop() {
        super.doStop();
        Close.close(this.transactionManager);
        if (null != this.metricThread) {
            this.metricThread.stop();
        }
        logger.info("ProduceManager is stopped.");
    }

    @Override // org.joyqueue.broker.producer.Produce
    public PutResult putMessage(Producer producer, List<BrokerMessage> list, QosLevel qosLevel) throws JoyQueueException {
        return putMessage(producer, list, qosLevel, this.clusterManager.getProducerPolicy(TopicName.parse(producer.getTopic()), producer.getApp()).getTimeOut().intValue());
    }

    @Override // org.joyqueue.broker.producer.Produce
    public PutResult putMessage(Producer producer, List<BrokerMessage> list, QosLevel qosLevel, int i) throws JoyQueueException {
        long now = i + SystemClock.now();
        QosLevel configQosLevel = getConfigQosLevel(producer, qosLevel);
        String txId = list.get(0).getTxId();
        return StringUtils.isNotEmpty(txId) ? writeTxMessage(producer, list, txId, now) : writeMessages(producer, list, configQosLevel, now);
    }

    @Override // org.joyqueue.broker.producer.Produce
    public void putMessageAsync(Producer producer, List<BrokerMessage> list, QosLevel qosLevel, EventListener<WriteResult> eventListener) throws JoyQueueException {
        putMessageAsync(producer, list, qosLevel, this.clusterManager.getProducerPolicy(TopicName.parse(producer.getTopic()), producer.getApp()).getTimeOut().intValue(), eventListener);
    }

    @Override // org.joyqueue.broker.producer.Produce
    public void putMessageAsync(Producer producer, List<BrokerMessage> list, QosLevel qosLevel, int i, EventListener<WriteResult> eventListener) throws JoyQueueException {
        long now = i + SystemClock.now();
        String txId = list.get(0).getTxId();
        QosLevel configQosLevel = getConfigQosLevel(producer, qosLevel);
        if (StringUtils.isNotEmpty(txId)) {
            writeTxMessageAsync(producer, list, txId, i, eventListener);
        } else {
            writeMessagesAsync(producer, list, configQosLevel, now, eventListener);
        }
    }

    private PutResult writeTxMessage(Producer producer, List<BrokerMessage> list, String str, long j) throws JoyQueueException {
        WriteResult syncWait = syncWait(this.transactionManager.putMessage(producer, str, generateRByteBufferList(list)), j - SystemClock.now());
        PutResult putResult = new PutResult();
        putResult.addWriteResult(Short.valueOf(list.get(0).getPartition()), syncWait);
        return putResult;
    }

    private void writeTxMessageAsync(Producer producer, List<BrokerMessage> list, String str, long j, EventListener<WriteResult> eventListener) throws JoyQueueException {
        try {
            eventListener.onEvent(this.transactionManager.putMessage(producer, str, generateRByteBufferList(list)).get(j, TimeUnit.MILLISECONDS));
        } catch (Exception e) {
            logger.error("writeTxMessageAsync exception, producer: {}", producer, e);
            eventListener.onEvent(new WriteResult(JoyQueueCode.CN_UNKNOWN_ERROR, ArrayUtils.EMPTY_LONG_ARRAY));
        }
    }

    private PutResult writeMessages(Producer producer, List<BrokerMessage> list, QosLevel qosLevel, long j) throws JoyQueueException {
        PutResult putResult = new PutResult();
        String topic = producer.getTopic();
        List<Short> masterPartitionList = this.clusterManager.getMasterPartitionList(TopicName.parse(topic));
        if (masterPartitionList == null || masterPartitionList.size() == 0) {
            logger.error("no partitions available topic:%s", topic);
            throw new JoyQueueException(JoyQueueCode.CN_NO_PERMISSION, new Object[0]);
        }
        long now = SystemClock.now();
        for (Map.Entry<PartitionGroup, List<WriteRequest>> entry : dispatchPartition(list, masterPartitionList).entrySet()) {
            PartitionGroup key = entry.getKey();
            List<WriteRequest> value = entry.getValue();
            WriteResult syncWait = syncWait(this.store.getStore(topic, key.getGroup(), qosLevel).asyncWrite((WriteRequest[]) value.toArray(new WriteRequest[0])), j - SystemClock.now());
            if (syncWait.getCode().equals(JoyQueueCode.SUCCESS)) {
                onPutMessage(topic, producer.getApp(), key.getGroup(), now, value);
            }
            putResult.addWriteResult(Short.valueOf((short) key.getGroup()), syncWait);
            if (this.config.getLogDetail(producer.getApp())) {
                logger.info("writeMessages, topic: {}, app: {}, partitionGroup: {}, qosLevel: {}, size: {}, result: {}", new Object[]{producer.getTopic(), producer.getApp(), Integer.valueOf(key.getGroup()), qosLevel, Integer.valueOf(value.size()), syncWait.getCode()});
            }
        }
        return putResult;
    }

    private void writeMessagesAsync(Producer producer, List<BrokerMessage> list, QosLevel qosLevel, long j, EventListener<WriteResult> eventListener) throws JoyQueueException {
        String topic = producer.getTopic();
        String app = producer.getApp();
        List<Short> masterPartitionList = this.clusterManager.getMasterPartitionList(TopicName.parse(topic));
        if (masterPartitionList == null || masterPartitionList.size() == 0) {
            logger.error("no partitions available topic:%s", topic);
            throw new JoyQueueException(JoyQueueCode.CN_NO_PERMISSION, new Object[0]);
        }
        for (Map.Entry<PartitionGroup, List<WriteRequest>> entry : dispatchPartition(list, masterPartitionList).entrySet()) {
            PartitionGroup key = entry.getKey();
            if (logger.isDebugEnabled()) {
                logger.debug("ProduceManager writeMessageAsync topic:[{}], partitionGroup:[{}]]", topic, key);
            }
            List<WriteRequest> value = entry.getValue();
            PartitionGroupStore store = this.store.getStore(topic, key.getGroup(), qosLevel);
            long now = SystemClock.now();
            if (null != this.metric) {
                long nanoTime = System.nanoTime();
                store.asyncWrite(new MetricEventListener(nanoTime, now, this.metric, eventListener, topic, app, key.getGroup(), value), (WriteRequest[]) value.toArray(new WriteRequest[0]));
                long nanoTime2 = System.nanoTime();
                this.metric.addCounter("tps", value.stream().map((v0) -> {
                    return v0.getBuffer();
                }).count());
                this.metric.addTraffic("traffic", value.stream().map((v0) -> {
                    return v0.getBuffer();
                }).mapToInt((v0) -> {
                    return v0.remaining();
                }).sum());
                this.metric.addLatency("async", nanoTime2 - nanoTime);
            } else {
                store.asyncWrite(writeResult -> {
                    if (writeResult.getCode().equals(JoyQueueCode.SUCCESS)) {
                        onPutMessage(topic, app, key.getGroup(), now, value);
                    }
                    if (this.config.getLogDetail(producer.getApp())) {
                        logger.info("writeMessagesAsync, topic: {}, app: {}, partitionGroup: {}, qosLevel: {}, size: {}, result: {}", new Object[]{producer.getTopic(), producer.getApp(), Integer.valueOf(key.getGroup()), qosLevel, Integer.valueOf(value.size()), writeResult.getCode()});
                    }
                    eventListener.onEvent(writeResult);
                }, (WriteRequest[]) value.toArray(new WriteRequest[0]));
            }
            if (qosLevel.equals(QosLevel.ONE_WAY)) {
                onPutMessage(topic, app, key.getGroup(), now, value);
            }
        }
    }

    protected void onPutMessage(String str, String str2, int i, long j, List<WriteRequest> list) {
        long now = SystemClock.now();
        list.forEach(writeRequest -> {
            this.brokerMonitor.onPutMessage(str, str2, i, writeRequest.getPartition(), writeRequest.getBatchSize(), writeRequest.getBuffer().limit(), now - j);
        });
    }

    protected QosLevel getConfigQosLevel(Producer producer, QosLevel qosLevel) {
        return this.config.getTopicQosLevel(producer.getTopic()) != -1 ? QosLevel.valueOf(this.config.getTopicQosLevel(producer.getTopic())) : this.config.getAppQosLevel(producer.getApp()) != -1 ? QosLevel.valueOf(this.config.getAppQosLevel(producer.getApp())) : this.config.getBrokerQosLevel() != -1 ? QosLevel.valueOf(this.config.getBrokerQosLevel()) : qosLevel;
    }

    private WriteResult syncWait(Future<WriteResult> future, long j) throws JoyQueueException {
        try {
            return future.get(j, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException e) {
            logger.error("Write message error", e);
            throw new JoyQueueException(JoyQueueCode.CN_THREAD_INTERRUPTED, new Object[0]);
        } catch (TimeoutException e2) {
            throw new JoyQueueException(JoyQueueCode.SE_WRITE_TIMEOUT, new Object[0]);
        }
    }

    private Map<PartitionGroup, List<WriteRequest>> dispatchPartition(List<BrokerMessage> list, List<Short> list2) throws JoyQueueException {
        short shortValue = list2.get((int) Math.floor(Math.random() * list2.size())).shortValue();
        int size = list2.size();
        HashMap hashMap = new HashMap();
        for (BrokerMessage brokerMessage : list) {
            short selectPartition = selectPartition(shortValue, brokerMessage, list2, size);
            PartitionGroup partitionGroup = this.clusterManager.getPartitionGroup(TopicName.parse(brokerMessage.getTopic()), selectPartition);
            List list3 = (List) hashMap.get(partitionGroup);
            if (list3 == null) {
                list3 = new ArrayList();
                hashMap.put(partitionGroup, list3);
            }
            short s = 1;
            if (brokerMessage.isBatch()) {
                s = brokerMessage.getFlag();
            }
            list3.add(new WriteRequest(selectPartition, convertBrokerMessage2RByteBuffer(brokerMessage), s));
        }
        return hashMap;
    }

    private short selectPartition(short s, BrokerMessage brokerMessage, List<Short> list, int i) {
        if (brokerMessage.getPartition() >= 0) {
            return brokerMessage.getPartition();
        }
        short s2 = s;
        if (brokerMessage.isOrdered()) {
            String businessId = brokerMessage.getBusinessId();
            if (StringUtils.isEmpty(businessId)) {
                s2 = list.get(0).shortValue();
            } else {
                int hashCode = businessId.hashCode();
                s2 = list.get(Math.abs(hashCode > Integer.MIN_VALUE ? hashCode : -2147483647) % i).shortValue();
            }
        }
        return s2;
    }

    private ByteBuffer convertBrokerMessage2RByteBuffer(BrokerMessage brokerMessage) throws JoyQueueException {
        int sizeOf = Serializer.sizeOf(brokerMessage);
        ByteBuffer allocate = ByteBuffer.allocate(sizeOf);
        try {
            Serializer.write(brokerMessage, allocate, sizeOf);
            return allocate;
        } catch (Exception e) {
            logger.error("Serialize message error! topic:{},app:{}", new Object[]{brokerMessage.getTopic(), brokerMessage.getApp(), e});
            throw new JoyQueueException(JoyQueueCode.SE_SERIALIZER_ERROR, new Object[0]);
        }
    }

    private ByteBuffer[] generateRByteBufferList(List<BrokerMessage> list) throws JoyQueueException {
        int size = list.size();
        ByteBuffer[] byteBufferArr = new ByteBuffer[size];
        for (int i = 0; i < size; i++) {
            byteBufferArr[i] = convertBrokerMessage2RByteBuffer(list.get(i));
        }
        return byteBufferArr;
    }

    @Override // org.joyqueue.broker.producer.Produce
    public TransactionId putTransactionMessage(Producer producer, JoyQueueLog joyQueueLog) throws JoyQueueException {
        if (joyQueueLog.getType() == 3) {
            return this.transactionManager.prepare(producer, (BrokerPrepare) joyQueueLog);
        }
        if (joyQueueLog.getType() == 4) {
            return this.transactionManager.commit(producer, (BrokerCommit) joyQueueLog);
        }
        if (joyQueueLog.getType() == 5) {
            return this.transactionManager.rollback(producer, (BrokerRollback) joyQueueLog);
        }
        throw new JoyQueueException(JoyQueueCode.CN_COMMAND_UNSUPPORTED, new Object[0]);
    }

    @Override // org.joyqueue.broker.producer.Produce
    public TransactionId getTransaction(Producer producer, String str) {
        return this.transactionManager.getTransaction(producer.getTopic(), producer.getApp(), str);
    }

    @Override // org.joyqueue.broker.producer.Produce
    public List<TransactionId> getFeedback(Producer producer, int i) {
        return this.transactionManager.getFeedback(producer, i);
    }

    @Override // org.joyqueue.broker.BrokerContextAware
    public void setBrokerContext(BrokerContext brokerContext) {
        this.brokerContext = brokerContext;
    }
}
