package io.activej.net.socket.tcp;

import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufPool;
import io.activej.common.ApplicationSettings;
import io.activej.common.Checks;
import io.activej.common.MemSize;
import io.activej.common.Utils;
import io.activej.common.exception.AsyncTimeoutException;
import io.activej.common.inspector.AbstractInspector;
import io.activej.common.inspector.BaseInspector;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.NioChannelEventHandler;
import io.activej.eventloop.net.SocketSettings;
import io.activej.eventloop.schedule.ScheduledRunnable;
import io.activej.eventloop.util.RunnableWithContext;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.stats.EventStats;
import io.activej.jmx.stats.ExceptionStats;
import io.activej.jmx.stats.ValueStats;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/activej/net/socket/tcp/AsyncTcpSocketNio.class */
public final class AsyncTcpSocketNio implements AsyncTcpSocket, NioChannelEventHandler {
    private static final boolean CHECK;
    public static final int DEFAULT_READ_BUFFER_SIZE;
    public static final AsyncTimeoutException TIMEOUT_EXCEPTION;
    public static final int NO_TIMEOUT = 0;
    private static final AtomicInteger CONNECTION_COUNT;
    private final Eventloop eventloop;
    private final InetSocketAddress remoteAddress;

    @Nullable
    private SocketChannel channel;

    @Nullable
    private ByteBuf readBuf;
    private boolean readEndOfStream;

    @Nullable
    private ByteBuf writeBuf;
    private boolean writeEndOfStream;

    @Nullable
    private SettablePromise<ByteBuf> read;

    @Nullable
    private SettablePromise<Void> write;
    private SelectionKey key;
    private byte ops;
    private int readTimeout = 0;
    private int writeTimeout = 0;
    private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;

    @Nullable
    private ScheduledRunnable scheduledReadTimeout;

    @Nullable
    private ScheduledRunnable scheduledWriteTimeout;

    @Nullable
    private Inspector inspector;

    @Nullable
    private Object userData;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:io/activej/net/socket/tcp/AsyncTcpSocketNio$Inspector.class */
    public interface Inspector extends BaseInspector<Inspector> {
        void onConnect(AsyncTcpSocketNio asyncTcpSocketNio);

        void onReadTimeout(AsyncTcpSocketNio asyncTcpSocketNio);

        void onRead(AsyncTcpSocketNio asyncTcpSocketNio, ByteBuf byteBuf);

        void onReadEndOfStream(AsyncTcpSocketNio asyncTcpSocketNio);

        void onReadError(AsyncTcpSocketNio asyncTcpSocketNio, IOException iOException);

        void onWriteTimeout(AsyncTcpSocketNio asyncTcpSocketNio);

        void onWrite(AsyncTcpSocketNio asyncTcpSocketNio, ByteBuf byteBuf, int i);

        void onWriteError(AsyncTcpSocketNio asyncTcpSocketNio, IOException iOException);

        void onDisconnect(AsyncTcpSocketNio asyncTcpSocketNio);
    }

    /* loaded from: input_file:io/activej/net/socket/tcp/AsyncTcpSocketNio$JmxInspector.class */
    public static class JmxInspector extends AbstractInspector<Inspector> implements Inspector {
        public static final Duration SMOOTHING_WINDOW = Duration.ofMinutes(1);
        private final EventStats connects = EventStats.create(SMOOTHING_WINDOW);
        private final ValueStats reads = ValueStats.create(SMOOTHING_WINDOW).withUnit("bytes").withRate();
        private final EventStats readEndOfStreams = EventStats.create(SMOOTHING_WINDOW);
        private final ExceptionStats readErrors = ExceptionStats.create();
        private final EventStats readTimeouts = EventStats.create(SMOOTHING_WINDOW);
        private final ValueStats writes = ValueStats.create(SMOOTHING_WINDOW).withUnit("bytes").withRate();
        private final ExceptionStats writeErrors = ExceptionStats.create();
        private final EventStats writeTimeouts = EventStats.create(SMOOTHING_WINDOW);
        private final EventStats writeOverloaded = EventStats.create(SMOOTHING_WINDOW);
        private final EventStats disconnects = EventStats.create(SMOOTHING_WINDOW);

        @Override // io.activej.net.socket.tcp.AsyncTcpSocketNio.Inspector
        public void onConnect(AsyncTcpSocketNio asyncTcpSocketNio) {
            this.connects.recordEvent();
        }

