package org.restcomm.protocols.ss7.sccp.impl;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.restcomm.protocols.ss7.sccp.impl.SccpStackImpl;
import org.restcomm.protocols.ss7.sccp.impl.message.MessageUtil;
import org.restcomm.protocols.ss7.sccp.impl.message.SccpConnSegmentableMessageImpl;
import org.restcomm.protocols.ss7.sccp.message.SccpConnMessage;
import org.restcomm.protocols.ss7.sccp.parameter.LocalReference;
import org.restcomm.protocols.ss7.sccp.parameter.ProtocolClass;
import org.restcomm.protocols.ss7.sccp.parameter.RefusalCause;
import org.restcomm.protocols.ss7.sccp.parameter.ReleaseCause;
import org.restcomm.protocols.ss7.sccp.parameter.ResetCause;
import org.restcomm.protocols.ss7.scheduler.Scheduler;
import org.restcomm.protocols.ss7.scheduler.Task;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/restcomm/protocols/ss7/sccp/impl/SccpConnectionWithTransmitQueueImpl.class */
public abstract class SccpConnectionWithTransmitQueueImpl extends SccpConnectionBaseImpl {
    private static final int SLEEP_DELAY = 15;
    private static final int OUTGOING_SIZE_LIMIT = 10000;
    private MessageSender messageSender;
    private final ConcurrentLinkedQueue<SccpConnMessage> outgoing;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/restcomm/protocols/ss7/sccp/impl/SccpConnectionWithTransmitQueueImpl$MessageSender.class */
    public class MessageSender extends Task {
        public MessageSender(Scheduler scheduler) {
            super(scheduler);
        }

        public int getQueueNumber() {
            return Scheduler.L4WRITE_QUEUE.intValue();
        }

        public void submit() {
            this.scheduler.submit(this, Integer.valueOf(getQueueNumber()));
        }

        public long perform() {
            while (!SccpConnectionWithTransmitQueueImpl.this.outgoing.isEmpty() && SccpConnectionWithTransmitQueueImpl.this.isCanSendData()) {
                SccpConnMessage sccpConnMessage = (SccpConnMessage) SccpConnectionWithTransmitQueueImpl.this.outgoing.poll();
                if (SccpConnectionWithTransmitQueueImpl.this.logger.isDebugEnabled()) {
                    SccpConnectionWithTransmitQueueImpl.this.logger.debug("Polling another message from queue: " + sccpConnMessage.toString());
                }
                try {
                    SccpConnectionWithTransmitQueueImpl.super.sendMessage(sccpConnMessage);
                } catch (Exception e) {
                    SccpConnectionWithTransmitQueueImpl.this.logger.error("IOException when sending the message: " + e.getMessage(), e);
                    throw new RuntimeException(e);
                }
            }
            if (SccpConnectionWithTransmitQueueImpl.this.outgoing.isEmpty()) {
                return 0L;
            }
            SccpConnectionWithTransmitQueueImpl.this.logger.debug("Queue not empty, retrying");
            try {
                Thread.sleep(15L);
                submit();
                return 0L;
            } catch (InterruptedException e2) {
                SccpConnectionWithTransmitQueueImpl.this.logger.error(e2);
                throw new RuntimeException(e2);
            }
        }
    }

    public SccpConnectionWithTransmitQueueImpl(int i, int i2, LocalReference localReference, ProtocolClass protocolClass, SccpStackImpl sccpStackImpl, SccpRoutingControl sccpRoutingControl) {
        super(i, i2, localReference, protocolClass, sccpStackImpl, sccpRoutingControl);
        this.outgoing = new ConcurrentLinkedQueue<>();
        this.messageSender = new MessageSender(this.stack.scheduler);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.restcomm.protocols.ss7.sccp.impl.SccpConnectionBaseImpl
    public void sendMessage(SccpConnMessage sccpConnMessage) throws Exception {
        if (this.stack.state != SccpStackImpl.State.RUNNING) {
            this.logger.error("Trying to send SCCP message from SCCP user but SCCP stack is not RUNNING");
            return;
        }
        if (!(sccpConnMessage instanceof SccpConnSegmentableMessageImpl)) {
            super.sendMessage(sccpConnMessage);
            return;
        }
        if (MessageUtil.getDln(sccpConnMessage) == null) {
            this.logger.error(String.format("Message doesn't have DLN set: ", sccpConnMessage));
            throw new IllegalStateException();
        }
        if (this.outgoing.size() > OUTGOING_SIZE_LIMIT) {
            this.logger.error(String.format("Outgoing messages queue overloaded, already reached the limit %d", Integer.valueOf(OUTGOING_SIZE_LIMIT)));
            throw new IllegalStateException(String.format("Outgoing messages queue overloaded, already reached the limit %d", Integer.valueOf(OUTGOING_SIZE_LIMIT)));
        }
        this.outgoing.add(sccpConnMessage);
        this.messageSender.submit();
    }

    @Override // org.restcomm.protocols.ss7.sccp.impl.SccpConnectionBaseImpl
    public void reset(ResetCause resetCause) throws Exception {
        super.reset(resetCause);
        clearTransmitQueue();
    }

    @Override // org.restcomm.protocols.ss7.sccp.impl.SccpConnectionBaseImpl
    public void disconnect(ReleaseCause releaseCause, byte[] bArr) throws Exception {
        super.disconnect(releaseCause, bArr);
        clearTransmitQueue();
    }

    @Override // org.restcomm.protocols.ss7.sccp.impl.SccpConnectionBaseImpl
    public void refuse(RefusalCause refusalCause, byte[] bArr) throws Exception {
        super.refuse(refusalCause, bArr);
        clearTransmitQueue();
    }

    protected int getTransmitQueueSize() {
        return this.outgoing.size();
    }

    private void clearTransmitQueue() {
        this.outgoing.clear();
    }
}
