package io.scalecube.cluster;

import io.scalecube.cluster.fdetector.FailureDetectorConfig;
import io.scalecube.cluster.fdetector.FailureDetectorImpl;
import io.scalecube.cluster.gossip.GossipConfig;
import io.scalecube.cluster.gossip.GossipProtocolImpl;
import io.scalecube.cluster.membership.MembershipConfig;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.membership.MembershipProtocolImpl;
import io.scalecube.cluster.metadata.MetadataCodec;
import io.scalecube.cluster.metadata.MetadataStore;
import io.scalecube.cluster.metadata.MetadataStoreImpl;
import io.scalecube.cluster.monitor.ClusterMonitorMBean;
import io.scalecube.cluster.monitor.ClusterMonitorModel;
import io.scalecube.cluster.monitor.JmxClusterMonitorMBean;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.cluster.transport.api.TransportFactory;
import io.scalecube.net.Address;
import io.scalecube.transport.netty.TransportImpl;
import io.scalecube.utils.ServiceLoaderUtil;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:io/scalecube/cluster/ClusterImpl.class */
public final class ClusterImpl implements Cluster {
    private static final Logger LOGGER = LoggerFactory.getLogger(Cluster.class);
    private static final Pattern NAMESPACE_PATTERN = Pattern.compile("^(\\w+[\\w\\-./]*\\w)+");
    private static final Set<String> SYSTEM_MESSAGES = Collections.unmodifiableSet((Set) Stream.of((Object[]) new String[]{FailureDetectorImpl.PING, FailureDetectorImpl.PING_REQ, FailureDetectorImpl.PING_ACK, MembershipProtocolImpl.SYNC, MembershipProtocolImpl.SYNC_ACK, GossipProtocolImpl.GOSSIP_REQ, MetadataStoreImpl.GET_METADATA_REQ, MetadataStoreImpl.GET_METADATA_RESP}).collect(Collectors.toSet()));
    private static final Set<String> SYSTEM_GOSSIPS = Collections.singleton(MembershipProtocolImpl.MEMBERSHIP_GOSSIP);
    private ClusterConfig config;
    private Function<Cluster, ? extends ClusterMessageHandler> handler;
    private final Sinks.Many<MembershipEvent> sink;
    private final Disposable.Composite actionsDisposables;
    private final Sinks.One<Void> start;
    private final Sinks.One<Void> onStart;
    private final Sinks.One<Void> shutdown;
    private final Sinks.One<Void> onShutdown;
    private Transport transport;
    private Member localMember;
    private FailureDetectorImpl failureDetector;
    private GossipProtocolImpl gossip;
    private MembershipProtocolImpl membership;
    private MetadataStore metadataStore;
    private Scheduler scheduler;
    private CorrelationIdGenerator cidGenerator;
    private ClusterMonitorModel.Builder monitorModelBuilder;

    /* loaded from: input_file:io/scalecube/cluster/ClusterImpl$SenderAwareTransport.class */
    private static class SenderAwareTransport implements Transport {
        private final Transport transport;
        private final Address address;

        private SenderAwareTransport(Transport transport, Address address) {
            this.transport = (Transport) Objects.requireNonNull(transport);
            this.address = (Address) Objects.requireNonNull(address);
        }

        public Address address() {
            return this.transport.address();
        }

        public Mono<Transport> start() {
            return this.transport.start();
        }

        public Mono<Void> stop() {
            return this.transport.stop();
        }

        public boolean isStopped() {
            return this.transport.isStopped();
        }

        public Mono<Void> send(Address address, Message message) {
            return Mono.defer(() -> {
                return this.transport.send(address, enhanceWithSender(message));
            });
        }

        public Mono<Message> requestResponse(Address address, Message message) {
            return Mono.defer(() -> {
                return this.transport.requestResponse(address, enhanceWithSender(message));
            });
        }

        public Flux<Message> listen() {
            return this.transport.listen();
        }

        private Message enhanceWithSender(Message message) {
            return Message.with(message).sender(this.address).build();
        }
    }

    public ClusterImpl() {
        this(ClusterConfig.defaultConfig());
    }

