package org.usergrid.mq.cassandra.io;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.query.SliceQuery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.usergrid.locking.Lock;
import org.usergrid.locking.LockManager;
import org.usergrid.locking.exception.UGLockException;
import org.usergrid.mq.Message;
import org.usergrid.mq.QueueQuery;
import org.usergrid.mq.QueueResults;
import org.usergrid.mq.cassandra.CassandraMQUtils;
import org.usergrid.mq.cassandra.QueuesCF;
import org.usergrid.mq.cassandra.io.AbstractSearch;
import org.usergrid.mq.cassandra.io.NoTransactionSearch;
import org.usergrid.persistence.cassandra.CassandraService;
import org.usergrid.persistence.exceptions.QueueException;
import org.usergrid.persistence.exceptions.TransactionNotFoundException;
import org.usergrid.utils.UUIDUtils;

/* loaded from: input_file:org/usergrid/mq/cassandra/io/ConsumerTransaction.class */
public class ConsumerTransaction extends NoTransactionSearch {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerTransaction.class);
    private static final int MAX_READ = 10000;
    private LockManager lockManager;
    private UUID applicationId;
    protected CassandraService cass;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/usergrid/mq/cassandra/io/ConsumerTransaction$TransactionPointer.class */
    public static class TransactionPointer {
        private UUID expiration;
        private UUID targetMessage;

        public TransactionPointer(UUID uuid, UUID uuid2) {
            this.expiration = uuid;
            this.targetMessage = uuid2;
        }

        public String toString() {
            return "TransactionPointer [expiration=" + this.expiration + ", targetMessage=" + this.targetMessage + "]";
        }
    }

    public ConsumerTransaction(UUID uuid, Keyspace keyspace, LockManager lockManager, CassandraService cassandraService) {
        super(keyspace);
        this.applicationId = uuid;
        this.lockManager = lockManager;
        this.cass = cassandraService;
    }

    public UUID renewTransaction(String str, UUID uuid, QueueQuery queueQuery) throws TransactionNotFoundException {
        long currentTimeMillis = System.currentTimeMillis();
        if (queueQuery == null) {
            queueQuery = new QueueQuery();
        }
        UUID queueId = CassandraMQUtils.getQueueId(str);
        UUID consumerId = CassandraMQUtils.getConsumerId(queueId, queueQuery);
        ByteBuffer queueClientTransactionKey = CassandraMQUtils.getQueueClientTransactionKey(queueId, consumerId);
        SliceQuery createSliceQuery = HFactory.createSliceQuery(this.ko, be, ue, ue);
        createSliceQuery.setColumnFamily(QueuesCF.CONSUMER_QUEUE_TIMEOUTS.getColumnFamily());
        createSliceQuery.setKey(queueClientTransactionKey);
        createSliceQuery.setColumnNames(new UUID[]{uuid});
        HColumn columnByName = ((ColumnSlice) createSliceQuery.execute().get()).getColumnByName(uuid);
        if (columnByName == null) {
            throw new TransactionNotFoundException(String.format("No transaction with id %s exists", uuid));
        }
        UUID uuid2 = (UUID) columnByName.getName();
        UUID uuid3 = (UUID) columnByName.getValue();
        UUID newTimeUUID = UUIDUtils.newTimeUUID(currentTimeMillis + queueQuery.getTimeout());
        logger.debug("Writing new timeout at '{}' for message '{}'", newTimeUUID, uuid3);
        Mutator createMutator = HFactory.createMutator(this.ko, be);
        createMutator.addInsertion(queueClientTransactionKey, QueuesCF.CONSUMER_QUEUE_TIMEOUTS.getColumnFamily(), HFactory.createColumn(newTimeUUID, uuid3, this.cass.createTimestamp(), ue, ue));
        createMutator.execute();
        deleteTransaction(queueId, consumerId, uuid2);
        return newTimeUUID;
    }

    public void deleteTransaction(String str, UUID uuid, QueueQuery queueQuery) {
        if (queueQuery == null) {
            queueQuery = new QueueQuery();
        }
        UUID queueId = CassandraMQUtils.getQueueId(str);
        deleteTransaction(queueId, CassandraMQUtils.getConsumerId(queueId, queueQuery), uuid);
    }

    private void deleteTransaction(UUID uuid, UUID uuid2, UUID uuid3) {
        Mutator createMutator = HFactory.createMutator(this.ko, be);
        createMutator.addDeletion(CassandraMQUtils.getQueueClientTransactionKey(uuid, uuid2), QueuesCF.CONSUMER_QUEUE_TIMEOUTS.getColumnFamily(), uuid3, ue, this.cass.createTimestamp());
        createMutator.execute();
    }

    @Override // org.usergrid.mq.cassandra.io.NoTransactionSearch, org.usergrid.mq.cassandra.io.QueueSearch
    public QueueResults getResults(String str, QueueQuery queueQuery) {
        UUID queueId = CassandraMQUtils.getQueueId(str);
        UUID consumerId = CassandraMQUtils.getConsumerId(queueId, queueQuery);
        if (queueQuery.getLimit() > 10000) {
            throw new IllegalArgumentException(String.format("You specified a size of %d, you cannot specify a size larger than %d when using transations", Integer.valueOf(queueQuery.getLimit(1)), 10000));
        }
        Lock createLock = this.lockManager.createLock(this.applicationId, queueId.toString(), consumerId.toString());
        try {
            try {
                createLock.lock();
                long currentTimeMillis = System.currentTimeMillis();
                UUID newTimeUUID = UUIDUtils.newTimeUUID(currentTimeMillis, 0);
                AbstractSearch.QueueBounds queueBounds = getQueueBounds(queueId);
                if (queueBounds == null) {
                    QueueResults createResults = createResults(new ArrayList(0), str, queueId, consumerId);
                    try {
                        createLock.unlock();
                        return createResults;
                    } catch (UGLockException e) {
                        logger.error("Unable to release lock", e);
                        throw new QueueException("Unable to release lock", e);
                    }
                }
                AbstractSearch.QueueBounds queueBounds2 = new AbstractSearch.QueueBounds(queueBounds.getOldest(), newTimeUUID);
                NoTransactionSearch.SearchParam params = getParams(queueId, consumerId, queueQuery);
                List<UUID> queueRange = getQueueRange(queueId, queueBounds2, params);
                List<TransactionPointer> consumerIds = getConsumerIds(queueId, consumerId, params, newTimeUUID);
                int i = -1;
                for (int i2 = 0; i2 < consumerIds.size(); i2++) {
                    TransactionPointer transactionPointer = consumerIds.get(i2);
                    int binarySearch = Collections.binarySearch(queueRange, transactionPointer.expiration);
                    if (binarySearch <= (params.limit * (-1)) - 1) {
                        break;
                    }
                    queueRange.add((binarySearch + 1) * (-1), transactionPointer.targetMessage);
                    i = i2;
                }
                if (queueRange.size() > params.limit) {
                    queueRange = queueRange.subList(0, params.limit);
                }
                List<Message> loadMessages = loadMessages(queueRange, params.reversed);
                writeTransactions(loadMessages, queueQuery.getTimeout() + currentTimeMillis, queueId, consumerId);
                deleteTransactionPointers(consumerIds, i + 1, queueId, consumerId);
                QueueResults createResults2 = createResults(loadMessages, str, queueId, consumerId);
                writeClientPointer(queueId, consumerId, UUIDUtils.max(i == -1 ? null : consumerIds.get(i).expiration, loadMessages.size() == 0 ? null : loadMessages.get(loadMessages.size() - 1).getUuid()));
                try {
                    createLock.unlock();
                    return createResults2;
                } catch (UGLockException e2) {
                    logger.error("Unable to release lock", e2);
                    throw new QueueException("Unable to release lock", e2);
                }
            } catch (UGLockException e3) {
                logger.error("Unable to acquire lock", e3);
                throw new QueueException("Unable to acquire lock", e3);
            }
        } catch (Throwable th) {
            try {
                createLock.unlock();
                throw th;
            } catch (UGLockException e4) {
                logger.error("Unable to release lock", e4);
                throw new QueueException("Unable to release lock", e4);
            }
        }
    }

    protected List<TransactionPointer> getConsumerIds(UUID uuid, UUID uuid2, NoTransactionSearch.SearchParam searchParam, UUID uuid3) {
        SliceQuery createSliceQuery = HFactory.createSliceQuery(this.ko, be, ue, ue);
        createSliceQuery.setColumnFamily(QueuesCF.CONSUMER_QUEUE_TIMEOUTS.getColumnFamily());
        createSliceQuery.setKey(CassandraMQUtils.getQueueClientTransactionKey(uuid, uuid2));
        createSliceQuery.setRange(searchParam.startId, uuid3, false, searchParam.limit + 1);
        List<HColumn> columns = ((ColumnSlice) createSliceQuery.execute().get()).getColumns();
        ArrayList arrayList = new ArrayList(searchParam.limit);
        for (HColumn hColumn : columns) {
            if (logger.isDebugEnabled()) {
                logger.debug("Adding uuid '{}' for original message '{}' to results for queue '{}' and consumer '{}'", new Object[]{hColumn.getName(), hColumn.getValue(), uuid, uuid2});
                logger.debug("Max timeuuid : '{}', Current timeuuid : '{}', comparison '{}'", new Object[]{uuid3, hColumn.getName(), Integer.valueOf(UUIDUtils.compare(uuid3, (UUID) hColumn.getName()))});
            }
            arrayList.add(new TransactionPointer((UUID) hColumn.getName(), (UUID) hColumn.getValue()));
        }
        return arrayList;
    }

    protected void deleteTransactionPointers(List<TransactionPointer> list, int i, UUID uuid, UUID uuid2) {
        Mutator createMutator = HFactory.createMutator(this.ko, be);
        ByteBuffer queueClientTransactionKey = CassandraMQUtils.getQueueClientTransactionKey(uuid, uuid2);
        for (int i2 = 0; i2 < i && i2 < list.size(); i2++) {
            UUID uuid3 = list.get(i2).expiration;
            if (logger.isDebugEnabled()) {
                logger.debug("Removing transaction pointer '{}' for queue '{}' and consumer '{}'", new Object[]{uuid3, uuid, uuid2});
            }
            createMutator.addDeletion(queueClientTransactionKey, QueuesCF.CONSUMER_QUEUE_TIMEOUTS.getColumnFamily(), uuid3, ue, this.cass.createTimestamp());
        }
        createMutator.execute();
    }

    protected void writeTransactions(List<Message> list, long j, UUID uuid, UUID uuid2) {
        Mutator createMutator = HFactory.createMutator(this.ko, be);
        ByteBuffer queueClientTransactionKey = CassandraMQUtils.getQueueClientTransactionKey(uuid, uuid2);
        int i = 0;
        long createTimestamp = this.cass.createTimestamp();
        for (Message message : list) {
            UUID newTimeUUID = UUIDUtils.newTimeUUID(j, i);
            UUID uuid3 = message.getUuid();
            logger.debug("Writing new timeout at '{}' for message '{}'", newTimeUUID, uuid3);
            createMutator.addInsertion(queueClientTransactionKey, QueuesCF.CONSUMER_QUEUE_TIMEOUTS.getColumnFamily(), HFactory.createColumn(newTimeUUID, uuid3, createTimestamp, ue, ue));
            message.setTransaction(newTimeUUID);
            i++;
        }
        createMutator.execute();
    }
}
