package org.pipservices4.messaging.queues;

import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.pipservices4.components.config.ConfigParams;
import org.pipservices4.components.context.Context;
import org.pipservices4.components.context.IContext;
import org.pipservices4.config.auth.CredentialParams;
import org.pipservices4.config.connect.ConnectionParams;

/* JADX WARN: Classes with same name are omitted:
  input_file:lib/pip-services4-messaging-0.0.1.jar:org/pipservices4/messaging/queues/MemoryMessageQueue.class
  input_file:obj/src/org/pipservices4/messaging/queues/MemoryMessageQueue.class
 */
/* loaded from: input_file:lib/pip-services4-messaging-0.0.1-jar-with-dependencies.jar:org/pipservices4/messaging/queues/MemoryMessageQueue.class */
public class MemoryMessageQueue extends MessageQueue {
    private final List<MessageEnvelope> _messages;
    private int _lockTokenSequence;
    private final Map<Integer, LockedMessage> _lockedMessages;
    private boolean _opened;
    private boolean _cancel;
    private long _listenInterval;

    public MemoryMessageQueue() {
        this(null);
    }

    public MemoryMessageQueue(String str) {
        super(str);
        this._messages = new ArrayList();
        this._lockTokenSequence = 0;
        this._lockedMessages = new HashMap();
        this._opened = false;
        this._cancel = false;
        this._listenInterval = 1000L;
        this._capabilities = new MessagingCapabilities(true, true, true, true, true, true, true, false, true);
    }

    @Override // org.pipservices4.messaging.queues.MessageQueue, org.pipservices4.components.run.IOpenable
    public boolean isOpen() {
        return this._opened;
    }

    @Override // org.pipservices4.messaging.queues.MessageQueue
    public void openWithParams(IContext iContext, ConnectionParams connectionParams, CredentialParams credentialParams) {
        this._logger.trace(iContext, "Opened queue %s", this);
        this._opened = true;
    }

    @Override // org.pipservices4.messaging.queues.MessageQueue, org.pipservices4.components.run.IClosable
    public void close(IContext iContext) {
        synchronized (this._lock) {
            this._cancel = false;
            this._opened = false;
            this._lock.notifyAll();
        }
        this._logger.trace(iContext, "Closed queue %s", this);
    }

    @Override // org.pipservices4.messaging.queues.MessageQueue, org.pipservices4.components.run.ICleanable
    public void clear(IContext iContext) {
        synchronized (this._lock) {
            this._messages.clear();
            this._lockedMessages.clear();
        }
        this._logger.trace(iContext, "Cleared queue %s", this);
    }

    @Override // org.pipservices4.messaging.queues.MessageQueue, org.pipservices4.components.config.IConfigurable
    public void configure(ConfigParams configParams) {
        super.configure(configParams);
        this._listenInterval = configParams.getAsLongWithDefault("listen_interval", this._listenInterval);
        this._listenInterval = configParams.getAsLongWithDefault("options.listen_interval", this._listenInterval);
    }

    @Override // org.pipservices4.messaging.queues.MessageQueue, org.pipservices4.messaging.queues.IMessageQueue
    public int readMessageCount() {
        int size;
        synchronized (this._lock) {
            size = this._messages.size();
        }
        return size;
    }

    @Override // org.pipservices4.messaging.queues.MessageQueue, org.pipservices4.messaging.queues.IMessageQueue
    public void send(IContext iContext, MessageEnvelope messageEnvelope) {
        if (messageEnvelope == null) {
            return;
        }
        synchronized (this._lock) {
            messageEnvelope.setSentTime(ZonedDateTime.now(ZoneOffset.UTC));
            this._messages.add(messageEnvelope);
            this._lock.notify();
        }
        this._counters.incrementOne("queue." + getName() + ".sent_messages");
        this._logger.debug(iContext, "Sent message %s via %s", messageEnvelope, this);
    }

    @Override // org.pipservices4.messaging.queues.MessageQueue, org.pipservices4.messaging.queues.IMessageQueue
    public MessageEnvelope peek(IContext iContext) {
        MessageEnvelope messageEnvelope = null;
        synchronized (this._lock) {
            if (this._messages.size() > 0) {
                messageEnvelope = this._messages.get(0);
            }
        }
        if (messageEnvelope != null) {
            this._logger.trace(iContext, "Peeked message %s on %s", messageEnvelope, this);
        }
        return messageEnvelope;
    }

    @Override // org.pipservices4.messaging.queues.MessageQueue, org.pipservices4.messaging.queues.IMessageQueue
    public List<MessageEnvelope> peekBatch(IContext iContext, int i) {
        ArrayList arrayList = new ArrayList();
        synchronized (this._lock) {
            for (int i2 = 0; i2 < this._messages.size() && i2 < i; i2++) {
                arrayList.add(this._messages.get(i2));
            }
        }
        this._logger.trace(iContext, "Peeked %d messages on %s", Integer.valueOf(arrayList.size()), this);
        return arrayList;
    }

