package org.openremote.agent.protocol.io;

import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import dev.failsafe.RetryPolicyBuilder;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.EncoderException;
import jakarta.validation.constraints.NotNull;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.openremote.container.Container;
import org.openremote.model.asset.agent.ConnectionStatus;
import org.openremote.model.syslog.SyslogCategory;

/* loaded from: input_file:org/openremote/agent/protocol/io/AbstractNettyIOClient.class */
public abstract class AbstractNettyIOClient<T, U extends SocketAddress> implements NettyIOClient<T> {
    public static long RECONNECT_DELAY_INITIAL_MILLIS = 1000;
    public static long RECONNECT_DELAY_MAX_MILLIS = 300000;
    private static final Logger LOG = SyslogCategory.getLogger(SyslogCategory.PROTOCOL, AbstractNettyIOClient.class);
    protected Channel channel;
    protected Bootstrap bootstrap;
    protected EventLoopGroup workerGroup;
    protected CompletableFuture<Void> connectRetry;
    protected Supplier<ChannelHandler[]> encoderDecoderProvider;
    protected final List<Consumer<T>> messageConsumers = new CopyOnWriteArrayList();
    protected final List<Consumer<ConnectionStatus>> connectionStatusConsumers = new CopyOnWriteArrayList();
    protected ConnectionStatus connectionStatus = ConnectionStatus.DISCONNECTED;
    protected int connectTimeout = 5000;
    protected ExecutorService executorService = Container.EXECUTOR;
    protected ScheduledExecutorService scheduledExecutorService = Container.SCHEDULED_EXECUTOR;

    /* loaded from: input_file:org/openremote/agent/protocol/io/AbstractNettyIOClient$ByteToMessageDecoder.class */
    public static class ByteToMessageDecoder<T> extends io.netty.handler.codec.ByteToMessageDecoder {
        protected List<T> messages = new ArrayList(1);
        protected AbstractNettyIOClient<T, ?> client;
        protected BiConsumer<ByteBuf, List<T>> decoder;

