package org.zeromq.jms.protocol;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.zeromq.ZMQ;
import org.zeromq.jms.ZmqException;
import org.zeromq.jms.ZmqMessage;
import org.zeromq.jms.protocol.ZmqGateway;
import org.zeromq.jms.protocol.ZmqSocketSession;
import org.zeromq.jms.protocol.event.ZmqEventHandler;
import org.zeromq.jms.protocol.filter.ZmqFilterPolicy;
import org.zeromq.jms.protocol.redelivery.ZmqRedeliveryPolicy;
import org.zeromq.jms.protocol.store.ZmqFileJounralStore;
import org.zeromq.jms.protocol.store.ZmqJournalEntry;
import org.zeromq.jms.protocol.store.ZmqJournalStore;
import org.zeromq.jms.selector.ZmqMessageSelector;
import org.zeromq.jms.util.Stopwatch;

/* loaded from: input_file:org/zeromq/jms/protocol/AbstractZmqGateway.class */
public abstract class AbstractZmqGateway implements ZmqGateway {
    private static final Logger LOGGER = Logger.getLogger(AbstractZmqGateway.class.getCanonicalName());
    private static final int HEARTBEAT_RATE_MILLI_SECOND = 1000;
    private static final int AUTO_PAUSE_IDLE_MILLI_SECOND = 3000;
    private static final int SOCKET_STATUS_TIMEOUT_MILLI_SECOND = 5000;
    private static final int SOCKET_WAIT_MILLI_SECOND = 500;
    private static final int SOCKET_METRIC_BUCKET_COUNT = 360;
    private static final int SOCKET_METRIC_BUCKET_INTERVAL_MILLI_SECOND = 10000;
    private static final int LISTENER_THREAD_POOL = 1;
    private static final int LISTENER_WAIT_MILLI_SECOND = 500;
    private final String name;
    private final ZmqSocketType type;
    private final boolean bound;
    private final String addr;
    private final int flags;
    private ZMQ.Context context;
    private ZMQ.Context proxyContext;
    private final boolean transacted;
    private final boolean acknowledge;
    private final boolean heartbeat;
    private final ZmqGateway.Direction direction;
    private final ZmqSocketContext socketContext;
    private final ZmqRedeliveryPolicy redelivery;
    private final ZmqFilterPolicy filterPolicy;
    private final ZmqEventHandler eventHandler;
    private final ZmqMessageSelector messageSelector;
    private final ZmqJournalStore journalStore;
    private ZmqGatewayListener listener;
    private AtomicBoolean active = new AtomicBoolean(false);
    private ZmqProxySession proxySession = null;
    private ExecutorService socketExecutor = null;
    private ExecutorService listenerExecutor = null;
    private ExecutorService proxyExecutor = null;
    private final TransferQueue<ZmqSendEvent> incomingQueue = new LinkedTransferQueue();
    private final Queue<ZmqSendEvent> incomingSnapshot = new LinkedList();
    private final TransferQueue<ZmqSendEvent> outgoingQueue = new LinkedTransferQueue();
    private final Queue<ZmqSendEvent> outgoingSnapshot = new LinkedList();
    private final Date startDateTime = new Date();
    private final List<ZmqSocketMetrics> metrics = Collections.synchronizedList(new LinkedList());
    private final Map<String, ZmqSocketSession> socketSessions = Collections.synchronizedMap(new HashMap());

