package org.zeromq.jms.protocol;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import org.zeromq.jms.ZmqException;
import org.zeromq.jms.protocol.event.ZmqEventHandler;
import org.zeromq.jms.protocol.filter.ZmqFilterPolicy;

/* loaded from: input_file:org/zeromq/jms/protocol/ZmqSocketSession.class */
public class ZmqSocketSession implements Runnable {
    private static final Logger LOGGER = Logger.getLogger(ZmqSocketSession.class.getCanonicalName());
    private volatile ZmqSocketStatus status = ZmqSocketStatus.STOPPED;
    private volatile long lastReceiveTime = System.nanoTime();
    private volatile long lastSendTime = System.nanoTime();
    private final AtomicBoolean process;
    private final ZMQ.Socket socket;
    private final ZmqSocketType socketType;
    private final String socketAddr;
    private final boolean socketBound;
    private final boolean socketIncoming;
    private final boolean socketOutgoing;
    private final int socketFlags;
    private final int socketWaitTime;
    private final boolean socketHeartbeat;
    private final boolean socketAcknowledge;
    private final ZmqSocketListener socketListener;
    private final ZmqSocketMetrics metrics;
    private final ZmqEventHandler handler;
    private final ZmqFilterPolicy filter;

    public ZmqSocketSession(AtomicBoolean atomicBoolean, ZMQ.Socket socket, ZmqSocketType zmqSocketType, String str, boolean z, boolean z2, boolean z3, int i, int i2, boolean z4, boolean z5, ZmqSocketListener zmqSocketListener, ZmqFilterPolicy zmqFilterPolicy, ZmqEventHandler zmqEventHandler, ZmqSocketMetrics zmqSocketMetrics) {
        this.process = atomicBoolean;
        this.socket = socket;
        this.socketType = zmqSocketType;
        this.socketAddr = str;
        this.socketBound = z;
        this.socketIncoming = z2;
        this.socketOutgoing = z3;
        this.socketFlags = i;
        this.socketWaitTime = i2;
        this.socketHeartbeat = z4;
        this.socketAcknowledge = z5;
        this.socketListener = zmqSocketListener;
        this.filter = zmqFilterPolicy;
        this.handler = zmqEventHandler;
        this.metrics = zmqSocketMetrics;
    }

    public String getAddr() {
        return this.socketAddr;
    }

    public boolean isBound() {
        return this.socketBound;
    }

    public boolean isIncoming() {
        return this.socketIncoming;
    }

    public boolean isOutgoing() {
        return this.socketOutgoing;
    }

    public boolean isHeartbeat() {
        return this.socketHeartbeat;
    }

    public boolean isAcknowledge() {
        return this.socketAcknowledge;
    }

    public ZmqSocketStatus getStatus() {
        return this.status;
    }

    public void pause() {
        if (this.status == ZmqSocketStatus.RUNNING) {
            this.status = ZmqSocketStatus.WAITING;
            LOGGER.warning("Socket paused: " + this);
        }
    }

    public long getLastReceiveTime() {
        return this.lastReceiveTime;
    }

    public long getLastSendTime() {
        return this.lastSendTime;
    }

    @Override // java.lang.Runnable
    public void run() {
        openSocket(this);
        if (this.socketOutgoing) {
            this.socket.setReceiveTimeOut(0);
        } else {
            this.socket.setReceiveTimeOut(this.socketWaitTime);
        }
        LOGGER.info("Started socket: " + this);
        while (this.process.get()) {
            if (this.socketOutgoing) {
                sendSocket(this);
            }
            if (this.socketIncoming) {
                receiveSocket(this);
            }
            this.metrics.setStatus(this.status);
        }
        LOGGER.info("Stopped socket: " + this);
        if (this.socketHeartbeat && this.socketOutgoing && this.socketIncoming) {
            this.socket.setReceiveTimeOut(this.socketWaitTime);
            sendSocket(this);
        }
        closeSocket(this);
    }

    protected void openSocket(ZmqSocketSession zmqSocketSession) {
        String str = zmqSocketSession.socketAddr;
        ZMQ.Socket socket = zmqSocketSession.socket;
        if (this.socketBound) {
            try {
                socket.bind(str);
                LOGGER.info("Bind socket successful: " + this);
            } catch (Exception e) {
                LOGGER.log(Level.SEVERE, "Socket binding failure: " + this);
                throw e;
            }
        } else {
            try {
                socket.connect(str);
                LOGGER.info("Connect socket successful: " + this);
            } catch (Exception e2) {
                LOGGER.log(Level.SEVERE, "Socket connect failure: " + this, (Throwable) e2);
                throw e2;
            }
        }
        this.socketListener.open(this);
        this.status = ZmqSocketStatus.RUNNING;
    }