        @Override // io.activej.net.socket.tcp.AsyncTcpSocketNio.Inspector
        public void onReadTimeout(AsyncTcpSocketNio asyncTcpSocketNio) {
            this.readTimeouts.recordEvent();
        }

        @Override // io.activej.net.socket.tcp.AsyncTcpSocketNio.Inspector
        public void onRead(AsyncTcpSocketNio asyncTcpSocketNio, ByteBuf byteBuf) {
            this.reads.recordValue(byteBuf.readRemaining());
        }

        @Override // io.activej.net.socket.tcp.AsyncTcpSocketNio.Inspector
        public void onReadEndOfStream(AsyncTcpSocketNio asyncTcpSocketNio) {
            this.readEndOfStreams.recordEvent();
        }

        @Override // io.activej.net.socket.tcp.AsyncTcpSocketNio.Inspector
        public void onReadError(AsyncTcpSocketNio asyncTcpSocketNio, IOException iOException) {
            this.readErrors.recordException(iOException, asyncTcpSocketNio.getRemoteAddress());
        }

        @Override // io.activej.net.socket.tcp.AsyncTcpSocketNio.Inspector
        public void onWriteTimeout(AsyncTcpSocketNio asyncTcpSocketNio) {
            this.writeTimeouts.recordEvent();
        }

        @Override // io.activej.net.socket.tcp.AsyncTcpSocketNio.Inspector
        public void onWrite(AsyncTcpSocketNio asyncTcpSocketNio, ByteBuf byteBuf, int i) {
            this.writes.recordValue(i);
            if (byteBuf.readRemaining() != i) {
                this.writeOverloaded.recordEvent();
            }
        }

        @Override // io.activej.net.socket.tcp.AsyncTcpSocketNio.Inspector
        public void onWriteError(AsyncTcpSocketNio asyncTcpSocketNio, IOException iOException) {
            this.writeErrors.recordException(iOException, asyncTcpSocketNio.getRemoteAddress());
        }

        @Override // io.activej.net.socket.tcp.AsyncTcpSocketNio.Inspector
        public void onDisconnect(AsyncTcpSocketNio asyncTcpSocketNio) {
            this.disconnects.recordEvent();
        }

        @JmxAttribute
        public EventStats getReadTimeouts() {
            return this.readTimeouts;
        }

        @JmxAttribute
        public ValueStats getReads() {
            return this.reads;
        }

        @JmxAttribute
        public EventStats getReadEndOfStreams() {
            return this.readEndOfStreams;
        }

        @JmxAttribute
        public ExceptionStats getReadErrors() {
            return this.readErrors;
        }

        @JmxAttribute
        public EventStats getWriteTimeouts() {
            return this.writeTimeouts;
        }

        @JmxAttribute
        public ValueStats getWrites() {
            return this.writes;
        }

        @JmxAttribute
        public ExceptionStats getWriteErrors() {
            return this.writeErrors;
        }

        @JmxAttribute
        public EventStats getWriteOverloaded() {
            return this.writeOverloaded;
        }

        @JmxAttribute
        public EventStats getConnects() {
            return this.connects;
        }

        @JmxAttribute
        public EventStats getDisconnects() {
            return this.disconnects;
        }

        @JmxAttribute
        public long getActiveSockets() {
            return this.connects.getTotalCount() - this.disconnects.getTotalCount();
        }
    }

    public static AsyncTcpSocketNio wrapChannel(Eventloop eventloop, SocketChannel socketChannel, @NotNull InetSocketAddress inetSocketAddress, @Nullable SocketSettings socketSettings) throws IOException {
        AsyncTcpSocketNio asyncTcpSocketNio = new AsyncTcpSocketNio(eventloop, socketChannel, inetSocketAddress);
        if (socketSettings == null) {
            return asyncTcpSocketNio;
        }
        socketSettings.applySettings(socketChannel);
        if (socketSettings.hasImplReadTimeout()) {
            asyncTcpSocketNio.readTimeout = (int) socketSettings.getImplReadTimeoutMillis();
        }
        if (socketSettings.hasImplWriteTimeout()) {
            asyncTcpSocketNio.writeTimeout = (int) socketSettings.getImplWriteTimeoutMillis();
        }
        if (socketSettings.hasReadBufferSize()) {
            asyncTcpSocketNio.readBufferSize = socketSettings.getImplReadBufferSizeBytes();
        }
        return asyncTcpSocketNio;
    }

