package org.joyqueue.broker.retry;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.jd.laf.extension.ExtensionManager;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.broker.BrokerContext;
import org.joyqueue.broker.BrokerContextAware;
import org.joyqueue.broker.Plugins;
import org.joyqueue.broker.cluster.ClusterManager;
import org.joyqueue.broker.limit.RateLimiter;
import org.joyqueue.broker.monitor.BrokerMonitor;
import org.joyqueue.broker.network.support.BrokerTransportClientFactory;
import org.joyqueue.config.BrokerConfigKey;
import org.joyqueue.domain.Broker;
import org.joyqueue.domain.Consumer;
import org.joyqueue.domain.TopicName;
import org.joyqueue.event.EventType;
import org.joyqueue.event.MetaEvent;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.monitor.PointTracer;
import org.joyqueue.monitor.TraceStat;
import org.joyqueue.network.session.Joint;
import org.joyqueue.network.transport.TransportClient;
import org.joyqueue.network.transport.config.ClientConfig;
import org.joyqueue.network.transport.config.TransportConfigSupport;
import org.joyqueue.nsr.NameService;
import org.joyqueue.nsr.event.UpdateBrokerEvent;
import org.joyqueue.server.retry.NullMessageRetry;
import org.joyqueue.server.retry.api.MessageRetry;
import org.joyqueue.server.retry.api.RetryPolicyProvider;
import org.joyqueue.server.retry.model.RetryMessageModel;
import org.joyqueue.server.retry.remote.RemoteMessageRetry;
import org.joyqueue.server.retry.remote.RemoteRetryProvider;
import org.joyqueue.toolkit.concurrent.EventListener;
import org.joyqueue.toolkit.config.Property;
import org.joyqueue.toolkit.config.PropertySupplier;
import org.joyqueue.toolkit.retry.RetryPolicy;
import org.joyqueue.toolkit.service.Service;
import org.joyqueue.toolkit.time.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/retry/BrokerRetryManager.class */
public class BrokerRetryManager extends Service implements MessageRetry<Long>, BrokerContextAware {
    private static final Logger logger = LoggerFactory.getLogger(BrokerRetryManager.class);
    private MessageRetry delegate;
    private EventListener eventListener = new BrokerRetryEventListener();
    private volatile String retryType;
    private RetryPolicyProvider retryPolicyProvider;
    private RemoteRetryProvider remoteRetryProvider;
    private NameService nameService;
    private ClusterManager clusterManager;
    private PropertySupplier propertySupplier;
    private RetryRateLimiter rateLimiterManager;
    private BrokerMonitor brokerMonitor;
    private PointTracer tracer;

    /* loaded from: input_file:org/joyqueue/broker/retry/BrokerRetryManager$BrokerRetryEventListener.class */
    protected class BrokerRetryEventListener implements EventListener<MetaEvent> {
        protected BrokerRetryEventListener() {
        }

        public void onEvent(MetaEvent metaEvent) {
            MessageRetry loadRetryManager;
            try {
                if (metaEvent.getEventType() == EventType.UPDATE_BROKER) {
                    BrokerRetryManager.logger.info("listen update broker event.");
                    Broker newBroker = ((UpdateBrokerEvent) metaEvent).getNewBroker();
                    String retryType = newBroker != null ? newBroker.getRetryType() : null;
                    if (retryType != null && !retryType.equals(BrokerRetryManager.this.retryType) && (loadRetryManager = BrokerRetryManager.this.loadRetryManager(retryType)) != null) {
                        MessageRetry messageRetry = BrokerRetryManager.this.delegate;
                        if (messageRetry != null) {
                            messageRetry.stop();
                        }
                        BrokerRetryManager.this.delegate = loadRetryManager;
                    }
                    BrokerRetryManager.this.retryType = retryType;
                    BrokerRetryManager.logger.info("Broker Retry Mode is : {}", BrokerRetryManager.this.retryType);
                }
            } catch (Exception e) {
                BrokerRetryManager.logger.error("process broker retry event error.", e);
            }
        }
    }

