package org.fabric3.binding.zeromq.runtime.message;

import java.io.IOException;
import java.lang.Thread;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.fabric3.api.binding.zeromq.model.ZeroMQMetadata;
import org.fabric3.binding.zeromq.runtime.MessagingMonitor;
import org.fabric3.binding.zeromq.runtime.context.ContextManager;
import org.fabric3.spi.container.invocation.CallbackReferenceSerializer;
import org.fabric3.spi.container.invocation.WorkContext;
import org.fabric3.spi.federation.addressing.SocketAddress;
import org.oasisopen.sca.ServiceRuntimeException;
import org.oasisopen.sca.ServiceUnavailableException;
import org.zeromq.ZMQ;

/* loaded from: input_file:org/fabric3/binding/zeromq/runtime/message/NonReliableRequestReplySender.class */
public class NonReliableRequestReplySender implements RequestReplySender, Thread.UncaughtExceptionHandler {
    private static final Callable<byte[]> CALLABLE = new Callable<byte[]>() { // from class: org.fabric3.binding.zeromq.runtime.message.NonReliableRequestReplySender.1
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public byte[] call() throws Exception {
            return null;
        }
    };
    private static final Request SHUTDOWN = new Request(null, 0, null);
    private String id;
    private ContextManager manager;
    private List<SocketAddress> addresses;
    private long pollTimeout;
    private MessagingMonitor monitor;
    private Dispatcher dispatcher;
    private RoundRobinSocketMultiplexer multiplexer;
    private LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue<>();
    private Map<ZMQ.Socket, ZMQ.Poller> pollers = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/fabric3/binding/zeromq/runtime/message/NonReliableRequestReplySender$Dispatcher.class */
    public class Dispatcher implements Runnable {
        private AtomicBoolean active;
        private AtomicBoolean doRefresh;
        private ZMQ.Socket controlSocket;

        private Dispatcher() {
            this.active = new AtomicBoolean(true);
            this.doRefresh = new AtomicBoolean(true);
        }

        public void refresh() {
            this.doRefresh.set(true);
        }

        public void stop() {
            this.active.set(false);
        }

