package dev.getelements.elements.remote.jeromq;

import dev.getelements.elements.rt.AsyncConnection;
import dev.getelements.elements.rt.AsyncConnectionPool;
import dev.getelements.elements.rt.PayloadReader;
import dev.getelements.elements.rt.PayloadWriter;
import dev.getelements.elements.rt.exception.InternalException;
import dev.getelements.elements.rt.remote.Invocation;
import dev.getelements.elements.rt.remote.InvocationError;
import dev.getelements.elements.rt.remote.InvocationResult;
import dev.getelements.elements.rt.remote.LocalInvocationDispatcher;
import dev.getelements.elements.rt.remote.MessageType;
import dev.getelements.elements.rt.remote.RequestHeader;
import dev.getelements.elements.rt.remote.ResponseHeader;
import dev.getelements.elements.rt.remote.jeromq.IdentityUtil;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

/* loaded from: input_file:dev/getelements/elements/remote/jeromq/JeroMQNodeInvocation.class */
public class JeroMQNodeInvocation {
    private static final Logger logger = LoggerFactory.getLogger(JeroMQNodeInvocation.class);
    private final LocalInvocationDispatcher localInvocationDispatcher;
    private final PayloadWriter payloadWriter;
    private final PayloadReader payloadReader;
    private final AsyncConnectionPool<ZContext, ZMQ.Socket> outbound;
    private final ZMsg identity;
    private final AtomicBoolean sync = new AtomicBoolean();
    private final AtomicInteger remaining;
    private final Consumer<InvocationResult> syncInvocationResultConsumer;
    private final Consumer<InvocationError> syncInvocationErrorConsumer;
    private final List<Consumer<InvocationResult>> asyncInvocationResultConsumerList;
    private final Consumer<InvocationError> asyncInvocationErrorConsumer;
    private final byte[] payload;

    public JeroMQNodeInvocation(ZMsg zMsg, LocalInvocationDispatcher localInvocationDispatcher, PayloadReader payloadReader, PayloadWriter payloadWriter, AsyncConnectionPool<ZContext, ZMQ.Socket> asyncConnectionPool) {
        this.localInvocationDispatcher = localInvocationDispatcher;
        this.payloadWriter = payloadWriter;
        this.payloadReader = payloadReader;
        this.outbound = asyncConnectionPool;
        this.identity = IdentityUtil.popIdentity(zMsg);
        RequestHeader requestHeader = new RequestHeader();
        requestHeader.getByteBuffer().put(zMsg.remove().getData());
        int i = requestHeader.additionalParts.get();
        this.remaining = new AtomicInteger(i);
        this.payload = zMsg.remove().getData();
        this.syncInvocationResultConsumer = invocationResult -> {
            if (this.sync.getAndSet(true)) {
                logger.error("Already set sync response.  Ignoring {}", invocationResult);
            } else {
                sendResult(invocationResult, 0);
            }
        };
        this.syncInvocationErrorConsumer = invocationError -> {
            if (this.sync.getAndSet(true)) {
                logger.error("Already set sync response.  Ignoring {}", invocationError);
            } else {
                sendError(invocationError, 0);
            }
        };
        this.asyncInvocationErrorConsumer = invocationError2 -> {
            if (this.remaining.getAndSet(0) <= 0) {
                logger.error("Suppressing invocation error.  Already sent.", invocationError2.getThrowable());
            } else {
                sendError(invocationError2, 1);
            }
        };
        this.asyncInvocationResultConsumerList = (List) IntStream.range(0, i).map(i2 -> {
            return i2 + 1;
        }).mapToObj(i3 -> {
            return invocationResult2 -> {
                if (this.remaining.getAndDecrement() <= 0) {
                    logger.debug("Ignoring invocation result {} because of previous errors.", invocationResult2);
                } else {
                    sendResult(invocationResult2, i3);
                }
            };
        }).collect(Collectors.toList());
    }

    public void dispatch() {
        try {
            this.localInvocationDispatcher.dispatch((Invocation) this.payloadReader.read(Invocation.class, this.payload), this.syncInvocationResultConsumer, this.syncInvocationErrorConsumer, this.asyncInvocationResultConsumerList, this.asyncInvocationErrorConsumer);
            if (!this.sync.get()) {
                throw new InternalException("Sync callback was not made.");
            }
        } catch (IOException e) {
            InvocationError invocationError = new InvocationError();
            invocationError.setThrowable(e);
            sendError(invocationError, 0);
        }
    }

    private void sendResult(InvocationResult invocationResult, int i) {
        ResponseHeader responseHeader = new ResponseHeader();
        responseHeader.type.set(MessageType.INVOCATION_RESULT);
        responseHeader.part.set(i);
        byte[] bArr = new byte[responseHeader.size()];
        responseHeader.getByteBuffer().get(bArr);
        try {
            byte[] write = this.payloadWriter.write(invocationResult);
            ZMsg duplicate = this.identity.duplicate();
            duplicate.addLast(IdentityUtil.EMPTY_DELIMITER);
            duplicate.addLast(bArr);
            duplicate.addLast(write);
            write(duplicate);
        } catch (IOException e) {
            logger.error("Could not write payload to byte stream.  Sending empty.", e);
            InvocationError invocationError = new InvocationError();
            invocationError.setThrowable(e);
            sendError(invocationError, i);
        }
    }

    private void sendError(InvocationError invocationError, int i) {
        byte[] bArr;
        ResponseHeader responseHeader = new ResponseHeader();
        responseHeader.type.set(MessageType.INVOCATION_ERROR);
        responseHeader.part.set(i);
        byte[] bArr2 = new byte[responseHeader.size()];
        responseHeader.getByteBuffer().get(bArr2);
        try {
            bArr = this.payloadWriter.write(invocationError);
        } catch (Exception e) {
            logger.error("Could not write payload to byte stream.  Sending empty payload.", e);
            bArr = new byte[0];
        }
        ZMsg duplicate = this.identity.duplicate();
        duplicate.addLast(IdentityUtil.EMPTY_DELIMITER);
        duplicate.addLast(bArr2);
        duplicate.addLast(bArr);
        write(duplicate);
    }

    private void write(ZMsg zMsg) {
        this.outbound.acquireNextAvailableConnection(asyncConnection -> {
            asyncConnection.setEvents(new AsyncConnection.Event[]{AsyncConnection.Event.WRITE, AsyncConnection.Event.ERROR});
            asyncConnection.onWrite(asyncConnection -> {
                zMsg.send((ZMQ.Socket) asyncConnection.socket());
                asyncConnection.recycle();
            });
            asyncConnection.onError(asyncConnection2 -> {
                logger.error("Got error writing response to connection {} (errno: {})", asyncConnection, Integer.valueOf(((ZMQ.Socket) asyncConnection.socket()).errno()));
                asyncConnection.close();
            });
        });
    }
}
