package org.fabric3.binding.zeromq.runtime.message;

import java.lang.Thread;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.fabric3.api.annotation.management.Management;
import org.fabric3.api.annotation.management.ManagementOperation;
import org.fabric3.api.annotation.management.OperationType;
import org.fabric3.api.binding.zeromq.model.ZeroMQMetadata;
import org.fabric3.binding.zeromq.runtime.MessagingMonitor;
import org.fabric3.binding.zeromq.runtime.context.ContextManager;
import org.fabric3.spi.federation.addressing.SocketAddress;
import org.fabric3.spi.host.Port;
import org.zeromq.ZMQ;

@Management
/* loaded from: input_file:org/fabric3/binding/zeromq/runtime/message/NonReliableQueuedPublisher.class */
public class NonReliableQueuedPublisher implements Publisher, Thread.UncaughtExceptionHandler {
    private static final byte[] SHUTDOWN = new byte[0];
    private ContextManager manager;
    private SocketAddress address;
    private long pollTimeout;
    private ZeroMQMetadata metadata;
    private MessagingMonitor monitor;
    private ZMQ.Socket socket;
    private Dispatcher dispatcher;
    private LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/fabric3/binding/zeromq/runtime/message/NonReliableQueuedPublisher$Dispatcher.class */
    public class Dispatcher implements Runnable {
        private AtomicBoolean active;

        private Dispatcher() {
            this.active = new AtomicBoolean(true);
        }

        public void stop() {
            this.active.set(false);
        }

        @Override // java.lang.Runnable
        public void run() {
            Object poll;
            String str = getClass().getName() + ":" + UUID.randomUUID().toString();
            NonReliableQueuedPublisher.this.manager.reserve(str);
            NonReliableQueuedPublisher.this.socket = NonReliableQueuedPublisher.this.manager.getContext().socket(1);
            SocketHelper.configure(NonReliableQueuedPublisher.this.socket, NonReliableQueuedPublisher.this.metadata);
            NonReliableQueuedPublisher.this.address.getPort().bind(Port.TYPE.TCP);
            NonReliableQueuedPublisher.this.socket.bind(NonReliableQueuedPublisher.this.address.toProtocolString());
            while (this.active.get()) {
                try {
                    poll = NonReliableQueuedPublisher.this.queue.poll(NonReliableQueuedPublisher.this.pollTimeout, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (RuntimeException e2) {
                    NonReliableQueuedPublisher.this.manager.release(str);
                    NonReliableQueuedPublisher.this.schedule();
                    throw e2;
                }
                if (NonReliableQueuedPublisher.SHUTDOWN == poll) {
                    close(str);
                    return;
                }
                if (poll != null) {
                    ArrayList arrayList = new ArrayList();
                    arrayList.add(poll);
                    NonReliableQueuedPublisher.this.queue.drainTo(arrayList);
                    for (Object obj : arrayList) {
                        if (obj instanceof byte[]) {
                            NonReliableQueuedPublisher.this.socket.send((byte[]) obj, 0);
                        } else if (obj instanceof byte[][]) {
                            byte[][] bArr = (byte[][]) obj;
                            int length = bArr.length;
                            for (int i = 0; i < length - 1; i++) {
                                NonReliableQueuedPublisher.this.socket.send(bArr[i], 2);
                            }
                            NonReliableQueuedPublisher.this.socket.send(bArr[length - 1], 0);
                        } else {
                            NonReliableQueuedPublisher.this.monitor.error("Unknown object type:" + obj.getClass().getName());
                        }
                    }
                }
            }
            close(str);
        }

        private void close(String str) {
            if (NonReliableQueuedPublisher.this.socket != null) {
                try {
                    NonReliableQueuedPublisher.this.socket.close();
                    NonReliableQueuedPublisher.this.manager.release(str);
                } catch (Throwable th) {
                    NonReliableQueuedPublisher.this.manager.release(str);
                    throw th;
                }
            }
        }
    }

    public NonReliableQueuedPublisher(ContextManager contextManager, SocketAddress socketAddress, ZeroMQMetadata zeroMQMetadata, long j, MessagingMonitor messagingMonitor) {
        this.manager = contextManager;
        this.address = socketAddress;
        this.pollTimeout = j;
        this.metadata = zeroMQMetadata;
        this.monitor = messagingMonitor;
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.Publisher
    @ManagementOperation(type = OperationType.POST)
    public void start() {
        if (this.dispatcher == null) {
            this.dispatcher = new Dispatcher();
            schedule();
        }
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.Publisher
    @ManagementOperation(type = OperationType.POST)
    public void stop() {
        try {
            try {
                if (this.dispatcher != null) {
                    this.dispatcher.stop();
                }
                this.queue.put(SHUTDOWN);
                this.dispatcher = null;
            } catch (InterruptedException e) {
                this.monitor.error(e);
                this.dispatcher = null;
            }
        } catch (Throwable th) {
            this.dispatcher = null;
            throw th;
        }
    }

    @ManagementOperation
    public String getAddress() {
        return this.address.toString();
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.Publisher
    public void publish(byte[] bArr) {
        try {
            this.queue.put(bArr);
        } catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.Publisher
    public void publish(byte[][] bArr) {
        try {
            this.queue.put(bArr);
        } catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

    @Override // java.lang.Thread.UncaughtExceptionHandler
    public void uncaughtException(Thread thread, Throwable th) {
        this.monitor.error(th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void schedule() {
        Thread thread = new Thread(this.dispatcher);
        thread.setUncaughtExceptionHandler(this);
        thread.start();
    }
}
