package com.swiftmq.net.client;

import com.swiftmq.net.protocol.ChunkListener;
import com.swiftmq.net.protocol.OutputListener;
import com.swiftmq.net.protocol.ProtocolInputHandler;
import com.swiftmq.net.protocol.ProtocolOutputHandler;
import com.swiftmq.net.protocol.smqp.SMQPInputHandler;
import com.swiftmq.net.protocol.smqp.SMQPOutputHandler;
import com.swiftmq.tools.prop.SystemProperties;
import com.swiftmq.tools.util.DataByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/* loaded from: input_file:com/swiftmq/net/client/BlockingConnection.class */
public class BlockingConnection extends Thread implements Connection, ChunkListener, OutputListener {
    static final boolean ISDAEMON = Boolean.valueOf(SystemProperties.get("swiftmq.socket.reader.isdaemon", "false")).booleanValue();
    static final boolean SET_SOCKET_OPTIONS = Boolean.valueOf(SystemProperties.get("swiftmq.socket.set.options", "true")).booleanValue();
    static final int MAX_SNDBUFSIZE = Integer.parseInt(SystemProperties.get("swiftmq.socket.max.sendbuffersize", "0"));
    static final int MAX_RCVBUFSIZE = Integer.parseInt(SystemProperties.get("swiftmq.socket.max.receivebuffersize", "0"));
    static final int SO_TIMEOUT = Integer.parseInt(SystemProperties.get("swiftmq.socket.sotimeout", "0"));
    Socket socket;
    int inputBufferSize;
    int inputExtendSize;
    int outputBufferSize;
    int outputExtendSize;
    ProtocolInputHandler inputHandler;
    ProtocolOutputHandler outputHandler;
    DataByteArrayInputStream dis;
    InboundHandler inboundHandler;
    ExceptionHandler exceptionHandler;
    InputStream socketIn;
    OutputStream socketOut;
    String myHostname;
    boolean closed;
    int sndBufferSize;
    AtomicBoolean inputActiveIndicator;
    ReentrantReadWriteLock lock;

    public BlockingConnection(Socket socket, InboundHandler inboundHandler, ExceptionHandler exceptionHandler) throws IOException {
        this(socket, 131072, 65536, 131072, 65536);
        this.inboundHandler = inboundHandler;
        this.exceptionHandler = exceptionHandler;
    }

    public BlockingConnection(Socket socket, int i, int i2, int i3, int i4) throws IOException {
        this.socket = null;
        this.inputBufferSize = 0;
        this.inputExtendSize = 0;
        this.outputBufferSize = 0;
        this.outputExtendSize = 0;
        this.inputHandler = null;
        this.outputHandler = null;
        this.dis = null;
        this.inboundHandler = null;
        this.exceptionHandler = null;
        this.socketIn = null;
        this.socketOut = null;
        this.myHostname = null;
        this.closed = false;
        this.sndBufferSize = 8192;
        this.inputActiveIndicator = null;
        this.lock = new ReentrantReadWriteLock();
        this.socket = socket;
        setDaemon(ISDAEMON);
        if (SET_SOCKET_OPTIONS) {
            int i5 = i3;
            try {
                i5 = MAX_SNDBUFSIZE > 0 ? Math.min(i3, MAX_SNDBUFSIZE) : i5;
                socket.setSendBufferSize(i5);
                this.sndBufferSize = socket.getSendBufferSize();
            } catch (SocketException e) {
                System.err.println("Unable to perform 'socket.setSendBufferSize(" + i5 + ")', exception: " + String.valueOf(e));
            }
            try {
                i5 = MAX_RCVBUFSIZE > 0 ? Math.min(i, MAX_RCVBUFSIZE) : i;
                if (socket.getReceiveBufferSize() != i5) {
                    socket.setReceiveBufferSize(i5);
                }
            } catch (SocketException e2) {
                System.err.println("Unable to perform 'socket.setReceiveBufferSize(" + i5 + ")', exception: " + String.valueOf(e2));
            }
        }
        if (SO_TIMEOUT > 0) {
            try {
                socket.setSoTimeout(SO_TIMEOUT);
            } catch (SocketException e3) {
                System.err.println("Unable to perform 'socket.setSoTimeout(" + SO_TIMEOUT + ")', exception: " + String.valueOf(e3));
            }
        }
        this.inputBufferSize = i;
        this.inputExtendSize = i2;
        this.outputBufferSize = i3;
        this.outputExtendSize = i4;
        this.outputHandler = createOutputHandler(i3, i4);
        this.outputHandler.setOutputListener(this);
        this.inputHandler = createInputHandler();
        this.inputHandler.createInputBuffer(i, i2);
        this.inputHandler.setChunkListener(this);
        this.dis = new DataByteArrayInputStream();
        this.socketIn = socket.getInputStream();
        this.socketOut = socket.getOutputStream();
        try {
            this.myHostname = socket.getLocalAddress().toString();
        } catch (Exception e4) {
            this.myHostname = "unknown";
        }
    }

