package org.axonframework.extensions.springcloud.commandhandling;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.CommandResultMessage;
import org.axonframework.commandhandling.GenericCommandResultMessage;
import org.axonframework.commandhandling.distributed.CommandBusConnector;
import org.axonframework.commandhandling.distributed.CommandDispatchException;
import org.axonframework.commandhandling.distributed.Member;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.DirectExecutor;
import org.axonframework.common.Registration;
import org.axonframework.lifecycle.ShutdownLatch;
import org.axonframework.lifecycle.StartHandler;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.serialization.Serializer;
import org.axonframework.tracing.NoOpSpanFactory;
import org.axonframework.tracing.Span;
import org.axonframework.tracing.SpanFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestOperations;

@RequestMapping({"/spring-command-bus-connector"})
@RestController
/* loaded from: input_file:org/axonframework/extensions/springcloud/commandhandling/SpringHttpCommandBusConnector.class */
public class SpringHttpCommandBusConnector implements CommandBusConnector {
    private static final Logger logger = LoggerFactory.getLogger(SpringHttpCommandBusConnector.class);
    private static final boolean EXPECT_REPLY = true;
    private static final boolean DO_NOT_EXPECT_REPLY = false;
    private static final String COMMAND_BUS_CONNECTOR_PATH = "/spring-command-bus-connector/command";
    private final CommandBus localCommandBus;
    private final RestOperations restOperations;
    private final Serializer serializer;
    private final Executor executor;
    private final SpanFactory spanFactory;
    private final ShutdownLatch shutdownLatch = new ShutdownLatch();

    /* loaded from: input_file:org/axonframework/extensions/springcloud/commandhandling/SpringHttpCommandBusConnector$Builder.class */
    public static class Builder {
        private CommandBus localCommandBus;
        private RestOperations restOperations;
        private Serializer serializer;
        private Executor executor = DirectExecutor.INSTANCE;
        private SpanFactory spanFactory = NoOpSpanFactory.INSTANCE;

        public Builder localCommandBus(CommandBus commandBus) {
            BuilderUtils.assertNonNull(commandBus, "Local CommandBus may not be null");
            this.localCommandBus = commandBus;
            return this;
        }

        public Builder restOperations(RestOperations restOperations) {
            BuilderUtils.assertNonNull(restOperations, "RestOperations may not be null");
            this.restOperations = restOperations;
            return this;
        }

        public Builder serializer(Serializer serializer) {
            BuilderUtils.assertNonNull(serializer, "Serializer may not be null");
            this.serializer = serializer;
            return this;
        }

        public Builder executor(Executor executor) {
            BuilderUtils.assertNonNull(executor, "Executor may not be null");
            this.executor = executor;
            return this;
        }

        public Builder spanFactory(SpanFactory spanFactory) {
            BuilderUtils.assertNonNull(spanFactory, "SpanFactory may not be null");
            this.spanFactory = spanFactory;
            return this;
        }

        public SpringHttpCommandBusConnector build() {
            return new SpringHttpCommandBusConnector(this);
        }

        protected void validate() {
            BuilderUtils.assertNonNull(this.localCommandBus, "The local CommandBus is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.restOperations, "The RestOperations is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.serializer, "The Serializer is a hard requirement and should be provided");
        }
    }

    /* loaded from: input_file:org/axonframework/extensions/springcloud/commandhandling/SpringHttpCommandBusConnector$SpringHttpReplyFutureCallback.class */
    public class SpringHttpReplyFutureCallback<C, R> extends CompletableFuture<SpringHttpReplyMessage<?>> implements CommandCallback<C, R> {
        public SpringHttpReplyFutureCallback() {
        }

        public void onResult(CommandMessage<? extends C> commandMessage, @Nonnull CommandResultMessage<? extends R> commandResultMessage) {
            super.complete(SpringHttpCommandBusConnector.this.createReply(commandMessage.getIdentifier(), commandResultMessage));
        }
    }

    protected SpringHttpCommandBusConnector(Builder builder) {
        builder.validate();
        this.localCommandBus = builder.localCommandBus;
        this.restOperations = builder.restOperations;
        this.serializer = builder.serializer;
        this.executor = builder.executor;
        this.spanFactory = builder.spanFactory;
    }

    @StartHandler(phase = -134217728)
    public void start() {
        this.shutdownLatch.initialize();
    }

    public static Builder builder() {
        return new Builder();
    }

    public <C> void send(Member member, @Nonnull CommandMessage<? extends C> commandMessage) {
        this.shutdownLatch.ifShuttingDown("JGroupsConnector is shutting down, no new commands will be sent.");
        if (member.local()) {
            this.localCommandBus.dispatch(commandMessage);
        } else {
            this.executor.execute(() -> {
                sendRemotely(member, commandMessage, false);
            });
        }
    }

    public <C, R> void send(Member member, @Nonnull CommandMessage<C> commandMessage, @Nonnull CommandCallback<? super C, R> commandCallback) {
        this.shutdownLatch.ifShuttingDown("SpringHttpCommandBusConnector is shutting down, no new commands will be sent.");
        ShutdownLatch.ActivityHandle registerActivity = this.shutdownLatch.registerActivity();
        if (!member.local()) {
            this.executor.execute(() -> {
                try {
                    try {
                        SpringHttpReplyMessage springHttpReplyMessage = (SpringHttpReplyMessage) sendRemotely(member, commandMessage, true).getBody();
                        if (springHttpReplyMessage != null) {
                            commandCallback.onResult(commandMessage, springHttpReplyMessage.getCommandResultMessage(this.serializer));
                        }
                    } catch (Exception e) {
                        commandCallback.onResult(commandMessage, GenericCommandResultMessage.asCommandResultMessage(new CommandDispatchException("An exception occurred while dispatching a command or its result", e)));
                        registerActivity.end();
                    }
                } finally {
                    registerActivity.end();
                }
            });
        } else {
            this.localCommandBus.dispatch(commandMessage, (commandMessage2, commandResultMessage) -> {
                try {
                    commandCallback.onResult(commandMessage2, commandResultMessage);
                    registerActivity.end();
                } catch (Throwable th) {
                    registerActivity.end();
                    throw th;
                }
            });
        }
    }