    protected void closeSocket(ZmqSocketSession zmqSocketSession) {
        String str = zmqSocketSession.socketAddr;
        ZMQ.Socket socket = zmqSocketSession.socket;
        if (this.socketBound) {
            try {
                socket.unbind(str);
                socket.close();
                LOGGER.info("Unbind socket successful: " + this);
            } catch (Exception e) {
                LOGGER.log(Level.SEVERE, "Socketing unbind failure: " + this, (Throwable) e);
                throw e;
            }
        } else {
            try {
                socket.disconnect(str);
                socket.close();
                LOGGER.info("Disconnect socket successful: " + this);
            } catch (Exception e2) {
                LOGGER.log(Level.SEVERE, "Socket disconnect failure: " + this, (Throwable) e2);
                throw e2;
            }
        }
        this.socketListener.close(this);
        this.status = ZmqSocketStatus.STOPPED;
    }

    protected void sendSocket(ZmqSocketSession zmqSocketSession) {
        boolean z = this.status == ZmqSocketStatus.RUNNING;
        ZmqEvent zmqEvent = null;
        try {
            if (this.socketListener == null) {
                return;
            }
            do {
                zmqEvent = this.socketListener.send(this);
                if (zmqEvent != null) {
                    if (!this.handler.createMsg(this.socketType, this.filter, zmqEvent).send(this.socket, true)) {
                        LOGGER.log(Level.SEVERE, "Unable to send message: " + this);
                        this.status = ZmqSocketStatus.WAITING;
                        this.socketListener.error(this, zmqEvent);
                        return;
                    }
                    this.metrics.incrementSend();
                    this.lastSendTime = System.nanoTime();
                }
                if (zmqEvent == null) {
                    return;
                }
            } while (z);
        } catch (ZmqException e) {
            LOGGER.log(Level.SEVERE, "Unable to send message due to internal error: " + this, (Throwable) e);
            this.socketListener.error(this, zmqEvent);
        }
    }

    protected void receiveSocket(ZmqSocketSession zmqSocketSession) {
        ZmqEvent receive;
        if (LOGGER.isLoggable(Level.FINEST)) {
            LOGGER.log(Level.FINEST, "Receive and wait (" + this.socket.getReceiveTimeOut() + ") : " + this);
        }
        if (this.status == ZmqSocketStatus.STOPPED) {
            return;
        }
        ZMsg recvMsg = ZMsg.recvMsg(this.socket, this.socketFlags);
        while (true) {
            ZMsg zMsg = recvMsg;
            if (zMsg == null) {
                return;
            }
            this.metrics.incrementReceive();
            this.lastReceiveTime = System.nanoTime();
            try {
                ZmqEvent createEvent = this.handler.createEvent(this.socketType, zMsg);
                if (createEvent != null && this.socketListener != null && (receive = this.socketListener.receive(this, createEvent)) != null) {
                    if (this.socketIncoming) {
                        this.handler.createMsg(this.socketType, this.filter, receive).send(this.socket, true);
                        this.metrics.incrementSend();
                        this.lastSendTime = System.nanoTime();
                    } else {
                        LOGGER.log(Level.SEVERE, "Socketing has not outgoing state: " + this);
                    }
                }
            } catch (ZmqException e) {
                LOGGER.log(Level.SEVERE, "Socketing incoming failure: " + this, (Throwable) e);
            }
            zMsg.destroy();
            recvMsg = ZMsg.recvMsg(this.socket);
        }
    }

    public String toString() {
        return "ZmqSocketSession [socketType=" + this.socketType + ", socketAddr=" + this.socketAddr + ", socketBound=" + this.socketBound + ", socketIncoming=" + this.socketIncoming + ", socketOutgoing=" + this.socketOutgoing + ", socketFlags=" + this.socketFlags + ", socketWaitTime=" + this.socketWaitTime + ", socketHeartbeat=" + this.socketHeartbeat + ", socketAcknowledge=" + this.socketAcknowledge + "]";
    }
}
