package io.datakernel.rpc.protocol.stream;

import com.google.common.base.Preconditions;
import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.eventloop.NioEventloop;
import io.datakernel.eventloop.SocketConnection;
import io.datakernel.jmx.CompositeDataBuilder;
import io.datakernel.jmx.MBeanFormat;
import io.datakernel.rpc.protocol.RpcMessage;
import io.datakernel.rpc.protocol.RpcMessageSerializer;
import io.datakernel.rpc.protocol.RpcProtocol;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamProducer;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.net.TcpStreamSocketConnection;
import io.datakernel.stream.processor.StreamBinaryDeserializer;
import io.datakernel.stream.processor.StreamBinarySerializer;
import io.datakernel.stream.processor.StreamDeserializer;
import io.datakernel.stream.processor.StreamLZ4Compressor;
import io.datakernel.stream.processor.StreamLZ4Decompressor;
import io.datakernel.stream.processor.StreamSerializer;
import java.nio.channels.SocketChannel;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.SimpleType;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/datakernel/rpc/protocol/stream/RpcStreamProtocol.class */
public abstract class RpcStreamProtocol implements RpcProtocol {
    private final Sender sender;
    private final Receiver receiver;
    private final StreamLZ4Compressor compressor;
    private final StreamLZ4Decompressor decompressor;
    private final StreamSerializer<RpcMessage> serializer;
    private final StreamDeserializer<RpcMessage> deserializer;
    private final boolean compression;
    private final TcpStreamSocketConnection connection;
    private boolean monitoring;
    private long timeMonitoring;

    /* loaded from: input_file:io/datakernel/rpc/protocol/stream/RpcStreamProtocol$Receiver.class */
    private class Receiver extends AbstractStreamConsumer<RpcMessage> implements StreamDataReceiver<RpcMessage> {
        public Receiver(Eventloop eventloop) {
            super(eventloop);
        }

        public StreamDataReceiver<RpcMessage> getDataReceiver() {
            return this;
        }

        public void onData(RpcMessage rpcMessage) {
            RpcStreamProtocol.this.onReceiveMessage(rpcMessage);
        }

        public void onEndOfStream() {
            RpcStreamProtocol.this.sender.sendEndOfStream();
        }