        public ByteToMessageDecoder(AbstractNettyIOClient<T, ?> abstractNettyIOClient, @NotNull BiConsumer<ByteBuf, List<T>> biConsumer) {
            this.client = abstractNettyIOClient;
            this.decoder = biConsumer;
        }

        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
            this.decoder.accept(byteBuf, this.messages);
            if (this.messages.isEmpty()) {
                return;
            }
            this.messages.forEach(obj -> {
                this.client.onMessageReceived(obj);
            });
            this.messages.clear();
        }
    }

    /* loaded from: input_file:org/openremote/agent/protocol/io/AbstractNettyIOClient$MessageToByteEncoder.class */
    public static class MessageToByteEncoder<T> extends io.netty.handler.codec.MessageToByteEncoder<T> {
        protected AbstractNettyIOClient<T, ?> client;
        protected BiConsumer<T, ByteBuf> encoder;

        public MessageToByteEncoder(Class<? extends T> cls, AbstractNettyIOClient<T, ?> abstractNettyIOClient, BiConsumer<T, ByteBuf> biConsumer) {
            super(cls);
            this.client = abstractNettyIOClient;
            this.encoder = biConsumer;
        }

        protected void encode(ChannelHandlerContext channelHandlerContext, T t, ByteBuf byteBuf) {
            this.encoder.accept(t, byteBuf);
        }
    }

    /* loaded from: input_file:org/openremote/agent/protocol/io/AbstractNettyIOClient$MessageToMessageDecoder.class */
    public static class MessageToMessageDecoder<T> extends SimpleChannelInboundHandler<T> {
        protected AbstractNettyIOClient<T, ?> client;

        public MessageToMessageDecoder(Class<? extends T> cls, AbstractNettyIOClient<T, ?> abstractNettyIOClient) {
            super(cls);
            this.client = abstractNettyIOClient;
        }

        public void channelRead0(ChannelHandlerContext channelHandlerContext, T t) {
            this.client.onMessageReceived(t);
        }
    }

    @Override // org.openremote.agent.protocol.io.NettyIOClient
    public void setEncoderDecoderProvider(Supplier<ChannelHandler[]> supplier) throws UnsupportedOperationException {
        this.encoderDecoderProvider = supplier;
    }

    public int getConnectTimeoutMillis() {
        return this.connectTimeout;
    }

    public void setConnectTimeoutMillis(int i) {
        this.connectTimeout = i;
    }

    protected abstract Class<? extends Channel> getChannelClass();

    protected abstract EventLoopGroup getWorkerGroup();

    protected abstract Future<Void> startChannel();

    /* JADX INFO: Access modifiers changed from: protected */
    public void configureChannel() {
        this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(getConnectTimeoutMillis()));
    }

    @Override // org.openremote.agent.protocol.io.IOClient
    public void connect() {
        synchronized (this) {
            if (this.connectionStatus != ConnectionStatus.DISCONNECTED) {
                LOG.finest("Must be disconnected before calling connect: " + getClientUri());
                return;
            }
            LOG.fine("Connecting IO Client: " + getClientUri());
            onConnectionStatusChanged(ConnectionStatus.CONNECTING);
            this.workerGroup = getWorkerGroup();
            this.bootstrap = new Bootstrap();
            this.bootstrap.channel(getChannelClass());
            configureChannel();
            this.bootstrap.group(this.workerGroup);
            this.bootstrap.handler(new ChannelInitializer<Channel>() { // from class: org.openremote.agent.protocol.io.AbstractNettyIOClient.1
                public void initChannel(Channel channel) throws Exception {
                    AbstractNettyIOClient.this.initChannel(channel);
                }
            });
            scheduleDoConnect(100L);
        }
    }

    protected void scheduleDoConnect(long j) {
        long max = Math.max(j, RECONNECT_DELAY_INITIAL_MILLIS);
        this.connectRetry = Failsafe.with(((RetryPolicyBuilder) RetryPolicy.builder().withJitter(Duration.ofMillis(max)).withBackoff(Duration.ofMillis(max), Duration.ofMillis(Math.max(max + 1, RECONNECT_DELAY_MAX_MILLIS))).handle(Exception.class)).onRetryScheduled(executionScheduledEvent -> {
            LOG.info("Re-connection scheduled in '" + executionScheduledEvent.getDelay() + "' for: " + getClientUri());
        }).onFailedAttempt(executionAttemptedEvent -> {
            LOG.info("Connection attempt failed '" + executionAttemptedEvent.getAttemptCount() + "' for: " + getClientUri() + ", error=" + (executionAttemptedEvent.getLastException() != null ? executionAttemptedEvent.getLastException().getMessage() : null));
            doDisconnect();
        }).withMaxRetries(Integer.MAX_VALUE).build(), new RetryPolicy[0]).with(this.executorService).runAsyncExecution(asyncExecution -> {
            LOG.fine("Connection attempt '" + (asyncExecution.getAttemptCount() + 1) + "' for: " + getClientUri());
            waitForConnectFuture(doConnect());
            asyncExecution.recordResult((Object) null);
        }).whenComplete((BiConsumer) (r4, th) -> {
            if (th != null) {
                disconnect();
                return;
            }
            synchronized (this) {
                if (this.connectionStatus == ConnectionStatus.CONNECTING) {
                    LOG.fine("Connection attempt success: " + getClientUri());
                    onConnectionStatusChanged(ConnectionStatus.CONNECTED);
                }
            }
        });
    }

    protected Void waitForConnectFuture(Future<Void> future) throws Exception {
        return future.get(getConnectTimeoutMillis() + 1000, TimeUnit.MILLISECONDS);
    }

    protected Future<Void> doConnect() {
        LOG.info("Establishing connection: " + getClientUri());
        return startChannel();
    }

    @Override // org.openremote.agent.protocol.io.IOClient
    public void disconnect() {
        synchronized (this) {
            if (this.connectionStatus == ConnectionStatus.DISCONNECTED || this.connectionStatus == ConnectionStatus.DISCONNECTING) {
                LOG.finest("Already disconnected or disconnecting: " + getClientUri());
                return;
            }
            LOG.fine("Disconnecting IO client: " + getClientUri());
            onConnectionStatusChanged(ConnectionStatus.DISCONNECTING);
            try {
                if (this.connectRetry != null) {
                    this.connectRetry.cancel(true);
                    this.connectRetry = null;
                }
            } catch (Exception e) {
            }
            doDisconnect();
            try {
                if (this.workerGroup != null) {
                    this.workerGroup.shutdownGracefully();
                    this.workerGroup = null;
                }
            } catch (Exception e2) {
            }
            this.bootstrap = null;
            onConnectionStatusChanged(ConnectionStatus.DISCONNECTED);
        }
    }

    protected void doReconnect() {
        doDisconnect();
        scheduleDoConnect(5000L);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doDisconnect() {
        LOG.finest("Performing disconnect: " + getClientUri());
        try {
            if (this.channel != null) {
                try {
                    this.channel.disconnect().await();
                } catch (Exception e) {
                }
                try {
                    this.channel.close().await();
                } catch (Exception e2) {
                }
                this.channel = null;
            }
        } catch (Exception e3) {
            LOG.log(Level.WARNING, "Failed to disconnect gracefully: " + getClientUri(), (Throwable) e3);
        }
        LOG.finest("Disconnect done: " + getClientUri());
    }

    @Override // org.openremote.agent.protocol.io.IOClient
    public void sendMessage(T t) {
        if (this.channel == null) {
            return;
        }
        try {
            this.channel.writeAndFlush(t);
            LOG.finest("Message sent to server: " + getClientUri());
        } catch (Exception e) {
            LOG.log(Level.INFO, "Message send failed: " + getClientUri(), (Throwable) e);
        }
    }

    @Override // org.openremote.agent.protocol.io.IOClient
    public ConnectionStatus getConnectionStatus() {
        return this.connectionStatus;
    }

    @Override // org.openremote.agent.protocol.io.IOClient
    public void addConnectionStatusConsumer(Consumer<ConnectionStatus> consumer) {
        if (this.connectionStatusConsumers.contains(consumer)) {
            return;
        }
        this.connectionStatusConsumers.add(consumer);
    }

    @Override // org.openremote.agent.protocol.io.IOClient
    public void removeConnectionStatusConsumer(Consumer<ConnectionStatus> consumer) {
        this.connectionStatusConsumers.remove(consumer);
    }

    @Override // org.openremote.agent.protocol.io.IOClient
    public void removeAllConnectionStatusConsumers() {
        this.connectionStatusConsumers.clear();
    }

    @Override // org.openremote.agent.protocol.io.IOClient
    public void addMessageConsumer(Consumer<T> consumer) {
        if (this.messageConsumers.contains(consumer)) {
            return;
        }
        this.messageConsumers.add(consumer);
    }

    @Override // org.openremote.agent.protocol.io.IOClient
    public void removeMessageConsumer(Consumer<T> consumer) {
        this.messageConsumers.remove(consumer);
    }

    @Override // org.openremote.agent.protocol.io.IOClient
    public void removeAllMessageConsumers() {
        this.messageConsumers.clear();
    }

    protected void initChannel(Channel channel) throws Exception {
        this.channel = channel;
        addEncodersDecoders(channel);
        channel.closeFuture().addListener(future -> {
            boolean z = false;
            if (!future.isSuccess() && future.cause() != null) {
                LOG.info("Connection closed with exception on '" + getClientUri() + "': " + future.cause().getMessage());
            }
            synchronized (this) {
                if (this.connectionStatus == ConnectionStatus.CONNECTED) {
                    onConnectionStatusChanged(ConnectionStatus.CONNECTING);
                    z = true;
                }
            }
            if (z) {
                doReconnect();
            }
        });
        channel.pipeline().addLast(new ChannelHandler[]{new ChannelInboundHandlerAdapter() { // from class: org.openremote.agent.protocol.io.AbstractNettyIOClient.2
            public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
                if (th instanceof DecoderException) {
                    AbstractNettyIOClient.this.onDecodeException(channelHandlerContext, (DecoderException) th);
                } else if (!(th instanceof EncoderException)) {
                    channelHandlerContext.close();
                } else {
                    AbstractNettyIOClient.this.onEncodeException(channelHandlerContext, (EncoderException) th);
                }
            }
        }});
        channel.pipeline().addLast(new ChannelHandler[]{new ChannelOutboundHandlerAdapter() { // from class: org.openremote.agent.protocol.io.AbstractNettyIOClient.3
            public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
                channelPromise.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
                super.write(channelHandlerContext, obj, channelPromise);
            }
        }});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addEncodersDecoders(Channel channel) throws Exception {
        ChannelHandler[] channelHandlerArr;
        if (this.encoderDecoderProvider == null || (channelHandlerArr = this.encoderDecoderProvider.get()) == null) {
            return;
        }
        Arrays.stream(channelHandlerArr).forEach(channelHandler -> {
            channel.pipeline().addLast(new ChannelHandler[]{channelHandler});
        });
    }

    protected void onMessageReceived(T t) {
        LOG.finest("Message received notifying consumers: " + getClientUri());
        this.messageConsumers.forEach(consumer -> {
            try {
                consumer.accept(t);
            } catch (Exception e) {
                LOG.log(Level.WARNING, "Exception occurred in message handler '" + e.getMessage() + "':" + getClientUri());
            }
        });
    }

    protected void onDecodeException(ChannelHandlerContext channelHandlerContext, DecoderException decoderException) {
        LOG.log(Level.FINE, "Decoder exception occurred on in-bound message '" + decoderException.getMessage() + "': " + getClientUri(), (Throwable) decoderException);
    }

    protected void onEncodeException(ChannelHandlerContext channelHandlerContext, EncoderException encoderException) {
        LOG.log(Level.FINE, "Encoder exception occurred on out-bound message '" + encoderException.getMessage() + "': " + getClientUri(), (Throwable) encoderException);
    }

    protected void onConnectionStatusChanged(ConnectionStatus connectionStatus) {
        if (this.connectionStatus == connectionStatus) {
            return;
        }
        this.connectionStatus = connectionStatus;
        if (!this.connectionStatusConsumers.isEmpty()) {
            LOG.finest("Notifying connection status consumers: count=" + this.connectionStatusConsumers.size());
        }
        this.connectionStatusConsumers.forEach(consumer -> {
            try {
                consumer.accept(connectionStatus);
            } catch (Exception e) {
                LOG.log(Level.INFO, "Connection status change handler threw an exception: " + getClientUri(), (Throwable) e);
            }
        });
    }

    public String toString() {
        return getClientUri();
    }

    public static CompletableFuture<Void> toCompletableFuture(Future<Void> future) throws UnsupportedOperationException {
        if (future instanceof CompletableFuture) {
            return (CompletableFuture) future;
        }
        if (!(future instanceof ChannelFuture)) {
            throw new UnsupportedOperationException("Future must be a ChannelFuture or already a CompletableFuture");
        }
        ChannelFuture channelFuture = (ChannelFuture) future;
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        try {
            channelFuture.addListener(future2 -> {
                if (future2.isCancelled()) {
                    completableFuture.cancel(true);
                    return;
                }
                if (future2.cause() != null) {
                    completableFuture.completeExceptionally(future2.cause());
                } else if (future2.isSuccess()) {
                    completableFuture.complete(null);
                } else {
                    completableFuture.completeExceptionally(new RuntimeException("Unknown connection failure occurred"));
                }
            });
        } catch (Exception e) {
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }
}
