package org.fabric3.binding.zeromq.runtime.message;

import java.lang.Thread;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.fabric3.api.annotation.management.Management;
import org.fabric3.api.binding.zeromq.model.ZeroMQMetadata;
import org.fabric3.binding.zeromq.runtime.MessagingMonitor;
import org.fabric3.binding.zeromq.runtime.context.ContextManager;
import org.fabric3.spi.container.invocation.Message;
import org.fabric3.spi.container.invocation.MessageCache;
import org.fabric3.spi.container.wire.InvocationChain;
import org.fabric3.spi.federation.addressing.SocketAddress;
import org.oasisopen.sca.ServiceRuntimeException;
import org.zeromq.ZMQ;

@Management
/* loaded from: input_file:org/fabric3/binding/zeromq/runtime/message/NonReliableRequestReplyReceiver.class */
public class NonReliableRequestReplyReceiver extends AbstractReceiver implements Thread.UncaughtExceptionHandler {
    private static final Response SHUTDOWN = new Response(null, null);
    private LinkedBlockingQueue<Response> queue;
    private final long pollTimeout;

    /* loaded from: input_file:org/fabric3/binding/zeromq/runtime/message/NonReliableRequestReplyReceiver$Response.class */
    private static class Response {
        private byte[] clientId;
        private byte[] body;

        private Response(byte[] bArr, byte[] bArr2) {
            this.clientId = bArr;
            this.body = bArr2;
        }
    }

    public NonReliableRequestReplyReceiver(ContextManager contextManager, SocketAddress socketAddress, List<InvocationChain> list, ExecutorService executorService, long j, ZeroMQMetadata zeroMQMetadata, MessagingMonitor messagingMonitor) {
        super(contextManager, socketAddress, list, 6, zeroMQMetadata, executorService, messagingMonitor);
        this.pollTimeout = j;
        this.queue = new LinkedBlockingQueue<>();
    }

    /* JADX WARN: Type inference failed for: r0v4, types: [byte[], byte[][]] */
    @Override // org.fabric3.binding.zeromq.runtime.message.AbstractReceiver
    protected boolean invoke(ZMQ.Socket socket) {
        final byte[] recv = socket.recv(1);
        if (recv == null) {
            return false;
        }
        final ?? r0 = new byte[3];
        int i = 1;
        r0[0] = socket.recv(0);
        while (socket.hasReceiveMore()) {
            if (i > 2) {
                this.monitor.error("Invalid message: received more than three frames");
                return false;
            }
            r0[i] = socket.recv(0);
            i++;
        }
        this.executorService.execute(new Runnable() { // from class: org.fabric3.binding.zeromq.runtime.message.NonReliableRequestReplyReceiver.1
            @Override // java.lang.Runnable
            public void run() {
                Message andResetMessage = MessageCache.getAndResetMessage();
                try {
                    andResetMessage.setBody(r0[0]);
                    int i2 = ByteBuffer.wrap(r0[1]).getInt();
                    andResetMessage.setWorkContext(NonReliableRequestReplyReceiver.this.setWorkContext(r0[2]));
                    Object body = NonReliableRequestReplyReceiver.this.interceptors[i2].invoke(andResetMessage).getBody();
                    if (!(body instanceof byte[])) {
                        throw new ServiceRuntimeException("Return value not serialized");
                    }
                    try {
                        NonReliableRequestReplyReceiver.this.queue.put(new Response(recv, (byte[]) body));
                    } catch (InterruptedException e) {
                        Thread.interrupted();
                    }
                } finally {
                    andResetMessage.reset();
                }
            }
        });
        return true;
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.AbstractReceiver
    protected void response(ZMQ.Socket socket) {
        Response response;
        try {
            Response poll = this.queue.poll(this.pollTimeout, TimeUnit.MICROSECONDS);
            if (poll == null || SHUTDOWN == poll) {
                return;
            }
            ArrayList arrayList = new ArrayList();
            arrayList.add(poll);
            this.queue.drainTo(arrayList);
            Iterator it = arrayList.iterator();
            while (it.hasNext() && SHUTDOWN != (response = (Response) it.next())) {
                socket.send(response.clientId, 2);
                socket.send(response.body, 0);
            }
        } catch (InterruptedException e) {
            Thread.interrupted();
        }
    }
}
