package org.pipservices3.messaging.queues;

import java.util.ArrayList;
import java.util.List;
import org.pipservices3.commons.config.ConfigParams;
import org.pipservices3.commons.errors.InvalidStateException;
import org.pipservices3.commons.run.ICleanable;

/* loaded from: input_file:org/pipservices3/messaging/queues/CachedMessageQueue.class */
public abstract class CachedMessageQueue extends MessageQueue implements ICleanable {
    protected boolean _autoSubscribe;
    protected List<MessageEnvelope> _messages;
    protected IMessageReceiver _receiver;

    public CachedMessageQueue(String str, MessagingCapabilities messagingCapabilities) {
        super(str, messagingCapabilities);
    }

    public CachedMessageQueue() {
    }

    @Override // org.pipservices3.messaging.queues.MessageQueue
    public void configure(ConfigParams configParams) {
        super.configure(configParams);
        this._autoSubscribe = configParams.getAsBooleanWithDefault("options.autosubscribe", this._autoSubscribe);
    }

    @Override // org.pipservices3.messaging.queues.MessageQueue
    public void open(String str) {
        if (isOpen()) {
            return;
        }
        try {
            if (this._autoSubscribe) {
                subscribe(str);
            }
            this._logger.debug(str, "Opened queue " + getName(), new Object[0]);
        } catch (Exception e) {
            close(str);
        }
    }

    @Override // org.pipservices3.messaging.queues.MessageQueue
    public void close(String str) {
        if (isOpen()) {
            try {
                unsubscribe(str);
                synchronized (this._lock) {
                    this._messages = new ArrayList();
                    this._receiver = null;
                }
            } catch (Throwable th) {
                synchronized (this._lock) {
                    this._messages = new ArrayList();
                    this._receiver = null;
                    throw th;
                }
            }
        }
    }

    protected abstract void subscribe(String str);

    protected abstract void unsubscribe(String str);

    @Override // org.pipservices3.messaging.queues.MessageQueue
    public void clear(String str) {
        synchronized (this._lock) {
            this._messages = new ArrayList();
        }
    }

    @Override // org.pipservices3.messaging.queues.MessageQueue, org.pipservices3.messaging.queues.IMessageQueue
    public int readMessageCount() {
        int size;
        synchronized (this._lock) {
            size = this._messages.size();
        }
        return size;
    }

    @Override // org.pipservices3.messaging.queues.MessageQueue, org.pipservices3.messaging.queues.IMessageQueue
    public MessageEnvelope peek(String str) throws InvalidStateException {
        checkOpen(str);
        subscribe(str);
        MessageEnvelope messageEnvelope = null;
        synchronized (this._lock) {
            if (this._messages.size() > 0) {
                messageEnvelope = this._messages.get(0);
            }
        }
        if (messageEnvelope != null) {
            this._logger.trace(messageEnvelope.getCorrelationId(), "Peeked message %s on %s", new Object[]{messageEnvelope, getName()});
        }
        return messageEnvelope;
    }

    @Override // org.pipservices3.messaging.queues.MessageQueue, org.pipservices3.messaging.queues.IMessageQueue
    public List<MessageEnvelope> peekBatch(String str, int i) throws InvalidStateException {
        List<MessageEnvelope> subList;
        checkOpen(str);
        subscribe(str);
        synchronized (this._lock) {
            subList = this._messages.subList(0, i);
        }
        this._logger.trace(str, "Peeked %d messages on %s", new Object[]{Integer.valueOf(subList.size()), getName()});
        return subList;
    }

    @Override // org.pipservices3.messaging.queues.MessageQueue, org.pipservices3.messaging.queues.IMessageQueue
    public MessageEnvelope receive(String str, long j) throws InvalidStateException {
        MessageEnvelope messageEnvelope;
        checkOpen(str);
        subscribe(str);
        int i = 0;
        synchronized (this._lock) {
            messageEnvelope = this._messages.get(0);
            this._messages.remove(0);
        }
        while (i < j && messageEnvelope == null) {
            synchronized (this._lock) {
                try {
                    this._lock.wait(100);
                    i += 100;
                    messageEnvelope = this._messages.get(0);
                    this._messages.remove(0);
                } catch (InterruptedException e) {
                    return null;
                }
            }
        }
        return messageEnvelope;
    }

    protected void sendMessageToReceiver(IMessageReceiver iMessageReceiver, MessageEnvelope messageEnvelope) {
        String correlationId = messageEnvelope != null ? messageEnvelope.getCorrelationId() : null;
        if (messageEnvelope == null || iMessageReceiver == null) {
            this._logger.warn(correlationId, "Message was skipped.", new Object[0]);
            return;
        }
        try {
            this._receiver.receiveMessage(messageEnvelope, this);
        } catch (Exception e) {
            this._logger.error(correlationId, e, "Failed to process the message", new Object[0]);
        }
    }

    @Override // org.pipservices3.messaging.queues.MessageQueue, org.pipservices3.messaging.queues.IMessageQueue
    public void listen(String str, IMessageReceiver iMessageReceiver) {
        if (isOpen()) {
            subscribe(str);
            this._logger.trace((String) null, "Started listening messages at %s", new Object[]{getName()});
            while (isOpen() && this._messages.size() > 0) {
                synchronized (this._lock) {
                    MessageEnvelope messageEnvelope = this._messages.get(0);
                    this._messages.remove(0);
                    if (messageEnvelope != null) {
                        sendMessageToReceiver(iMessageReceiver, messageEnvelope);
                    }
                }
            }
            if (isOpen()) {
                this._receiver = iMessageReceiver;
            }
        }
    }

    @Override // org.pipservices3.messaging.queues.MessageQueue, org.pipservices3.messaging.queues.IMessageQueue
    public void endListen(String str) {
        this._receiver = null;
    }
}
