package org.joyqueue.client.internal.consumer.support;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.joyqueue.client.internal.consumer.MessagePoller;
import org.joyqueue.client.internal.consumer.config.ConsumerConfig;
import org.joyqueue.toolkit.concurrent.NamedThreadFactory;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/client/internal/consumer/support/TopicMessageConsumerScheduler.class */
public class TopicMessageConsumerScheduler extends Service implements Runnable {
    protected static final Logger logger = LoggerFactory.getLogger(TopicMessageConsumerScheduler.class);
    private String topic;
    private ConsumerConfig config;
    private MessagePoller messagePoller;
    private TopicMessageConsumerDispatcher messageConsumerDispatcher;
    private ExecutorService scheduleThreadPool;
    private volatile boolean suspend = false;
    private volatile boolean stopped = false;

    public TopicMessageConsumerScheduler(String str, ConsumerConfig consumerConfig, MessagePoller messagePoller, TopicMessageConsumerDispatcher topicMessageConsumerDispatcher) {
        this.topic = str;
        this.config = consumerConfig;
        this.messagePoller = messagePoller;
        this.messageConsumerDispatcher = topicMessageConsumerDispatcher;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.joyqueue.toolkit.service.Service, org.joyqueue.toolkit.service.Activity
    public void validate() throws Exception {
        this.scheduleThreadPool = Executors.newFixedThreadPool(this.config.getThread(), new NamedThreadFactory(String.format("joyqueue-consumer-scheduler-%s", this.topic), true));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.joyqueue.toolkit.service.Activity
    public void doStart() throws Exception {
        for (int i = 0; i < this.config.getThread(); i++) {
            this.scheduleThreadPool.execute(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.joyqueue.toolkit.service.Activity
    public void doStop() {
        this.stopped = true;
        if (this.scheduleThreadPool != null) {
            this.scheduleThreadPool.shutdown();
        }
    }

    public void suspend() {
        this.suspend = true;
    }

    public boolean isSuspend() {
        return this.suspend;
    }

    public void resume() {
        this.suspend = false;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.stopped) {
            try {
                if (this.suspend) {
                    Thread.currentThread();
                    Thread.sleep(this.config.getIdleInterval());
                } else {
                    doSchedule();
                }
            } catch (Exception e) {
                if (!this.stopped) {
                    logger.error("dispatch consumer exception, topic: {}", this.topic, e);
                    try {
                        Thread.currentThread();
                        Thread.sleep(this.config.getIdleInterval());
                    } catch (InterruptedException e2) {
                        logger.debug("dispatch consumer exception, topic: {}", this.topic, e2);
                    }
                }
            }
        }
    }

    protected void doSchedule() throws Exception {
        if (!this.messageConsumerDispatcher.dispatch()) {
            Thread.currentThread();
            Thread.sleep(this.config.getIdleInterval());
        } else if (this.config.getInterval() > 0) {
            Thread.currentThread();
            Thread.sleep(this.config.getInterval());
        }
    }
}
