package io.activej.net.socket.udp;

import io.activej.async.process.AsyncCloseable;
import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufPool;
import io.activej.common.Check;
import io.activej.common.MemSize;
import io.activej.common.Preconditions;
import io.activej.common.api.Recyclable;
import io.activej.common.inspector.AbstractInspector;
import io.activej.common.inspector.BaseInspector;
import io.activej.common.tuple.Tuple2;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.NioChannelEventHandler;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.stats.EventStats;
import io.activej.jmx.stats.ValueStats;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.time.Duration;
import java.util.ArrayDeque;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/activej/net/socket/udp/AsyncUdpSocketNio.class */
public final class AsyncUdpSocketNio implements AsyncUdpSocket, NioChannelEventHandler {
    private static final boolean CHECK = Check.isEnabled(AsyncUdpSocketNio.class);
    private static final MemSize DEFAULT_UDP_BUFFER_SIZE = MemSize.kilobytes(16);
    public static final int OP_POSTPONED = 128;
    private final Eventloop eventloop;

    @Nullable
    private SelectionKey key;
    private final DatagramChannel channel;

    @Nullable
    private Inspector inspector;
    private int receiveBufferSize = DEFAULT_UDP_BUFFER_SIZE.toInt();
    private final ArrayDeque<SettablePromise<UdpPacket>> readQueue = new ArrayDeque<>();
    private final ArrayDeque<UdpPacket> readBuffer = new ArrayDeque<>();
    private final ArrayDeque<Tuple2<UdpPacket, SettablePromise<Void>>> writeQueue = new ArrayDeque<>();
    private int ops = 0;

    /* loaded from: input_file:io/activej/net/socket/udp/AsyncUdpSocketNio$ForwardingInspector.class */
    public static abstract class ForwardingInspector implements Inspector {

        @Nullable
        protected final Inspector next;

        public ForwardingInspector(@Nullable Inspector inspector) {
            this.next = inspector;
        }

        @Override // io.activej.net.socket.udp.AsyncUdpSocketNio.Inspector
        public void onReceive(UdpPacket udpPacket) {
            if (this.next != null) {
                this.next.onReceive(udpPacket);
            }
        }

        @Override // io.activej.net.socket.udp.AsyncUdpSocketNio.Inspector
        public void onReceiveError(IOException iOException) {
            if (this.next != null) {
                this.next.onReceiveError(iOException);
            }
        }

        @Override // io.activej.net.socket.udp.AsyncUdpSocketNio.Inspector
        public void onSend(UdpPacket udpPacket) {
            if (this.next != null) {
                this.next.onSend(udpPacket);
            }
        }

        @Override // io.activej.net.socket.udp.AsyncUdpSocketNio.Inspector
        public void onSendError(IOException iOException) {
            if (this.next != null) {
                this.next.onSendError(iOException);
            }
        }

        @Nullable
        /* renamed from: lookup, reason: merged with bridge method [inline-methods] */
        public <T extends Inspector> T m6lookup(Class<T> cls) {
            if (cls.isAssignableFrom(getClass())) {
                return this;
            }
            if (this.next != null) {
                return (T) this.next.lookup(cls);
            }
            return null;
        }
    }

    /* loaded from: input_file:io/activej/net/socket/udp/AsyncUdpSocketNio$Inspector.class */
    public interface Inspector extends BaseInspector<Inspector> {
        void onReceive(UdpPacket udpPacket);

        void onReceiveError(IOException iOException);

        void onSend(UdpPacket udpPacket);

        void onSendError(IOException iOException);
    }

    /* loaded from: input_file:io/activej/net/socket/udp/AsyncUdpSocketNio$JmxInspector.class */
    public static class JmxInspector extends AbstractInspector<Inspector> implements Inspector {
        private final ValueStats receives;
        private final EventStats receiveErrors;
        private final ValueStats sends;
        private final EventStats sendErrors;

        public JmxInspector(Duration duration) {
            this.receives = ValueStats.create(duration).withUnit("bytes").withRate();
            this.receiveErrors = EventStats.create(duration);
            this.sends = ValueStats.create(duration).withUnit("bytes").withRate();
            this.sendErrors = EventStats.create(duration);
        }

        @Override // io.activej.net.socket.udp.AsyncUdpSocketNio.Inspector
        public void onReceive(UdpPacket udpPacket) {
            this.receives.recordValue(udpPacket.getBuf().readRemaining());
        }

        @Override // io.activej.net.socket.udp.AsyncUdpSocketNio.Inspector
        public void onReceiveError(IOException iOException) {
            this.receiveErrors.recordEvent();
        }

        @Override // io.activej.net.socket.udp.AsyncUdpSocketNio.Inspector
        public void onSend(UdpPacket udpPacket) {
            this.sends.recordValue(udpPacket.getBuf().readRemaining());
        }

        @Override // io.activej.net.socket.udp.AsyncUdpSocketNio.Inspector
        public void onSendError(IOException iOException) {
            this.sendErrors.recordEvent();
        }

        @JmxAttribute(description = "Received packet size")
        public ValueStats getReceives() {
            return this.receives;
        }

        @JmxAttribute
        public EventStats getReceiveErrors() {
            return this.receiveErrors;
        }

        @JmxAttribute(description = "Sent packet size")
        public ValueStats getSends() {
            return this.sends;
        }