    public ClusterImpl(ClusterConfig clusterConfig) {
        this.handler = cluster -> {
            return new ClusterMessageHandler() { // from class: io.scalecube.cluster.ClusterImpl.1
            };
        };
        this.sink = Sinks.many().multicast().directBestEffort();
        this.actionsDisposables = Disposables.composite();
        this.start = Sinks.one();
        this.onStart = Sinks.one();
        this.shutdown = Sinks.one();
        this.onShutdown = Sinks.one();
        this.config = (ClusterConfig) Objects.requireNonNull(clusterConfig);
        initLifecycle();
    }

    private ClusterImpl(ClusterImpl clusterImpl) {
        this.handler = cluster -> {
            return new ClusterMessageHandler() { // from class: io.scalecube.cluster.ClusterImpl.1
            };
        };
        this.sink = Sinks.many().multicast().directBestEffort();
        this.actionsDisposables = Disposables.composite();
        this.start = Sinks.one();
        this.onStart = Sinks.one();
        this.shutdown = Sinks.one();
        this.onShutdown = Sinks.one();
        this.config = clusterImpl.config.clone();
        this.handler = clusterImpl.handler;
        initLifecycle();
    }

    private void initLifecycle() {
        Mono doOnSuccess = this.start.asMono().then(doStart()).doOnSuccess(cluster -> {
            this.onStart.tryEmitEmpty();
        });
        Sinks.One<Void> one = this.onStart;
        one.getClass();
        doOnSuccess.doOnError(one::tryEmitError).subscribe((Consumer) null, th -> {
            LOGGER.error("[{}][doStart] Exception occurred:", this.localMember, th);
        });
        this.shutdown.asMono().then(doShutdown()).doFinally(signalType -> {
            this.onShutdown.tryEmitEmpty();
        }).subscribe((Consumer) null, th2 -> {
            LOGGER.warn("[{}][doShutdown] Exception occurred: {}", this.localMember, th2.toString());
        });
    }

    public ClusterImpl config(UnaryOperator<ClusterConfig> unaryOperator) {
        Objects.requireNonNull(unaryOperator);
        ClusterImpl clusterImpl = new ClusterImpl(this);
        clusterImpl.config = (ClusterConfig) unaryOperator.apply(this.config);
        return clusterImpl;
    }

    public ClusterImpl transport(UnaryOperator<TransportConfig> unaryOperator) {
        Objects.requireNonNull(unaryOperator);
        ClusterImpl clusterImpl = new ClusterImpl(this);
        clusterImpl.config = this.config.transport(unaryOperator);
        return clusterImpl;
    }

    public ClusterImpl transportFactory(Supplier<TransportFactory> supplier) {
        Objects.requireNonNull(supplier);
        ClusterImpl clusterImpl = new ClusterImpl(this);
        clusterImpl.config = this.config.transport(transportConfig -> {
            return transportConfig.transportFactory((TransportFactory) supplier.get());
        });
        return clusterImpl;
    }

    public ClusterImpl failureDetector(UnaryOperator<FailureDetectorConfig> unaryOperator) {
        Objects.requireNonNull(unaryOperator);
        ClusterImpl clusterImpl = new ClusterImpl(this);
        clusterImpl.config = this.config.failureDetector(unaryOperator);
        return clusterImpl;
    }

    public ClusterImpl gossip(UnaryOperator<GossipConfig> unaryOperator) {
        Objects.requireNonNull(unaryOperator);
        ClusterImpl clusterImpl = new ClusterImpl(this);
        clusterImpl.config = this.config.gossip(unaryOperator);
        return clusterImpl;
    }

    public ClusterImpl membership(UnaryOperator<MembershipConfig> unaryOperator) {
        Objects.requireNonNull(unaryOperator);
        ClusterImpl clusterImpl = new ClusterImpl(this);
        clusterImpl.config = this.config.membership(unaryOperator);
        return clusterImpl;
    }

    public ClusterImpl handler(Function<Cluster, ClusterMessageHandler> function) {
        Objects.requireNonNull(function);
        ClusterImpl clusterImpl = new ClusterImpl(this);
        clusterImpl.handler = function;
        return clusterImpl;
    }

    public Mono<Cluster> start() {
        return Mono.defer(() -> {
            this.start.tryEmitEmpty();
            return this.onStart.asMono().thenReturn(this);
        });
    }

    public Cluster startAwait() {
        return (Cluster) start().block();
    }

    private Mono<Cluster> doStart() {
        return Mono.fromRunnable(this::validateConfiguration).then(Mono.defer(this::doStart0));
    }

