package org.axonframework.extensions.jgroups.commandhandling;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.CommandResultMessage;
import org.axonframework.commandhandling.GenericCommandResultMessage;
import org.axonframework.commandhandling.distributed.AnnotationRoutingStrategy;
import org.axonframework.commandhandling.distributed.CommandBusConnector;
import org.axonframework.commandhandling.distributed.CommandBusConnectorCommunicationException;
import org.axonframework.commandhandling.distributed.CommandCallbackRepository;
import org.axonframework.commandhandling.distributed.CommandCallbackWrapper;
import org.axonframework.commandhandling.distributed.CommandMessageFilter;
import org.axonframework.commandhandling.distributed.CommandRouter;
import org.axonframework.commandhandling.distributed.ConsistentHash;
import org.axonframework.commandhandling.distributed.ConsistentHashChangeListener;
import org.axonframework.commandhandling.distributed.Member;
import org.axonframework.commandhandling.distributed.RoutingStrategy;
import org.axonframework.commandhandling.distributed.ServiceRegistryException;
import org.axonframework.commandhandling.distributed.SimpleMember;
import org.axonframework.commandhandling.distributed.commandfilter.DenyAll;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.ObjectUtils;
import org.axonframework.common.Registration;
import org.axonframework.lifecycle.ShutdownLatch;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.serialization.Serializer;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.Receiver;
import org.jgroups.View;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/extensions/jgroups/commandhandling/JGroupsConnector.class */
public class JGroupsConnector implements CommandRouter, Receiver, CommandBusConnector {
    private static final Logger logger = LoggerFactory.getLogger(JGroupsConnector.class);
    private static final boolean LOCAL_MEMBER = true;
    private static final boolean NON_LOCAL_MEMBER = false;
    private final CommandBus localSegment;
    private final JChannel channel;
    private final String clusterName;
    private final Serializer serializer;
    private final RoutingStrategy routingStrategy;
    private final ConsistentHashChangeListener consistentHashChangeListener;
    private ExecutorService executorService;
    private final boolean executorProvided;
    private volatile View currentView;
    private final Object monitor = new Object();
    private final ShutdownLatch shutdownLatch = new ShutdownLatch();
    private final Map<String, ShutdownLatch.ActivityHandle> pendingCommands = new ConcurrentHashMap();
    private final CommandCallbackRepository<Address> callbackRepository = new CommandCallbackRepository<>();
    private final JoinCondition joinedCondition = new JoinCondition();
    private final Map<Address, VersionedMember> members = new ConcurrentHashMap();
    private final AtomicReference<ConsistentHash> consistentHash = new AtomicReference<>(new ConsistentHash());
    private final AtomicInteger membershipVersion = new AtomicInteger(NON_LOCAL_MEMBER);
    private volatile int loadFactor = NON_LOCAL_MEMBER;
    private volatile CommandMessageFilter commandFilter = DenyAll.INSTANCE;

    /* loaded from: input_file:org/axonframework/extensions/jgroups/commandhandling/JGroupsConnector$Builder.class */
    public static class Builder {
        private CommandBus localSegment;
        private JChannel channel;
        private String clusterName;
        private Serializer serializer;
        private RoutingStrategy routingStrategy = new AnnotationRoutingStrategy();
        private ConsistentHashChangeListener consistentHashChangeListener = ConsistentHashChangeListener.noOp();
        private ExecutorService executorService;

        public Builder localSegment(CommandBus commandBus) {
            BuilderUtils.assertNonNull(commandBus, "The localSegment may not be null");
            this.localSegment = commandBus;
            return this;
        }

        public Builder channel(JChannel jChannel) {
            BuilderUtils.assertNonNull(jChannel, "JChannel may not be null");
            this.channel = jChannel;
            return this;
        }

        public Builder clusterName(String str) {
            assertClusterName(str, "The clusterName may not be null or empty");
            this.clusterName = str;
            return this;
        }

        public Builder serializer(Serializer serializer) {
            BuilderUtils.assertNonNull(serializer, "Serializer may not be null");
            this.serializer = serializer;
            return this;
        }

        public Builder routingStrategy(RoutingStrategy routingStrategy) {
            BuilderUtils.assertNonNull(routingStrategy, "RoutingStrategy may not be null");
            this.routingStrategy = routingStrategy;
            return this;
        }

        public Builder consistentHashChangeListener(ConsistentHashChangeListener consistentHashChangeListener) {
            BuilderUtils.assertNonNull(consistentHashChangeListener, "ConsistentHashChangeListener may not be null");
            this.consistentHashChangeListener = consistentHashChangeListener;
            return this;
        }

        public Builder executorService(ExecutorService executorService) {
            this.executorService = executorService;
            return this;
        }

