package org.joyqueue.broker.store;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.jd.laf.extension.ExtensionManager;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.config.BrokerStoreConfig;
import org.joyqueue.broker.consumer.position.PositionManager;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.domain.TopicConfig;
import org.joyqueue.store.StoreService;
import org.joyqueue.toolkit.concurrent.NamedThreadFactory;
import org.joyqueue.toolkit.config.PropertySupplier;
import org.joyqueue.toolkit.service.Service;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/store/StoreCleanManager.class */
public class StoreCleanManager extends Service {
    private static final Logger LOG = LoggerFactory.getLogger(StoreCleanManager.class);
    private static final int SCHEDULE_EXECUTOR_THREADS = 16;
    private PropertySupplier propertySupplier;
    private BrokerStoreConfig brokerStoreConfig;
    private StoreService storeService;
    private ClusterManager clusterManager;
    private PositionManager positionManager;
    private Map<String, StoreCleaningStrategy> cleaningStrategyMap;
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(SCHEDULE_EXECUTOR_THREADS, new NamedThreadFactory("StoreCleaning-Scheduled-Executor"));
    private ScheduledFuture cleanFuture;

    public StoreCleanManager(PropertySupplier propertySupplier, StoreService storeService, ClusterManager clusterManager, PositionManager positionManager) {
        this.propertySupplier = propertySupplier;
        this.brokerStoreConfig = new BrokerStoreConfig(propertySupplier);
        this.storeService = storeService;
        this.clusterManager = clusterManager;
        this.positionManager = positionManager;
    }

    protected void validate() throws Exception {
        super.validate();
        Preconditions.checkArgument(this.propertySupplier != null, "property supplier can not be null");
        Preconditions.checkArgument(this.storeService != null, "store service can not be null");
        Preconditions.checkArgument(this.positionManager != null, "position manager can not be null");
        List<StoreCleaningStrategy> initStoreCleaningStrategyList = initStoreCleaningStrategyList();
        Preconditions.checkArgument(initStoreCleaningStrategyList.size() != 0, "load cleaning strategy list can not be null");
        this.cleaningStrategyMap = new HashMap(initStoreCleaningStrategyList.size());
        for (StoreCleaningStrategy storeCleaningStrategy : initStoreCleaningStrategyList) {
            storeCleaningStrategy.setSupplier(this.propertySupplier);
            this.cleaningStrategyMap.put(storeCleaningStrategy.getClass().getSimpleName(), storeCleaningStrategy);
        }
    }

    private List<StoreCleaningStrategy> initStoreCleaningStrategyList() {
        return Lists.newArrayList(ExtensionManager.getOrLoadExtensions(StoreCleaningStrategy.class));
    }

    public void start() throws Exception {
        super.start();
        this.cleanFuture = this.scheduledExecutorService.scheduleWithFixedDelay(this::clean, ThreadLocalRandom.current().nextLong(this.brokerStoreConfig.getStoreCleanScheduleBegin(), this.brokerStoreConfig.getStoreCleanScheduleEnd()), ThreadLocalRandom.current().nextLong(this.brokerStoreConfig.getStoreCleanScheduleBegin(), this.brokerStoreConfig.getStoreCleanScheduleEnd()), TimeUnit.MILLISECONDS);
    }

    public void stop() {
        super.stop();
        try {
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
            return;
        }
        if (this.cleanFuture != null) {
            long now = SystemClock.now();
            while (!this.cleanFuture.isDone()) {
                if (SystemClock.now() - now > 5000) {
                    throw new TimeoutException("Wait for async store clean job timeout!");
                }
                this.cleanFuture.cancel(true);
                try {
                    Thread.sleep(50L);
                } catch (InterruptedException e) {
                    LOG.warn("Exception: ", e);
                }
                LOG.error(th.getMessage(), th);
                return;
            }
        }
    }

    private void clean() {
        long j;
        if (LOG.isDebugEnabled()) {
            LOG.info("Start scheduled StoreCleaningStrategy task use class: <{}>!!!", this.brokerStoreConfig.getCleanStrategyClass());
        }
        long j2 = 0;
        long now = SystemClock.now();
        do {
            j = 0;
            List<TopicConfig> topics = this.clusterManager.getTopics();
            if (topics != null && topics.size() > 0) {
                for (TopicConfig topicConfig : topics) {
                    List<PartitionGroup> topicPartitionGroups = this.clusterManager.getTopicPartitionGroups(topicConfig.getName());
                    if (CollectionUtils.isNotEmpty(topicPartitionGroups)) {
                        for (PartitionGroup partitionGroup : topicPartitionGroups) {
                            try {
                                Set<Short> partitions = partitionGroup.getPartitions();
                                if (CollectionUtils.isNotEmpty(partitions)) {
                                    List<String> appByTopic = this.clusterManager.getAppByTopic(topicConfig.getName());
                                    HashMap hashMap = new HashMap(partitions.size());
                                    for (Short sh : partitions) {
                                        long j3 = Long.MAX_VALUE;
                                        if (CollectionUtils.isNotEmpty(appByTopic)) {
                                            Iterator<String> it = appByTopic.iterator();
                                            while (it.hasNext()) {
                                                j3 = Math.min(j3, this.positionManager.getLastMsgAckIndex(topicConfig.getName(), it.next(), sh.shortValue()));
                                            }
                                        }
                                        hashMap.put(sh, Long.valueOf(j3));
                                    }
                                    StoreCleaningStrategy storeCleaningStrategy = this.cleaningStrategyMap.get(this.brokerStoreConfig.getCleanStrategyClass());
                                    if (storeCleaningStrategy != null) {
                                        if (LOG.isDebugEnabled()) {
                                            LOG.info("Begin store clean topic: <{}>, partition group: <{}>, partition ack map: <{}>", new Object[]{topicConfig.getName().getFullName(), Integer.valueOf(partitionGroup.getGroup()), hashMap});
                                        }
                                        j = storeCleaningStrategy.deleteIfNeeded(this.storeService.getStore(topicConfig.getName().getFullName(), partitionGroup.getGroup()), hashMap);
                                    }
                                }
                            } catch (Throwable th) {
                                LOG.error("Error to clean store for topic <{}>, partition group <{}>, exception: {}", new Object[]{topicConfig, Integer.valueOf(partitionGroup.getGroup()), th});
                            }
                        }
                    }
                }
            }
            j2 += j;
        } while (j > 0);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Round total clean storage size {},elapsed time {}ms", Long.valueOf(j2), Long.valueOf(SystemClock.now() - now));
        }
    }
}
