package org.fabric3.binding.zeromq.runtime.broker;

import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.fabric3.api.annotation.Source;
import org.fabric3.api.annotation.monitor.Monitor;
import org.fabric3.api.binding.zeromq.model.SocketAddressDefinition;
import org.fabric3.api.binding.zeromq.model.ZeroMQMetadata;
import org.fabric3.api.host.Fabric3Exception;
import org.fabric3.api.model.type.contract.DataType;
import org.fabric3.binding.zeromq.runtime.MessagingMonitor;
import org.fabric3.binding.zeromq.runtime.SocketAddress;
import org.fabric3.binding.zeromq.runtime.ZeroMQPubSubBroker;
import org.fabric3.binding.zeromq.runtime.context.ContextManager;
import org.fabric3.binding.zeromq.runtime.handler.PublisherHandler;
import org.fabric3.binding.zeromq.runtime.management.ZeroMQManagementService;
import org.fabric3.binding.zeromq.runtime.message.NonReliableQueuedPublisher;
import org.fabric3.binding.zeromq.runtime.message.NonReliableSingleThreadPublisher;
import org.fabric3.binding.zeromq.runtime.message.NonReliableSubscriber;
import org.fabric3.binding.zeromq.runtime.message.Publisher;
import org.fabric3.binding.zeromq.runtime.message.Subscriber;
import org.fabric3.spi.container.channel.ChannelConnection;
import org.fabric3.spi.container.channel.EventStream;
import org.fabric3.spi.container.channel.EventStreamHandler;
import org.fabric3.spi.container.channel.TransformerHandlerFactory;
import org.fabric3.spi.discovery.ChannelEntry;
import org.fabric3.spi.discovery.DiscoveryAgent;
import org.fabric3.spi.host.Port;
import org.fabric3.spi.host.PortAllocator;
import org.fabric3.spi.model.type.java.JavaType;
import org.fabric3.spi.runtime.event.EventService;
import org.fabric3.spi.runtime.event.Fabric3EventListener;
import org.fabric3.spi.runtime.event.RuntimeStop;
import org.oasisopen.sca.annotation.Init;
import org.oasisopen.sca.annotation.Property;
import org.oasisopen.sca.annotation.Reference;
import org.oasisopen.sca.annotation.Service;

@Service({ZeroMQPubSubBroker.class})
/* loaded from: input_file:org/fabric3/binding/zeromq/runtime/broker/ZeroMQPubSubBrokerImpl.class */
public class ZeroMQPubSubBrokerImpl implements ZeroMQPubSubBroker, Fabric3EventListener<RuntimeStop> {
    private static final JavaType BYTES = new JavaType(byte[].class);
    private static final JavaType TWO_DIMENSIONAL_BYTES = new JavaType(byte[][].class);
    private static final String ZMQ = "zmq";
    private ContextManager manager;
    private DiscoveryAgent discoveryAgent;
    private PortAllocator allocator;
    private TransformerHandlerFactory handlerFactory;
    private ZeroMQManagementService managementService;
    private EventService eventService;
    private ExecutorService executorService;
    private MessagingMonitor monitor;
    private long pollTimeout = 10000;
    private Map<String, Subscriber> subscribers = new HashMap();
    private Map<String, PublisherHolder> publishers = new HashMap();
    private String hostAddress = InetAddress.getLocalHost().getHostAddress();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/fabric3/binding/zeromq/runtime/broker/ZeroMQPubSubBrokerImpl$PublisherHolder.class */
    public class PublisherHolder {
        private List<String> connectionIds;
        private Publisher publisher;
        private SocketAddress address;

        private PublisherHolder(Publisher publisher, SocketAddress socketAddress) {
            this.connectionIds = new ArrayList();
            this.publisher = publisher;
            this.address = socketAddress;
        }

        public List<String> getConnectionIds() {
            return this.connectionIds;
        }

        public Publisher getPublisher() {
            return this.publisher;
        }

        public SocketAddress getAddress() {
            return this.address;
        }
    }

    public ZeroMQPubSubBrokerImpl(@Reference ContextManager contextManager, @Reference(required = false) DiscoveryAgent discoveryAgent, @Reference PortAllocator portAllocator, @Reference TransformerHandlerFactory transformerHandlerFactory, @Reference ZeroMQManagementService zeroMQManagementService, @Reference EventService eventService, @Reference(name = "executorService") ExecutorService executorService, @Monitor MessagingMonitor messagingMonitor) throws UnknownHostException {
        this.manager = contextManager;
        this.discoveryAgent = discoveryAgent;
        this.allocator = portAllocator;
        this.handlerFactory = transformerHandlerFactory;
        this.managementService = zeroMQManagementService;
        this.eventService = eventService;
        this.executorService = executorService;
        this.monitor = messagingMonitor;
    }