        public JGroupsConnector build() {
            return new JGroupsConnector(this);
        }

        protected void validate() {
            BuilderUtils.assertNonNull(this.localSegment, "The localSegment is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.channel, "The JChannel is a hard requirement and should be provided");
            assertClusterName(this.clusterName, "The clusterName is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.serializer, "The Serializer is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.routingStrategy, "The RoutingStrategy is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.consistentHashChangeListener, "The ConsistentHashChangeListener is a hard requirement and should be provided");
        }

        private void assertClusterName(String str, String str2) {
            BuilderUtils.assertThat(str, str3 -> {
                return Objects.nonNull(str3) && !"".equals(str3);
            }, str2);
        }
    }

    /* loaded from: input_file:org/axonframework/extensions/jgroups/commandhandling/JGroupsConnector$JoinCondition.class */
    private static final class JoinCondition {
        private final CountDownLatch joinCountDown;
        private volatile boolean success;

        private JoinCondition() {
            this.joinCountDown = new CountDownLatch(JGroupsConnector.LOCAL_MEMBER);
        }

        public void await() throws InterruptedException {
            this.joinCountDown.await();
        }

        public void await(long j, TimeUnit timeUnit) throws InterruptedException {
            this.joinCountDown.await(j, timeUnit);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void markJoined() {
            this.success = true;
            this.joinCountDown.countDown();
        }

        public boolean isJoined() {
            return this.success;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/extensions/jgroups/commandhandling/JGroupsConnector$VersionedMember.class */
    public static class VersionedMember implements Member {
        private final SimpleMember<Address> member;
        private final int version;

        public VersionedMember(SimpleMember<Address> simpleMember, int i) {
            this.member = simpleMember;
            this.version = i;
        }

        public int order() {
            return this.version;
        }

        public String name() {
            return this.member.name();
        }

        public <T> Optional<T> getConnectionEndpoint(Class<T> cls) {
            return this.member.getConnectionEndpoint(cls);
        }

        public boolean local() {
            return this.member.local();
        }

        public void suspect() {
            this.member.suspect();
        }
    }

    protected JGroupsConnector(Builder builder) {
        builder.validate();
        this.localSegment = builder.localSegment;
        this.channel = builder.channel;
        this.clusterName = builder.clusterName;
        this.serializer = builder.serializer;
        this.routingStrategy = builder.routingStrategy;
        this.consistentHashChangeListener = builder.consistentHashChangeListener;
        ExecutorService executorService = builder.executorService;
        if (executorService == null) {
            this.executorProvided = false;
        } else {
            this.executorService = executorService;
            this.executorProvided = true;
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    public void updateMembership(int i, CommandMessageFilter commandMessageFilter) {
        this.loadFactor = i;
        this.commandFilter = commandMessageFilter;
        broadCastMembership(this.membershipVersion.getAndIncrement(), false);
    }

    protected void broadCastMembership(int i, boolean z) throws ServiceRegistryException {
        try {
            if (this.channel.isConnected()) {
                logger.info("Broadcasting membership from {}", this.channel.getAddress());
                sendMyConfigurationTo(null, z, i);
            }
        } catch (Exception e) {
            throw new ServiceRegistryException("Could not broadcast local membership details to the cluster", e);
        }
    }

    public void connect() throws Exception {
        if (this.channel.getClusterName() != null && !this.clusterName.equals(this.channel.getClusterName())) {
            throw new ConnectionFailedException("Already joined cluster: " + this.channel.getClusterName());
        }
        if (!this.executorProvided) {
            this.executorService = Executors.newCachedThreadPool(new AxonThreadFactory("JGroupsConnector(" + this.clusterName + ")"));
        }
        this.shutdownLatch.initialize();
        this.channel.setReceiver(this);
        this.channel.connect(this.clusterName);
        Address address = this.channel.getAddress();
        SimpleMember simpleMember = new SimpleMember(address.toString(), address, true, (Consumer) null);
        this.members.put(address, new VersionedMember(simpleMember, this.membershipVersion.getAndIncrement()));
        updateConsistentHash(consistentHash -> {
            return consistentHash.with(simpleMember, this.loadFactor, this.commandFilter);
        });
    }

    public void disconnect() {
        this.channel.disconnect();
        if (this.executorProvided) {
            return;
        }
        this.executorService.shutdown();
    }

    public CompletableFuture<Void> initiateShutdown() {
        return this.shutdownLatch.initiateShutdown();
    }

    public void getState(OutputStream outputStream) {
    }

    public void setState(InputStream inputStream) {
    }

    public synchronized void viewAccepted(View view) {
        if (this.currentView == null) {
            this.currentView = view;
            logger.info("Local segment ({}) joined the cluster. Broadcasting configuration.", this.channel.getAddress());
            try {
                broadCastMembership(this.membershipVersion.get(), true);
                this.joinedCondition.markJoined();
            } catch (Exception e) {
                throw new MembershipUpdateFailedException("Failed to broadcast my settings", e);
            }
        } else if (!view.equals(this.currentView)) {
            Address[][] diff = View.diff(this.currentView, view);
            Address[] addressArr = diff[NON_LOCAL_MEMBER];
            Address[] addressArr2 = diff[LOCAL_MEMBER];
            this.currentView = view;
            Address address = this.channel.getAddress();
            Arrays.stream(addressArr2).forEach(address2 -> {
                updateConsistentHash(consistentHash -> {
                    VersionedMember versionedMember = this.members.get(address2);
                    return versionedMember == null ? consistentHash : consistentHash.without(versionedMember);
                });
            });
            Arrays.stream(addressArr2).forEach(address3 -> {
                this.members.remove(address3);
                this.callbackRepository.cancelCallbacksForChannel(address3).forEach(commandCallbackWrapper -> {
                    Optional.ofNullable(this.pendingCommands.remove(commandCallbackWrapper.getMessage().getIdentifier())).ifPresent((v0) -> {
                        v0.end();
                    });
                });
            });
            Arrays.stream(addressArr).filter(address4 -> {
                return !address4.equals(address);
            }).forEach(address5 -> {
                sendMyConfigurationTo(address5, true, this.membershipVersion.get());
            });
        }
        this.currentView = view;
    }

    public void suspect(Address address) {
        logger.warn("Member is suspect: {}", address);
    }

    public void block() {
    }

    public void unblock() {
    }

    public void receive(Message message) {
        this.executorService.execute(() -> {
            Object object = message.getObject();
            if (object instanceof JoinMessage) {
                processJoinMessage(message, (JoinMessage) object);
                return;
            }
            if (object instanceof JGroupsDispatchMessage) {
                processDispatchMessage(message, (JGroupsDispatchMessage) object);
            } else if (object instanceof JGroupsReplyMessage) {
                processReplyMessage((JGroupsReplyMessage) object);
            } else {
                logger.warn("Received unknown message: {}", object.getClass().getName());
            }
        });
    }

    private void processReplyMessage(JGroupsReplyMessage jGroupsReplyMessage) {
        CommandCallbackWrapper fetchAndRemove = this.callbackRepository.fetchAndRemove(jGroupsReplyMessage.getCommandIdentifier());
        if (fetchAndRemove == null) {
            logger.warn("Received a callback for a message that has either already received a callback, or which was not sent through this node. Ignoring.");
        } else {
            fetchAndRemove.reportResult(jGroupsReplyMessage.getCommandResultMessage(this.serializer));
        }
        Optional.ofNullable(this.pendingCommands.remove(fetchAndRemove.getMessage().getIdentifier())).ifPresent((v0) -> {
            v0.end();
        });
    }

    private <C, R> void processDispatchMessage(Message message, JGroupsDispatchMessage jGroupsDispatchMessage) {
        if (!jGroupsDispatchMessage.isExpectReply()) {
            try {
                this.localSegment.dispatch(jGroupsDispatchMessage.getCommandMessage(this.serializer));
                return;
            } catch (Exception e) {
                logger.error("Could not dispatch command", e);
                return;
            }
        }
        try {
            this.localSegment.dispatch(jGroupsDispatchMessage.getCommandMessage(this.serializer), (commandMessage, commandResultMessage) -> {
                sendReply(message.getSrc(), jGroupsDispatchMessage.getCommandIdentifier(), commandResultMessage);
            });
        } catch (Exception e2) {
            sendReply(message.getSrc(), jGroupsDispatchMessage.getCommandIdentifier(), GenericCommandResultMessage.asCommandResultMessage(e2));
        }
    }

    private <R> void sendReply(Address address, String str, CommandResultMessage<R> commandResultMessage) {
        JGroupsReplyMessage jGroupsReplyMessage;
        try {
            jGroupsReplyMessage = new JGroupsReplyMessage(str, commandResultMessage, this.serializer);
        } catch (Exception e) {
            logger.warn(String.format("Could not serialize command reply [%s]. Sending back NULL.", commandResultMessage), e);
            jGroupsReplyMessage = new JGroupsReplyMessage(str, GenericCommandResultMessage.asCommandResultMessage(e), this.serializer);
        }
        try {
            this.channel.send(address, jGroupsReplyMessage);
        } catch (Exception e2) {
            logger.error("Could not send reply", e2);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void processJoinMessage(Message message, JoinMessage joinMessage) {
        String obj = message.getSrc().toString();
        View view = this.channel.getView();
        if (view == null || !view.containsMember(message.getSrc())) {
            logger.warn("Received join message from '{}', but it's not in my current view of the cluster.", message.getSrc().toString());
            return;
        }
        int loadFactor = joinMessage.getLoadFactor();
        CommandMessageFilter messageFilter = joinMessage.messageFilter();
        SimpleMember simpleMember = new SimpleMember(obj, message.getSrc(), false, simpleMember2 -> {
        });
        synchronized (this.monitor) {
            if (joinMessage.getOrder() != ((VersionedMember) this.members.compute(simpleMember.endpoint(), (address, versionedMember) -> {
                return (versionedMember == null || versionedMember.order() <= joinMessage.getOrder()) ? new VersionedMember(simpleMember, joinMessage.getOrder()) : versionedMember;
            })).order()) {
                logger.info("Received outdated update. Discarding it.");
                return;
            }
            updateConsistentHash(consistentHash -> {
                return consistentHash.with(simpleMember, loadFactor, messageFilter);
            });
            if (joinMessage.isExpectReply() && !this.channel.getAddress().equals(message.getSrc())) {
                sendMyConfigurationTo((Address) simpleMember.endpoint(), false, this.membershipVersion.get());
            }
            if (!logger.isInfoEnabled() || message.getSrc().equals(this.channel.getAddress())) {
                logger.debug("Got my own ({}) join message for load factor: {}", obj, Integer.valueOf(loadFactor));
            } else {
                logger.info("{} joined with load factor: {}", obj, Integer.valueOf(loadFactor));
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Got a network of members: {}", this.members.values());
            }
        }
    }

    private void updateConsistentHash(UnaryOperator<ConsistentHash> unaryOperator) {
        this.consistentHashChangeListener.onConsistentHashChanged(this.consistentHash.updateAndGet(unaryOperator));
    }

    private void sendMyConfigurationTo(Address address, boolean z, int i) {
        try {
            logger.info("Sending my configuration to {}.", ObjectUtils.getOrDefault(address, "all nodes"));
            Message message = new Message(address, new JoinMessage(this.loadFactor, this.commandFilter, i, z));
            message.setFlag(new Message.Flag[]{Message.Flag.OOB});
            this.channel.send(message);
        } catch (Exception e) {
            logger.warn("An exception occurred while sending membership information to newly joined member: {}", address);
        }
    }

    public boolean awaitJoined() throws InterruptedException {
        this.joinedCondition.await();
        return this.joinedCondition.isJoined();
    }

    public boolean awaitJoined(long j, TimeUnit timeUnit) throws InterruptedException {
        this.joinedCondition.await(j, timeUnit);
        return this.joinedCondition.isJoined();
    }

    public String getNodeName() {
        return this.channel.getName();
    }

    protected ConsistentHash getConsistentHash() {
        return this.consistentHash.get();
    }

    public <C> void send(Member member, CommandMessage<? extends C> commandMessage) throws Exception {
        this.shutdownLatch.ifShuttingDown("JGroupsConnector is shutting down, no new commands will be sent.");
        this.channel.send(resolveAddress(member), new JGroupsDispatchMessage(commandMessage, this.serializer, false));
    }

    public <C, R> void send(Member member, CommandMessage<C> commandMessage, CommandCallback<? super C, R> commandCallback) throws Exception {
        this.shutdownLatch.ifShuttingDown("JGroupsConnector is shutting down, no new commands will be sent.");
        CommandCallbackWrapper commandCallbackWrapper = new CommandCallbackWrapper(member.getConnectionEndpoint(Address.class).orElse(this.channel.address()), commandMessage, commandCallback);
        this.pendingCommands.put(commandMessage.getIdentifier(), this.shutdownLatch.registerActivity());
        this.callbackRepository.store(commandMessage.getIdentifier(), commandCallbackWrapper);
        this.channel.send(resolveAddress(member), new JGroupsDispatchMessage(commandMessage, this.serializer, true));
    }

    public Registration subscribe(String str, MessageHandler<? super CommandMessage<?>> messageHandler) {
        return this.localSegment.subscribe(str, messageHandler);
    }

    protected Address resolveAddress(Member member) {
        return (Address) member.getConnectionEndpoint(Address.class).orElseThrow(() -> {
            return new CommandBusConnectorCommunicationException("The target member doesn't expose a JGroups endpoint");
        });
    }

    public Optional<Member> findDestination(CommandMessage<?> commandMessage) {
        return this.consistentHash.get().getMember(this.routingStrategy.getRoutingKey(commandMessage), commandMessage);
    }

    public Registration registerHandlerInterceptor(MessageHandlerInterceptor<? super CommandMessage<?>> messageHandlerInterceptor) {
        return this.localSegment.registerHandlerInterceptor(messageHandlerInterceptor);
    }

    public Optional<CommandBus> localSegment() {
        return Optional.of(this.localSegment);
    }
}
