package edu.iu.dsc.tws.common.net.tcp;

import edu.iu.dsc.tws.api.config.Config;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/common/net/tcp/BaseNetworkChannel.class */
public abstract class BaseNetworkChannel {
    private static final Logger LOG;
    protected final SocketChannel socketChannel;
    protected Progress looper;
    protected SelectHandler selectHandler;
    protected TCPMessage readingRequest;
    protected int readEdge;
    protected int readMessageSize;
    protected ChannelHandler channelHandler;
    private static final int HEADER_SIZE = 8;
    static final /* synthetic */ boolean $assertionsDisabled;
    protected BlockingQueue<TCPMessage> pendingSends = new ArrayBlockingQueue(1024);
    protected Map<Integer, BlockingQueue<TCPMessage>> pendingReceives = new HashMap();
    protected ByteBuffer readHeader = ByteBuffer.allocate(HEADER_SIZE);
    protected ByteBuffer writeHeader = ByteBuffer.allocate(HEADER_SIZE);
    protected DataStatus readStatus = DataStatus.INIT;
    protected DataStatus writeStatus = DataStatus.INIT;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BaseNetworkChannel(Config config, Progress progress, SelectHandler selectHandler, SocketChannel socketChannel, ChannelHandler channelHandler) {
        this.socketChannel = socketChannel;
        this.selectHandler = selectHandler;
        this.looper = progress;
        this.channelHandler = channelHandler;
    }

    public void read() {
        TCPMessage readRequest;
        while (this.pendingReceives.size() > 0 && (readRequest = readRequest(this.socketChannel)) != null) {
            readRequest.setComplete();
            this.channelHandler.onReceiveComplete(this.socketChannel, readRequest);
        }
    }

    public abstract TCPMessage readRequest(SocketChannel socketChannel);

    public void clear() {
        this.pendingReceives.clear();
        this.pendingSends.clear();
    }

    public boolean addReadRequest(TCPMessage tCPMessage) {
        return false;
    }

    public boolean addWriteRequest(TCPMessage tCPMessage) {
        ByteBuffer byteBuffer = tCPMessage.getByteBuffer();
        if (tCPMessage.getLength() == 0) {
            throw new RuntimeException("Cannot send a message with 0 length");
        }
        if (byteBuffer.remaining() == 0) {
            throw new RuntimeException("Cannot send a message with 0 length");
        }
        return this.pendingSends.offer(tCPMessage);
    }

    public void write() {
        TCPMessage peek;
        int writeRequest;
        while (this.pendingSends.size() > 0 && (peek = this.pendingSends.peek()) != null && (writeRequest = writeRequest(this.socketChannel, peek)) <= 0) {
            if (writeRequest < 0) {
                LOG.severe("Something bad happened while writing to channel");
                peek.setError();
                this.selectHandler.handleError(this.socketChannel);
                return;
            } else {
                this.pendingSends.poll();
                peek.setComplete();
                this.channelHandler.onSendComplete(this.socketChannel, peek);
                if (this.pendingSends.size() == 0) {
                    disableWriting();
                }
            }
        }
    }

    private int writeRequest(SocketChannel socketChannel, TCPMessage tCPMessage) {
        ByteBuffer byteBuffer = tCPMessage.getByteBuffer();
        int i = 0;
        if (this.writeStatus == DataStatus.INIT) {
            this.writeHeader.clear();
            this.writeStatus = DataStatus.HEADER;
            this.writeHeader.putInt(tCPMessage.getLength());
            this.writeHeader.putInt(tCPMessage.getEdge());
            this.writeHeader.flip();
        }
        if (this.writeStatus == DataStatus.HEADER) {
            i = writeToChannel(socketChannel, this.writeHeader);
            if (i < 0) {
                return i;
            }
            if (i == 0) {
                this.writeStatus = DataStatus.BODY;
            }
        }
        if (this.writeStatus == DataStatus.BODY) {
            i = writeToChannel(socketChannel, byteBuffer);
            if (i < 0) {
                return i;
            }
            if (i == 0) {
                LOG.finest(String.format("WRITE BODY %d", Integer.valueOf(byteBuffer.limit())));
                this.writeStatus = DataStatus.INIT;
                return i;
            }
        }
        return i;
    }

    private int writeToChannel(SocketChannel socketChannel, ByteBuffer byteBuffer) {
        int remaining = byteBuffer.remaining();
        if (!$assertionsDisabled && remaining <= 0) {
            throw new AssertionError();
        }
        try {
            int write = socketChannel.write(byteBuffer);
            LOG.finest("Wrote " + write);
            return remaining - write;
        } catch (IOException e) {
            LOG.log(Level.SEVERE, "Error writing to channel ", (Throwable) e);
            return -1;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int readFromChannel(SocketChannel socketChannel, ByteBuffer byteBuffer) {
        int remaining = byteBuffer.remaining();
        try {
            int read = socketChannel.read(byteBuffer);
            return read < 0 ? read : remaining - read;
        } catch (ClosedByInterruptException e) {
            LOG.warning("ClosedByInterruptException thrown. Probably the Channel is closed by the user program intentionally.");
            return -1;
        } catch (IOException e2) {
            LOG.log(Level.SEVERE, "Error in channel.read ", (Throwable) e2);
            return -1;
        }
    }

    public void forceFlush() {
        while (!this.pendingSends.isEmpty() && writeRequest(this.socketChannel, this.pendingSends.poll()) == 0) {
        }
    }

    public boolean isPending() {
        boolean z = this.pendingSends.size() > 0;
        boolean z2 = false;
        Iterator<Map.Entry<Integer, BlockingQueue<TCPMessage>>> it = this.pendingReceives.entrySet().iterator();
        while (it.hasNext()) {
            z2 = it.next().getValue().size() > 0;
        }
        return z || z2;
    }

    public void enableReading() {
        if (this.looper.isReadRegistered(this.socketChannel)) {
            return;
        }
        try {
            this.looper.registerRead(this.socketChannel, this.selectHandler);
        } catch (ClosedChannelException e) {
            this.selectHandler.handleError(this.socketChannel);
        }
    }

    public void disableReading() {
        if (this.looper.isReadRegistered(this.socketChannel)) {
            this.looper.unregisterRead(this.socketChannel);
        }
    }

    public void enableWriting() {
        if (this.looper.isWriteRegistered(this.socketChannel)) {
            return;
        }
        try {
            this.looper.registerWrite(this.socketChannel, this.selectHandler);
        } catch (ClosedChannelException e) {
            this.selectHandler.handleError(this.socketChannel);
        }
    }

    public void disableWriting() {
        if (this.looper.isWriteRegistered(this.socketChannel)) {
            this.looper.unregisterWrite(this.socketChannel);
        }
    }

    static {
        $assertionsDisabled = !BaseNetworkChannel.class.desiredAssertionStatus();
        LOG = Logger.getLogger(BaseNetworkChannel.class.getName());
    }
}
