package org.joyqueue.broker.polling;

import com.google.common.base.Preconditions;
import com.jd.laf.extension.Converts;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.consumer.Consume;
import org.joyqueue.broker.consumer.model.PullResult;
import org.joyqueue.broker.monitor.SessionManager;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.network.session.Consumer;
import org.joyqueue.network.session.Joint;
import org.joyqueue.toolkit.concurrent.NamedThreadFactory;
import org.joyqueue.toolkit.config.Property;
import org.joyqueue.toolkit.config.PropertySupplier;
import org.joyqueue.toolkit.service.Service;
import org.joyqueue.toolkit.service.ServiceThread;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/polling/LongPollingManager.class */
public class LongPollingManager extends Service {
    public static final String LONG_POLLING_QUEUE_SIZE = "broker.consume.long_polling_queue_size";
    public static final int MAX_LONG_POLLING_QUEUE_SIZE = 10000;
    protected static Logger logger = LoggerFactory.getLogger(LongPollingManager.class);
    protected Consume consumeManager;
    protected SessionManager sessionManager;
    protected ExecutorService executorService;
    protected ClusterManager clusterManager;
    protected PropertySupplier propertySupplier;
    protected Queue<LongPolling> longPollingQueue = new LinkedBlockingQueue();
    protected ConcurrentMap<Joint, AtomicInteger> counter = new ConcurrentHashMap();
    protected Thread guardThread = null;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/joyqueue/broker/polling/LongPollingManager$PullMessageTask.class */
    public class PullMessageTask implements Runnable {
        private final LongPolling longPolling;

        public PullMessageTask(LongPolling longPolling) {
            this.longPolling = longPolling;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (LongPollingManager.this.isStarted()) {
                Consumer consumer = this.longPolling.getConsumer();
                AtomicInteger atomicInteger = LongPollingManager.this.counter.get(consumer.getJoint());
                try {
                    PullResult message = LongPollingManager.this.consumeManager.getMessage(consumer, this.longPolling.getCount(), this.longPolling.getAckTimeout());
                    if (message != null && message.getBuffers().size() > 0) {
                        this.longPolling.getLongPollingCallback().onSuccess(consumer, message);
                        if (atomicInteger != null) {
                            atomicInteger.decrementAndGet();
                        }
                    } else if (LongPollingManager.this.isStarted()) {
                        LongPollingManager.this.longPollingQueue.offer(this.longPolling);
                    }
                    if (!message.getCode().equals(JoyQueueCode.SUCCESS)) {
                        LongPollingManager.logger.error("getMessage error, code: {}, consumer: {}", message.getCode(), consumer);
                    }
                } catch (Throwable th) {
                    try {
                        LongPollingManager.logger.error("long pull error.", th);
                        if (atomicInteger != null) {
                            atomicInteger.decrementAndGet();
                        }
                        this.longPolling.getLongPollingCallback().onException(consumer, th);
                    } catch (Exception e) {
                        LongPollingManager.logger.error("ack long pull error.", e);
                    }
                }
            }
        }
    }

    public LongPollingManager(SessionManager sessionManager, ClusterManager clusterManager, Consume consume, PropertySupplier propertySupplier) {
        Preconditions.checkArgument(sessionManager != null, "sessionManager can not be null");
        Preconditions.checkArgument(clusterManager != null, "clusterManager can not be null");
        Preconditions.checkArgument(consume != null, "consumeManager can not be null");
        Preconditions.checkArgument(propertySupplier != null, "propertySupplier can not be null");
        this.sessionManager = sessionManager;
        this.clusterManager = clusterManager;
        this.consumeManager = consume;
        this.propertySupplier = propertySupplier;
        this.executorService = Executors.newSingleThreadExecutor(new NamedThreadFactory("LongPolling"));
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.counter.clear();
        this.guardThread = new Thread((Runnable) new ServiceThread(this, 100L) { // from class: org.joyqueue.broker.polling.LongPollingManager.1
            public boolean onException(Throwable th) {
                LongPollingManager.logger.error(th.getMessage(), th);
                return true;
            }

            protected void execute() throws Exception {
                LongPollingManager.this.processHoldRequest();
            }
        }, "LongPolling-Thread");
        this.guardThread.start();
        logger.info("long polling manager is started");
    }

    protected void doStop() {
        super.doStop();
        if (this.guardThread != null) {
            this.guardThread.interrupt();
        }
        this.counter.clear();
        logger.info("long pull manager is stopped");
    }

    protected AtomicInteger getCount(Consumer consumer) {
        AtomicInteger atomicInteger = this.counter.get(consumer.getJoint());
        if (atomicInteger == null) {
            atomicInteger = new AtomicInteger(0);
            AtomicInteger putIfAbsent = this.counter.putIfAbsent(consumer.getJoint(), atomicInteger);
            if (putIfAbsent != null) {
                atomicInteger = putIfAbsent;
            }
        }
        return atomicInteger;
    }

    public boolean suspend(LongPolling longPolling) {
        logger.debug("longPolling info:[{}], longPollingQueueSize:[{}]", longPolling, Integer.valueOf(this.longPollingQueue.size()));
        Consumer consumer = longPolling.getConsumer();
        if (consumer == null || longPolling.getLongPollingTimeout() == 0 || !isStarted()) {
            return false;
        }
        String topic = consumer.getTopic();
        AtomicInteger count = getCount(consumer);
        if (count.get() >= this.clusterManager.getLocalPartitions(TopicName.parse(topic)).size() || this.longPollingQueue.size() >= getLongPollingQueueSize() || !this.longPollingQueue.offer(longPolling)) {
            return false;
        }
        count.incrementAndGet();
        return true;
    }

    private int getLongPollingQueueSize() {
        Property property = this.propertySupplier.getProperty(LONG_POLLING_QUEUE_SIZE);
        return property == null ? MAX_LONG_POLLING_QUEUE_SIZE : Converts.getInteger(property.getValue()).intValue();
    }

    protected void processHoldRequest() throws Exception {
        int size = this.longPollingQueue.size();
        for (int i = 0; i < size && isStarted(); i++) {
            long now = SystemClock.now();
            LongPolling poll = this.longPollingQueue.poll();
            Consumer consumer = poll.getConsumer();
            AtomicInteger atomicInteger = this.counter.get(consumer.getJoint());
            Consumer consumerById = this.sessionManager.getConsumerById(consumer.getId());
            if (consumerById == null) {
                if (atomicInteger != null) {
                    atomicInteger.decrementAndGet();
                }
                poll.getLongPollingCallback().onExpire(poll.getConsumer());
            } else if (poll.getExpire() <= now) {
                if (atomicInteger != null) {
                    atomicInteger.decrementAndGet();
                }
                poll.getLongPollingCallback().onExpire(consumerById);
            } else if (this.consumeManager.hasFreePartition(consumerById)) {
                this.executorService.execute(new PullMessageTask(poll));
            } else {
                this.longPollingQueue.offer(poll);
            }
        }
    }
}