    public static AsyncTcpSocketNio wrapChannel(Eventloop eventloop, SocketChannel socketChannel, @Nullable SocketSettings socketSettings) throws IOException {
        return wrapChannel(eventloop, socketChannel, (InetSocketAddress) socketChannel.getRemoteAddress(), socketSettings);
    }

    public static Promise<AsyncTcpSocketNio> connect(InetSocketAddress inetSocketAddress) {
        return connect(inetSocketAddress, (Duration) null, (SocketSettings) null);
    }

    public static Promise<AsyncTcpSocketNio> connect(InetSocketAddress inetSocketAddress, @Nullable Duration duration, @Nullable SocketSettings socketSettings) {
        return connect(inetSocketAddress, duration == null ? 0L : duration.toMillis(), socketSettings);
    }

    public static Promise<AsyncTcpSocketNio> connect(InetSocketAddress inetSocketAddress, long j, @Nullable SocketSettings socketSettings) {
        Eventloop currentEventloop = Eventloop.getCurrentEventloop();
        return Promise.ofCallback(settablePromise -> {
            currentEventloop.connect(inetSocketAddress, j, settablePromise);
        }).then(socketChannel -> {
            try {
                return Promise.of(wrapChannel(currentEventloop, socketChannel, inetSocketAddress, socketSettings));
            } catch (IOException e) {
                currentEventloop.closeChannel(socketChannel, (SelectionKey) null);
                return Promise.ofException(e);
            }
        });
    }

    public void setInspector(@Nullable Inspector inspector) {
        this.inspector = inspector;
    }

    private AsyncTcpSocketNio(Eventloop eventloop, @NotNull SocketChannel socketChannel, InetSocketAddress inetSocketAddress) {
        this.eventloop = eventloop;
        this.channel = socketChannel;
        this.remoteAddress = inetSocketAddress;
    }

    public static int getConnectionCount() {
        return CONNECTION_COUNT.get();
    }

    public InetSocketAddress getRemoteAddress() {
        return this.remoteAddress;
    }

    @Nullable
    public Object getUserData() {
        return this.userData;
    }

    public void setUserData(@Nullable Object obj) {
        this.userData = obj;
    }

    private void scheduleReadTimeout() {
        if (!$assertionsDisabled && (this.scheduledReadTimeout != null || this.readTimeout == 0)) {
            throw new AssertionError();
        }
        this.scheduledReadTimeout = this.eventloop.delayBackground(this.readTimeout, RunnableWithContext.wrapContext(this, () -> {
            if (this.inspector != null) {
                this.inspector.onReadTimeout(this);
            }
            this.scheduledReadTimeout = null;
            closeEx(TIMEOUT_EXCEPTION);
        }));
    }

    private void scheduleWriteTimeout() {
        if (!$assertionsDisabled && (this.scheduledWriteTimeout != null || this.writeTimeout == 0)) {
            throw new AssertionError();
        }
        this.scheduledWriteTimeout = this.eventloop.delayBackground(this.writeTimeout, RunnableWithContext.wrapContext(this, () -> {
            if (this.inspector != null) {
                this.inspector.onWriteTimeout(this);
            }
            this.scheduledWriteTimeout = null;
            closeEx(TIMEOUT_EXCEPTION);
        }));
    }

    private void updateInterests() {
        if (!$assertionsDisabled && (isClosed() || this.ops < 0)) {
            throw new AssertionError();
        }
        byte b = (byte) (((this.readBuf != null || this.readEndOfStream) ? 0 : 1) | ((this.writeBuf == null || this.writeEndOfStream) ? 0 : 4));
        if (this.key != null) {
            if (this.ops != b) {
                this.ops = b;
                this.key.interestOps(this.ops);
                return;
            }
            return;
        }
        this.ops = b;
        try {
            this.key = this.channel.register(this.eventloop.ensureSelector(), this.ops, this);
            CONNECTION_COUNT.incrementAndGet();
        } catch (ClosedChannelException e) {
            closeEx(e);
        }
    }