    @Override // org.pipservices4.messaging.queues.MessageQueue, org.pipservices4.messaging.queues.IMessageQueue
    public MessageEnvelope receive(IContext iContext, long j) {
        MessageEnvelope messageEnvelope = null;
        synchronized (this._lock) {
            if (this._messages.size() > 0) {
                messageEnvelope = this._messages.get(0);
                this._messages.remove(0);
            }
        }
        while (0 < j && messageEnvelope == null) {
            synchronized (this._lock) {
                if (messageEnvelope == null) {
                    try {
                        this._lock.wait(100L);
                    } catch (InterruptedException e) {
                        return null;
                    }
                }
                if (messageEnvelope == null && this._messages.size() > 0) {
                    messageEnvelope = this._messages.get(0);
                    this._messages.remove(0);
                }
            }
        }
        if (messageEnvelope == null) {
            return null;
        }
        int i = this._lockTokenSequence;
        this._lockTokenSequence = i + 1;
        messageEnvelope.setReference(Integer.valueOf(i));
        LockedMessage lockedMessage = new LockedMessage();
        lockedMessage.expirationTime = ZonedDateTime.now().plus(j, (TemporalUnit) ChronoUnit.MILLIS);
        lockedMessage.message = messageEnvelope;
        lockedMessage.timeout = Long.valueOf(j);
        this._lockedMessages.put(Integer.valueOf(i), lockedMessage);
        this._counters.incrementOne("queue." + getName() + ".received_messages");
        this._logger.debug(Context.fromTraceId(messageEnvelope.getTraceId()), "Received message %s via %s", messageEnvelope, this);
        return messageEnvelope;
    }

    @Override // org.pipservices4.messaging.queues.MessageQueue, org.pipservices4.messaging.queues.IMessageQueue
    public void renewLock(MessageEnvelope messageEnvelope, long j) {
        if (messageEnvelope == null || messageEnvelope.getReference() == null) {
            return;
        }
        synchronized (this._lock) {
            LockedMessage lockedMessage = this._lockedMessages.get(Integer.valueOf(((Integer) messageEnvelope.getReference()).intValue()));
            if (lockedMessage != null) {
                lockedMessage.expirationTime = ZonedDateTime.now().plus(j, (TemporalUnit) ChronoUnit.MILLIS);
            }
        }
        this._logger.trace(Context.fromTraceId(messageEnvelope.getTraceId()), "Renewed lock for message %s at %s", messageEnvelope, this);
    }

    @Override // org.pipservices4.messaging.queues.MessageQueue, org.pipservices4.messaging.queues.IMessageQueue
    public void abandon(MessageEnvelope messageEnvelope) {
        if (messageEnvelope == null || messageEnvelope.getReference() == null) {
            return;
        }
        synchronized (this._lock) {
            int intValue = ((Integer) messageEnvelope.getReference()).intValue();
            LockedMessage lockedMessage = this._lockedMessages.get(Integer.valueOf(intValue));
            if (lockedMessage != null) {
                this._lockedMessages.remove(Integer.valueOf(intValue));
                messageEnvelope.setReference(null);
                if (lockedMessage.expirationTime.toInstant().toEpochMilli() <= ZonedDateTime.now().toInstant().toEpochMilli()) {
                    return;
                }
                this._logger.trace(Context.fromTraceId(messageEnvelope.getTraceId()), "Abandoned message %s at %s", messageEnvelope, this);
                send(Context.fromTraceId(messageEnvelope.getTraceId()), messageEnvelope);
            }
        }
    }

    @Override // org.pipservices4.messaging.queues.MessageQueue, org.pipservices4.messaging.queues.IMessageQueue
    public void complete(MessageEnvelope messageEnvelope) {
        if (messageEnvelope == null || messageEnvelope.getReference() == null) {
            return;
        }
        synchronized (this._lock) {
            this._lockedMessages.remove(Integer.valueOf(((Integer) messageEnvelope.getReference()).intValue()));
            messageEnvelope.setReference(null);
        }
        this._logger.trace(Context.fromTraceId(messageEnvelope.getTraceId()), "Completed message %s at %s", messageEnvelope, this);
    }

    @Override // org.pipservices4.messaging.queues.MessageQueue, org.pipservices4.messaging.queues.IMessageQueue
    public void moveToDeadLetter(MessageEnvelope messageEnvelope) {
        if (messageEnvelope == null || messageEnvelope.getReference() == null) {
            return;
        }
        synchronized (this._lock) {
            this._lockedMessages.remove(Integer.valueOf(((Integer) messageEnvelope.getReference()).intValue()));
            messageEnvelope.setReference(null);
        }
        this._counters.incrementOne("queue." + getName() + ".dead_messages");
        this._logger.trace(Context.fromTraceId(messageEnvelope.getTraceId()), "Moved to dead message %s at %s", messageEnvelope, this);
    }

    @Override // org.pipservices4.messaging.queues.MessageQueue, org.pipservices4.messaging.queues.IMessageQueue
    public void listen(IContext iContext, IMessageReceiver iMessageReceiver) {
        if (this._cancel) {
            this._logger.error(iContext, "Already listening queue %s", this);
            return;
        }
        this._logger.trace(iContext, "Started listening messages at %s", this);
        this._cancel = true;
        while (this._cancel) {
            MessageEnvelope receive = receive(iContext, this._listenInterval);
            if (this._cancel && receive != null) {
                try {
                    iMessageReceiver.receiveMessage(receive, this);
                } catch (Exception e) {
                    this._logger.error(iContext, e, "Failed to process the message", new Object[0]);
                }
            }
        }
        this._logger.trace(iContext, "Stopped listening messages at %s", this);
    }

    @Override // org.pipservices4.messaging.queues.MessageQueue, org.pipservices4.messaging.queues.IMessageQueue
    public void endListen(IContext iContext) {
        synchronized (this._lock) {
            this._cancel = false;
        }
    }

    @Override // org.pipservices4.messaging.queues.MessageQueue
    public String toString() {
        return "[" + getName() + "]";
    }
}
