package io.scalecube.cluster;

import io.scalecube.cluster.fdetector.FailureDetectorImpl;
import io.scalecube.cluster.gossip.GossipProtocolImpl;
import io.scalecube.cluster.membership.IdGenerator;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.membership.MembershipProtocolImpl;
import io.scalecube.transport.Address;
import io.scalecube.transport.Message;
import io.scalecube.transport.NetworkEmulator;
import io.scalecube.transport.Transport;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/scalecube/cluster/ClusterImpl.class */
public final class ClusterImpl implements Cluster {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClusterImpl.class);
    private static final Set<String> SYSTEM_MESSAGES = Collections.unmodifiableSet((Set) Stream.of((Object[]) new String[]{FailureDetectorImpl.PING, FailureDetectorImpl.PING_REQ, FailureDetectorImpl.PING_ACK, MembershipProtocolImpl.SYNC, MembershipProtocolImpl.SYNC_ACK, GossipProtocolImpl.GOSSIP_REQ}).collect(Collectors.toSet()));
    private static final Set<String> SYSTEM_GOSSIPS = Collections.singleton(MembershipProtocolImpl.MEMBERSHIP_GOSSIP);
    private final ClusterConfig config;
    private final ConcurrentMap<String, Member> members = new ConcurrentHashMap();
    private final ConcurrentMap<Address, String> memberAddressIndex = new ConcurrentHashMap();
    private final DirectProcessor<MembershipEvent> membershipEvents = DirectProcessor.create();
    private final FluxSink<MembershipEvent> membershipSink = this.membershipEvents.sink();
    private final Disposable.Composite actionsDisposables = Disposables.composite();
    private Transport transport;
    private FailureDetectorImpl failureDetector;
    private GossipProtocolImpl gossip;
    private MembershipProtocolImpl membership;
    private Scheduler scheduler;

    public ClusterImpl(ClusterConfig clusterConfig) {
        this.config = (ClusterConfig) Objects.requireNonNull(clusterConfig);
    }

    public Mono<Cluster> join0() {
        return Transport.bind(this.config.getTransportConfig()).flatMap(transport -> {
            this.transport = transport;
            Member createLocalMember = createLocalMember();
            onMemberAdded(createLocalMember);
            AtomicReference atomicReference = new AtomicReference(createLocalMember);
            this.scheduler = Schedulers.newSingle("sc-cluster-" + createLocalMember.address().port(), true);
            atomicReference.getClass();
            this.failureDetector = new FailureDetectorImpl(atomicReference::get, this.transport, this.membershipEvents.onBackpressureBuffer(), this.config, this.scheduler);
            atomicReference.getClass();
            this.gossip = new GossipProtocolImpl(atomicReference::get, this.transport, this.membershipEvents.onBackpressureBuffer(), this.config, this.scheduler);
            this.membership = new MembershipProtocolImpl(atomicReference, this.transport, this.failureDetector, this.gossip, this.config, this.scheduler);
            this.actionsDisposables.add(this.membership.listen().subscribe(membershipEvent -> {
                onMemberEvent(membershipEvent, this.membershipSink);
            }, this::onError));
            this.failureDetector.start();
            this.gossip.start();
            return this.membership.start();
        }).then(Mono.just(this));
    }

    private Member createLocalMember() {
        String generateId = IdGenerator.generateId();
        InetAddress localIpAddress = Address.getLocalIpAddress();
        int port = this.transport.address().port();
        String memberHost = this.config.getMemberHost();
        Integer memberPort = this.config.getMemberPort();
        return new Member(generateId, (Address) Optional.ofNullable(memberHost).map(str -> {
            return Address.create(str, ((Integer) Optional.ofNullable(memberPort).orElse(Integer.valueOf(port))).intValue());
        }).orElseGet(() -> {
            return Address.create(localIpAddress.getHostAddress(), port);
        }), this.config.getMetadata());
    }

    private void onMemberEvent(MembershipEvent membershipEvent, FluxSink<MembershipEvent> fluxSink) {
        Member member = membershipEvent.member();
        if (membershipEvent.isAdded()) {
            onMemberAdded(member);
        }
        if (membershipEvent.isRemoved()) {
            this.members.remove(member.id());
            this.memberAddressIndex.remove(member.address());
        }
        if (membershipEvent.isUpdated()) {
            this.members.put(member.id(), member);
        }
        fluxSink.next(membershipEvent);
    }

    private void onMemberAdded(Member member) {
        this.memberAddressIndex.put(member.address(), member.id());
        this.members.put(member.id(), member);
    }

    private void onError(Throwable th) {
        LOGGER.error("Received unexpected error: ", th);
    }

    @Override // io.scalecube.cluster.Cluster
    public Address address() {
        return member().address();
    }

    @Override // io.scalecube.cluster.Cluster
    public Mono<Void> send(Member member, Message message) {
        return send(member.address(), message);
    }

    @Override // io.scalecube.cluster.Cluster
    public Mono<Void> send(Address address, Message message) {
        return this.transport.send(address, message);
    }

    @Override // io.scalecube.cluster.Cluster
    public Flux<Message> listen() {
        return this.transport.listen().filter(message -> {
            return !SYSTEM_MESSAGES.contains(message.qualifier());
        });
    }

    @Override // io.scalecube.cluster.Cluster
    public Mono<String> spreadGossip(Message message) {
        return this.gossip.spread(message);
    }

    @Override // io.scalecube.cluster.Cluster
    public Flux<Message> listenGossips() {
        return this.gossip.listen().filter(message -> {
            return !SYSTEM_GOSSIPS.contains(message.qualifier());
        });
    }

    @Override // io.scalecube.cluster.Cluster
    public Collection<Member> members() {
        return Collections.unmodifiableCollection(this.members.values());
    }

    @Override // io.scalecube.cluster.Cluster
    public Member member() {
        return this.membership.member();
    }

    @Override // io.scalecube.cluster.Cluster
    public Optional<Member> member(String str) {
        return Optional.ofNullable(this.members.get(str));
    }

    @Override // io.scalecube.cluster.Cluster
    public Optional<Member> member(Address address) {
        return Optional.ofNullable(this.memberAddressIndex.get(address)).flatMap(str -> {
            return Optional.ofNullable(this.members.get(str));
        });
    }

    @Override // io.scalecube.cluster.Cluster
    public Collection<Member> otherMembers() {
        ArrayList arrayList = new ArrayList(this.members.values());
        arrayList.remove(this.membership.member());
        return Collections.unmodifiableCollection(arrayList);
    }

    @Override // io.scalecube.cluster.Cluster
    public Mono<Void> updateMetadata(Map<String, String> map) {
        return this.membership.updateMetadata(map);
    }

    @Override // io.scalecube.cluster.Cluster
    public Mono<Void> updateMetadataProperty(String str, String str2) {
        return Mono.defer(() -> {
            HashMap hashMap = new HashMap(this.membership.member().metadata());
            hashMap.put(str, str2);
            return this.membership.updateMetadata(hashMap);
        });
    }

    @Override // io.scalecube.cluster.Cluster
    public Flux<MembershipEvent> listenMembership() {
        return Flux.defer(() -> {
            return Flux.fromIterable(otherMembers()).map(MembershipEvent::createAdded).concatWith(this.membershipEvents).onBackpressureBuffer();
        });
    }

    @Override // io.scalecube.cluster.Cluster
    public Mono<Void> shutdown() {
        return this.membership.leave().doOnSubscribe(subscription -> {
            LOGGER.info("Cluster member {} is shutting down", this.membership.member());
        }).flatMap(str -> {
            LOGGER.info("Cluster member notified about his leaving and shutting down {}", this.membership.member());
            this.actionsDisposables.dispose();
            this.membership.stop();
            this.gossip.stop();
            this.failureDetector.stop();
            this.scheduler.dispose();
            return this.transport.stop();
        }).doOnSuccess(r5 -> {
            LOGGER.info("Cluster member has shut down {}", this.membership.member());
        });
    }

    @Override // io.scalecube.cluster.Cluster
    public NetworkEmulator networkEmulator() {
        return this.transport.networkEmulator();
    }

    @Override // io.scalecube.cluster.Cluster
    public boolean isShutdown() {
        return this.transport.isStopped();
    }
}
