package org.enodeframework.queue.command;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.net.NetServer;
import io.vertx.core.parsetools.RecordParser;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.enodeframework.commanding.CommandResult;
import org.enodeframework.commanding.CommandReturnType;
import org.enodeframework.commanding.CommandStatus;
import org.enodeframework.commanding.ICommand;
import org.enodeframework.common.SysProperties;
import org.enodeframework.common.exception.DuplicateRegisterException;
import org.enodeframework.common.scheduling.IScheduleService;
import org.enodeframework.common.scheduling.Worker;
import org.enodeframework.common.utilities.RemoteReply;
import org.enodeframework.queue.domainevent.DomainEventHandledMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/enodeframework/queue/command/DefaultCommandResultProcessor.class */
public class DefaultCommandResultProcessor extends AbstractVerticle implements ICommandResultProcessor {
    private static final Logger logger = LoggerFactory.getLogger(DefaultCommandResultProcessor.class);
    private final int port;
    private final String scanExpireCommandTaskName;
    private final IScheduleService scheduleService;
    private final int completionSourceTimeout;
    private final Cache<String, CommandTaskCompletionSource> commandTaskDict;
    private final BlockingQueue<CommandResult> commandExecutedMessageLocalQueue;
    private final BlockingQueue<DomainEventHandledMessage> domainEventHandledMessageLocalQueue;
    private final Worker commandExecutedMessageWorker;
    private final Worker domainEventHandledMessageWorker;
    private InetSocketAddress bindAddress;
    private NetServer netServer;
    private boolean started;

    public DefaultCommandResultProcessor(IScheduleService iScheduleService, int i) {
        this(iScheduleService, i, SysProperties.COMPLETION_SOURCE_TIMEOUT);
    }

    public DefaultCommandResultProcessor(IScheduleService iScheduleService, int i, int i2) {
        this.scheduleService = iScheduleService;
        this.port = i;
        this.completionSourceTimeout = i2;
        this.scanExpireCommandTaskName = "CleanTimeoutCommandTask_" + System.currentTimeMillis() + new Random().nextInt(SysProperties.COMPLETION_SOURCE_TIMEOUT);
        this.commandTaskDict = CacheBuilder.newBuilder().removalListener(removalNotification -> {
            if (removalNotification.getCause().equals(RemovalCause.EXPIRED)) {
                processTimeoutCommand((String) removalNotification.getKey(), (CommandTaskCompletionSource) removalNotification.getValue());
            }
        }).expireAfterWrite(i2, TimeUnit.MILLISECONDS).build();
        this.commandExecutedMessageLocalQueue = new LinkedBlockingQueue();
        this.domainEventHandledMessageLocalQueue = new LinkedBlockingQueue();
        this.commandExecutedMessageWorker = new Worker("ProcessExecutedCommandMessage", () -> {
            processExecutedCommandMessage(this.commandExecutedMessageLocalQueue.take());
        });
        this.domainEventHandledMessageWorker = new Worker("ProcessDomainEventHandledMessage", () -> {
            processDomainEventHandledMessage(this.domainEventHandledMessageLocalQueue.take());
        });
    }

    public void startServer(int i) {
        this.netServer = this.vertx.createNetServer();
        this.netServer.connectHandler(netSocket -> {
            RecordParser.newDelimited(SysProperties.DELIMITED, netSocket).endHandler(r3 -> {
                netSocket.close();
            }).exceptionHandler(th -> {
                logger.error("Failed to start NetServer port:{}", Integer.valueOf(i), th);
                netSocket.close();
            }).handler(buffer -> {
                processRequestInternal((RemoteReply) buffer.toJsonObject().mapTo(RemoteReply.class));
            });
        });
        this.bindAddress = new InetSocketAddress(i);
        this.netServer.listen(i);
    }

    @Override // org.enodeframework.queue.command.ICommandResultProcessor
    public void registerProcessingCommand(ICommand iCommand, CommandReturnType commandReturnType, CompletableFuture<CommandResult> completableFuture) {
        if (this.commandTaskDict.asMap().containsKey(iCommand.getId())) {
            throw new DuplicateRegisterException(String.format("Duplicate processing command registration, type:%s, id:%s", iCommand.getClass().getName(), iCommand.getId()));
        }
        this.commandTaskDict.asMap().put(iCommand.getId(), new CommandTaskCompletionSource(iCommand.getAggregateRootId(), commandReturnType, completableFuture));
    }

