package org.axonframework.axonserver.connector.command;

import io.axoniq.axonserver.grpc.command.Command;
import io.axoniq.axonserver.grpc.command.CommandProviderInbound;
import io.axoniq.axonserver.grpc.command.CommandProviderOutbound;
import io.axoniq.axonserver.grpc.command.CommandResponse;
import io.axoniq.axonserver.grpc.command.CommandServiceGrpc;
import io.axoniq.axonserver.grpc.command.CommandSubscription;
import io.grpc.ClientInterceptor;
import io.grpc.stub.StreamObserver;
import java.util.Comparator;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.DispatchInterceptors;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.PlatformConnectionManager;
import org.axonframework.axonserver.connector.util.ContextAddingInterceptor;
import org.axonframework.axonserver.connector.util.ExceptionSerializer;
import org.axonframework.axonserver.connector.util.FlowControllingStreamObserver;
import org.axonframework.axonserver.connector.util.ProcessingInstructionHelper;
import org.axonframework.axonserver.connector.util.TokenAddingInterceptor;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandExecutionException;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.CommandResultMessage;
import org.axonframework.commandhandling.GenericCommandResultMessage;
import org.axonframework.commandhandling.distributed.RoutingStrategy;
import org.axonframework.common.Registration;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/axonserver/connector/command/AxonServerCommandBus.class */
public class AxonServerCommandBus implements CommandBus {
    private final CommandBus localSegment;
    private final CommandRouterSubscriber commandRouterSubscriber;
    private final PlatformConnectionManager platformConnectionManager;
    private final RoutingStrategy routingStrategy;
    private final CommandPriorityCalculator priorityCalculator;
    private final CommandSerializer serializer;
    private final AxonServerConfiguration configuration;
    private final ClientInterceptor[] interceptors;
    private final DispatchInterceptors<CommandMessage<?>> dispatchInterceptors;
    private Logger logger;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/axonframework/axonserver/connector/command/AxonServerCommandBus$CommandRouterSubscriber.class */
    public class CommandRouterSubscriber {
        private final CopyOnWriteArraySet<String> subscribedCommands = new CopyOnWriteArraySet<>();
        private final PriorityBlockingQueue<Command> commandQueue;
        private final ExecutorService executor;
        private volatile boolean subscribing;
        private volatile StreamObserver<CommandProviderOutbound> subscriberStreamObserver;

        CommandRouterSubscriber() {
            this.executor = Executors.newFixedThreadPool(AxonServerCommandBus.this.configuration.getCommandThreads().intValue());
            AxonServerCommandBus.this.platformConnectionManager.addReconnectListener(this::resubscribe);
            AxonServerCommandBus.this.platformConnectionManager.addDisconnectListener(this::unsubscribeAll);
            this.commandQueue = new PriorityBlockingQueue<>(1000, Comparator.comparingLong(command -> {
                return -ProcessingInstructionHelper.priority(command.getProcessingInstructionsList());
            }));
            IntStream.range(0, AxonServerCommandBus.this.configuration.getCommandThreads().intValue()).forEach(i -> {
                this.executor.submit(this::commandExecutor);
            });
        }

        private void commandExecutor() {
            AxonServerCommandBus.this.logger.debug("Starting command Executor");
            while (true) {
                try {
                    Command poll = this.commandQueue.poll(10L, TimeUnit.SECONDS);
                    if (poll != null) {
                        AxonServerCommandBus.this.logger.debug("Received command: {}", poll);
                        processCommand(poll);
                    }
                } catch (InterruptedException e) {
                    AxonServerCommandBus.this.logger.warn("Interrupted queryExecutor", e);
                    return;
                }
            }
        }

        private void resubscribe() {
            if (this.subscribedCommands.isEmpty() || this.subscribing) {
                return;
            }
            try {
                StreamObserver<CommandProviderOutbound> subscriberObserver = getSubscriberObserver();
                this.subscribedCommands.forEach(str -> {
                    subscriberObserver.onNext(CommandProviderOutbound.newBuilder().setSubscribe(CommandSubscription.newBuilder().setCommand(str).setComponentName(AxonServerCommandBus.this.configuration.getComponentName()).setClientName(AxonServerCommandBus.this.configuration.getClientName()).setMessageId(UUID.randomUUID().toString()).m527build()).m427build());
                });
            } catch (Exception e) {
                AxonServerCommandBus.this.logger.warn("Error while resubscribing - {}", e.getMessage());
            }
        }

        public void subscribe(String str) {
            this.subscribing = true;
            this.subscribedCommands.add(str);
            try {
                getSubscriberObserver().onNext(CommandProviderOutbound.newBuilder().setSubscribe(CommandSubscription.newBuilder().setCommand(str).setClientName(AxonServerCommandBus.this.configuration.getClientName()).setComponentName(AxonServerCommandBus.this.configuration.getComponentName()).setMessageId(UUID.randomUUID().toString()).m527build()).m427build());
            } catch (Exception e) {
                AxonServerCommandBus.this.logger.warn("Subscribe at AxonServer platform failed - {}, trying again at later moment", e.getMessage());
            } finally {
                this.subscribing = false;
            }
        }

