package com.swiftmq.amqp.v100.client;

import com.swiftmq.amqp.v100.client.po.POFillCache;
import com.swiftmq.amqp.v100.client.po.POSendDisposition;
import com.swiftmq.amqp.v100.generated.messaging.delivery_state.DeliveryStateIF;
import com.swiftmq.amqp.v100.generated.transactions.coordination.TxnIdIF;
import com.swiftmq.amqp.v100.generated.transport.performatives.TransferFrame;
import com.swiftmq.amqp.v100.messaging.AMQPMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:com/swiftmq/amqp/v100/client/Consumer.class */
public class Consumer extends Link {
    private static final int DEFAULT_LINKCREDIT = 500;
    String source;
    List cache;
    Lock cacheLock;
    Condition cacheEmpty;
    volatile int linkCredit;
    int currentLinkCredit;
    AtomicLong deliveryCount;
    boolean acquireMode;
    volatile TxnIdIF currentTx;
    boolean firstFillCache;
    TransferFrame currentMessage;
    MessageAvailabilityListener messageAvailabilityListener;

    /* JADX INFO: Access modifiers changed from: protected */
    public Consumer(Session session, String str, String str2, int i, int i2, DeliveryMemory deliveryMemory) {
        super(session, str2, i2, deliveryMemory);
        this.cache = new ArrayList();
        this.cacheLock = new ReentrantLock();
        this.cacheEmpty = null;
        this.linkCredit = 0;
        this.currentLinkCredit = 0;
        this.deliveryCount = null;
        this.acquireMode = false;
        this.currentTx = null;
        this.firstFillCache = true;
        this.currentMessage = null;
        this.messageAvailabilityListener = null;
        this.source = str;
        this.linkCredit = i;
        this.cacheEmpty = this.cacheLock.newCondition();
        fillCache(-1L);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Consumer(Session session, String str, String str2, int i, DeliveryMemory deliveryMemory) {
        super(session, str2, i, deliveryMemory);
        this.cache = new ArrayList();
        this.cacheLock = new ReentrantLock();
        this.cacheEmpty = null;
        this.linkCredit = 0;
        this.currentLinkCredit = 0;
        this.deliveryCount = null;
        this.acquireMode = false;
        this.currentTx = null;
        this.firstFillCache = true;
        this.currentMessage = null;
        this.messageAvailabilityListener = null;
        this.source = str;
        this.linkCredit = 0;
        this.acquireMode = true;
        this.cacheEmpty = this.cacheLock.newCondition();
    }

    public String getSource() {
        return this.source;
    }

    public int getLinkCredit() {
        return this.linkCredit;
    }

    public void setLinkCredit(int i) {
        this.linkCredit = i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TransferFrame getCurrentMessage() {
        return this.currentMessage;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setCurrentMessage(TransferFrame transferFrame) {
        this.currentMessage = transferFrame;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.swiftmq.amqp.v100.client.Link
    public void setDeliveryCount(long j) {
        this.deliveryCount = new AtomicLong(j);
    }

    protected void fillCache(long j) {
        try {
            this.cacheLock.lock();
            if (this.linkCredit == 0) {
                this.linkCredit = 500;
            }
            this.currentLinkCredit = this.linkCredit;
            this.mySession.dispatch(new POFillCache(this, this.linkCredit, j, this.currentTx));
            this.firstFillCache = false;
        } finally {
            this.cacheLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addToCache(AMQPMessage aMQPMessage) {
        try {
            this.cacheLock.lock();
            this.cache.add(aMQPMessage);
            if (this.cache.size() == 1) {
                if (this.messageAvailabilityListener != null) {
                    this.messageAvailabilityListener.messageAvailable(this);
                    this.messageAvailabilityListener = null;
                }
                this.cacheEmpty.signal();
            }
        } finally {
            this.cacheLock.unlock();
        }
    }

    public void sendDisposition(AMQPMessage aMQPMessage, DeliveryStateIF deliveryStateIF) {
        this.mySession.dispatch(new POSendDisposition(this, aMQPMessage.getDeliveryId(), aMQPMessage.getDeliveryTag(), deliveryStateIF));
    }

    public void acquire(int i, TxnIdIF txnIdIF) throws LinkClosedException {
        verifyState();
        this.linkCredit = i;
        this.currentTx = txnIdIF;
        fillCache(this.firstFillCache ? -1L : this.deliveryCount.get());
    }

    public AMQPMessage receive(long j) {
        return receive(j, null);
    }

    private AMQPMessage receive(long j, MessageAvailabilityListener messageAvailabilityListener) {
        if (this.closed) {
            return null;
        }
        try {
            this.cacheLock.lock();
            if (this.cache.size() == 0) {
                if (j > 0) {
                    try {
                        this.cacheEmpty.await(j, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                    }
                } else if (j == 0) {
                    this.cacheEmpty.awaitUninterruptibly();
                } else if (j == -1) {
                    this.messageAvailabilityListener = messageAvailabilityListener;
                }
            }
            if (this.cache.size() == 0) {
                return null;
            }
            AMQPMessage aMQPMessage = (AMQPMessage) this.cache.remove(0);
            aMQPMessage.setConsumer(this);
            long incrementAndGet = this.deliveryCount.incrementAndGet();
            this.currentLinkCredit--;
            if (!this.acquireMode && this.currentLinkCredit == 0) {
                fillCache(incrementAndGet);
            }
            this.cacheLock.unlock();
            return aMQPMessage;
        } finally {
            this.cacheLock.unlock();
        }
    }

    public AMQPMessage receive() {
        return receive(0L);
    }

    public AMQPMessage receiveNoWait() {
        return receive(-1L);
    }

    public AMQPMessage receiveNoWait(MessageAvailabilityListener messageAvailabilityListener) {
        return receive(-1L, messageAvailabilityListener);
    }

    @Override // com.swiftmq.amqp.v100.client.Link
    public void close() throws AMQPException {
        try {
            this.cacheLock.lock();
            this.cacheEmpty.signal();
            super.close();
        } finally {
            this.cacheLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.swiftmq.amqp.v100.client.Link
    public void cancel() {
        super.cancel();
        try {
            this.cacheLock.lock();
            this.cacheEmpty.signal();
        } finally {
            this.cacheLock.unlock();
        }
    }
}
