package org.joyqueue.broker.producer.transaction;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.buffer.Serializer;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.monitor.BrokerMonitor;
import org.joyqueue.broker.producer.ProduceConfig;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.domain.Producer;
import org.joyqueue.domain.QosLevel;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.message.BrokerCommit;
import org.joyqueue.message.BrokerPrepare;
import org.joyqueue.message.BrokerRollback;
import org.joyqueue.network.session.Producer;
import org.joyqueue.network.session.TransactionId;
import org.joyqueue.store.PartitionGroupStore;
import org.joyqueue.store.StoreService;
import org.joyqueue.store.WriteRequest;
import org.joyqueue.store.WriteResult;
import org.joyqueue.store.message.MessageParser;
import org.joyqueue.store.transaction.TransactionStore;
import org.joyqueue.toolkit.service.Service;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/producer/transaction/TransactionManager.class */
public class TransactionManager extends Service {
    private static final Logger logger = LoggerFactory.getLogger(TransactionManager.class);
    private final AtomicLong sequence = new AtomicLong();
    private ProduceConfig config;
    private StoreService store;
    private ClusterManager clusterManager;
    private BrokerMonitor brokerMonitor;
    private UnCompletedTransactionManager unCompletedTransactionManager;
    private TransactionRecover transactionRecover;
    private TransactionCleaner transactionCleaner;

    public TransactionManager(ProduceConfig produceConfig, StoreService storeService, ClusterManager clusterManager, BrokerMonitor brokerMonitor) {
        this.config = produceConfig;
        this.store = storeService;
        this.clusterManager = clusterManager;
        this.brokerMonitor = brokerMonitor;
    }

    public TransactionId prepare(Producer producer, BrokerPrepare brokerPrepare) throws JoyQueueException {
        if (this.unCompletedTransactionManager.getTransactionCount(producer.getTopic(), producer.getApp()) > this.config.getTransactionMaxUncomplete()) {
            logger.warn("too many transactions, topic: {}, app: {}. txId: {}", new Object[]{brokerPrepare.getTopic(), brokerPrepare.getApp(), brokerPrepare.getTxId()});
            throw new JoyQueueException(JoyQueueCode.FW_TRANSACTION_LIMIT, new Object[0]);
        }
        TransactionStore transactionStore = this.store.getTransactionStore(producer.getTopic());
        if (transactionStore == null) {
            logger.error("transaction store not exist, topic: {}", producer.getTopic());
            throw new JoyQueueException(JoyQueueCode.CN_TRANSACTION_NOT_EXISTS, new Object[0]);
        }
        int next = transactionStore.next();
        TransactionId generateTransactionId = generateTransactionId(brokerPrepare, next);
        try {
            this.unCompletedTransactionManager.putTransaction(generateTransactionId);
            ByteBuffer allocate = ByteBuffer.allocate(Serializer.sizeOfBrokerPrepare(brokerPrepare));
            Serializer.writeBrokerPrepare(brokerPrepare, allocate);
            allocate.flip();
            waitFuture(producer, transactionStore.asyncWrite(next, new ByteBuffer[]{allocate.slice()}));
            return generateTransactionId;
        } catch (Exception e) {
            try {
                transactionStore.remove(next);
                this.unCompletedTransactionManager.removeTransaction(generateTransactionId);
            } catch (Exception e2) {
                logger.error("clear prepare exception, topic: {}, app: {}. txId: {}", new Object[]{brokerPrepare.getTopic(), brokerPrepare.getApp(), brokerPrepare.getTxId(), e2});
            }
            logger.error("write prepare exception, topic: {}, app: {}. txId: {}", new Object[]{brokerPrepare.getTopic(), brokerPrepare.getApp(), brokerPrepare.getTxId(), e});
            throw new JoyQueueException(JoyQueueCode.CN_TRANSACTION_PREPARE_ERROR, new Object[0]);
        }
    }