    public void start() {
        if (this.started) {
            return;
        }
        startServer(this.port);
        this.commandExecutedMessageWorker.start();
        this.domainEventHandledMessageWorker.start();
        IScheduleService iScheduleService = this.scheduleService;
        String str = this.scanExpireCommandTaskName;
        Cache<String, CommandTaskCompletionSource> cache = this.commandTaskDict;
        cache.getClass();
        iScheduleService.startTask(str, cache::cleanUp, this.completionSourceTimeout, this.completionSourceTimeout);
        this.started = true;
    }

    public void stop() {
        this.scheduleService.stopTask(this.scanExpireCommandTaskName);
        this.commandExecutedMessageWorker.stop();
        this.domainEventHandledMessageWorker.stop();
        this.netServer.close();
    }

    @Override // org.enodeframework.queue.command.ICommandResultProcessor
    public InetSocketAddress getBindAddress() {
        return this.bindAddress;
    }

    public void processRequestInternal(RemoteReply remoteReply) {
        if (remoteReply.getCode() == CommandReturnType.CommandExecuted.getValue()) {
            this.commandExecutedMessageLocalQueue.add(remoteReply.getCommandResult());
        } else if (remoteReply.getCode() != CommandReturnType.EventHandled.getValue()) {
            logger.error("Invalid remoting reply: {}", remoteReply);
        } else {
            this.domainEventHandledMessageLocalQueue.add(remoteReply.getEventHandledMessage());
        }
    }

    private void processExecutedCommandMessage(CommandResult commandResult) {
        CommandTaskCompletionSource commandTaskCompletionSource = (CommandTaskCompletionSource) this.commandTaskDict.asMap().get(commandResult.getCommandId());
        this.commandTaskDict.cleanUp();
        if (commandTaskCompletionSource == null) {
            if (logger.isDebugEnabled()) {
                logger.debug("Command result return, {}, but commandTaskCompletionSource maybe timeout expired.", commandResult);
                return;
            }
            return;
        }
        if (commandTaskCompletionSource.getCommandReturnType().equals(CommandReturnType.CommandExecuted)) {
            this.commandTaskDict.asMap().remove(commandResult.getCommandId());
            if (commandTaskCompletionSource.getTaskCompletionSource().complete(commandResult) && logger.isDebugEnabled()) {
                logger.debug("Command result return CommandExecuted, {}", commandResult);
                return;
            }
            return;
        }
        if (commandTaskCompletionSource.getCommandReturnType().equals(CommandReturnType.EventHandled)) {
            if (commandResult.getStatus().equals(CommandStatus.Failed) || commandResult.getStatus().equals(CommandStatus.NothingChanged)) {
                this.commandTaskDict.asMap().remove(commandResult.getCommandId());
                if (commandTaskCompletionSource.getTaskCompletionSource().complete(commandResult) && logger.isDebugEnabled()) {
                    logger.debug("Command result return EventHandled, {}", commandResult);
                }
            }
        }
    }

    private void processTimeoutCommand(String str, CommandTaskCompletionSource commandTaskCompletionSource) {
        if (commandTaskCompletionSource != null) {
            logger.error("Wait command notify timeout, commandId:{}", str);
            commandTaskCompletionSource.getTaskCompletionSource().complete(new CommandResult(CommandStatus.Failed, str, commandTaskCompletionSource.getAggregateRootId(), "Wait command notify timeout.", String.class.getName()));
        }
    }

    @Override // org.enodeframework.queue.command.ICommandResultProcessor
    public void processFailedSendingCommand(ICommand iCommand) {
        CommandTaskCompletionSource commandTaskCompletionSource = (CommandTaskCompletionSource) this.commandTaskDict.asMap().remove(iCommand.getId());
        if (commandTaskCompletionSource != null) {
            commandTaskCompletionSource.getTaskCompletionSource().complete(new CommandResult(CommandStatus.Failed, iCommand.getId(), iCommand.getAggregateRootId(), "Failed to send the command.", String.class.getName()));
        }
    }

    private void processDomainEventHandledMessage(DomainEventHandledMessage domainEventHandledMessage) {
        CommandTaskCompletionSource commandTaskCompletionSource = (CommandTaskCompletionSource) this.commandTaskDict.asMap().remove(domainEventHandledMessage.getCommandId());
        if (commandTaskCompletionSource != null) {
            CommandResult commandResult = new CommandResult(CommandStatus.Success, domainEventHandledMessage.getCommandId(), domainEventHandledMessage.getAggregateRootId(), domainEventHandledMessage.getCommandResult(), domainEventHandledMessage.getCommandResult() != null ? String.class.getName() : null);
            if (commandTaskCompletionSource.getTaskCompletionSource().complete(commandResult) && logger.isDebugEnabled()) {
                logger.debug("Command result return, {}", commandResult);
            }
        }
    }
}
