package at.qubic.api.network;

import at.qubic.api.domain.MessageType;
import at.qubic.api.domain.QubicMessage;
import at.qubic.api.domain.QubicRequest;
import at.qubic.api.exception.InvalidResponseException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import lombok.Generated;
import org.apache.commons.codec.binary.Hex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;

/* loaded from: input_file:at/qubic/api/network/BasicNodeOperations.class */
public class BasicNodeOperations {

    @Generated
    private static final Logger log;
    static final /* synthetic */ boolean $assertionsDisabled;

    public Mono<QubicMessage> sendAndReceiveNext(Node node, Mono<? extends QubicRequest> mono, MessageType messageType) {
        return node.getClient().connect().flatMap(connection -> {
            return exchangeMessage(node, mono, connection).filter(isType(messageType)).next();
        }).switchIfEmpty(Mono.defer(() -> {
            return Mono.error(new InvalidResponseException("Did not receive expected message of type: " + String.valueOf(messageType)));
        })).timeout(Duration.ofSeconds(2L)).doOnError(th -> {
            logNodeCallError(node, th);
        });
    }

    public Flux<QubicMessage> sendAndReceive(Node node, Mono<? extends QubicRequest> mono, MessageType messageType) {
        return node.getClient().connect().flatMapMany(connection -> {
            return exchangeMessage(node, mono, connection).takeUntil(isType(MessageType.END_RESPONSE));
        }).collectList().flatMapMany(list -> {
            if (list.isEmpty() || ((QubicMessage) list.getLast()).getHeader().getType() != MessageType.END_RESPONSE) {
                throw new InvalidResponseException("No end response received");
            }
            return Flux.fromIterable(list);
        }).doOnNext(qubicMessage -> {
            log.debug("Received [{}].", qubicMessage.getHeader());
        }).filter(messageType == null ? isType(MessageType.END_RESPONSE).negate() : isType(messageType)).timeout(Duration.ofSeconds(2L)).doOnError(th -> {
            logNodeCallError(node, th);
        });
    }

    public Mono<? extends Connection> send(Node node, Mono<QubicRequest> mono) {
        Mono doOnNext = mono.doOnNext(qubicRequest -> {
            log.debug("[{}] Sending message of type: [{}].", node.getName(), qubicRequest.getMessageType());
        }).map(qubicRequest2 -> {
            return qubicRequest2.toMessage().toBytes();
        }).doOnNext(bArr -> {
            log.trace("Sending bytes: [{}]", Hex.encodeHexString(bArr));
        });
        return node.getClient().handle((nettyInbound, nettyOutbound) -> {
            return nettyOutbound.sendByteArray(doOnNext);
        }).connect().switchIfEmpty(Mono.defer(() -> {
            return Mono.error(new InvalidResponseException("Unexpected 'empty' result when sending to node: " + node.getName()));
        })).doOnError(th -> {
            logNodeCallError(node, th);
        });
    }

    private Flux<QubicMessage> exchangeMessage(Node node, Mono<? extends QubicRequest> mono, Connection connection) {
        Flux map = exchangeMessages(connection, getRequestBytes(mono)).map(this::readMessage);
        Objects.requireNonNull(node);
        return map.doOnNext(node::updatePublicPeers).skipWhile(isType(MessageType.EXCHANGE_PUBLIC_PEERS));
    }

    private Mono<byte[]> getRequestBytes(Mono<? extends QubicRequest> mono) {
        return convertToMessageBytes(mono.doOnNext(qubicRequest -> {
            log.debug("Sending [{}].", qubicRequest.toMessage().getHeader().getType());
        }));
    }

    private void logNodeCallError(Node node, Throwable th) {
        if ((th instanceof TimeoutException) || (th instanceof ConnectException) || (th instanceof NoRouteToHostException)) {
            log.warn("Node [{}] call errored: {}", node.getName(), th.getMessage());
        } else {
            log.error("Node [{}] call errored.", node.getName(), th);
        }
    }

    private static Flux<ByteBuffer> exchangeMessages(Connection connection, Mono<byte[]> mono) {
        Mono then = connection.outbound().sendByteArray(mono).then();
        Flux asByteBuffer = connection.inbound().receive().asByteBuffer();
        Objects.requireNonNull(connection);
        return then.thenMany(asByteBuffer.doOnTerminate(connection::dispose));
    }

    private QubicMessage readMessage(ByteBuffer byteBuffer) {
        QubicMessage fromBytes = QubicMessage.fromBytes(byteBuffer);
        log.trace("Payload: {}", fromBytes.getPayload());
        if (byteBuffer.hasRemaining()) {
            log.warn("Received [{}] bytes more than expected. Message: {}, remaining: {}.", new Object[]{Integer.valueOf(byteBuffer.remaining()), fromBytes, byteBuffer.get(new byte[byteBuffer.remaining()])});
            if (!$assertionsDisabled && byteBuffer.hasRemaining()) {
                throw new AssertionError();
            }
        }
        if (isType(MessageType.END_RESPONSE).test(fromBytes)) {
            log.debug("Received header with type [{}].", MessageType.END_RESPONSE);
        }
        return fromBytes;
    }

    private Mono<byte[]> convertToMessageBytes(Mono<? extends QubicRequest> mono) {
        return mono.map(qubicRequest -> {
            return qubicRequest.toMessage().toBytes();
        }).doOnNext(bArr -> {
            log.trace("Sending bytes [{}]", Hex.encodeHexString(bArr));
        });
    }

    private static Predicate<QubicMessage> isType(MessageType messageType) {
        return qubicMessage -> {
            return qubicMessage.getHeader().getType() == messageType;
        };
    }

    static {
        $assertionsDisabled = !BasicNodeOperations.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(BasicNodeOperations.class);
    }
}