    /* loaded from: input_file:org/zeromq/jms/protocol/AbstractZmqGateway$ListenerThread.class */
    private class ListenerThread implements Runnable {
        private ListenerThread() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (AbstractZmqGateway.this.active.get() && AbstractZmqGateway.this.listener != null) {
                try {
                    ZmqMessage receive = AbstractZmqGateway.this.receive(500);
                    if (receive != null) {
                        AbstractZmqGateway.this.listener.onMessage(receive);
                    }
                } catch (ZmqException e) {
                    AbstractZmqGateway.this.listener.onException(e);
                }
            }
        }
    }

    public AbstractZmqGateway(String str, ZmqSocketContext zmqSocketContext, ZmqFilterPolicy zmqFilterPolicy, ZmqEventHandler zmqEventHandler, ZmqGatewayListener zmqGatewayListener, ZmqJournalStore zmqJournalStore, ZmqMessageSelector zmqMessageSelector, ZmqRedeliveryPolicy zmqRedeliveryPolicy, boolean z, boolean z2, boolean z3, ZmqGateway.Direction direction) {
        this.listener = null;
        this.name = str;
        this.type = zmqSocketContext.getType();
        this.socketContext = new ZmqSocketContext(zmqSocketContext);
        this.bound = zmqSocketContext.isBindFlag().booleanValue();
        this.addr = zmqSocketContext.getAddr();
        this.flags = zmqSocketContext.getRecieveMsgFlag().intValue();
        this.filterPolicy = zmqFilterPolicy;
        this.eventHandler = zmqEventHandler;
        this.listener = zmqGatewayListener;
        this.journalStore = zmqJournalStore;
        this.messageSelector = zmqMessageSelector;
        this.redelivery = zmqRedeliveryPolicy;
        this.transacted = z;
        this.acknowledge = z2;
        this.heartbeat = z3;
        this.direction = direction;
    }

    protected String[] getSocketAddrs() {
        return this.addr.split(",");
    }

    protected boolean waitOnStatus(long j, EnumSet<ZmqSocketStatus> enumSet) {
        boolean z;
        Stopwatch stopwatch = new Stopwatch();
        long j2 = 500;
        if (j < 0) {
            j2 = 5000;
        } else if (j < 500) {
            j2 = j / 2;
        }
        do {
            z = LISTENER_THREAD_POOL;
            Iterator<ZmqSocketSession> it = this.socketSessions.values().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                if (!enumSet.contains(it.next().getStatus())) {
                    z = false;
                    break;
                }
            }
            if (z) {
                break;
            }
            stopwatch.sleep(j2);
        } while (stopwatch.before(j));
        return z;
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public void open(int i) {
        String[] subscirbeTags;
        if (this.active.get()) {
            return;
        }
        this.context = ZMQ.context(this.socketContext.getIOThreads());
        this.active.set(true);
        if (this.journalStore != null) {
            try {
                this.journalStore.open();
            } catch (ZmqException e) {
                LOGGER.log(Level.SEVERE, "Unable to journal store: " + this.journalStore, (Throwable) e);
                return;
            }
        }
        this.listenerExecutor = Executors.newFixedThreadPool(LISTENER_THREAD_POOL);
        if (this.listener != null) {
            this.listenerExecutor.execute(new ListenerThread());
        }
        String[] socketAddrs = getSocketAddrs();
        this.socketExecutor = Executors.newFixedThreadPool(socketAddrs.length);
        this.proxyExecutor = this.socketContext.isProxy() ? Executors.newFixedThreadPool(LISTENER_THREAD_POOL) : null;
        boolean z = this.direction == ZmqGateway.Direction.OUTGOING || this.heartbeat || this.acknowledge;
        boolean z2 = this.direction == ZmqGateway.Direction.INCOMING || this.heartbeat || this.acknowledge;
        int length = socketAddrs.length;
        for (int i2 = 0; i2 < length; i2 += LISTENER_THREAD_POOL) {
            String str = socketAddrs[i2];
            ZMQ.Socket socket = getSocket(this.context, this.socketContext);
            if (this.type == ZmqSocketType.SUB && this.filterPolicy != null && (subscirbeTags = this.filterPolicy.getSubscirbeTags()) != null) {
                int length2 = subscirbeTags.length;
                for (int i3 = 0; i3 < length2; i3 += LISTENER_THREAD_POOL) {
                    socket.subscribe(subscirbeTags[i3].getBytes());
                }
            }
            ZmqSocketSession zmqSocketSession = this.socketSessions.get(this.addr);
            ZmqSocketMetrics metrics = zmqSocketSession != null ? zmqSocketSession.getMetrics() : null;
            if (metrics == null) {
                metrics = new ZmqSocketMetrics(str, SOCKET_METRIC_BUCKET_COUNT, SOCKET_METRIC_BUCKET_INTERVAL_MILLI_SECOND, z, z2);
                this.metrics.add(metrics);
            }
            ZmqSocketSession zmqSocketSession2 = new ZmqSocketSession(this.name, this.active, socket, this.type, str, this.bound, z2, z, this.flags, 500, this.heartbeat, this.acknowledge, getSocketListener(str, z2, z), this.filterPolicy, this.eventHandler, metrics);
            this.socketSessions.put(str, zmqSocketSession2);
            this.socketExecutor.execute(zmqSocketSession2);
            if (zmqSocketSession2.isBound()) {
                ZmqSocketStatus status = zmqSocketSession2.getStatus();
                while (true) {
                    ZmqSocketStatus zmqSocketStatus = status;
                    if (zmqSocketStatus == ZmqSocketStatus.STOPPED || zmqSocketStatus == ZmqSocketStatus.PENDING) {
                        try {
                            Thread.sleep(500L);
                        } catch (InterruptedException e2) {
                            LOGGER.warning("Binding sleep interrupted: " + this);
                        }
                        status = zmqSocketSession2.getStatus();
                    }
                }
            }
        }
        if (this.socketContext.isProxy()) {
            this.proxyContext = ZMQ.context(this.socketContext.getIOThreads());
            String str2 = "proxy(" + this.name + ")";
            String proxyAddr = this.socketContext.getProxyAddr();
            ZmqSocketType proxyType = this.socketContext.getProxyType() == null ? ZmqSocketType.ROUTER : this.socketContext.getProxyType();
            ZMQ.Socket socket2 = this.context.socket(proxyType.getType());
            String str3 = this.addr;
            ZmqSocketType outProxyType = this.socketContext.getOutProxyType() == null ? ZmqSocketType.DEALER : this.socketContext.getOutProxyType();
            this.proxySession = new ZmqProxySession(str2, this.active, socket2, proxyType, proxyAddr, true, this.context.socket(outProxyType.getType()), outProxyType, str3, true);
            this.proxyExecutor.execute(this.proxySession);
        }
        waitOnStatus(i, EnumSet.of(ZmqSocketStatus.RUNNING, ZmqSocketStatus.PAUSED, ZmqSocketStatus.ERROR));
        LOGGER.info("Gateway openned: " + toString());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ZMQ.Socket getSocket(ZMQ.Context context, ZmqSocketContext zmqSocketContext) {
        Object obj;
        ZMQ.Socket socket = context.socket(zmqSocketContext.getType().getType());
        socket.setSendTimeOut(0);
        HashMap hashMap = new HashMap();
        Method[] methods = zmqSocketContext.getClass().getMethods();
        int length = methods.length;
        for (int i = 0; i < length; i += LISTENER_THREAD_POOL) {
            Method method = methods[i];
            if (method.getParameterTypes().length == 0 && method.getReturnType() != null) {
                String name = method.getName();
                if (name.startsWith("get")) {
                    try {
                        Object invoke = method.invoke(zmqSocketContext, new Object[0]);
                        if (invoke != null) {
                            hashMap.put(name.substring(3), invoke);
                        }
                    } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
                        LOGGER.log(Level.WARNING, "Ignoring 'getter' as potential 'setter': " + name, e);
                    }
                }
            }
        }
        Method[] methods2 = socket.getClass().getMethods();
        int length2 = methods2.length;
        for (int i2 = 0; i2 < length2; i2 += LISTENER_THREAD_POOL) {
            Method method2 = methods2[i2];
            if (method2.getParameterTypes().length == LISTENER_THREAD_POOL && method2.getReturnType() == null) {
                String name2 = method2.getName();
                if (name2.startsWith("set") && (obj = hashMap.get(name2.substring(3))) != null) {
                    try {
                        method2.invoke(socket, obj);
                    } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e2) {
                        LOGGER.log(Level.WARNING, "Ignoring 'setting' of socket context': " + name2, e2);
                    }
                }
            }
        }
        return socket;
    }

    protected ZmqSocketListener getSocketListener(String str, boolean z, boolean z2) {
        return new ZmqSocketListener() { // from class: org.zeromq.jms.protocol.AbstractZmqGateway.1
            @Override // org.zeromq.jms.protocol.ZmqSocketListener
            public boolean open(ZmqSocketSession zmqSocketSession) {
                return AbstractZmqGateway.this.socketOpen(zmqSocketSession);
            }

            @Override // org.zeromq.jms.protocol.ZmqSocketListener
            public ZmqEvent send(ZmqSocketSession zmqSocketSession) {
                return AbstractZmqGateway.this.socketSend(zmqSocketSession);
            }

            @Override // org.zeromq.jms.protocol.ZmqSocketListener
            public void error(ZmqSocketSession zmqSocketSession, ZmqEvent zmqEvent) {
                AbstractZmqGateway.this.socketError(zmqSocketSession, zmqEvent);
            }

            @Override // org.zeromq.jms.protocol.ZmqSocketListener
            public ZmqEvent receive(ZmqSocketSession zmqSocketSession, ZmqEvent zmqEvent) {
                return AbstractZmqGateway.this.socketReceive(zmqSocketSession, zmqEvent);
            }

            @Override // org.zeromq.jms.protocol.ZmqSocketListener
            public boolean close(ZmqSocketSession zmqSocketSession) {
                return AbstractZmqGateway.this.socketClose(zmqSocketSession);
            }
        };
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public boolean isActive() {
        return this.active.get();
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public void close(int i) {
        this.active.set(false);
        if (this.proxyContext != null) {
            this.proxyContext.close();
        }
        if (this.acknowledge) {
            int i2 = 0;
            for (ZmqSocketSession zmqSocketSession : this.socketSessions.values()) {
                int trackedCount = zmqSocketSession.trackedCount();
                if (trackedCount > 0) {
                    LOGGER.info("Gateway [" + this.name + "@" + zmqSocketSession.getAddr() + "] waiting for acknowledgement message(s): " + trackedCount);
                }
                i2 += trackedCount;
            }
            if (i2 > 0) {
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    LOGGER.throwing(AbstractZmqGateway.class.getCanonicalName(), "close()", e);
                }
            }
        }
        waitOnStatus(i, EnumSet.of(ZmqSocketStatus.STOPPED));
        if (this.journalStore != null) {
            try {
                this.journalStore.close();
            } catch (ZmqException e2) {
                LOGGER.log(Level.SEVERE, "Gateway [" + this.name + "] unable to close the journal store: " + this.journalStore, (Throwable) e2);
            }
        }
        if (this.listenerExecutor != null) {
            try {
                this.listenerExecutor.shutdown();
                if (!this.listenerExecutor.awaitTermination(3L, TimeUnit.SECONDS)) {
                    LOGGER.severe("Gateway [" + this.name + "] listener threads failed to stop: " + toString());
                }
            } catch (InterruptedException e3) {
                LOGGER.log(Level.SEVERE, "Gateway [" + this.name + "] listener threads failed to stop: " + toString(), (Throwable) e3);
            }
        }
        if (this.proxyExecutor != null) {
            try {
                this.proxyExecutor.shutdown();
                if (!this.proxyExecutor.awaitTermination(3L, TimeUnit.SECONDS)) {
                    LOGGER.warning("Proxy thread fails to stop, until context terminated (ZMQ issue): " + toString());
                }
            } catch (InterruptedException e4) {
                LOGGER.log(Level.SEVERE, "Proxy threads failed to stop: " + toString(), (Throwable) e4);
            }
        }
        if (this.socketExecutor != null) {
            try {
                this.socketExecutor.shutdown();
                if (!this.socketExecutor.awaitTermination(3L, TimeUnit.SECONDS)) {
                    LOGGER.severe("Socket threads failed to stop: " + toString());
                }
            } catch (InterruptedException e5) {
                LOGGER.log(Level.SEVERE, "Socket threads failed to stop: " + toString(), (Throwable) e5);
            }
        }
        if (this.acknowledge) {
            Iterator<ZmqSocketSession> it = this.socketSessions.values().iterator();
            while (it.hasNext()) {
                for (ZmqSocketSession.TrackEvent trackEvent : it.next().untrackAll()) {
                    if (trackEvent.getEvent() instanceof ZmqSendEvent) {
                        LOGGER.warning("Gateway [" + this.name + "] has un-acknowledged message (LOST): " + trackEvent);
                    }
                }
            }
        }
        this.context.close();
        LOGGER.info("Gateway closed: " + toString());
    }

    protected boolean socketOpen(ZmqSocketSession zmqSocketSession) {
        if (!zmqSocketSession.isBound()) {
            return true;
        }
        Iterator<ZmqSocketSession> it = this.socketSessions.values().iterator();
        while (it.hasNext()) {
            if (it.next().getStatus() == ZmqSocketStatus.RUNNING) {
                return false;
            }
        }
        return true;
    }

    protected boolean socketClose(ZmqSocketSession zmqSocketSession) {
        return true;
    }

    protected ZmqEvent socketSend(ZmqSocketSession zmqSocketSession) {
        String addr = zmqSocketSession.getAddr();
        boolean isOutgoing = zmqSocketSession.isOutgoing();
        ZmqEvent zmqEvent = null;
        if (zmqSocketSession.getStatus() == ZmqSocketStatus.RUNNING) {
            if (this.journalStore != null) {
                try {
                    ZmqJournalEntry read = this.journalStore.read();
                    if (read != null && !zmqSocketSession.isTracked(read.getMessageId())) {
                        zmqEvent = this.eventHandler.createSendEvent(read.getMessageId(), read.getMessage());
                    }
                } catch (ZmqException e) {
                    LOGGER.log(Level.WARNING, "Socket [" + this.name + "@" + addr + "] failed to read from the journal store", (Throwable) e);
                }
            }
            if (zmqEvent == null) {
                try {
                    zmqEvent = this.outgoingQueue.poll(500L, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e2) {
                    LOGGER.log(Level.WARNING, "Socket [" + this.name + "@" + addr + "] polling of outgoing queue interrupted", (Throwable) e2);
                }
            }
        }
        if (this.heartbeat && isOutgoing && zmqEvent == null) {
            long lastReceiveTime = zmqSocketSession.getLastReceiveTime();
            long lastSendTime = zmqSocketSession.getLastSendTime();
            long nanoTime = System.nanoTime();
            long j = (nanoTime - lastReceiveTime) / 1000000;
            long j2 = (nanoTime - lastSendTime) / 1000000;
            ZmqSocketStatus status = zmqSocketSession.getStatus();
            if (j > 1000 && j2 > 1000) {
                if (j <= ZmqFileJounralStore.JOUNRAL_SWEEP_PERIOD_MILLISECONDS || status != ZmqSocketStatus.RUNNING) {
                    zmqEvent = this.eventHandler.createHeartbeatEvent();
                    if (LOGGER.isLoggable(Level.FINEST)) {
                        LOGGER.log(Level.FINEST, "Socket [" + this.name + "@" + addr + "] send heartbeat: " + zmqEvent);
                    }
                } else {
                    zmqSocketSession.pause();
                    for (ZmqSocketSession.TrackEvent trackEvent : zmqSocketSession.untrackAll()) {
                        if (trackEvent.getEvent() instanceof ZmqSendEvent) {
                            socketError(zmqSocketSession, trackEvent.getEvent());
                        }
                    }
                }
            }
        }
        if (zmqEvent != null && isOutgoing && this.acknowledge) {
            zmqSocketSession.track(zmqEvent);
            if (LOGGER.isLoggable(Level.FINEST)) {
                LOGGER.log(Level.FINEST, "Socket [" + this.name + "@" + addr + "] tacking event: " + zmqEvent);
            }
        }
        if (LOGGER.isLoggable(Level.FINEST) && zmqEvent != null) {
            LOGGER.log(Level.FINEST, "Socket [" + this.name + "@" + addr + "] send event: " + zmqEvent);
        }
        return zmqEvent;
    }

    public void socketError(ZmqSocketSession zmqSocketSession, ZmqEvent zmqEvent) {
        if (zmqEvent instanceof ZmqSendEvent) {
            ZmqSendEvent zmqSendEvent = (ZmqSendEvent) zmqEvent;
            try {
                this.outgoingQueue.put(zmqSendEvent);
                if (LOGGER.isLoggable(Level.FINEST) && zmqSendEvent != null) {
                    LOGGER.log(Level.FINEST, "Socket [" + zmqSocketSession.getAddr() + "] send event: " + zmqSendEvent);
                }
            } catch (InterruptedException e) {
                LOGGER.log(Level.SEVERE, "Socket [" + this.name + "@" + zmqSocketSession.getAddr() + "] was unable to re-send event: " + zmqEvent, (Throwable) e);
            }
        }
    }

    public ZmqEvent socketReceive(ZmqSocketSession zmqSocketSession, ZmqEvent zmqEvent) {
        String addr = zmqSocketSession.getAddr();
        if (LOGGER.isLoggable(Level.FINEST)) {
            LOGGER.log(Level.FINEST, "Socket [" + this.name + "@" + addr + "] consume event: " + zmqEvent);
        }
        if (zmqEvent instanceof ZmqSendEvent) {
            try {
                if (this.journalStore != null) {
                    this.journalStore.create(zmqEvent.getMessageId(), ((ZmqSendEvent) zmqEvent).getMessage());
                }
                this.incomingQueue.put((ZmqSendEvent) zmqEvent);
            } catch (InterruptedException e) {
                LOGGER.log(Level.SEVERE, "Socket [" + this.name + "@" + addr + "] for gateway " + this.name + " cannot consume message due to intenral error: " + zmqEvent, (Throwable) e);
                return null;
            } catch (ZmqException e2) {
                LOGGER.log(Level.SEVERE, "Socket [" + this.name + "@" + addr + "] for gateway " + this.name + " cannot store messahe due to intenral error: " + zmqEvent, (Throwable) e2);
                return null;
            }
        }
        ZmqAckEvent zmqAckEvent = null;
        if (zmqEvent instanceof ZmqHeartbeatEvent) {
            try {
                if (zmqSocketSession.isAcknowledge() && zmqSocketSession.isIncoming()) {
                    zmqAckEvent = this.eventHandler.createAckEvent(zmqEvent);
                }
            } catch (ZmqException e3) {
                LOGGER.log(Level.SEVERE, "Socket [" + this.name + "@" + addr + "] received corrupt event: " + zmqEvent, (Throwable) e3);
            }
        } else if (zmqEvent instanceof ZmqAckEvent) {
            Object messageId = ((ZmqAckEvent) zmqEvent).getMessageId();
            if (messageId == null) {
                LOGGER.log(Level.SEVERE, "Socket [" + this.name + "@" + addr + "] received corrupt event: " + zmqEvent);
            } else {
                LOGGER.log(Level.INFO, "Socket [" + this.name + "@" + addr + "] received ACK event: " + zmqEvent);
                if (zmqSocketSession.untrack(messageId) == null) {
                    LOGGER.log(Level.WARNING, "Socket [" + this.name + "@" + addr + "] received ACK for untracked event: " + zmqEvent);
                }
            }
        }
        if (zmqAckEvent != null && LOGGER.isLoggable(Level.FINEST)) {
            LOGGER.log(Level.FINEST, "Socket [" + this.name + "@" + addr + "] reply event: " + zmqEvent);
        }
        return zmqAckEvent;
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public void start() {
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public void stop() {
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public void commit() throws ZmqException {
        if (!this.active.get()) {
            throw new ZmqException("The gateway has been close: " + toString());
        }
        if (!this.transacted) {
            throw new ZmqException("No transacion started: " + toString());
        }
        synchronized (this.outgoingSnapshot) {
            for (ZmqSendEvent zmqSendEvent : this.outgoingSnapshot) {
                this.outgoingQueue.add(zmqSendEvent);
                if (this.journalStore != null) {
                    this.journalStore.create(zmqSendEvent.getMessageId(), zmqSendEvent.getMessage());
                }
            }
            this.outgoingSnapshot.clear();
        }
        synchronized (this.incomingSnapshot) {
            if (this.redelivery != null) {
                this.redelivery.delivered(this.incomingSnapshot);
            }
            for (ZmqSendEvent zmqSendEvent2 : this.incomingSnapshot) {
                if (this.journalStore != null) {
                    this.journalStore.delete(zmqSendEvent2.getMessageId());
                }
            }
            this.incomingSnapshot.clear();
        }
        LOGGER.fine("Transaction committed: " + toString());
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public void rollback() throws ZmqException {
        if (!this.active.get()) {
            throw new ZmqException("The gateway has been close: " + toString());
        }
        if (!this.transacted) {
            throw new ZmqException("No transacion started: " + toString());
        }
        synchronized (this.outgoingSnapshot) {
            this.outgoingSnapshot.clear();
        }
        synchronized (this.incomingSnapshot) {
            if (this.redelivery != null) {
                this.redelivery.redeliver(this.incomingSnapshot);
            }
            this.incomingSnapshot.clear();
        }
        LOGGER.fine("Transaction rolledback: " + toString());
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public void send(ZmqMessage zmqMessage) throws ZmqException {
        ZmqSendEvent createSendEvent = this.eventHandler.createSendEvent(zmqMessage);
        if (this.transacted) {
            synchronized (this.outgoingSnapshot) {
                this.outgoingSnapshot.add(createSendEvent);
            }
        } else {
            this.outgoingQueue.add(createSendEvent);
            if (this.journalStore != null) {
                this.journalStore.create(createSendEvent.getMessageId(), createSendEvent.getMessage());
            }
        }
    }

    protected boolean isValidMessage(ZmqMessage zmqMessage) {
        if (this.messageSelector == null) {
            return true;
        }
        return this.messageSelector.evaluate(zmqMessage.getProperties());
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public ZmqMessage receive() throws ZmqException {
        ZmqMessage zmqMessage;
        ZmqMessage receive = receive(500);
        while (true) {
            zmqMessage = receive;
            if (zmqMessage != null || !this.active.get()) {
                break;
            }
            receive = receive(500);
        }
        if (zmqMessage == null) {
            throw new ZmqException("Receive request, buy gateway has been close: " + toString());
        }
        return zmqMessage;
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public ZmqMessage receive(int i) throws ZmqException {
        ZmqJournalEntry read;
        ZmqSendEvent nextRedeliver;
        Stopwatch stopwatch = null;
        if (LOGGER.isLoggable(Level.FINER)) {
            stopwatch = new Stopwatch();
        }
        if (!this.active.get()) {
            throw new ZmqException("Receive request, buy gateway has been close: " + toString());
        }
        if (this.redelivery != null && (nextRedeliver = this.redelivery.getNextRedeliver()) != null) {
            ZmqMessage message = nextRedeliver.getMessage();
            if (this.transacted) {
                synchronized (this.incomingSnapshot) {
                    this.incomingSnapshot.add(nextRedeliver);
                }
            }
            if (stopwatch != null) {
                LOGGER.log(Level.FINER, "Receive re-delivery message: " + stopwatch.lapsedTime() + " (msec) :" + toString());
            }
            return message;
        }
        if (this.journalStore != null && (read = this.journalStore.read()) != null) {
            if (this.transacted) {
                ZmqSendEvent createSendEvent = this.eventHandler.createSendEvent(read.getMessageId(), read.getMessage());
                synchronized (this.outgoingSnapshot) {
                    this.outgoingSnapshot.add(createSendEvent);
                }
            } else {
                this.journalStore.delete(read.getMessageId());
            }
            return read.getMessage();
        }
        long currentTimeMillis = System.currentTimeMillis();
        try {
            ZmqSendEvent poll = this.incomingQueue.poll(i, TimeUnit.MILLISECONDS);
            if (poll != null) {
                ZmqMessage message2 = poll.getMessage();
                if (isValidMessage(message2)) {
                    if (this.transacted) {
                        synchronized (this.incomingSnapshot) {
                            this.incomingSnapshot.add(poll);
                        }
                    } else if (this.journalStore != null) {
                        this.journalStore.create(poll.getMessageId(), message2);
                    }
                    if (stopwatch != null) {
                        LOGGER.log(Level.FINER, "Gateway [" + this.name + "] receive incoming message: " + stopwatch.lapsedTime() + " (msec)");
                    }
                    return message2;
                }
            }
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            if (currentTimeMillis2 >= i) {
                return null;
            }
            ZmqMessage receive = receive((int) (i - currentTimeMillis2));
            if (stopwatch != null) {
                if (receive == null) {
                    LOGGER.log(Level.FINER, "Gateway  [" + this.name + "] receive incoming message (Wait): " + stopwatch.lapsedTime() + " (msec)");
                } else {
                    LOGGER.log(Level.FINER, "Gatewau  [" + this.name + "] receive no message (Timeout): " + stopwatch.lapsedTime() + " (msec)");
                }
            }
            return receive;
        } catch (InterruptedException e) {
            throw new ZmqException("Unable to poll internal queue: " + toString(), e);
        }
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public void setListener(ZmqGatewayListener zmqGatewayListener) {
        if (zmqGatewayListener != null && this.listener == null) {
            this.listenerExecutor.execute(new ListenerThread());
        }
        this.listener = zmqGatewayListener;
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public List<ZmqSocketMetrics> getMetrics() {
        return this.metrics;
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public String getName() {
        return this.name;
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public String getAddr() {
        return this.addr;
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public ZmqSocketType getType() {
        return this.type;
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public ZmqSocketContext getSocketContext() {
        return this.socketContext;
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public boolean isBound() {
        return this.bound;
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public boolean isTransacted() {
        return this.transacted;
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public boolean isAcknowledged() {
        return this.acknowledge;
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public boolean isHeartbeat() {
        return this.heartbeat;
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public ZmqGateway.Direction getDirection() {
        return this.direction;
    }

    @Override // org.zeromq.jms.protocol.ZmqGateway
    public Date getStartTime() {
        return this.startDateTime;
    }

    protected ZMQ.Context getContext() {
        return this.context;
    }

    protected ZmqFilterPolicy getFilterPolicy() {
        return this.filterPolicy;
    }

    protected ZmqEventHandler getEventHandler() {
        return this.eventHandler;
    }

    protected ZmqMessageSelector getMessageSelector() {
        return this.messageSelector;
    }

    public String toString() {
        return getClass().getCanonicalName() + " [active=" + this.active + ", name=" + this.name + ", type=" + this.type + ", isBound=" + this.bound + ", addr=" + this.addr + ", proxyAddr=" + this.socketContext.getProxyAddr() + ", transacted=" + this.transacted + ", acknowleged=" + this.acknowledge + ", heartbeat=" + this.heartbeat + ", direction=" + this.direction + ", eventHandler=" + this.eventHandler + ", journalStore=" + this.journalStore + "]";
    }
}