    protected ProtocolOutputHandler createOutputHandler(int i, int i2) {
        return new SMQPOutputHandler(i, i2) { // from class: com.swiftmq.net.client.BlockingConnection.1
            @Override // com.swiftmq.net.protocol.ProtocolOutputHandler, java.io.OutputStream, java.io.Flushable
            public void flush() throws IOException {
                super.flush();
                invokeOutputListener();
            }
        };
    }

    protected ProtocolInputHandler createInputHandler() {
        return new SMQPInputHandler();
    }

    @Override // com.swiftmq.net.client.Connection
    public void setInputActiveIndicator(AtomicBoolean atomicBoolean) {
        this.inputActiveIndicator = atomicBoolean;
    }

    @Override // com.swiftmq.net.protocol.ChunkListener
    public void chunkCompleted(byte[] bArr, int i, int i2) {
        this.lock.writeLock().lock();
        try {
            this.dis.setBuffer(bArr, i, i2);
            this.inboundHandler.dataAvailable(this.dis);
            this.lock.writeLock().unlock();
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    @Override // com.swiftmq.net.protocol.OutputListener
    public int performWrite(byte[] bArr, int i, int i2) throws IOException {
        this.socketOut.write(bArr, i, i2);
        this.socketOut.flush();
        return i2;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        int read;
        while (!this.closed) {
            try {
                byte[] buffer = this.inputHandler.getBuffer();
                int offset = this.inputHandler.getOffset();
                try {
                    read = this.socketIn.read(buffer, offset, buffer.length - offset);
                    if (read > 0) {
                        if (this.inputActiveIndicator != null) {
                            this.inputActiveIndicator.set(true);
                        }
                        this.inputHandler.setBytesWritten(read);
                    }
                } catch (SocketTimeoutException e) {
                }
                if (read == -1) {
                    throw new IOException("End-of-Stream reached");
                    break;
                }
            } catch (IOException e2) {
                if (this.closed || this.exceptionHandler == null) {
                    return;
                }
                this.exceptionHandler.onException(e2);
                return;
            }
        }
    }

    @Override // com.swiftmq.net.client.Connection
    public void setInboundHandler(InboundHandler inboundHandler) {
        this.lock.writeLock().lock();
        try {
            this.inboundHandler = inboundHandler;
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override // com.swiftmq.net.client.Connection
    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
        this.exceptionHandler = exceptionHandler;
    }

    @Override // com.swiftmq.net.client.Connection
    public String getLocalHostname() {
        return this.myHostname;
    }

    @Override // com.swiftmq.net.client.Connection
    public String getHostname() {
        return this.socket.getInetAddress().getHostName();
    }

    @Override // com.swiftmq.net.client.Connection
    public int getPort() {
        return this.socket.getPort();
    }

    @Override // com.swiftmq.net.client.Connection
    public OutputStream getOutputStream() {
        return this.outputHandler;
    }

    @Override // com.swiftmq.net.client.Connection
    public void close() {
        this.closed = true;
        try {
            this.socket.close();
        } catch (IOException e) {
        }
    }

    @Override // java.lang.Thread
    public String toString() {
        return "[BlockingConnection, socket=" + String.valueOf(this.socket) + "]";
    }
}
