package org.joyqueue.nsr.message.support;

import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.domain.Broker;
import org.joyqueue.event.MetaEvent;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.network.command.BooleanAck;
import org.joyqueue.network.transport.TransportServer;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.command.CommandCallback;
import org.joyqueue.network.transport.command.JoyQueueCommand;
import org.joyqueue.network.transport.exception.TransportException;
import org.joyqueue.nsr.config.MessengerConfig;
import org.joyqueue.nsr.exception.MessengerException;
import org.joyqueue.nsr.message.MessageListener;
import org.joyqueue.nsr.message.Messenger;
import org.joyqueue.nsr.message.support.network.command.MessengerPublishRequest;
import org.joyqueue.nsr.message.support.network.transport.MessengerTransportServerFactory;
import org.joyqueue.nsr.message.support.session.MessengerSessionManager;
import org.joyqueue.toolkit.concurrent.EventBus;
import org.joyqueue.toolkit.config.PropertySupplier;
import org.joyqueue.toolkit.config.PropertySupplierAware;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/nsr/message/support/DefaultMessenger.class */
public class DefaultMessenger extends Service implements Messenger<MetaEvent>, PropertySupplierAware {
    protected static final Logger logger = LoggerFactory.getLogger(DefaultMessenger.class);
    private final EventBus eventBus = new EventBus("joyqueue-messenger-eventBus");
    private MessengerConfig config;
    private MessengerSessionManager messengerSessionManager;
    private TransportServer messengerTransportServer;

    protected void validate() throws Exception {
        this.messengerSessionManager = new MessengerSessionManager(this.config);
    }

    protected void doStart() throws Exception {
        this.config.getServerConfig().setIoThread(1);
        this.config.getServerConfig().setPort(this.config.getPort());
        this.config.getServerConfig().setIoThreadName("joyqueue-messenger-io-eventLoop");
        this.config.getServerConfig().setAcceptThreadName("joyqueue-messenger-accept-eventLoop");
        this.messengerTransportServer = new MessengerTransportServerFactory(this.config, this.eventBus).bind(this.config.getServerConfig());
        this.messengerTransportServer.start();
        this.messengerSessionManager.start();
        this.eventBus.start();
    }

    protected void doStop() {
        this.messengerSessionManager.stop();
        this.messengerTransportServer.stop();
        this.eventBus.stop();
    }

    @Override // org.joyqueue.nsr.message.Messenger
    public void publish(final MetaEvent metaEvent, List<Broker> list) {
        if (!this.config.getPublishEnable() || CollectionUtils.isEmpty(list)) {
            return;
        }
        final boolean[] zArr = {true};
        final CountDownLatch countDownLatch = new CountDownLatch(list.size());
        for (final Broker broker : list) {
            try {
                this.messengerSessionManager.getOrCreateSession(broker.getId().intValue(), broker.getIp(), broker.getMessengerPort()).async(new JoyQueueCommand(new MessengerPublishRequest(metaEvent)), this.config.getSessionTimeout(), new CommandCallback() { // from class: org.joyqueue.nsr.message.support.DefaultMessenger.1
                    public void onSuccess(Command command, Command command2) {
                        BooleanAck booleanAck = (BooleanAck) command2.getPayload();
                        if (booleanAck.getHeader().getStatus() != JoyQueueCode.SUCCESS.getCode()) {
                            DefaultMessenger.logger.warn("messenger publish error, event: {}, id: {}, ip: {}, port: {}, code: {}", new Object[]{metaEvent, broker.getId(), broker.getIp(), Integer.valueOf(broker.getMessengerPort()), Integer.valueOf(booleanAck.type())});
                            zArr[0] = false;
                        }
                        countDownLatch.countDown();
                    }

                    public void onException(Command command, Throwable th) {
                        DefaultMessenger.logger.warn("messenger publish error, event: {}, id: {}, ip: {}, port: {}", new Object[]{metaEvent, broker.getId(), broker.getIp(), Integer.valueOf(broker.getMessengerPort()), th});
                        boolean z = false;
                        if ((th instanceof TransportException.RequestErrorException) && DefaultMessenger.this.config.getPublishIgnoreConnectionError()) {
                            z = true;
                        }
                        zArr[0] = z;
                        countDownLatch.countDown();
                    }
                });
            } catch (Exception e) {
                if (!this.config.getPublishIgnoreConnectionError()) {
                    zArr[0] = false;
                    logger.warn("create session exception, event: {}, brokerId: {}, brokerIp: {}, brokerPort: {}", new Object[]{metaEvent, broker.getId(), broker.getIp(), Integer.valueOf(broker.getMessengerPort()), e});
                }
                countDownLatch.countDown();
            }
        }
        try {
            if (!countDownLatch.await(this.config.getPublishTimeout(), TimeUnit.MILLISECONDS)) {
                logger.warn("messenger publish timeout, event: {}, brokers: {}, timeout: {}", new Object[]{metaEvent, list, Integer.valueOf(this.config.getPublishTimeout())});
                if (!this.config.getPublishForce()) {
                    throw new MessengerException("messenger publish timeout");
                }
            }
            if (zArr[0] || this.config.getPublishForce()) {
                return;
            }
            logger.warn("messenger publish failed, event: {}, brokers: {}", metaEvent, list);
            throw new MessengerException("messenger publish failed");
        } catch (InterruptedException e2) {
            throw new MessengerException("messenger publish exception", e2);
        }
    }

    @Override // org.joyqueue.nsr.message.Messenger
    public void publish(MetaEvent metaEvent, Broker... brokerArr) {
        publish(metaEvent, Lists.newArrayList(brokerArr));
    }

    @Override // org.joyqueue.nsr.message.Messenger
    public void fastPublish(MetaEvent metaEvent, List<Broker> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        for (Broker broker : list) {
            try {
                this.messengerSessionManager.getOrCreateSession(broker.getId().intValue(), broker.getIp(), broker.getMessengerPort()).oneway(new JoyQueueCommand(new MessengerPublishRequest(metaEvent)), this.config.getSessionTimeout());
            } catch (Exception e) {
                logger.warn("messenger fastPublish failed, event: {}, broker: {}", metaEvent, broker);
            }
        }
    }

    @Override // org.joyqueue.nsr.message.Messenger
    public void fastPublish(MetaEvent metaEvent, Broker... brokerArr) {
        fastPublish(metaEvent, Lists.newArrayList(brokerArr));
    }

    @Override // org.joyqueue.nsr.message.Messenger
    public void addListener(MessageListener messageListener) {
        this.eventBus.addListener(messageListener);
    }

    public void setSupplier(PropertySupplier propertySupplier) {
        this.config = new MessengerConfig(propertySupplier);
    }

    /* renamed from: type, reason: merged with bridge method [inline-methods] */
    public String m8type() {
        return "default";
    }
}
