package org.enodeframework.queue;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.enodeframework.commanding.CommandResult;
import org.enodeframework.commanding.CommandReturnType;
import org.enodeframework.common.SysProperties;
import org.enodeframework.common.serializing.ISerializeService;
import org.enodeframework.common.utilities.InetUtil;
import org.enodeframework.common.utilities.ReplyMessage;
import org.enodeframework.queue.domainevent.DomainEventHandledMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/enodeframework/queue/DefaultSendReplyService.class */
public class DefaultSendReplyService extends AbstractVerticle implements ISendReplyService {
    private static final Logger logger = LoggerFactory.getLogger(DefaultSendReplyService.class);
    private final ConcurrentHashMap<String, CompletableFuture<NetSocket>> socketMap = new ConcurrentHashMap<>();
    private final ISerializeService serializeService;
    private boolean started;
    private boolean stoped;
    private NetClient netClient;

    public DefaultSendReplyService(ISerializeService iSerializeService) {
        this.serializeService = iSerializeService;
    }

    public void start() {
        if (this.started) {
            return;
        }
        this.netClient = this.vertx.createNetClient(new NetClientOptions());
        this.started = true;
    }

    public void stop() {
        if (this.stoped) {
            return;
        }
        this.netClient.close();
        this.stoped = true;
    }

    @Override // org.enodeframework.queue.ISendReplyService
    public CompletableFuture<Void> sendCommandReply(CommandResult commandResult, InetSocketAddress inetSocketAddress) {
        ReplyMessage replyMessage = new ReplyMessage();
        replyMessage.setCode(CommandReturnType.CommandExecuted.getValue());
        replyMessage.setCommandResult(commandResult);
        return sendReply(replyMessage, inetSocketAddress);
    }

    @Override // org.enodeframework.queue.ISendReplyService
    public CompletableFuture<Void> sendEventReply(DomainEventHandledMessage domainEventHandledMessage, InetSocketAddress inetSocketAddress) {
        ReplyMessage replyMessage = new ReplyMessage();
        replyMessage.setCode(CommandReturnType.EventHandled.getValue());
        replyMessage.setEventHandledMessage(domainEventHandledMessage);
        return sendReply(replyMessage, inetSocketAddress);
    }

    public CompletableFuture<Void> sendReply(ReplyMessage replyMessage, InetSocketAddress inetSocketAddress) {
        String str = this.serializeService.serialize(replyMessage) + SysProperties.DELIMITED;
        String stringAddress = InetUtil.toStringAddress(inetSocketAddress);
        SocketAddress inetSocketAddress2 = SocketAddress.inetSocketAddress(inetSocketAddress.getPort(), inetSocketAddress.getAddress().getHostAddress());
        CompletableFuture<NetSocket> completableFuture = new CompletableFuture<>();
        if (this.socketMap.putIfAbsent(stringAddress, completableFuture) == null) {
            this.netClient.connect(inetSocketAddress2, asyncResult -> {
                if (!asyncResult.succeeded()) {
                    completableFuture.completeExceptionally(asyncResult.cause());
                    logger.error("Failed to connect NetServer, key: {}", stringAddress, asyncResult.cause());
                } else {
                    NetSocket netSocket = (NetSocket) asyncResult.result();
                    netSocket.endHandler(r3 -> {
                        netSocket.close();
                    }).exceptionHandler(th -> {
                        this.socketMap.remove(stringAddress);
                        logger.error("NetSocket occurs unexpected error", th);
                        netSocket.close();
                    }).handler(buffer -> {
                    }).closeHandler(r6 -> {
                        this.socketMap.remove(stringAddress);
                        logger.info("NetClient socket closed: {}", stringAddress);
                    });
                    completableFuture.complete(netSocket);
                }
            });
        }
        return this.socketMap.get(stringAddress).thenAccept(netSocket -> {
            netSocket.write(str);
        }).exceptionally(th -> {
            logger.error("Send command reply has exception, key: {}", stringAddress, th);
            return null;
        });
    }
}