    private Mono<Cluster> doStart0() {
        return TransportImpl.bind(this.config.transportConfig()).flatMap(transport -> {
            this.localMember = createLocalMember(transport.address());
            this.transport = new SenderAwareTransport(transport, this.localMember.address());
            this.cidGenerator = new CorrelationIdGenerator(this.localMember.id());
            this.scheduler = Schedulers.newSingle("sc-cluster-" + this.localMember.address().port(), true);
            this.monitorModelBuilder = new ClusterMonitorModel.Builder();
            this.failureDetector = new FailureDetectorImpl(this.localMember, this.transport, this.sink.asFlux().onBackpressureBuffer(), this.config.failureDetectorConfig(), this.scheduler, this.cidGenerator);
            this.gossip = new GossipProtocolImpl(this.localMember, this.transport, this.sink.asFlux().onBackpressureBuffer(), this.config.gossipConfig(), this.scheduler);
            this.metadataStore = new MetadataStoreImpl(this.localMember, this.transport, this.config.metadata(), this.config, this.scheduler, this.cidGenerator);
            this.membership = new MembershipProtocolImpl(this.localMember, this.transport, this.failureDetector, this.gossip, this.metadataStore, this.config, this.scheduler, this.cidGenerator, this.monitorModelBuilder);
            Disposable.Composite composite = this.actionsDisposables;
            Flux<MembershipEvent> listen = this.membership.listen();
            Sinks.Many<MembershipEvent> many = this.sink;
            many.getClass();
            Consumer consumer = (v1) -> {
                r2.tryEmitNext(v1);
            };
            Consumer consumer2 = this::onError;
            Sinks.Many<MembershipEvent> many2 = this.sink;
            many2.getClass();
            composite.add(listen.subscribe(consumer, consumer2, many2::tryEmitComplete));
            return Mono.fromRunnable(() -> {
                this.failureDetector.start();
            }).then(Mono.fromRunnable(() -> {
                this.gossip.start();
            })).then(Mono.fromRunnable(() -> {
                this.metadataStore.start();
            })).then(Mono.fromRunnable(this::startHandler)).then(this.membership.start()).then(Mono.fromRunnable(this::startJmxMonitor)).then();
        }).doOnSubscribe(subscription -> {
            LOGGER.info("[{}][doStart] Starting, config: {}", this.localMember, this.config);
        }).doOnSuccess(r5 -> {
            LOGGER.info("[{}][doStart] Started", this.localMember);
        }).thenReturn(this);
    }

    private void validateConfiguration() {
        Object metadata;
        if (((MetadataCodec) ServiceLoaderUtil.findFirst(MetadataCodec.class).orElse(null)) == null && (metadata = this.config.metadata()) != null && !(metadata instanceof Serializable)) {
            throw new IllegalArgumentException("Invalid cluster configuration: metadata must be Serializable");
        }
        Objects.requireNonNull(this.config.transportConfig().messageCodec(), "Invalid cluster configuration: transport.messageCodec must be specified");
        Objects.requireNonNull(this.config.membershipConfig().namespace(), "Invalid cluster configuration: membership.namespace must be specified");
        if (!NAMESPACE_PATTERN.matcher(this.config.membershipConfig().namespace()).matches()) {
            throw new IllegalArgumentException("Invalid cluster config: membership.namespace format is invalid");
        }
    }

    private void startHandler() {
        ClusterMessageHandler apply = this.handler.apply(this);
        Disposable.Composite composite = this.actionsDisposables;
        Flux<Message> listenMessage = listenMessage();
        apply.getClass();
        composite.add(listenMessage.subscribe(apply::onMessage, this::onError));
        Disposable.Composite composite2 = this.actionsDisposables;
        Flux<MembershipEvent> listenMembership = listenMembership();
        apply.getClass();
        composite2.add(listenMembership.subscribe(apply::onMembershipEvent, this::onError));
        Disposable.Composite composite3 = this.actionsDisposables;
        Flux<Message> listenGossip = listenGossip();
        apply.getClass();
        composite3.add(listenGossip.subscribe(apply::onGossip, this::onError));
    }

    private void startJmxMonitor() {
        try {
            ManagementFactory.getPlatformMBeanServer().registerMBean(new StandardMBean(new JmxClusterMonitorMBean(this.monitorModelBuilder.config(this.config).cluster(this).build()), ClusterMonitorMBean.class), new ObjectName("io.scalecube.cluster:name=" + member().id() + "@" + System.nanoTime()));
        } catch (Exception e) {
            throw Exceptions.propagate(e);
        }
    }