        public void onError(Exception exc) {
            RpcStreamProtocol.this.sender.closeWithError(exc);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/datakernel/rpc/protocol/stream/RpcStreamProtocol$Sender.class */
    public class Sender extends AbstractStreamProducer<RpcMessage> {
        public Sender(Eventloop eventloop) {
            super(eventloop);
        }

        public void onSuspended() {
        }

        public void onResumed() {
        }

        public void onClosed() {
            RpcStreamProtocol.this.receiver.closeUpstream();
        }

        public void onClosedWithError(Exception exc) {
            RpcStreamProtocol.this.receiver.closeUpstreamWithError(exc);
        }

        public boolean isOverloaded() {
            return this.status != 0;
        }

        public void sendMessage(RpcMessage rpcMessage) throws Exception {
            this.downstreamDataReceiver.onData(rpcMessage);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RpcStreamProtocol(NioEventloop nioEventloop, SocketChannel socketChannel, RpcMessageSerializer rpcMessageSerializer, RpcStreamProtocolSettings rpcStreamProtocolSettings) {
        this.sender = new Sender(nioEventloop);
        this.receiver = new Receiver(nioEventloop);
        this.serializer = new StreamBinarySerializer(nioEventloop, ((RpcMessageSerializer) Preconditions.checkNotNull(rpcMessageSerializer)).getSerializer(), rpcStreamProtocolSettings.getDefaultPacketSize(), rpcStreamProtocolSettings.getMaxPacketSize(), 0, true);
        this.deserializer = new StreamBinaryDeserializer(nioEventloop, ((RpcMessageSerializer) Preconditions.checkNotNull(rpcMessageSerializer)).getSerializer(), rpcStreamProtocolSettings.getMaxPacketSize());
        this.compression = rpcStreamProtocolSettings.isCompression();
        if (this.compression) {
            this.compressor = StreamLZ4Compressor.fastCompressor(nioEventloop);
            this.decompressor = new StreamLZ4Decompressor(nioEventloop);
        } else {
            this.compressor = null;
            this.decompressor = null;
        }
        this.connection = new TcpStreamSocketConnection(nioEventloop, socketChannel) { // from class: io.datakernel.rpc.protocol.stream.RpcStreamProtocol.1
            protected void wire(StreamProducer<ByteBuf> streamProducer, StreamConsumer<ByteBuf> streamConsumer) {
                if (RpcStreamProtocol.this.compression) {
                    streamProducer.streamTo(RpcStreamProtocol.this.decompressor);
                    RpcStreamProtocol.this.decompressor.streamTo(RpcStreamProtocol.this.deserializer);
                    RpcStreamProtocol.this.serializer.streamTo(RpcStreamProtocol.this.compressor);
                    RpcStreamProtocol.this.compressor.streamTo(streamConsumer);
                } else {
                    streamProducer.streamTo(RpcStreamProtocol.this.deserializer);
                    RpcStreamProtocol.this.serializer.streamTo(streamConsumer);
                }
                RpcStreamProtocol.this.deserializer.streamTo(RpcStreamProtocol.this.receiver);
                RpcStreamProtocol.this.sender.streamTo(RpcStreamProtocol.this.serializer);
                RpcStreamProtocol.this.onWired();
            }

            public void onClosed() {
                RpcStreamProtocol.this.onClosed();
            }
        };
    }

    @Override // io.datakernel.rpc.protocol.RpcProtocol
    public void sendMessage(RpcMessage rpcMessage) throws Exception {
        this.sender.sendMessage(rpcMessage);
    }

    @Override // io.datakernel.rpc.protocol.RpcProtocol
    public boolean isOverloaded() {
        return this.sender.isOverloaded();
    }

    @Override // io.datakernel.rpc.protocol.RpcProtocol
    public SocketConnection getSocketConnection() {
        return this.connection;
    }

    protected abstract void onWired();

    protected abstract void onReceiveMessage(RpcMessage rpcMessage);

    protected abstract void onClosed();

    @Override // io.datakernel.rpc.protocol.RpcProtocol
    public void close() {
        this.sender.sendEndOfStream();
        this.receiver.closeUpstream();
    }

    @Override // io.datakernel.rpc.protocol.RpcProtocol
    public void startMonitoring() {
        this.monitoring = true;
        this.timeMonitoring = System.currentTimeMillis();
    }

    @Override // io.datakernel.rpc.protocol.RpcProtocol
    public void stopMonitoring() {
        this.monitoring = false;
        this.timeMonitoring = 0L;
    }

    public boolean isMonitoring() {
        return this.monitoring;
    }

    @Override // io.datakernel.rpc.protocol.RpcProtocol
    public void reset() {
        if (isMonitoring()) {
            this.timeMonitoring = System.currentTimeMillis();
        }
    }

    private String getMonitoringTime() {
        if (this.timeMonitoring == 0) {
            return null;
        }
        return MBeanFormat.formatDuration(System.currentTimeMillis() - this.timeMonitoring);
    }

    @Override // io.datakernel.rpc.protocol.RpcProtocol
    public CompositeData getConnectionDetails() throws OpenDataException {
        return CompositeDataBuilder.builder(RpcStreamProtocol.class.getSimpleName(), "Rpc stream connection details").add("Overloaded", SimpleType.BOOLEAN, Boolean.valueOf(isOverloaded())).add("Compression", SimpleType.BOOLEAN, Boolean.valueOf(this.compression)).add("Monitoring", SimpleType.BOOLEAN, Boolean.valueOf(isMonitoring())).add("MonitoringTime", SimpleType.STRING, getMonitoringTime()).add("ChannelInfo", SimpleType.STRING, this.connection.getChannelInfo()).build();
    }
}