        private void processCommand(Command command) {
            StreamObserver<CommandProviderOutbound> subscriberObserver = getSubscriberObserver();
            try {
                dispatchLocal(AxonServerCommandBus.this.serializer.deserialize(command), subscriberObserver);
            } catch (Throwable th) {
                AxonServerCommandBus.this.logger.error("Error while dispatching command {} - {}", new Object[]{command.getName(), th.getMessage(), th});
                subscriberObserver.onNext(CommandProviderOutbound.newBuilder().setCommandResponse(CommandResponse.newBuilder().setMessageIdentifier(UUID.randomUUID().toString()).setRequestIdentifier(command.getMessageIdentifier()).setErrorCode(ErrorCode.resolve(th).errorCode()).setMessage(ExceptionSerializer.serialize(AxonServerCommandBus.this.configuration.getClientName(), th))).m427build());
            }
        }

        private synchronized StreamObserver<CommandProviderOutbound> getSubscriberObserver() {
            if (this.subscriberStreamObserver == null) {
                AxonServerCommandBus.this.logger.info("Create new subscriber");
                this.subscriberStreamObserver = new FlowControllingStreamObserver(AxonServerCommandBus.this.platformConnectionManager.getCommandStream(new StreamObserver<CommandProviderInbound>() { // from class: org.axonframework.axonserver.connector.command.AxonServerCommandBus.CommandRouterSubscriber.1
                    public void onNext(CommandProviderInbound commandProviderInbound) {
                        AxonServerCommandBus.this.logger.debug("Received from server: {}", commandProviderInbound);
                        switch (commandProviderInbound.getRequestCase()) {
                            case COMMAND:
                                CommandRouterSubscriber.this.commandQueue.add(commandProviderInbound.getCommand());
                                return;
                            default:
                                return;
                        }
                    }

                    public void onError(Throwable th) {
                        AxonServerCommandBus.this.logger.warn("Received error from server: {}", th.getMessage());
                        CommandRouterSubscriber.this.subscriberStreamObserver = null;
                    }

                    public void onCompleted() {
                        AxonServerCommandBus.this.logger.debug("Received completed from server");
                        CommandRouterSubscriber.this.subscriberStreamObserver = null;
                    }
                }, AxonServerCommandBus.this.interceptors), AxonServerCommandBus.this.configuration, flowControl -> {
                    return CommandProviderOutbound.newBuilder().setFlowControl(flowControl).m427build();
                }, commandProviderOutbound -> {
                    return commandProviderOutbound.getRequestCase().equals(CommandProviderOutbound.RequestCase.COMMANDRESPONSE);
                }).sendInitialPermits();
            }
            return this.subscriberStreamObserver;
        }

        public void unsubscribe(String str) {
            this.subscribedCommands.remove(str);
            try {
                getSubscriberObserver().onNext(CommandProviderOutbound.newBuilder().setUnsubscribe(CommandSubscription.newBuilder().setCommand(str).setClientName(AxonServerCommandBus.this.configuration.getClientName()).setMessageId(UUID.randomUUID().toString()).m527build()).m427build());
            } catch (Exception e) {
            }
        }

        void unsubscribeAll() {
            Iterator<String> it = this.subscribedCommands.iterator();
            while (it.hasNext()) {
                try {
                    getSubscriberObserver().onNext(CommandProviderOutbound.newBuilder().setUnsubscribe(CommandSubscription.newBuilder().setCommand(it.next()).setClientName(AxonServerCommandBus.this.configuration.getClientName()).setMessageId(UUID.randomUUID().toString()).m527build()).m427build());
                } catch (Exception e) {
                }
            }
            this.subscriberStreamObserver = null;
        }

        private <C> void dispatchLocal(final CommandMessage<C> commandMessage, final StreamObserver<CommandProviderOutbound> streamObserver) {
            AxonServerCommandBus.this.logger.debug("DispatchLocal: {}", commandMessage.getCommandName());
            AxonServerCommandBus.this.localSegment.dispatch(commandMessage, new CommandCallback<C, Object>() { // from class: org.axonframework.axonserver.connector.command.AxonServerCommandBus.CommandRouterSubscriber.2
                public void onSuccess(CommandMessage<? extends C> commandMessage2, CommandResultMessage<?> commandResultMessage) {
                    AxonServerCommandBus.this.logger.debug("DispatchLocal: done {}", commandMessage.getCommandName());
                    streamObserver.onNext(AxonServerCommandBus.this.serializer.serialize(commandResultMessage.getPayload(), commandMessage.getIdentifier()));
                }

                public void onFailure(CommandMessage<? extends C> commandMessage2, Throwable th) {
                    streamObserver.onNext(CommandProviderOutbound.newBuilder().setCommandResponse(CommandResponse.newBuilder().setMessageIdentifier(UUID.randomUUID().toString()).setRequestIdentifier(commandMessage.getIdentifier()).setErrorCode(ErrorCode.resolve(th).errorCode()).setMessage(ExceptionSerializer.serialize(AxonServerCommandBus.this.configuration.getClientName(), th))).m427build());
                    AxonServerCommandBus.this.logger.info("DispatchLocal: failure {} - {}", new Object[]{commandMessage.getCommandName(), th.getMessage(), th});
                }
            });
        }