    @Source("$systemConfig//f3:zeromq.binding/@poll.timeout")
    @Property(required = false)
    public void setPollTimeout(long j) {
        this.pollTimeout = j;
    }

    @Source("$systemConfig/f3:runtime/@host.address")
    @Property(required = false)
    public void setHost(String str) {
        this.hostAddress = str;
    }

    @Init
    public void init() {
        this.eventService.subscribe(RuntimeStop.class, this);
    }

    @Override // org.fabric3.binding.zeromq.runtime.ZeroMQPubSubBroker
    public void subscribe(URI uri, ZeroMQMetadata zeroMQMetadata, ChannelConnection channelConnection, ClassLoader classLoader) throws Fabric3Exception {
        boolean z;
        List list;
        String channelName = zeroMQMetadata.getChannelName();
        Subscriber subscriber = this.subscribers.get(channelName);
        if (subscriber == null) {
            String uri2 = uri.toString();
            EventStreamHandler createSubscriberHandlers = createSubscriberHandlers(channelConnection, classLoader);
            createSubscriberHandlers.setNext(channelConnection.getEventStream().getHeadHandler());
            if (zeroMQMetadata.getSocketAddresses() != null) {
                z = false;
                list = new ArrayList();
                for (SocketAddressDefinition socketAddressDefinition : zeroMQMetadata.getSocketAddresses()) {
                    SpecifiedPort specifiedPort = new SpecifiedPort(socketAddressDefinition.getPort());
                    String host = socketAddressDefinition.getHost();
                    if ("localhost".equals(host)) {
                        host = this.hostAddress;
                    }
                    list.add(new SocketAddress("tcp", host, specifiedPort));
                }
            } else {
                if (this.discoveryAgent == null) {
                    throw new Fabric3Exception("Discovery extension must be installed for dynamic channel addresses");
                }
                z = true;
                list = (List) this.discoveryAgent.getChannelEntries(channelName).stream().map(channelEntry -> {
                    return new SocketAddress(channelEntry.getTransport(), channelEntry.getAddress(), new SpecifiedPort(channelEntry.getPort()));
                }).collect(Collectors.toList());
            }
            NonReliableSubscriber nonReliableSubscriber = new NonReliableSubscriber(uri2, this.manager, list, createSubscriberHandlers, zeroMQMetadata, this.executorService);
            nonReliableSubscriber.incrementConnectionCount();
            nonReliableSubscriber.start();
            if (z && this.discoveryAgent != null) {
                this.discoveryAgent.registerChannelListener(channelName, nonReliableSubscriber);
            }
            this.subscribers.put(channelName, nonReliableSubscriber);
            this.managementService.register(channelName, uri, nonReliableSubscriber);
        } else {
            subscriber.incrementConnectionCount();
        }
        this.monitor.onSubscribe(uri.getPath().substring(1) + "/" + uri.getFragment());
    }

    @Override // org.fabric3.binding.zeromq.runtime.ZeroMQPubSubBroker
    public void unsubscribe(URI uri, ZeroMQMetadata zeroMQMetadata) {
        String channelName = zeroMQMetadata.getChannelName();
        Subscriber subscriber = this.subscribers.get(channelName);
        if (subscriber == null) {
            throw new IllegalStateException("Subscriber not found: " + uri);
        }
        if (this.discoveryAgent != null) {
            this.discoveryAgent.unregisterChannelListener(channelName, subscriber);
        }
        subscriber.decrementConnectionCount();
        if (!subscriber.hasConnections()) {
            this.subscribers.remove(channelName);
            subscriber.stop();
        }
        this.managementService.unregister(channelName, uri);
        this.monitor.onUnsubscribe(uri.getPath().substring(1) + "/" + uri.getFragment());
    }