    private void onError(Throwable th) {
        LOGGER.error("[{}] Received unexpected error:", this.localMember, th);
    }

    private Flux<Message> listenMessage() {
        return this.transport.listen().filter(message -> {
            return !SYSTEM_MESSAGES.contains(message.qualifier());
        });
    }

    private Flux<Message> listenGossip() {
        return this.gossip.listen().filter(message -> {
            return !SYSTEM_GOSSIPS.contains(message.qualifier());
        });
    }

    private Flux<MembershipEvent> listenMembership() {
        return this.sink.asFlux().onBackpressureBuffer();
    }

    private Member createLocalMember(Address address) {
        int intValue = ((Integer) Optional.ofNullable(this.config.externalPort()).orElse(Integer.valueOf(address.port()))).intValue();
        return new Member(UUID.randomUUID().toString(), this.config.memberAlias(), (Address) Optional.ofNullable(this.config.externalHost()).map(str -> {
            return Address.create(str, intValue);
        }).orElseGet(() -> {
            return Address.create(address.host(), intValue);
        }), this.config.membershipConfig().namespace());
    }

    public Address address() {
        return member().address();
    }

    public Mono<Void> send(Member member, Message message) {
        return send(member.address(), message);
    }

    public Mono<Void> send(Address address, Message message) {
        return this.transport.send(address, message);
    }

    public Mono<Message> requestResponse(Address address, Message message) {
        return this.transport.requestResponse(address, message);
    }

    public Mono<Message> requestResponse(Member member, Message message) {
        return this.transport.requestResponse(member.address(), message);
    }

    public Mono<String> spreadGossip(Message message) {
        return this.gossip.spread(message);
    }

    public Collection<Member> members() {
        return this.membership.members();
    }

    public Collection<Member> otherMembers() {
        return this.membership.otherMembers();
    }

    public <T> Optional<T> metadata() {
        return this.metadataStore.metadata();
    }

    public <T> Optional<T> metadata(Member member) {
        return member().equals(member) ? metadata() : this.metadataStore.metadata(member).map(this::toMetadata);
    }

    private <T> T toMetadata(ByteBuffer byteBuffer) {
        return (T) this.config.metadataCodec().deserialize(byteBuffer);
    }

    public Member member() {
        return this.localMember;
    }

    public Optional<Member> member(String str) {
        return this.membership.member(str);
    }

    public Optional<Member> member(Address address) {
        return this.membership.member(address);
    }

    public <T> Mono<Void> updateMetadata(T t) {
        return Mono.fromRunnable(() -> {
            this.metadataStore.updateMetadata(t);
        }).then(this.membership.updateIncarnation()).subscribeOn(this.scheduler);
    }

    public void shutdown() {
        this.shutdown.tryEmitEmpty();
    }

    private Mono<Void> doShutdown() {
        return Mono.defer(() -> {
            LOGGER.info("[{}][doShutdown] Shutting down", this.localMember);
            return Flux.concatDelayError(new Publisher[]{leaveCluster(), dispose(), this.transport.stop()}).then().doFinally(signalType -> {
                this.scheduler.dispose();
            }).doOnSuccess(r5 -> {
                LOGGER.info("[{}][doShutdown] Shutdown", this.localMember);
            });
        });
    }

    private Mono<Void> leaveCluster() {
        return this.membership.leaveCluster().subscribeOn(this.scheduler).doOnSubscribe(subscription -> {
            LOGGER.info("[{}][leaveCluster] Leaving cluster", this.localMember);
        }).doOnSuccess(r5 -> {
            LOGGER.info("[{}][leaveCluster] Left cluster", this.localMember);
        }).doOnError(th -> {
            LOGGER.warn("[{}][leaveCluster] Exception occurred: {}", this.localMember, th.toString());
        }).then();
    }

    private Mono<Void> dispose() {
        return Mono.fromRunnable(() -> {
            this.actionsDisposables.dispose();
            this.metadataStore.stop();
            this.membership.stop();
            this.gossip.stop();
            this.failureDetector.stop();
        });
    }

    public Mono<Void> onShutdown() {
        return this.onShutdown.asMono();
    }

    public boolean isShutdown() {
        return this.onShutdown.asMono().toFuture().isDone();
    }
}