        @Override // java.lang.Runnable
        public void run() {
            ArrayList<Request> arrayList;
            Request request;
            while (this.active.get()) {
                try {
                    reconnect();
                    arrayList = new ArrayList();
                    request = (Request) NonReliableRequestReplySender.this.queue.poll(NonReliableRequestReplySender.this.pollTimeout, TimeUnit.MICROSECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (RuntimeException e2) {
                    NonReliableRequestReplySender.this.schedule();
                    throw e2;
                }
                if (NonReliableRequestReplySender.SHUTDOWN == request) {
                    NonReliableRequestReplySender.this.multiplexer.close();
                    this.controlSocket.close();
                    return;
                }
                if (NonReliableRequestReplySender.this.multiplexer.isAvailable()) {
                    if (request != null) {
                        arrayList.add(request);
                        NonReliableRequestReplySender.this.queue.drainTo(arrayList);
                    }
                    for (Request request2 : arrayList) {
                        ZMQ.Socket socket = NonReliableRequestReplySender.this.multiplexer.get();
                        socket.send(request2.getPayload(), 2);
                        byte[] array = ByteBuffer.allocate(4).putInt(request2.getIndex()).array();
                        byte[] workContext = request2.getWorkContext();
                        if (workContext == null || workContext.length <= 0) {
                            socket.send(array, 0);
                        } else {
                            socket.send(array, 2);
                            socket.send(workContext, 0);
                        }
                        if (((ZMQ.Poller) NonReliableRequestReplySender.this.pollers.get(socket)).poll(NonReliableRequestReplySender.this.pollTimeout) < 0) {
                            request2.setException(new ServiceUnavailableException("Timeout waiting on response"));
                            request2.run();
                        } else {
                            if (this.controlSocket.recv(1) != null) {
                                NonReliableRequestReplySender.this.multiplexer.close();
                                if (this.controlSocket != null) {
                                    this.controlSocket.close();
                                    return;
                                }
                                return;
                            }
                            request2.set(socket.recv(0));
                            request2.run();
                        }
                    }
                } else {
                    NonReliableRequestReplySender.this.monitor.dropMessage();
                }
            }
            NonReliableRequestReplySender.this.multiplexer.close();
            if (this.controlSocket != null) {
                this.controlSocket.close();
            }
        }

        private void reconnect() {
            if (this.doRefresh.getAndSet(false)) {
                if (this.controlSocket == null) {
                    this.controlSocket = NonReliableRequestReplySender.this.manager.createControlSocket();
                }
                NonReliableRequestReplySender.this.multiplexer.update(NonReliableRequestReplySender.this.addresses);
                Collection<ZMQ.Socket> all = NonReliableRequestReplySender.this.multiplexer.getAll();
                NonReliableRequestReplySender.this.pollers.clear();
                for (ZMQ.Socket socket : all) {
                    ZMQ.Poller poller = NonReliableRequestReplySender.this.manager.getContext().poller();
                    poller.register(socket, 1);
                    poller.register(this.controlSocket, 1);
                    NonReliableRequestReplySender.this.pollers.put(socket, poller);
                }
            }
        }
    }

    /* loaded from: input_file:org/fabric3/binding/zeromq/runtime/message/NonReliableRequestReplySender$Request.class */
    private static class Request extends FutureTask<byte[]> {
        private byte[] payload;
        private byte[] workContext;
        private int index;

        public Request(byte[] bArr, int i, byte[] bArr2) {
            super(NonReliableRequestReplySender.CALLABLE);
            this.payload = bArr;
            this.index = i;
            this.workContext = bArr2;
        }

        public byte[] getPayload() {
            return this.payload;
        }

        public int getIndex() {
            return this.index;
        }

        public byte[] getWorkContext() {
            return this.workContext;
        }

        @Override // java.util.concurrent.FutureTask
        public void set(byte[] bArr) {
            super.set((Request) bArr);
        }

        @Override // java.util.concurrent.FutureTask
        protected void setException(Throwable th) {
            super.setException(th);
        }
    }

    public NonReliableRequestReplySender(String str, ContextManager contextManager, List<SocketAddress> list, long j, ZeroMQMetadata zeroMQMetadata, MessagingMonitor messagingMonitor) {
        this.id = str;
        this.manager = contextManager;
        this.addresses = list;
        this.pollTimeout = j;
        this.monitor = messagingMonitor;
        this.multiplexer = new RoundRobinSocketMultiplexer(contextManager, 5, zeroMQMetadata);
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.Sender
    public void start() {
        if (this.dispatcher == null) {
            this.dispatcher = new Dispatcher();
            schedule();
        }
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.Sender
    public void stop() {
        try {
            try {
                this.dispatcher.stop();
                this.queue.put(SHUTDOWN);
                this.dispatcher = null;
            } catch (InterruptedException e) {
                this.monitor.error(e);
                this.dispatcher = null;
            }
        } catch (Throwable th) {
            this.dispatcher = null;
            throw th;
        }
    }

    public String getId() {
        return this.id;
    }

    public void onUpdate(List<SocketAddress> list) {
        this.addresses = list;
        this.dispatcher.refresh();
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.RequestReplySender
    public byte[] sendAndReply(byte[] bArr, int i, WorkContext workContext) {
        try {
            Request request = new Request(bArr, i, serialize(workContext));
            this.queue.put(request);
            return request.get(100000L, TimeUnit.MILLISECONDS);
        } catch (IOException | TimeoutException e) {
            throw new ServiceUnavailableException(e);
        } catch (InterruptedException e2) {
            Thread.interrupted();
            throw new ServiceRuntimeException(e2);
        } catch (ExecutionException e3) {
            throw new ServiceRuntimeException(e3);
        }
    }

    @Override // java.lang.Thread.UncaughtExceptionHandler
    public void uncaughtException(Thread thread, Throwable th) {
        this.monitor.error(th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void schedule() {
        Thread thread = new Thread(this.dispatcher);
        thread.setUncaughtExceptionHandler(this);
        thread.start();
    }

    private byte[] serialize(WorkContext workContext) throws IOException {
        List callbackReferences = workContext.getCallbackReferences();
        if (callbackReferences == null || callbackReferences.isEmpty()) {
            return null;
        }
        return CallbackReferenceSerializer.serializeToBytes(callbackReferences);
    }
}
