package io.scalecube.cluster.membership;

import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.ClusterMath;
import io.scalecube.cluster.CorrelationIdGenerator;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.fdetector.FailureDetector;
import io.scalecube.cluster.fdetector.FailureDetectorConfig;
import io.scalecube.cluster.fdetector.FailureDetectorEvent;
import io.scalecube.cluster.gossip.GossipProtocol;
import io.scalecube.cluster.metadata.MetadataStore;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.net.Address;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:io/scalecube/cluster/membership/MembershipProtocolImpl.class */
public final class MembershipProtocolImpl implements MembershipProtocol {
    private static final Logger LOGGER = LoggerFactory.getLogger(MembershipProtocol.class);
    private static final Logger LOGGER_MEMBERSHIP = LoggerFactory.getLogger("io.scalecube.cluster.Membership");
    public static final String SYNC = "sc/membership/sync";
    public static final String SYNC_ACK = "sc/membership/syncAck";
    public static final String MEMBERSHIP_GOSSIP = "sc/membership/gossip";
    private final Member localMember;
    private final Transport transport;
    private final MembershipConfig membershipConfig;
    private final FailureDetectorConfig failureDetectorConfig;
    private final List<Address> seedMembers;
    private final FailureDetector failureDetector;
    private final GossipProtocol gossipProtocol;
    private final MetadataStore metadataStore;
    private final CorrelationIdGenerator cidGenerator;
    private final Scheduler scheduler;
    private final Map<String, MembershipRecord> membershipTable = new HashMap();
    private final Map<String, Member> members = new HashMap();
    private final FluxProcessor<MembershipEvent, MembershipEvent> subject = DirectProcessor.create().serialize();
    private final FluxSink<MembershipEvent> sink = this.subject.sink();
    private final Disposable.Composite actionsDisposables = Disposables.composite();
    private final Map<String, Disposable> suspicionTimeoutTasks = new HashMap();

    /* loaded from: input_file:io/scalecube/cluster/membership/MembershipProtocolImpl$JmxMonitorMBean.class */
    public static class JmxMonitorMBean implements MonitorMBean {
        public static final int REMOVED_MEMBERS_HISTORY_SIZE = 42;
        private final MembershipProtocolImpl membershipProtocol;
        private final ReplayProcessor<MembershipEvent> removedMembersHistory = ReplayProcessor.create(42);