        @JmxAttribute
        public EventStats getSendErrors() {
            return this.sendErrors;
        }
    }

    private AsyncUdpSocketNio(@NotNull Eventloop eventloop, @NotNull DatagramChannel datagramChannel) throws IOException {
        this.eventloop = eventloop;
        this.channel = datagramChannel;
        this.key = datagramChannel.register(eventloop.ensureSelector(), 0, this);
    }

    public static Promise<AsyncUdpSocketNio> connect(Eventloop eventloop, DatagramChannel datagramChannel) {
        try {
            return Promise.of(new AsyncUdpSocketNio(eventloop, datagramChannel));
        } catch (IOException e) {
            return Promise.ofException(e);
        }
    }

    public AsyncUdpSocketNio withInspector(Inspector inspector) {
        this.inspector = inspector;
        return this;
    }

    public void setReceiveBufferSize(int i) {
        this.receiveBufferSize = i;
    }

    public boolean isOpen() {
        return this.key != null;
    }

    @Override // io.activej.net.socket.udp.AsyncUdpSocket
    public Promise<UdpPacket> receive() {
        if (CHECK) {
            Preconditions.checkState(this.eventloop.inEventloopThread());
        }
        if (!isOpen()) {
            return Promise.ofException(AsyncCloseable.CLOSE_EXCEPTION);
        }
        UdpPacket poll = this.readBuffer.poll();
        return poll != null ? Promise.of(poll) : Promise.ofCallback(settablePromise -> {
            this.readQueue.add(settablePromise);
            readInterest(true);
        });
    }

    public void onReadReady() {
        while (isOpen()) {
            ByteBuf allocate = ByteBufPool.allocate(this.receiveBufferSize);
            ByteBuffer writeByteBuffer = allocate.toWriteByteBuffer();
            InetSocketAddress inetSocketAddress = null;
            try {
                inetSocketAddress = (InetSocketAddress) this.channel.receive(writeByteBuffer);
            } catch (IOException e) {
                if (this.inspector != null) {
                    this.inspector.onReceiveError(e);
                }
            }
            if (inetSocketAddress == null) {
                allocate.recycle();
                return;
            }
            allocate.ofWriteByteBuffer(writeByteBuffer);
            UdpPacket of = UdpPacket.of(allocate, inetSocketAddress);
            if (this.inspector != null) {
                this.inspector.onReceive(of);
            }
            SettablePromise<UdpPacket> poll = this.readQueue.poll();
            if (poll != null) {
                poll.set(of);
                return;
            }
            this.readBuffer.add(of);
        }
    }

    @Override // io.activej.net.socket.udp.AsyncUdpSocket
    public Promise<Void> send(UdpPacket udpPacket) {
        if (CHECK) {
            Preconditions.checkState(this.eventloop.inEventloopThread());
        }
        return !isOpen() ? Promise.ofException(AsyncCloseable.CLOSE_EXCEPTION) : Promise.ofCallback(settablePromise -> {
            this.writeQueue.add(new Tuple2<>(udpPacket, settablePromise));
            onWriteReady();
        });
    }

    public void onWriteReady() {
        while (true) {
            Tuple2<UdpPacket, SettablePromise<Void>> peek = this.writeQueue.peek();
            if (peek == null) {
                break;
            }
            UdpPacket udpPacket = (UdpPacket) peek.getValue1();
            try {
                if (this.channel.send(udpPacket.getBuf().toReadByteBuffer(), udpPacket.getSocketAddress()) == 0) {
                    break;
                }
                ((SettablePromise) peek.getValue2()).set((Object) null);
                if (this.inspector != null) {
                    this.inspector.onSend(udpPacket);
                }
                this.writeQueue.poll();
                udpPacket.recycle();
            } catch (IOException e) {
                if (this.inspector != null) {
                    this.inspector.onSendError(e);
                }
            }
        }
        writeInterest(!this.writeQueue.isEmpty());
    }

    private void interests(int i) {
        if (this.ops != i) {
            this.ops = i;
            if ((this.ops & OP_POSTPONED) != 0 || this.key == null) {
                return;
            }
            this.key.interestOps(this.ops);
        }
    }

    private void readInterest(boolean z) {
        interests(z ? this.ops | 1 : this.ops & (-2));
    }

    private void writeInterest(boolean z) {
        interests(z ? this.ops | 4 : this.ops & (-5));
    }

    @Override // io.activej.net.socket.udp.AsyncUdpSocket
    public void close() {
        if (CHECK) {
            Preconditions.checkState(this.eventloop.inEventloopThread());
        }
        SelectionKey selectionKey = this.key;
        if (selectionKey == null) {
            return;
        }
        this.key = null;
        this.eventloop.closeChannel(this.channel, selectionKey);
        Recyclable.deepRecycle(this.writeQueue);
    }

    public String toString() {
        return isOpen() ? "UDP socket: " + getRemoteSocketAddress() : "closed UDP socket";
    }

    private InetSocketAddress getRemoteSocketAddress() {
        try {
            return (InetSocketAddress) this.channel.getRemoteAddress();
        } catch (ClosedChannelException e) {
            throw new AssertionError("Channel is closed");
        } catch (IOException e2) {
            throw new AssertionError(e2);
        }
    }
}