        public void disconnect() {
            if (this.subscriberStreamObserver != null) {
                this.subscriberStreamObserver.onCompleted();
            }
        }
    }

    public AxonServerCommandBus(PlatformConnectionManager platformConnectionManager, AxonServerConfiguration axonServerConfiguration, CommandBus commandBus, Serializer serializer, RoutingStrategy routingStrategy) {
        this(platformConnectionManager, axonServerConfiguration, commandBus, serializer, routingStrategy, new CommandPriorityCalculator() { // from class: org.axonframework.axonserver.connector.command.AxonServerCommandBus.1
        });
    }

    public AxonServerCommandBus(PlatformConnectionManager platformConnectionManager, AxonServerConfiguration axonServerConfiguration, CommandBus commandBus, Serializer serializer, RoutingStrategy routingStrategy, CommandPriorityCalculator commandPriorityCalculator) {
        this.dispatchInterceptors = new DispatchInterceptors<>();
        this.logger = LoggerFactory.getLogger(AxonServerCommandBus.class);
        this.localSegment = commandBus;
        this.serializer = new CommandSerializer(serializer, axonServerConfiguration);
        this.platformConnectionManager = platformConnectionManager;
        this.routingStrategy = routingStrategy;
        this.priorityCalculator = commandPriorityCalculator;
        this.configuration = axonServerConfiguration;
        this.commandRouterSubscriber = new CommandRouterSubscriber();
        this.interceptors = new ClientInterceptor[]{new TokenAddingInterceptor(axonServerConfiguration.getToken()), new ContextAddingInterceptor(axonServerConfiguration.getContext())};
    }

    public <C> void dispatch(CommandMessage<C> commandMessage) {
        dispatch(commandMessage, new CommandCallback<C, Object>() { // from class: org.axonframework.axonserver.connector.command.AxonServerCommandBus.2
            public void onSuccess(CommandMessage<? extends C> commandMessage2, CommandResultMessage<?> commandResultMessage) {
            }

            public void onFailure(CommandMessage<? extends C> commandMessage2, Throwable th) {
            }
        });
    }

    public <C, R> void dispatch(CommandMessage<C> commandMessage, final CommandCallback<? super C, ? super R> commandCallback) {
        this.logger.debug("Dispatch with callback: {}", commandMessage.getCommandName());
        final CommandMessage<?> intercept = this.dispatchInterceptors.intercept(commandMessage);
        ((CommandServiceGrpc.CommandServiceStub) CommandServiceGrpc.newStub(this.platformConnectionManager.getChannel()).withInterceptors(this.interceptors)).dispatch(this.serializer.serialize(intercept, this.routingStrategy.getRoutingKey(intercept), this.priorityCalculator.determinePriority(intercept)), new StreamObserver<CommandResponse>() { // from class: org.axonframework.axonserver.connector.command.AxonServerCommandBus.3
            public void onNext(CommandResponse commandResponse) {
                if (commandResponse.hasMessage()) {
                    commandCallback.onFailure(intercept, new CommandExecutionException(commandResponse.getMessage().getMessage(), new RemoteCommandException(commandResponse.getErrorCode(), commandResponse.getMessage())));
                    return;
                }
                AxonServerCommandBus.this.logger.debug("response received - {}", commandResponse);
                Object obj = null;
                if (commandResponse.hasPayload()) {
                    try {
                        obj = AxonServerCommandBus.this.serializer.deserialize(commandResponse);
                    } catch (Exception e) {
                        AxonServerCommandBus.this.logger.info("Failed to deserialize payload - {} - {}", commandResponse.getPayload().getData(), e.getCause().getMessage());
                    }
                }
                commandCallback.onSuccess(intercept, new GenericCommandResultMessage(obj));
            }

            public void onError(Throwable th) {
                commandCallback.onFailure(intercept, th);
            }

            public void onCompleted() {
            }
        });
    }

    public Registration subscribe(String str, MessageHandler<? super CommandMessage<?>> messageHandler) {
        this.logger.debug("Subscribe: {}", str);
        this.commandRouterSubscriber.subscribe(str);
        return new AxonServerRegistration(this.localSegment.subscribe(str, messageHandler), () -> {
            this.commandRouterSubscriber.unsubscribe(str);
        });
    }

    public Registration registerHandlerInterceptor(MessageHandlerInterceptor<? super CommandMessage<?>> messageHandlerInterceptor) {
        return this.localSegment.registerHandlerInterceptor(messageHandlerInterceptor);
    }

    public void disconnect() {
        this.commandRouterSubscriber.disconnect();
    }

    public Registration registerDispatchInterceptor(MessageDispatchInterceptor<? super CommandMessage<?>> messageDispatchInterceptor) {
        return this.dispatchInterceptors.registerDispatchInterceptor(messageDispatchInterceptor);
    }
}
