package org.apache.pulsar.common.protocol;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.ScheduledFuture;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.common.api.proto.CommandPing;
import org.apache.pulsar.common.api.proto.CommandPong;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-common-2.9.0-rc-202108262205.jar:org/apache/pulsar/common/protocol/PulsarHandler.class */
public abstract class PulsarHandler extends PulsarDecoder {
    protected ChannelHandlerContext ctx;
    protected SocketAddress remoteAddress;
    private final long keepAliveIntervalSeconds;
    private ScheduledFuture<?> keepAliveTask;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PulsarHandler.class);
    private int remoteEndpointProtocolVersion = ProtocolVersion.v0.getValue();
    private boolean waitingForPingResponse = false;

    public int getRemoteEndpointProtocolVersion() {
        return this.remoteEndpointProtocolVersion;
    }

    protected void setRemoteEndpointProtocolVersion(int i) {
        this.remoteEndpointProtocolVersion = i;
    }

    public PulsarHandler(int i, TimeUnit timeUnit) {
        this.keepAliveIntervalSeconds = timeUnit.toSeconds(i);
    }

    @Override // org.apache.pulsar.common.protocol.PulsarDecoder
    protected final void messageReceived() {
        this.waitingForPingResponse = false;
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.remoteAddress = channelHandlerContext.channel().remoteAddress();
        this.ctx = channelHandlerContext;
        if (log.isDebugEnabled()) {
            log.debug("[{}] Scheduling keep-alive task every {} s", channelHandlerContext.channel(), Long.valueOf(this.keepAliveIntervalSeconds));
        }
        if (this.keepAliveIntervalSeconds > 0) {
            this.keepAliveTask = channelHandlerContext.executor().scheduleAtFixedRate(this::handleKeepAliveTimeout, this.keepAliveIntervalSeconds, this.keepAliveIntervalSeconds, TimeUnit.SECONDS);
        }
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        cancelKeepAliveTask();
    }

    @Override // org.apache.pulsar.common.protocol.PulsarDecoder
    protected final void handlePing(CommandPing commandPing) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Replying back to ping message", this.ctx.channel());
        }
        this.ctx.writeAndFlush(Commands.newPong());
    }

    @Override // org.apache.pulsar.common.protocol.PulsarDecoder
    protected final void handlePong(CommandPong commandPong) {
    }

    private void handleKeepAliveTimeout() {
        if (this.ctx.channel().isOpen()) {
            if (!isHandshakeCompleted()) {
                log.warn("[{}] Pulsar Handshake was not completed within timeout, closing connection", this.ctx.channel());
                this.ctx.close();
                return;
            }
            if (this.waitingForPingResponse && this.ctx.channel().config().isAutoRead()) {
                log.warn("[{}] Forcing connection to close after keep-alive timeout", this.ctx.channel());
                this.ctx.close();
            } else if (getRemoteEndpointProtocolVersion() < ProtocolVersion.v1.getValue()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Peer doesn't support keep-alive", this.ctx.channel());
                }
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Sending ping message", this.ctx.channel());
                }
                this.waitingForPingResponse = true;
                this.ctx.writeAndFlush(Commands.newPing());
            }
        }
    }

    protected void cancelKeepAliveTask() {
        if (this.keepAliveTask != null) {
            this.keepAliveTask.cancel(false);
            this.keepAliveTask = null;
        }
    }

    protected abstract boolean isHandshakeCompleted();
}