    @Override // io.activej.net.socket.tcp.AsyncTcpSocket
    @NotNull
    public Promise<ByteBuf> read() {
        if (CHECK) {
            Checks.checkState(this.eventloop.inEventloopThread());
        }
        if (isClosed()) {
            return Promise.ofException(CLOSE_EXCEPTION);
        }
        this.read = null;
        if (this.readBuf != null || this.readEndOfStream) {
            ByteBuf byteBuf = this.readBuf;
            this.readBuf = null;
            return Promise.of(byteBuf);
        }
        SettablePromise<ByteBuf> settablePromise = new SettablePromise<>();
        this.read = settablePromise;
        if (this.scheduledReadTimeout == null && this.readTimeout != 0) {
            scheduleReadTimeout();
        }
        if (this.ops >= 0) {
            updateInterests();
        }
        return settablePromise;
    }

    public void onReadReady() {
        this.ops = (byte) (this.ops | 128);
        try {
            doRead();
            if (this.read != null && (this.readBuf != null || this.readEndOfStream)) {
                SettablePromise<ByteBuf> settablePromise = this.read;
                ByteBuf byteBuf = this.readBuf;
                this.read = null;
                this.readBuf = null;
                settablePromise.set(byteBuf);
            }
            if (isClosed()) {
                return;
            }
            this.ops = (byte) (this.ops & Byte.MAX_VALUE);
            updateInterests();
        } catch (IOException e) {
            closeEx(e);
        }
    }

    private void doRead() throws IOException {
        if (!$assertionsDisabled && this.channel == null) {
            throw new AssertionError();
        }
        ByteBuf allocate = ByteBufPool.allocate(this.readBufferSize);
        ByteBuffer writeByteBuffer = allocate.toWriteByteBuffer();
        try {
            int read = this.channel.read(writeByteBuffer);
            allocate.ofWriteByteBuffer(writeByteBuffer);
            if (read == 0) {
                if (this.inspector != null) {
                    this.inspector.onRead(this, allocate);
                }
                allocate.recycle();
                return;
            }
            this.scheduledReadTimeout = (ScheduledRunnable) Utils.nullify(this.scheduledReadTimeout, (v0) -> {
                v0.cancel();
            });
            if (read == -1) {
                allocate.recycle();
                if (this.inspector != null) {
                    this.inspector.onReadEndOfStream(this);
                }
                this.readEndOfStream = true;
                if (this.writeEndOfStream && this.writeBuf == null) {
                    doClose();
                    return;
                }
                return;
            }
            if (this.inspector != null) {
                this.inspector.onRead(this, allocate);
            }
            if (this.readBuf == null) {
                this.readBuf = allocate;
                return;
            }
            this.readBuf = ByteBufPool.ensureWriteRemaining(this.readBuf, allocate.readRemaining());
            this.readBuf.put(allocate.array(), allocate.head(), allocate.readRemaining());
            allocate.recycle();
        } catch (IOException e) {
            allocate.recycle();
            if (this.inspector != null) {
                this.inspector.onReadError(this, e);
            }
            throw e;
        }
    }

    @Override // io.activej.net.socket.tcp.AsyncTcpSocket
    @NotNull
    public Promise<Void> write(@Nullable ByteBuf byteBuf) {
        if (CHECK) {
            Checks.checkState(this.eventloop.inEventloopThread());
            Checks.checkState(!this.writeEndOfStream, "End of stream has already been sent");
        }
        if (isClosed()) {
            if (byteBuf != null) {
                byteBuf.recycle();
            }
            return Promise.ofException(CLOSE_EXCEPTION);
        }
        this.writeEndOfStream |= byteBuf == null;
        if (this.writeBuf == null) {
            if (byteBuf != null && !byteBuf.canRead()) {
                byteBuf.recycle();
                return Promise.complete();
            }
            this.writeBuf = byteBuf;
        } else if (byteBuf != null) {
            this.writeBuf = ByteBufPool.ensureWriteRemaining(this.writeBuf, byteBuf.readRemaining());
            this.writeBuf.put(byteBuf.array(), byteBuf.head(), byteBuf.readRemaining());
            byteBuf.recycle();
        }
        if (this.write != null) {
            return this.write;
        }
        try {
            doWrite();
            if (this.writeBuf == null) {
                return Promise.complete();
            }
            SettablePromise<Void> settablePromise = new SettablePromise<>();
            this.write = settablePromise;
            if (this.scheduledWriteTimeout == null && this.writeTimeout != 0) {
                scheduleWriteTimeout();
            }
            if (this.ops >= 0) {
                updateInterests();
            }
            return settablePromise;
        } catch (IOException e) {
            closeEx(e);
            return Promise.ofException(e);
        }
    }