        private JmxMonitorMBean(MembershipProtocolImpl membershipProtocolImpl) {
            this.membershipProtocol = membershipProtocolImpl;
            membershipProtocolImpl.listen().filter((v0) -> {
                return v0.isRemoved();
            }).subscribe(this.removedMembersHistory);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static JmxMonitorMBean start(MembershipProtocolImpl membershipProtocolImpl) throws Exception {
            JmxMonitorMBean jmxMonitorMBean = new JmxMonitorMBean(membershipProtocolImpl);
            ManagementFactory.getPlatformMBeanServer().registerMBean(new StandardMBean(jmxMonitorMBean, MonitorMBean.class), new ObjectName("io.scalecube.cluster:name=Membership@" + membershipProtocolImpl.localMember.id()));
            return jmxMonitorMBean;
        }

        @Override // io.scalecube.cluster.membership.MembershipProtocolImpl.MonitorMBean
        public int getIncarnation() {
            return ((MembershipRecord) this.membershipProtocol.membershipTable.get(this.membershipProtocol.localMember.id())).incarnation();
        }

        @Override // io.scalecube.cluster.membership.MembershipProtocolImpl.MonitorMBean
        public List<String> getAliveMembers() {
            return findRecordsByCondition((v0) -> {
                return v0.isAlive();
            });
        }

        @Override // io.scalecube.cluster.membership.MembershipProtocolImpl.MonitorMBean
        public String getAliveMembersAsString() {
            return (String) getAliveMembers().stream().collect(Collectors.joining(",", "[", "]"));
        }

        @Override // io.scalecube.cluster.membership.MembershipProtocolImpl.MonitorMBean
        public List<String> getSuspectedMembers() {
            return findRecordsByCondition((v0) -> {
                return v0.isSuspect();
            });
        }

        @Override // io.scalecube.cluster.membership.MembershipProtocolImpl.MonitorMBean
        public String getSuspectedMembersAsString() {
            return (String) getSuspectedMembers().stream().collect(Collectors.joining(",", "[", "]"));
        }

        @Override // io.scalecube.cluster.membership.MembershipProtocolImpl.MonitorMBean
        public List<String> getDeadMembers() {
            ArrayList arrayList = new ArrayList();
            Flux map = this.removedMembersHistory.map((v0) -> {
                return v0.toString();
            });
            arrayList.getClass();
            map.subscribe((v1) -> {
                r1.add(v1);
            });
            return arrayList;
        }

        @Override // io.scalecube.cluster.membership.MembershipProtocolImpl.MonitorMBean
        public String getDeadMembersAsString() {
            return (String) getDeadMembers().stream().collect(Collectors.joining(",", "[", "]"));
        }

        private List<String> findRecordsByCondition(Predicate<MembershipRecord> predicate) {
            return (List) this.membershipProtocol.getMembershipRecords().stream().filter(predicate).map(membershipRecord -> {
                return new Member(membershipRecord.id(), membershipRecord.address());
            }).map((v0) -> {
                return v0.toString();
            }).collect(Collectors.toList());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/scalecube/cluster/membership/MembershipProtocolImpl$MembershipUpdateReason.class */
    public enum MembershipUpdateReason {
        FAILURE_DETECTOR_EVENT,
        MEMBERSHIP_GOSSIP,
        SYNC,
        INITIAL_SYNC,
        SUSPICION_TIMEOUT
    }

    /* loaded from: input_file:io/scalecube/cluster/membership/MembershipProtocolImpl$MonitorMBean.class */
    public interface MonitorMBean {
        int getIncarnation();

        List<String> getAliveMembers();

        String getAliveMembersAsString();

        List<String> getSuspectedMembers();

        String getSuspectedMembersAsString();

        List<String> getDeadMembers();

        String getDeadMembersAsString();
    }

    public MembershipProtocolImpl(Member member, Transport transport, FailureDetector failureDetector, GossipProtocol gossipProtocol, MetadataStore metadataStore, ClusterConfig clusterConfig, Scheduler scheduler, CorrelationIdGenerator correlationIdGenerator) {
        this.transport = (Transport) Objects.requireNonNull(transport);
        this.failureDetector = (FailureDetector) Objects.requireNonNull(failureDetector);
        this.gossipProtocol = (GossipProtocol) Objects.requireNonNull(gossipProtocol);
        this.metadataStore = (MetadataStore) Objects.requireNonNull(metadataStore);
        this.localMember = (Member) Objects.requireNonNull(member);
        this.scheduler = (Scheduler) Objects.requireNonNull(scheduler);
        this.cidGenerator = (CorrelationIdGenerator) Objects.requireNonNull(correlationIdGenerator);
        this.membershipConfig = ((ClusterConfig) Objects.requireNonNull(clusterConfig)).membershipConfig();
        this.failureDetectorConfig = ((ClusterConfig) Objects.requireNonNull(clusterConfig)).failureDetectorConfig();
        this.seedMembers = cleanUpSeedMembers(this.membershipConfig.seedMembers());
        this.membershipTable.put(member.id(), new MembershipRecord(member, MemberStatus.ALIVE, 0));
        this.members.put(member.id(), member);
        this.actionsDisposables.addAll(Arrays.asList(transport.listen().publishOn(scheduler).subscribe(this::onMessage, this::onError), failureDetector.listen().publishOn(scheduler).subscribe(this::onFailureDetectorEvent, this::onError), gossipProtocol.listen().publishOn(scheduler).subscribe(this::onMembershipGossip, this::onError)));
    }

    private List<Address> cleanUpSeedMembers(Collection<Address> collection) {
        return (List) new LinkedHashSet(collection).stream().filter(address -> {
            return !address.equals(this.localMember.address());
        }).filter(address2 -> {
            return !address2.equals(this.transport.address());
        }).collect(Collectors.toList());
    }

    @Override // io.scalecube.cluster.membership.MembershipProtocol
    public Flux<MembershipEvent> listen() {
        return this.subject.onBackpressureBuffer();
    }

    public Mono<Void> updateIncarnation() {
        return Mono.defer(() -> {
            MembershipRecord membershipRecord = new MembershipRecord(this.localMember, MemberStatus.ALIVE, this.membershipTable.get(this.localMember.id()).incarnation() + 1);
            this.membershipTable.put(this.localMember.id(), membershipRecord);
            return spreadMembershipGossip(membershipRecord);
        });
    }

    public Mono<Void> leaveCluster() {
        return Mono.defer(() -> {
            MembershipRecord membershipRecord = new MembershipRecord(this.localMember, MemberStatus.DEAD, this.membershipTable.get(this.localMember.id()).incarnation() + 1);
            this.membershipTable.put(this.localMember.id(), membershipRecord);
            return spreadMembershipGossip(membershipRecord);
        });
    }

    @Override // io.scalecube.cluster.membership.MembershipProtocol
    public Mono<Void> start() {
        return Mono.create(this::start0).then(Mono.fromCallable(() -> {
            return JmxMonitorMBean.start(this);
        })).then();
    }

    private void start0(MonoSink<Object> monoSink) {
        if (this.seedMembers.isEmpty()) {
            schedulePeriodicSync();
            monoSink.success();
        } else {
            LOGGER.debug("Making initial Sync to all seed members: {}", this.seedMembers);
            Mono[] monoArr = (Mono[]) this.seedMembers.stream().map(address -> {
                return this.transport.requestResponse(address, prepareSyncDataMsg(SYNC, this.cidGenerator.nextCid())).filter(this::checkSyncGroup);
            }).toArray(i -> {
                return new Mono[i];
            });
            Flux.mergeDelayError(monoArr.length, monoArr).take(1L).timeout(Duration.ofMillis(this.membershipConfig.syncTimeout()), this.scheduler).publishOn(this.scheduler).flatMap(message -> {
                return onSyncAck(message, true);
            }).doFinally(signalType -> {
                schedulePeriodicSync();
                monoSink.success();
            }).subscribe((Consumer) null, th -> {
                LOGGER.debug("Exception on initial SyncAck, cause: {}", th.toString());
            });
        }
    }

    @Override // io.scalecube.cluster.membership.MembershipProtocol
    public void stop() {
        this.actionsDisposables.dispose();
        Iterator<String> it = this.suspicionTimeoutTasks.keySet().iterator();
        while (it.hasNext()) {
            Disposable disposable = this.suspicionTimeoutTasks.get(it.next());
            if (disposable != null && !disposable.isDisposed()) {
                disposable.dispose();
            }
        }
        this.suspicionTimeoutTasks.clear();
        this.sink.complete();
    }

    @Override // io.scalecube.cluster.membership.MembershipProtocol
    public Collection<Member> members() {
        return new ArrayList(this.members.values());
    }

    @Override // io.scalecube.cluster.membership.MembershipProtocol
    public Collection<Member> otherMembers() {
        return (Collection) new ArrayList(this.members.values()).stream().filter(member -> {
            return !member.equals(this.localMember);
        }).collect(Collectors.toList());
    }

    @Override // io.scalecube.cluster.membership.MembershipProtocol
    public Member member() {
        return this.localMember;
    }

    @Override // io.scalecube.cluster.membership.MembershipProtocol
    public Optional<Member> member(String str) {
        return Optional.ofNullable(this.members.get(str));
    }

    @Override // io.scalecube.cluster.membership.MembershipProtocol
    public Optional<Member> member(Address address) {
        return new ArrayList(this.members.values()).stream().filter(member -> {
            return member.address().equals(address);
        }).findFirst();
    }

    private void doSync() {
        Optional<Address> selectSyncAddress = selectSyncAddress();
        if (selectSyncAddress.isPresent()) {
            Address address = selectSyncAddress.get();
            Message prepareSyncDataMsg = prepareSyncDataMsg(SYNC, null);
            LOGGER.debug("Send Sync: {} to {}", prepareSyncDataMsg, address);
            this.transport.send(address, prepareSyncDataMsg).subscribe((Consumer) null, th -> {
                LOGGER.debug("Failed to send Sync: {} to {}, cause: {}", new Object[]{prepareSyncDataMsg, address, th.toString()});
            });
        }
    }

    private void onMessage(Message message) {
        if (checkSyncGroup(message)) {
            if (SYNC.equals(message.qualifier())) {
                onSync(message).subscribe((Consumer) null, this::onError);
            }
            if (SYNC_ACK.equals(message.qualifier()) && message.correlationId() == null) {
                onSyncAck(message, false).subscribe((Consumer) null, this::onError);
            }
        }
    }

    private Mono<Void> onSyncAck(Message message, boolean z) {
        return Mono.defer(() -> {
            LOGGER.debug("Received SyncAck: {}", message);
            return syncMembership((SyncData) message.data(), z);
        });
    }

    private Mono<Void> onSync(Message message) {
        return Mono.defer(() -> {
            LOGGER.debug("Received Sync: {}", message);
            return syncMembership((SyncData) message.data(), false).doOnSuccess(r7 -> {
                Message prepareSyncDataMsg = prepareSyncDataMsg(SYNC_ACK, message.correlationId());
                Address sender = message.sender();
                this.transport.send(sender, prepareSyncDataMsg).subscribe((Consumer) null, th -> {
                    LOGGER.debug("Failed to send SyncAck: {} to {}, cause: {}", new Object[]{prepareSyncDataMsg, sender, th.toString()});
                });
            });
        });
    }

    private void onFailureDetectorEvent(FailureDetectorEvent failureDetectorEvent) {
        MembershipRecord membershipRecord = this.membershipTable.get(failureDetectorEvent.member().id());
        if (membershipRecord == null || membershipRecord.status() == failureDetectorEvent.status()) {
            return;
        }
        LOGGER.debug("Received status change on failure detector event: {}", failureDetectorEvent);
        if (failureDetectorEvent.status() != MemberStatus.ALIVE) {
            updateMembership(new MembershipRecord(membershipRecord.member(), failureDetectorEvent.status(), membershipRecord.incarnation()), MembershipUpdateReason.FAILURE_DETECTOR_EVENT).subscribe((Consumer) null, this::onError);
            return;
        }
        Message prepareSyncDataMsg = prepareSyncDataMsg(SYNC, null);
        Address address = failureDetectorEvent.member().address();
        this.transport.send(address, prepareSyncDataMsg).subscribe((Consumer) null, th -> {
            LOGGER.debug("Failed to send {} to {}, cause: {}", new Object[]{prepareSyncDataMsg, address, th.toString()});
        });
    }

    private void onMembershipGossip(Message message) {
        if (MEMBERSHIP_GOSSIP.equals(message.qualifier())) {
            MembershipRecord membershipRecord = (MembershipRecord) message.data();
            LOGGER.debug("Received membership gossip: {}", membershipRecord);
            updateMembership(membershipRecord, MembershipUpdateReason.MEMBERSHIP_GOSSIP).subscribe((Consumer) null, this::onError);
        }
    }

    private Optional<Address> selectSyncAddress() {
        List list = (List) Stream.concat(this.seedMembers.stream(), otherMembers().stream().map((v0) -> {
            return v0.address();
        })).collect(Collectors.collectingAndThen(Collectors.toSet(), (v1) -> {
            return new ArrayList(v1);
        }));
        Collections.shuffle(list);
        return list.isEmpty() ? Optional.empty() : Optional.of(list.get(ThreadLocalRandom.current().nextInt(list.size())));
    }

    private void onError(Throwable th) {
        LOGGER.error("Received unexpected error: ", th);
    }

    private void onErrorIgnore(Throwable th) {
    }

    private boolean checkSyncGroup(Message message) {
        if (!(message.data() instanceof SyncData)) {
            return false;
        }
        return this.membershipConfig.syncGroup().equals(((SyncData) message.data()).getSyncGroup());
    }

    private void schedulePeriodicSync() {
        int syncInterval = this.membershipConfig.syncInterval();
        this.actionsDisposables.add(this.scheduler.schedulePeriodically(this::doSync, syncInterval, syncInterval, TimeUnit.MILLISECONDS));
    }

    private Message prepareSyncDataMsg(String str, String str2) {
        return Message.withData(new SyncData(new ArrayList(this.membershipTable.values()), this.membershipConfig.syncGroup())).qualifier(str).correlationId(str2).build();
    }

    private Mono<Void> syncMembership(SyncData syncData, boolean z) {
        return Mono.defer(() -> {
            MembershipUpdateReason membershipUpdateReason = z ? MembershipUpdateReason.INITIAL_SYNC : MembershipUpdateReason.SYNC;
            return Mono.whenDelayError((Publisher[]) syncData.getMembership().stream().map(membershipRecord -> {
                return updateMembership(membershipRecord, membershipUpdateReason);
            }).toArray(i -> {
                return new Mono[i];
            }));
        });
    }

    private Mono<Void> updateMembership(MembershipRecord membershipRecord, MembershipUpdateReason membershipUpdateReason) {
        return Mono.defer(() -> {
            Objects.requireNonNull(membershipRecord, "Membership record can't be null");
            MembershipRecord membershipRecord2 = this.membershipTable.get(membershipRecord.id());
            if (membershipRecord.equals(membershipRecord2) || !membershipRecord.isOverrides(membershipRecord2)) {
                LOGGER_MEMBERSHIP.debug("(update reason: {}) skipping update, can't override r0: {} with received r1: {}", new Object[]{membershipUpdateReason, membershipRecord2, membershipRecord});
                return Mono.empty();
            }
            if (membershipRecord.member().address().equals(this.localMember.address())) {
                return membershipRecord.member().id().equals(this.localMember.id()) ? onSelfMemberDetected(membershipRecord2, membershipRecord, membershipUpdateReason) : Mono.empty();
            }
            if (membershipRecord.isDead()) {
                return onDeadMemberDetected(membershipRecord);
            }
            if (membershipRecord.isSuspect()) {
                this.membershipTable.put(membershipRecord.id(), membershipRecord);
                scheduleSuspicionTimeoutTask(membershipRecord);
                spreadMembershipGossipUnlessGossiped(membershipRecord, membershipUpdateReason);
            }
            return (!membershipRecord.isAlive() || (membershipRecord2 != null && membershipRecord2.incarnation() >= membershipRecord.incarnation())) ? Mono.empty() : this.metadataStore.fetchMetadata(membershipRecord.member()).doOnError(th -> {
                LOGGER_MEMBERSHIP.debug("(update reason: {}) skipping to add/update member: {}, due to failed fetchMetadata call (cause: {})", new Object[]{membershipUpdateReason, membershipRecord, th.toString()});
            }).doOnSuccess(byteBuffer -> {
                cancelSuspicionTimeoutTask(membershipRecord.id());
                spreadMembershipGossipUnlessGossiped(membershipRecord, membershipUpdateReason);
                onAliveMemberDetected(membershipRecord, this.metadataStore.updateMetadata(membershipRecord.member(), byteBuffer), byteBuffer);
            }).onErrorResume(Exception.class, exc -> {
                return Mono.empty();
            }).then();
        });
    }

    private Mono<Void> onSelfMemberDetected(MembershipRecord membershipRecord, MembershipRecord membershipRecord2, MembershipUpdateReason membershipUpdateReason) {
        return Mono.fromRunnable(() -> {
            MembershipRecord membershipRecord3 = new MembershipRecord(this.localMember, membershipRecord.status(), Math.max(membershipRecord.incarnation(), membershipRecord2.incarnation()) + 1);
            this.membershipTable.put(this.localMember.id(), membershipRecord3);
            LOGGER_MEMBERSHIP.debug("(update reason: {}) updating incarnation, local record r0: {} to received r1: {}, spreading with increased incarnation r2: {}", new Object[]{membershipUpdateReason, membershipRecord, membershipRecord2, membershipRecord3});
            spreadMembershipGossip(membershipRecord3).doOnError(this::onErrorIgnore).subscribe();
        });
    }

    private Mono<Void> onDeadMemberDetected(MembershipRecord membershipRecord) {
        return Mono.fromRunnable(() -> {
            cancelSuspicionTimeoutTask(membershipRecord.id());
            if (this.members.containsKey(membershipRecord.id())) {
                this.members.remove(membershipRecord.id());
                this.membershipTable.remove(membershipRecord.id());
                MembershipEvent createRemoved = MembershipEvent.createRemoved(membershipRecord.member(), this.metadataStore.removeMetadata(membershipRecord.member()));
                LOGGER_MEMBERSHIP.debug("Emitting membership event {}", createRemoved);
                this.sink.next(createRemoved);
            }
        });
    }

    private void onAliveMemberDetected(MembershipRecord membershipRecord, ByteBuffer byteBuffer, ByteBuffer byteBuffer2) {
        Member member = membershipRecord.member();
        MembershipEvent membershipEvent = null;
        if (!this.members.containsKey(member.id())) {
            membershipEvent = MembershipEvent.createAdded(member, byteBuffer2);
        } else if (!byteBuffer2.equals(byteBuffer)) {
            membershipEvent = MembershipEvent.createUpdated(member, byteBuffer, byteBuffer2);
        }
        this.members.put(member.id(), member);
        this.membershipTable.put(member.id(), membershipRecord);
        if (membershipEvent != null) {
            LOGGER_MEMBERSHIP.debug("Emitting membership event {}", membershipEvent);
            this.sink.next(membershipEvent);
        }
    }

    private void cancelSuspicionTimeoutTask(String str) {
        Disposable remove = this.suspicionTimeoutTasks.remove(str);
        if (remove == null || remove.isDisposed()) {
            return;
        }
        LOGGER.debug("Cancelled SuspicionTimeoutTask for {}", str);
        remove.dispose();
    }

    private void scheduleSuspicionTimeoutTask(MembershipRecord membershipRecord) {
        long suspicionTimeout = ClusterMath.suspicionTimeout(this.membershipConfig.suspicionMult(), this.membershipTable.size(), this.failureDetectorConfig.pingInterval());
        this.suspicionTimeoutTasks.computeIfAbsent(membershipRecord.id(), str -> {
            LOGGER.debug("Scheduled SuspicionTimeoutTask for {}, suspicionTimeout {}", str, Long.valueOf(suspicionTimeout));
            return this.scheduler.schedule(() -> {
                onSuspicionTimeout(str);
            }, suspicionTimeout, TimeUnit.MILLISECONDS);
        });
    }

    private void onSuspicionTimeout(String str) {
        this.suspicionTimeoutTasks.remove(str);
        MembershipRecord membershipRecord = this.membershipTable.get(str);
        if (membershipRecord != null) {
            LOGGER.debug("Declare SUSPECTED member {} as DEAD by timeout", membershipRecord);
            updateMembership(new MembershipRecord(membershipRecord.member(), MemberStatus.DEAD, membershipRecord.incarnation()), MembershipUpdateReason.SUSPICION_TIMEOUT).subscribe((Consumer) null, this::onError);
        }
    }

    private void spreadMembershipGossipUnlessGossiped(MembershipRecord membershipRecord, MembershipUpdateReason membershipUpdateReason) {
        if (membershipUpdateReason == MembershipUpdateReason.MEMBERSHIP_GOSSIP || membershipUpdateReason == MembershipUpdateReason.INITIAL_SYNC) {
            return;
        }
        spreadMembershipGossip(membershipRecord).doOnError(this::onErrorIgnore).subscribe();
    }

    private Mono<Void> spreadMembershipGossip(MembershipRecord membershipRecord) {
        return Mono.defer(() -> {
            Message build = Message.withData(membershipRecord).qualifier(MEMBERSHIP_GOSSIP).build();
            LOGGER.debug("Spead membreship: {} with gossip", build);
            return this.gossipProtocol.spread(build).doOnError(th -> {
                LOGGER.debug("Failed to spread membership: {} with gossip, cause: {}", build, th.toString());
            }).then();
        });
    }

    FailureDetector getFailureDetector() {
        return this.failureDetector;
    }

    GossipProtocol getGossipProtocol() {
        return this.gossipProtocol;
    }

    Transport getTransport() {
        return this.transport;
    }

    MetadataStore getMetadataStore() {
        return this.metadataStore;
    }

    List<MembershipRecord> getMembershipRecords() {
        return Collections.unmodifiableList(new ArrayList(this.membershipTable.values()));
    }
}