    protected TransactionId generateTransactionId(BrokerPrepare brokerPrepare, int i) {
        long now = SystemClock.now();
        String txId = brokerPrepare.getTxId();
        if (StringUtils.isBlank(txId)) {
            txId = String.format("transactionId_%s_%s_%s_%s", brokerPrepare.getTopic(), brokerPrepare.getApp(), Long.valueOf(this.sequence.getAndIncrement()), Long.valueOf(now));
        }
        return new TransactionId(brokerPrepare.getTopic(), brokerPrepare.getApp(), txId, brokerPrepare.getQueryId(), i, brokerPrepare.getSource(), brokerPrepare.getTimeout(), now);
    }

    public TransactionId commit(Producer producer, BrokerCommit brokerCommit) throws JoyQueueException {
        TransactionStore transactionStore = this.store.getTransactionStore(producer.getTopic());
        if (transactionStore == null) {
            logger.error("transaction store not exist, topic: {}", producer.getTopic());
            throw new JoyQueueException(JoyQueueCode.CN_TRANSACTION_NOT_EXISTS, new Object[0]);
        }
        TransactionId transaction = this.unCompletedTransactionManager.getTransaction(brokerCommit.getTopic(), brokerCommit.getApp(), brokerCommit.getTxId());
        if (transaction == null) {
            logger.debug("The current tx is not in txManager, topic: {}, app: {}, txId: {}", new Object[]{brokerCommit.getTxId(), brokerCommit.getApp(), brokerCommit.getTxId()});
            throw new JoyQueueException(JoyQueueCode.CN_TRANSACTION_NOT_EXISTS, new Object[0]);
        }
        BrokerPrepare brokerPrepare = null;
        HashMap newHashMap = Maps.newHashMap();
        int i = 0;
        try {
            int i2 = 0;
            Iterator readIterator = transactionStore.readIterator(transaction.getStoreId());
            while (readIterator.hasNext()) {
                ByteBuffer byteBuffer = (ByteBuffer) readIterator.next();
                if (i2 == 0) {
                    brokerPrepare = Serializer.readBrokerPrepare(byteBuffer);
                } else {
                    short dispatchPartition = dispatchPartition(byteBuffer, (short) 0);
                    PartitionGroup partitionGroup = this.clusterManager.getPartitionGroup(TopicName.parse(producer.getTopic()), dispatchPartition);
                    if (partitionGroup == null) {
                        throw new JoyQueueException(JoyQueueCode.SE_WRITE_FAILED, new Object[0]);
                    }
                    List list = (List) newHashMap.get(Integer.valueOf(partitionGroup.getGroup()));
                    if (list == null) {
                        list = Lists.newLinkedList();
                        newHashMap.put(Integer.valueOf(partitionGroup.getGroup()), list);
                    }
                    list.add(new WriteRequest(dispatchPartition, byteBuffer, 1));
                    i += byteBuffer.limit();
                }
                i2++;
            }
            try {
                for (Map.Entry entry : newHashMap.entrySet()) {
                    PartitionGroupStore store = this.store.getStore(brokerCommit.getTopic(), ((Integer) entry.getKey()).intValue(), QosLevel.REPLICATION);
                    long now = SystemClock.now();
                    waitFuture(producer, store.asyncWrite((WriteRequest[]) ((List) entry.getValue()).toArray(new WriteRequest[0])));
                    long now2 = SystemClock.now();
                    Iterator it = ((List) entry.getValue()).iterator();
                    while (it.hasNext()) {
                        this.brokerMonitor.onPutMessage(producer.getTopic(), producer.getApp(), ((Integer) entry.getKey()).intValue(), ((WriteRequest) it.next()).getPartition(), 1L, i, now2 - now);
                    }
                }
                this.unCompletedTransactionManager.removeTransaction(transaction);
                return transaction;
            } catch (Exception e) {
                logger.warn("write transaction message exception, topic: {}, app: {}, txId: {}", new Object[]{brokerPrepare.getTopic(), brokerPrepare.getApp(), brokerPrepare.getTxId(), e});
                throw new JoyQueueException(JoyQueueCode.SE_IO_ERROR, new Object[0]);
            }
        } catch (Exception e2) {
            logger.error("write transaction message exception, topic: {}, app: {}, txId: {}", new Object[]{brokerCommit.getTopic(), brokerCommit.getApp(), brokerCommit.getTxId(), e2});
            if (e2 instanceof JoyQueueException) {
                throw e2;
            }
            throw new JoyQueueException(JoyQueueCode.SE_IO_ERROR, new Object[0]);
        }
    }