    /* loaded from: input_file:org/joyqueue/broker/retry/BrokerRetryManager$RetryPolicyProviderImpl.class */
    class RetryPolicyProviderImpl implements RetryPolicyProvider {
        private final Cache<String, RetryPolicy> cache = CacheBuilder.newBuilder().maximumSize(1000000).expireAfterWrite(1, TimeUnit.MINUTES).concurrencyLevel(Runtime.getRuntime().availableProcessors()).recordStats().build();

        RetryPolicyProviderImpl() {
        }

        public RetryPolicy getPolicy(TopicName topicName, String str) {
            String key = getKey(topicName, str);
            RetryPolicy retryPolicy = (RetryPolicy) this.cache.getIfPresent(key);
            if (retryPolicy != null) {
                return retryPolicy;
            }
            Consumer consumerByTopicAndApp = BrokerRetryManager.this.nameService.getConsumerByTopicAndApp(topicName, str);
            if (consumerByTopicAndApp == null) {
                BrokerRetryManager.logger.debug("nameService.getConsumerByTopicAndApp is null by topic:[{}], app:[{}]", topicName, str);
                RetryPolicy retryPolicy2 = new RetryPolicy();
                this.cache.put(key, retryPolicy2);
                return retryPolicy2;
            }
            RetryPolicy retryPolicy3 = consumerByTopicAndApp.getRetryPolicy();
            if (retryPolicy3 != null) {
                BrokerRetryManager.logger.debug("Get RetryPolicy:[{}] by topic:[{}], app:[{}], ", new Object[]{retryPolicy3.toString(), topicName.getFullName(), str});
                this.cache.put(key, retryPolicy3);
                return retryPolicy3;
            }
            BrokerRetryManager.logger.debug("consumerByTopicAndApp.getRetryPolicy() is null by topic:[{}], app:[{}]", topicName, str);
            RetryPolicy retryPolicy4 = new RetryPolicy();
            this.cache.put(key, retryPolicy4);
            return retryPolicy4;
        }

        private String getKey(TopicName topicName, String str) {
            return topicName.getFullName() + ":" + str;
        }
    }

    public BrokerRetryManager(BrokerContext brokerContext) {
        setBrokerContext(brokerContext);
        this.rateLimiterManager = new BrokerRetryRateLimiterManager(brokerContext);
    }

    protected void validate() throws Exception {
        super.validate();
        if (this.retryPolicyProvider == null) {
            this.retryPolicyProvider = new RetryPolicyProviderImpl();
        }
        if (this.remoteRetryProvider == null) {
            this.remoteRetryProvider = new RemoteRetryProvider() { // from class: org.joyqueue.broker.retry.BrokerRetryManager.1
                public Set<String> getUrls() {
                    List<Broker> localRetryBroker = BrokerRetryManager.this.clusterManager.getLocalRetryBroker();
                    BrokerRetryManager.logger.info("broker list:{}", Arrays.toString(localRetryBroker.toArray()));
                    HashSet hashSet = new HashSet();
                    for (Broker broker : localRetryBroker) {
                        hashSet.add(broker.getIp() + ":" + broker.getBackEndPort());
                    }
                    return hashSet;
                }

                public TransportClient createTransportClient() {
                    ClientConfig buildClientConfig = TransportConfigSupport.buildClientConfig(BrokerRetryManager.this.propertySupplier, "retry.remote.client");
                    buildClientConfig.setIoThreadName("joyqueue-retry-io-eventLoop");
                    return new BrokerTransportClientFactory().create(buildClientConfig);
                }
            };
        }
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.clusterManager.addListener(this.eventListener);
        this.clusterManager.addListener(this.rateLimiterManager);
        this.retryType = this.clusterManager.getBroker().getRetryType();
        this.delegate = loadRetryManager(this.retryType);
    }

    public void setRetryPolicyProvider(RetryPolicyProvider retryPolicyProvider) {
        this.retryPolicyProvider = retryPolicyProvider;
    }