    public CompletableFuture<Void> initiateShutdown() {
        return this.shutdownLatch.initiateShutdown();
    }

    private <C, R> ResponseEntity<SpringHttpReplyMessage<R>> sendRemotely(Member member, CommandMessage<? extends C> commandMessage, boolean z) {
        Optional connectionEndpoint = member.getConnectionEndpoint(URI.class);
        if (connectionEndpoint.isPresent()) {
            URI uri = (URI) connectionEndpoint.get();
            return this.restOperations.exchange(buildURIForPath(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), uri.getPath()), HttpMethod.POST, new HttpEntity(new SpringHttpDispatchMessage(commandMessage, this.serializer, z)), new ParameterizedTypeReference<SpringHttpReplyMessage<R>>() { // from class: org.axonframework.extensions.springcloud.commandhandling.SpringHttpCommandBusConnector.1
            });
        }
        String format = String.format("No Connection Endpoint found in Member [%s] for protocol [%s] to send the command message [%s] to", member, URI.class, commandMessage);
        logger.error(format);
        throw new IllegalArgumentException(format);
    }

    private URI buildURIForPath(String str, String str2, String str3, int i, String str4) {
        try {
            return new URI(str, str2, str3, i, str4 + COMMAND_BUS_CONNECTOR_PATH, null, null);
        } catch (URISyntaxException e) {
            logger.error("Failed to build URI for [{}{}{}], with user info [{}] and path [{}]", new Object[]{str, str3, Integer.valueOf(i), str2, COMMAND_BUS_CONNECTOR_PATH, e});
            throw new IllegalArgumentException(e);
        }
    }

    public Registration subscribe(@Nonnull String str, @Nonnull MessageHandler<? super CommandMessage<?>> messageHandler) {
        return this.localCommandBus.subscribe(str, messageHandler);
    }

    public Optional<CommandBus> localSegment() {
        return Optional.of(this.localCommandBus);
    }

    @PostMapping({"/command"})
    public <C, R> CompletableFuture<?> receiveCommand(@RequestBody SpringHttpDispatchMessage<C> springHttpDispatchMessage) {
        try {
            CommandMessage<C> commandMessage = springHttpDispatchMessage.getCommandMessage(this.serializer);
            Span createChildHandlerSpan = this.spanFactory.createChildHandlerSpan(() -> {
                return "SpringHttpCommandBusConnector.handle";
            }, commandMessage, new Message[DO_NOT_EXPECT_REPLY]);
            return (CompletableFuture) createChildHandlerSpan.runSupplier(() -> {
                if (!springHttpDispatchMessage.isExpectReply()) {
                    try {
                        this.localCommandBus.dispatch(commandMessage);
                        return CompletableFuture.completedFuture("");
                    } catch (Exception e) {
                        logger.error("Could not dispatch command", e);
                        createChildHandlerSpan.recordException(e);
                        return CompletableFuture.completedFuture(createReply(commandMessage.getIdentifier(), GenericCommandResultMessage.asCommandResultMessage(e)));
                    }
                }
                try {
                    SpringHttpReplyFutureCallback springHttpReplyFutureCallback = new SpringHttpReplyFutureCallback();
                    this.localCommandBus.dispatch(commandMessage, springHttpReplyFutureCallback);
                    return springHttpReplyFutureCallback;
                } catch (Exception e2) {
                    logger.error("Could not dispatch command", e2);
                    createChildHandlerSpan.recordException(e2);
                    return CompletableFuture.completedFuture(createReply(commandMessage.getIdentifier(), GenericCommandResultMessage.asCommandResultMessage(e2)));
                }
            });
        } catch (Exception e) {
            logger.error("Could not dispatch command", e);
            return springHttpDispatchMessage.isExpectReply() ? CompletableFuture.completedFuture(createReply("UNKNOWN", GenericCommandResultMessage.asCommandResultMessage(e))) : exceptionallyCompleted(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public SpringHttpReplyMessage<?> createReply(String str, CommandResultMessage<?> commandResultMessage) {
        try {
            return new SpringHttpReplyMessage<>(str, commandResultMessage, this.serializer);
        } catch (Exception e) {
            logger.warn("Could not serialize command reply [{}]. Sending back NULL.", commandResultMessage, e);
            return new SpringHttpReplyMessage<>(str, GenericCommandResultMessage.asCommandResultMessage(e), this.serializer);
        }
    }

    private static CompletableFuture<Object> exceptionallyCompleted(Exception exc) {
        CompletableFuture<Object> completableFuture = new CompletableFuture<>();
        completableFuture.completeExceptionally(exc);
        return completableFuture;
    }

    public Registration registerHandlerInterceptor(@Nonnull MessageHandlerInterceptor<? super CommandMessage<?>> messageHandlerInterceptor) {
        return this.localCommandBus.registerHandlerInterceptor(messageHandlerInterceptor);
    }
}
