package org.joyqueue.broker.kafka.session;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.kafka.command.ApiVersionsRequest;
import org.joyqueue.broker.kafka.command.FetchRequest;
import org.joyqueue.broker.kafka.command.FindCoordinatorRequest;
import org.joyqueue.broker.kafka.command.ProduceRequest;
import org.joyqueue.network.session.Language;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.network.transport.TransportHelper;
import org.joyqueue.network.transport.command.Command;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
/* loaded from: input_file:org/joyqueue/broker/kafka/session/KafkaConnectionHandler.class */
public class KafkaConnectionHandler extends ChannelDuplexHandler {
    protected static final Logger logger = LoggerFactory.getLogger(KafkaConnectionHandler.class);
    private KafkaConnectionManager kafkaConnectionManager;

    public KafkaConnectionHandler(KafkaConnectionManager kafkaConnectionManager) {
        this.kafkaConnectionManager = kafkaConnectionManager;
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (!(obj instanceof Command) || handleConnection(channelHandlerContext, (Command) obj)) {
            super.channelRead(channelHandlerContext, obj);
        } else {
            channelHandlerContext.channel().close();
        }
    }

    protected boolean handleConnection(ChannelHandlerContext channelHandlerContext, Command command) {
        Channel channel = channelHandlerContext.channel();
        Object payload = command.getPayload();
        Transport transport = TransportHelper.getTransport(channel);
        if (payload instanceof FetchRequest) {
            FetchRequest fetchRequest = (FetchRequest) payload;
            if (!this.kafkaConnectionManager.addConnection(transport, fetchRequest.getClientId(), String.valueOf((int) fetchRequest.getVersion()))) {
                return false;
            }
            Iterator<Map.Entry<String, List<FetchRequest.PartitionRequest>>> it = fetchRequest.getPartitionRequests().entrySet().iterator();
            while (it.hasNext()) {
                this.kafkaConnectionManager.addConsumer(transport, it.next().getKey());
            }
            return true;
        }
        if (payload instanceof ProduceRequest) {
            ProduceRequest produceRequest = (ProduceRequest) payload;
            if (!this.kafkaConnectionManager.addConnection(transport, produceRequest.getClientId(), String.valueOf((int) produceRequest.getVersion()))) {
                return false;
            }
            Iterator<Map.Entry<String, List<ProduceRequest.PartitionRequest>>> it2 = produceRequest.getPartitionRequests().entrySet().iterator();
            while (it2.hasNext()) {
                this.kafkaConnectionManager.addProducer(transport, it2.next().getKey());
            }
            return true;
        }
        if ((payload instanceof FindCoordinatorRequest) || !(payload instanceof ApiVersionsRequest)) {
            return true;
        }
        ApiVersionsRequest apiVersionsRequest = (ApiVersionsRequest) payload;
        if (StringUtils.isBlank(apiVersionsRequest.getClientSoftwareVersion())) {
            return true;
        }
        String replace = StringUtils.replace(apiVersionsRequest.getClientSoftwareName(), "apache-kafka-", "");
        String clientSoftwareVersion = apiVersionsRequest.getClientSoftwareVersion();
        if (Language.parse(replace).equals(Language.OTHER)) {
            clientSoftwareVersion = apiVersionsRequest.getClientSoftwareName() + "-" + apiVersionsRequest.getClientSoftwareVersion();
        }
        return this.kafkaConnectionManager.addConnection(transport, apiVersionsRequest.getClientId(), clientSoftwareVersion, Language.parse(replace));
    }
}
