package com.alipay.sofa.rpc.event;

import com.alipay.sofa.rpc.common.RpcConfigs;
import com.alipay.sofa.rpc.common.RpcOptions;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
import com.alipay.sofa.rpc.context.AsyncRuntime;
import com.alipay.sofa.rpc.context.RpcInternalContext;
import com.alipay.sofa.rpc.log.Logger;
import com.alipay.sofa.rpc.log.LoggerFactory;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;

/* loaded from: input_file:com/alipay/sofa/rpc/event/EventBus.class */
public class EventBus {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventBus.class);
    private static final boolean EVENT_BUS_ENABLE = RpcConfigs.getBooleanValue(RpcOptions.EVENT_BUS_ENABLE);
    private static final ConcurrentMap<Class<? extends Event>, CopyOnWriteArraySet<Subscriber>> SUBSCRIBER_MAP = new ConcurrentHashMap();

    public static boolean isEnable() {
        return EVENT_BUS_ENABLE;
    }

    public static boolean isEnable(Class<? extends Event> cls) {
        return EVENT_BUS_ENABLE && CommonUtils.isNotEmpty(SUBSCRIBER_MAP.get(cls));
    }

    public static void register(Class<? extends Event> cls, Subscriber subscriber) {
        CopyOnWriteArraySet<Subscriber> copyOnWriteArraySet = SUBSCRIBER_MAP.get(cls);
        if (copyOnWriteArraySet == null) {
            copyOnWriteArraySet = new CopyOnWriteArraySet<>();
            CopyOnWriteArraySet<Subscriber> putIfAbsent = SUBSCRIBER_MAP.putIfAbsent(cls, copyOnWriteArraySet);
            if (putIfAbsent != null) {
                copyOnWriteArraySet = putIfAbsent;
            }
        }
        copyOnWriteArraySet.add(subscriber);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Register subscriber: {} of event: {}.", subscriber, cls);
        }
    }

    public static void unRegister(Class<? extends Event> cls, Subscriber subscriber) {
        CopyOnWriteArraySet<Subscriber> copyOnWriteArraySet = SUBSCRIBER_MAP.get(cls);
        if (copyOnWriteArraySet != null) {
            copyOnWriteArraySet.remove(subscriber);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("UnRegister subscriber: {} of event: {}.", subscriber, cls);
            }
        }
    }

    public static void post(final Event event) {
        if (isEnable()) {
            CopyOnWriteArraySet<Subscriber> copyOnWriteArraySet = SUBSCRIBER_MAP.get(event.getClass());
            if (CommonUtils.isNotEmpty(copyOnWriteArraySet)) {
                Iterator<Subscriber> it = copyOnWriteArraySet.iterator();
                while (it.hasNext()) {
                    final Subscriber next = it.next();
                    if (next.isSync()) {
                        handleEvent(next, event);
                    } else {
                        final RpcInternalContext peekContext = RpcInternalContext.peekContext();
                        ThreadPoolExecutor asyncThreadPool = AsyncRuntime.getAsyncThreadPool();
                        try {
                            asyncThreadPool.execute(new Runnable() { // from class: com.alipay.sofa.rpc.event.EventBus.1
                                @Override // java.lang.Runnable
                                public void run() {
                                    try {
                                        RpcInternalContext.setContext(RpcInternalContext.this);
                                        EventBus.handleEvent(next, event);
                                    } finally {
                                        RpcInternalContext.removeContext();
                                    }
                                }
                            });
                        } catch (RejectedExecutionException e) {
                            LOGGER.warn("This queue is full when post event to async execute, queue size is " + asyncThreadPool.getQueue().size() + ", please optimize this async thread pool of eventbus.");
                        }
                    }
                }
            }
        }
    }

    private static void handleEvent(Subscriber subscriber, Event event) {
        try {
            subscriber.onEvent(event);
        } catch (Throwable th) {
            if (LOGGER.isWarnEnabled()) {
                LOGGER.warn("Handle " + event.getClass() + " error", th);
            }
        }
    }
}