    protected short dispatchPartition(ByteBuffer byteBuffer, short s) {
        short s2 = MessageParser.getShort(byteBuffer, MessageParser.PARTITION);
        return s2 < 0 ? s : s2;
    }

    protected void waitFuture(Producer producer, Future<WriteResult> future) throws JoyQueueException {
        try {
            Producer.ProducerPolicy tryGetProducerPolicy = this.clusterManager.tryGetProducerPolicy(TopicName.parse(producer.getTopic()), producer.getApp());
            int intValue = tryGetProducerPolicy == null ? 0 : tryGetProducerPolicy.getTimeOut().intValue();
            if (intValue == 0) {
                future.get();
            } else {
                future.get(intValue, TimeUnit.MILLISECONDS);
            }
        } catch (InterruptedException e) {
            throw new JoyQueueException(JoyQueueCode.SE_DISK_FLUSH_SLOW, new Object[0]);
        } catch (ExecutionException | TimeoutException e2) {
            throw new JoyQueueException(JoyQueueCode.SE_WRITE_TIMEOUT, new Object[0]);
        } catch (Exception e3) {
            throw new JoyQueueException(JoyQueueCode.CN_CONNECTION_TIMEOUT, new Object[0]);
        }
    }

    public TransactionId rollback(org.joyqueue.network.session.Producer producer, BrokerRollback brokerRollback) throws JoyQueueException {
        TransactionStore transactionStore = this.store.getTransactionStore(producer.getTopic());
        if (transactionStore == null) {
            logger.error("transaction store not exist, topic: {}", producer.getTopic());
            throw new JoyQueueException(JoyQueueCode.CN_TRANSACTION_NOT_EXISTS, new Object[0]);
        }
        TransactionId transaction = this.unCompletedTransactionManager.getTransaction(brokerRollback.getTopic(), brokerRollback.getApp(), brokerRollback.getTxId());
        if (transaction == null) {
            logger.debug("transaction not exist, topic: {}, id: {}", producer.getTopic(), brokerRollback.getTxId());
            throw new JoyQueueException(JoyQueueCode.CN_TRANSACTION_NOT_EXISTS, new Object[0]);
        }
        transactionStore.remove(transaction.getStoreId());
        this.unCompletedTransactionManager.removeTransaction(transaction);
        return transaction;
    }

    public Future<WriteResult> putMessage(org.joyqueue.network.session.Producer producer, String str, ByteBuffer... byteBufferArr) throws JoyQueueException {
        TransactionStore transactionStore = this.store.getTransactionStore(producer.getTopic());
        if (Strings.isNullOrEmpty(str)) {
            logger.error("The current message is not a tx message!");
            throw new JoyQueueException(JoyQueueCode.CN_UNKNOWN_ERROR, new Object[0]);
        }
        TransactionId transaction = this.unCompletedTransactionManager.getTransaction(producer.getTopic(), producer.getApp(), str);
        if (transaction != null) {
            return transactionStore.asyncWrite(transaction.getStoreId(), byteBufferArr);
        }
        logger.debug("The current tx is not in txManager! txId:{}...", str);
        throw new JoyQueueException(JoyQueueCode.CN_TRANSACTION_NOT_EXISTS, new Object[0]);
    }

    public TransactionId getTransaction(String str, String str2, String str3) {
        return this.unCompletedTransactionManager.getTransaction(str, str2, str3);
    }

    public List<TransactionId> getFeedback(org.joyqueue.network.session.Producer producer, int i) {
        return this.unCompletedTransactionManager.getFeedback(producer, i);
    }

    protected void validate() throws Exception {
        this.unCompletedTransactionManager = new UnCompletedTransactionManager(this.config);
        this.transactionRecover = new TransactionRecover(this.config, this.unCompletedTransactionManager, this.store);
        this.transactionCleaner = new TransactionCleaner(this.config, this.unCompletedTransactionManager, this.store);
    }

    protected void doStart() throws Exception {
        this.transactionRecover.recover();
        this.transactionCleaner.start();
    }

    protected void doStop() {
        if (this.transactionCleaner != null) {
            this.transactionCleaner.stop();
        }
    }
}