    public void addRetry(List<RetryMessageModel> list) throws JoyQueueException {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        String topic = list.get(0).getTopic();
        String app = list.get(0).getApp();
        Consumer consumerByTopicAndApp = this.clusterManager.getNameService().getConsumerByTopicAndApp(TopicName.parse(topic), app);
        if (consumerByTopicAndApp != null && consumerByTopicAndApp.getConsumerPolicy() != null && consumerByTopicAndApp.getConsumerPolicy().getRetry() != null && !consumerByTopicAndApp.getConsumerPolicy().getRetry().booleanValue()) {
            throw new JoyQueueException(JoyQueueCode.RETRY_TOKEN_LIMIT, new Object[0]);
        }
        if (!retryTokenAvailable(retryConsumers(list))) {
            throw new JoyQueueException(JoyQueueCode.RETRY_TOKEN_LIMIT, new Object[0]);
        }
        TraceStat begin = this.tracer.begin("BrokerRetryManager.addRetry");
        TraceStat begin2 = this.tracer.begin(String.format("BrokerRetryManager.addRetry.%s.%s", app.replace(".", "_"), topic));
        try {
            long now = SystemClock.now();
            this.delegate.addRetry(list);
            this.brokerMonitor.onAddRetry(topic, app, list.size(), SystemClock.now() - now);
            this.tracer.end(begin);
            this.tracer.end(begin2);
        } catch (Exception e) {
            this.tracer.error(begin);
            this.tracer.error(begin2);
            throw e;
        }
    }

    public boolean retryTokenAvailable(Set<Joint> set) {
        for (Joint joint : set) {
            RateLimiter orCreate = this.rateLimiterManager.getOrCreate(joint.getTopic(), joint.getApp());
            if (orCreate == null || orCreate.tryAcquireTps()) {
                return true;
            }
        }
        return false;
    }

    public Set<Joint> retryConsumers(List<RetryMessageModel> list) {
        HashSet hashSet = new HashSet(list.size());
        for (RetryMessageModel retryMessageModel : list) {
            hashSet.add(new Joint(retryMessageModel.getTopic(), retryMessageModel.getApp()));
        }
        return hashSet;
    }

    public void retrySuccess(String str, String str2, Long[] lArr) throws JoyQueueException {
        if (lArr == null) {
            return;
        }
        this.delegate.retrySuccess(str, str2, lArr);
        this.brokerMonitor.onRetrySuccess(str, str2, lArr.length);
    }

    public void retryError(String str, String str2, Long[] lArr) throws JoyQueueException {
        if (lArr == null) {
            return;
        }
        this.delegate.retryError(str, str2, lArr);
        this.brokerMonitor.onRetryFailure(str, str2, lArr.length);
    }

    public void retryExpire(String str, String str2, Long[] lArr) throws JoyQueueException {
        this.delegate.retryExpire(str, str2, lArr);
    }

    public List<RetryMessageModel> getRetry(String str, String str2, short s, long j) throws JoyQueueException {
        return this.delegate.getRetry(str, str2, s, j);
    }

    public int countRetry(String str, String str2) throws JoyQueueException {
        return this.delegate.countRetry(str, str2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MessageRetry loadRetryManager(String str) throws Exception {
        Property property = this.propertySupplier.getProperty("retry.enable");
        NullMessageRetry nullMessageRetry = !(null == property ? false : property.getBoolean(false).booleanValue()) ? new NullMessageRetry() : str.equals("RemoteRetry") ? new RemoteMessageRetry(this.remoteRetryProvider) : (MessageRetry) ExtensionManager.getOrLoadExtension(MessageRetry.class, str);
        if (nullMessageRetry == null) {
            throw new RuntimeException("No such implementation found." + str);
        }
        nullMessageRetry.setSupplier(this.propertySupplier);
        nullMessageRetry.setRetryPolicyProvider(this.retryPolicyProvider);
        nullMessageRetry.start();
        return nullMessageRetry;
    }

    @Override // org.joyqueue.broker.BrokerContextAware
    public void setBrokerContext(BrokerContext brokerContext) {
        this.nameService = brokerContext.getNameService();
        this.clusterManager = brokerContext.getClusterManager();
        this.propertySupplier = brokerContext.getPropertySupplier();
        this.brokerMonitor = brokerContext.getBrokerMonitor();
        this.tracer = (PointTracer) Plugins.TRACERERVICE.get(PropertySupplier.getValue(this.propertySupplier, BrokerConfigKey.TRACER_TYPE));
    }

    public void setSupplier(PropertySupplier propertySupplier) {
        this.propertySupplier = propertySupplier;
    }
}
