package com.geektcp.common.mq.service.metric;

import cn.hutool.core.collection.CollectionUtil;
import com.geektcp.common.mq.model.CreateTopicUo;
import com.geektcp.common.mq.model.TopicVo;
import com.geektcp.common.mq.service.TopicService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/geektcp/common/mq/service/metric/TopicServiceImpl.class */
public class TopicServiceImpl implements TopicService {
    private static final Logger log = LoggerFactory.getLogger(TopicServiceImpl.class);

    @Autowired
    private KafkaAdminClient kafkaAdminClient;

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @Override // com.geektcp.common.mq.service.TopicService
    public List<TopicVo> list() throws ExecutionException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        Set<String> listTopicToSet = listTopicToSet();
        Map<String, TopicDescription> map = this.kafkaAdminClient.describeTopics(listTopicToSet).all().get();
        listTopicToSet.forEach(str -> {
            TopicVo topicVo = new TopicVo();
            topicVo.setName(str);
            TopicDescription topicDescription = (TopicDescription) map.get(str);
            if (topicDescription != null) {
                topicVo.setPartitions(Integer.valueOf(topicDescription.partitions().size()));
            }
            arrayList.add(topicVo);
        });
        this.kafkaAdminClient.close();
        return arrayList;
    }

    @Override // com.geektcp.common.mq.service.TopicService
    public boolean createTopic(CreateTopicUo createTopicUo) {
        String name = createTopicUo.getName();
        if (StringUtils.isBlank(name)) {
            return false;
        }
        try {
            if (listTopicToSet().contains(name)) {
                log.error("已存在topic={}", name);
                return false;
            }
            int paratitionCount = createTopicUo.getParatitionCount();
            if (paratitionCount <= 0) {
                paratitionCount = 1;
            }
            Short replicationFactor = createTopicUo.getReplicationFactor();
            if (replicationFactor == null || replicationFactor.shortValue() <= 0) {
                replicationFactor = (short) 1;
            }
            try {
                KafkaFuture<Void> all = this.kafkaAdminClient.createTopics(Arrays.asList(new NewTopic(name, paratitionCount, replicationFactor.shortValue()))).all();
                all.get();
                boolean z = !all.isCompletedExceptionally();
                this.kafkaAdminClient.close();
                return z;
            } catch (Exception e) {
                return false;
            }
        } catch (Exception e2) {
            return false;
        }
    }

    @Override // com.geektcp.common.mq.service.TopicService
    public boolean deleteTopic(String str) {
        List asList = Arrays.asList(str);
        try {
            if (!listTopicToSet().contains(str)) {
                return false;
            }
            KafkaFuture<Void> all = this.kafkaAdminClient.deleteTopics(asList).all();
            try {
                all.get();
                boolean z = !all.isCompletedExceptionally();
                if (!z) {
                    return z;
                }
                List<MessageListenerContainer> list = (List) this.registry.getAllListenerContainers();
                if (CollectionUtil.isEmpty(list)) {
                    return true;
                }
                for (MessageListenerContainer messageListenerContainer : list) {
                    if (messageListenerContainer.isRunning()) {
                        Iterator it = messageListenerContainer.getAssignedPartitions().iterator();
                        while (it.hasNext()) {
                            if (StringUtils.equals(((TopicPartition) it.next()).topic(), str)) {
                                messageListenerContainer.stop();
                                return true;
                            }
                        }
                    }
                }
                return true;
            } catch (Exception e) {
                return false;
            }
        } catch (InterruptedException e2) {
            return false;
        } catch (ExecutionException e3) {
            return false;
        }
    }

    private Set<String> listTopicToSet() throws InterruptedException, ExecutionException {
        ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
        listTopicsOptions.listInternal(false);
        return this.kafkaAdminClient.listTopics(listTopicsOptions).names().get();
    }
}