    @Override // org.fabric3.binding.zeromq.runtime.ZeroMQPubSubBroker
    public void connect(String str, ZeroMQMetadata zeroMQMetadata, boolean z, ChannelConnection channelConnection, ClassLoader classLoader) throws Fabric3Exception {
        SocketAddress socketAddress;
        String channelName = zeroMQMetadata.getChannelName();
        PublisherHolder publisherHolder = this.publishers.get(channelName);
        if (publisherHolder != null) {
            attachConnection(channelConnection, publisherHolder.getPublisher(), classLoader);
            publisherHolder.getConnectionIds().add(str);
            return;
        }
        List socketAddresses = zeroMQMetadata.getSocketAddresses();
        if (socketAddresses == null || socketAddresses.isEmpty()) {
            socketAddress = new SocketAddress("tcp", this.hostAddress, this.allocator.allocate(channelName, ZMQ));
        } else {
            if (socketAddresses.size() != 1) {
                throw new Fabric3Exception("Invalid number of socket addresses: " + socketAddresses.size());
            }
            SocketAddressDefinition socketAddressDefinition = (SocketAddressDefinition) socketAddresses.get(0);
            Port reserve = this.allocator.reserve(channelName, ZMQ, socketAddressDefinition.getPort());
            String host = socketAddressDefinition.getHost();
            if ("localhost".equals(host)) {
                host = this.hostAddress;
            }
            socketAddress = new SocketAddress("tcp", host, reserve);
        }
        Publisher nonReliableSingleThreadPublisher = z ? new NonReliableSingleThreadPublisher(this.manager, socketAddress, zeroMQMetadata) : new NonReliableQueuedPublisher(this.manager, socketAddress, zeroMQMetadata, this.pollTimeout, this.monitor);
        attachConnection(channelConnection, nonReliableSingleThreadPublisher, classLoader);
        if (this.discoveryAgent != null) {
            ChannelEntry channelEntry = new ChannelEntry();
            channelEntry.setName(channelName);
            channelEntry.setAddress(socketAddress.getAddress());
            channelEntry.setPort(socketAddress.getPort().getNumber());
            channelEntry.setTransport("tcp");
            this.discoveryAgent.register(channelEntry);
        }
        nonReliableSingleThreadPublisher.start();
        PublisherHolder publisherHolder2 = new PublisherHolder(nonReliableSingleThreadPublisher, socketAddress);
        publisherHolder2.getConnectionIds().add(str);
        this.publishers.put(channelName, publisherHolder2);
        this.managementService.register(channelName, nonReliableSingleThreadPublisher);
    }

    @Override // org.fabric3.binding.zeromq.runtime.ZeroMQPubSubBroker
    public void release(String str, ZeroMQMetadata zeroMQMetadata) throws Fabric3Exception {
        String channelName = zeroMQMetadata.getChannelName();
        PublisherHolder publisherHolder = this.publishers.get(channelName);
        if (publisherHolder == null) {
            throw new Fabric3Exception("Publisher not found for " + channelName);
        }
        Publisher publisher = publisherHolder.getPublisher();
        publisherHolder.getConnectionIds().remove(str);
        if (publisherHolder.getConnectionIds().isEmpty()) {
            this.publishers.remove(channelName);
            publisher.stop();
            if (this.discoveryAgent != null) {
                this.discoveryAgent.unregisterChannel(channelName);
            }
            this.managementService.unregister(channelName);
        }
        this.allocator.release(channelName);
    }

    @Override // org.fabric3.binding.zeromq.runtime.ZeroMQPubSubBroker
    public void startAll() {
        this.subscribers.values().forEach((v0) -> {
            v0.start();
        });
        Iterator<PublisherHolder> it = this.publishers.values().iterator();
        while (it.hasNext()) {
            it.next().getPublisher().start();
        }
    }

    @Override // org.fabric3.binding.zeromq.runtime.ZeroMQPubSubBroker
    public void stopAll() {
        this.subscribers.values().forEach((v0) -> {
            v0.stop();
        });
        Iterator<PublisherHolder> it = this.publishers.values().iterator();
        while (it.hasNext()) {
            it.next().getPublisher().stop();
        }
    }

    public void onEvent(RuntimeStop runtimeStop) {
        stopAll();
    }

    private void attachConnection(ChannelConnection channelConnection, Publisher publisher, ClassLoader classLoader) throws Fabric3Exception {
        EventStream eventStream = channelConnection.getEventStream();
        DataType eventType = getEventType(eventStream);
        eventStream.addHandler(eventType.getType().equals(byte[][].class) ? this.handlerFactory.createHandler(eventType, TWO_DIMENSIONAL_BYTES, Collections.emptyList(), classLoader) : this.handlerFactory.createHandler(eventType, BYTES, Collections.emptyList(), classLoader));
        eventStream.addHandler(new PublisherHandler(publisher));
    }

    private EventStreamHandler createSubscriberHandlers(ChannelConnection channelConnection, ClassLoader classLoader) throws Fabric3Exception {
        DataType eventType = getEventType(channelConnection.getEventStream());
        return eventType.getType().equals(byte[][].class) ? this.handlerFactory.createHandler(TWO_DIMENSIONAL_BYTES, eventType, Collections.emptyList(), classLoader) : this.handlerFactory.createHandler(BYTES, eventType, Collections.emptyList(), classLoader);
    }

    private DataType getEventType(EventStream eventStream) {
        return new JavaType(eventStream.getEventType());
    }
}