    public void onWriteReady() {
        if (!$assertionsDisabled && this.write == null) {
            throw new AssertionError();
        }
        this.ops = (byte) (this.ops | 128);
        try {
            doWrite();
            if (this.writeBuf == null) {
                SettablePromise<Void> settablePromise = this.write;
                this.write = null;
                settablePromise.set((Object) null);
            }
            if (isClosed()) {
                return;
            }
            this.ops = (byte) (this.ops & Byte.MAX_VALUE);
            updateInterests();
        } catch (IOException e) {
            closeEx(e);
        }
    }

    private void doWrite() throws IOException {
        if (!$assertionsDisabled && this.channel == null) {
            throw new AssertionError();
        }
        if (this.writeBuf != null) {
            ByteBuf byteBuf = this.writeBuf;
            ByteBuffer readByteBuffer = byteBuf.toReadByteBuffer();
            try {
                this.channel.write(readByteBuffer);
                if (this.inspector != null) {
                    this.inspector.onWrite(this, byteBuf, readByteBuffer.position() - byteBuf.head());
                }
                byteBuf.ofReadByteBuffer(readByteBuffer);
                if (byteBuf.canRead()) {
                    return;
                }
                byteBuf.recycle();
                this.writeBuf = null;
            } catch (IOException e) {
                if (this.inspector != null) {
                    this.inspector.onWriteError(this, e);
                }
                throw e;
            }
        }
        this.scheduledWriteTimeout = (ScheduledRunnable) Utils.nullify(this.scheduledWriteTimeout, (v0) -> {
            v0.cancel();
        });
        if (this.writeEndOfStream) {
            if (this.readEndOfStream) {
                doClose();
            } else {
                this.channel.shutdownOutput();
            }
        }
    }

    public void closeEx(@NotNull Throwable th) {
        if (CHECK) {
            Checks.checkState(this.eventloop.inEventloopThread());
        }
        if (isClosed()) {
            return;
        }
        doClose();
        this.readBuf = (ByteBuf) Utils.nullify(this.readBuf, (v0) -> {
            v0.recycle();
        });
        this.writeBuf = (ByteBuf) Utils.nullify(this.writeBuf, (v0) -> {
            v0.recycle();
        });
        this.scheduledReadTimeout = (ScheduledRunnable) Utils.nullify(this.scheduledReadTimeout, (v0) -> {
            v0.cancel();
        });
        this.scheduledWriteTimeout = (ScheduledRunnable) Utils.nullify(this.scheduledWriteTimeout, (v0) -> {
            v0.cancel();
        });
        this.read = (SettablePromise) Utils.nullify(this.read, (v0, v1) -> {
            v0.setException(v1);
        }, th);
        this.write = (SettablePromise) Utils.nullify(this.write, (v0, v1) -> {
            v0.setException(v1);
        }, th);
    }

    private void doClose() {
        this.eventloop.closeChannel(this.channel, this.key);
        this.channel = null;
        CONNECTION_COUNT.decrementAndGet();
        if (this.inspector != null) {
            this.inspector.onDisconnect(this);
        }
    }

    @Override // io.activej.net.socket.tcp.AsyncTcpSocket
    public boolean isClosed() {
        return this.channel == null;
    }

    @Nullable
    public SocketChannel getSocketChannel() {
        return this.channel;
    }

    public String toString() {
        return "AsyncTcpSocketImpl{channel=" + (this.channel != null ? this.channel : "") + ", readBuf=" + this.readBuf + ", writeBuf=" + this.writeBuf + ", readEndOfStream=" + this.readEndOfStream + ", writeEndOfStream=" + this.writeEndOfStream + ", read=" + this.read + ", write=" + this.write + ", ops=" + ((int) this.ops) + "}";
    }

    static {
        $assertionsDisabled = !AsyncTcpSocketNio.class.desiredAssertionStatus();
        CHECK = Checks.isEnabled(AsyncTcpSocketNio.class);
        DEFAULT_READ_BUFFER_SIZE = ApplicationSettings.getMemSize(AsyncTcpSocketNio.class, "readBufferSize", MemSize.kilobytes(16L)).toInt();
        TIMEOUT_EXCEPTION = new AsyncTimeoutException(AsyncTcpSocketNio.class, "timed out");
        CONNECTION_COUNT = new AtomicInteger(0);
    }
}
