package io.scalecube.services.discovery;

import io.scalecube.cluster.Cluster;
import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.Member;
import io.scalecube.services.ServiceEndpoint;
import io.scalecube.services.api.Address;
import io.scalecube.services.discovery.api.ServiceDiscovery;
import io.scalecube.services.discovery.api.ServiceDiscoveryConfig;
import io.scalecube.services.discovery.api.ServiceDiscoveryEvent;
import io.scalecube.services.registry.api.ServiceRegistry;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/scalecube/services/discovery/ScalecubeServiceDiscovery.class */
public class ScalecubeServiceDiscovery implements ServiceDiscovery {
    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);
    public static final String SERVICE_METADATA = "service";
    private ServiceRegistry serviceRegistry;
    private Cluster cluster;
    private ServiceEndpoint endpoint;
    private final DirectProcessor<ServiceDiscoveryEvent> subject = DirectProcessor.create();
    private final FluxSink<ServiceDiscoveryEvent> sink = this.subject.serialize().sink();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/scalecube/services/discovery/ScalecubeServiceDiscovery$DiscoveryType.class */
    public enum DiscoveryType {
        ADDED,
        REMOVED,
        DISCOVERED
    }

    public Address address() {
        io.scalecube.transport.Address address = this.cluster.address();
        return Address.create(address.host(), address.port());
    }

    public ServiceEndpoint endpoint() {
        return this.endpoint;
    }

    public Mono<ServiceDiscovery> start(ServiceDiscoveryConfig serviceDiscoveryConfig) {
        this.serviceRegistry = serviceDiscoveryConfig.serviceRegistry();
        this.endpoint = serviceDiscoveryConfig.endpoint();
        ClusterConfig build = clusterConfigBuilder(serviceDiscoveryConfig).addMetadata((Map) this.serviceRegistry.listServiceEndpoints().stream().collect(Collectors.toMap(ClusterMetadataDecoder::encodeMetadata, serviceEndpoint -> {
            return SERVICE_METADATA;
        }))).build();
        LOGGER.info("Start scalecube service discovery with config: {}", build);
        return Mono.fromFuture(Cluster.join(build).whenComplete((cluster, th) -> {
            if (th == null) {
                this.cluster = cluster;
                init(this.cluster);
            }
        })).map(cluster2 -> {
            return this;
        });
    }

    public Flux<ServiceDiscoveryEvent> listen() {
        return Flux.fromIterable(this.serviceRegistry.listServiceEndpoints()).map(ServiceDiscoveryEvent::registered).concatWith(this.subject);
    }

    public Mono<Void> shutdown() {
        return Mono.defer(() -> {
            this.sink.complete();
            return (Mono) Optional.ofNullable(this.cluster).map(cluster -> {
                return Mono.fromFuture(cluster.shutdown());
            }).orElse(Mono.empty());
        });
    }

    private ClusterConfig.Builder clusterConfigBuilder(ServiceDiscoveryConfig serviceDiscoveryConfig) {
        ClusterConfig.Builder builder = ClusterConfig.builder();
        if (serviceDiscoveryConfig.seeds() != null) {
            builder.seedMembers((io.scalecube.transport.Address[]) Arrays.stream(serviceDiscoveryConfig.seeds()).map(address -> {
                return io.scalecube.transport.Address.create(address.host(), address.port());
            }).toArray(i -> {
                return new io.scalecube.transport.Address[i];
            }));
        }
        if (serviceDiscoveryConfig.port() != null) {
            builder.port(serviceDiscoveryConfig.port().intValue());
        }
        if (serviceDiscoveryConfig.tags() != null) {
            builder.metadata(serviceDiscoveryConfig.tags());
        }
        if (serviceDiscoveryConfig.memberHost() != null) {
            builder.memberHost(serviceDiscoveryConfig.memberHost());
        }
        if (serviceDiscoveryConfig.memberPort() != null) {
            builder.memberPort(serviceDiscoveryConfig.memberPort());
        }
        return builder;
    }

    private void init(Cluster cluster) {
        loadClusterServices(cluster);
        listenCluster(cluster);
    }

    private void listenCluster(Cluster cluster) {
        cluster.listenMembership().subscribe(membershipEvent -> {
            if (membershipEvent.isAdded()) {
                loadMemberServices(DiscoveryType.ADDED, membershipEvent.member());
            } else if (membershipEvent.isRemoved()) {
                loadMemberServices(DiscoveryType.REMOVED, membershipEvent.member());
            }
        });
    }

    private void loadClusterServices(Cluster cluster) {
        cluster.otherMembers().forEach(member -> {
            loadMemberServices(DiscoveryType.DISCOVERED, member);
        });
    }

    private void loadMemberServices(DiscoveryType discoveryType, Member member) {
        member.metadata().entrySet().stream().filter(entry -> {
            return SERVICE_METADATA.equals(entry.getValue());
        }).forEach(entry2 -> {
            ServiceEndpoint decodeMetadata = ClusterMetadataDecoder.decodeMetadata((String) entry2.getKey());
            if (decodeMetadata == null) {
                return;
            }
            LOGGER.debug("Member: {} is {} : {}", new Object[]{member, discoveryType, decodeMetadata});
            if ((discoveryType.equals(DiscoveryType.ADDED) || discoveryType.equals(DiscoveryType.DISCOVERED)) && this.serviceRegistry.registerService(decodeMetadata)) {
                LOGGER.info("Service Reference was ADDED since new Member has joined the cluster {} : {}", member, decodeMetadata);
                ServiceDiscoveryEvent registered = ServiceDiscoveryEvent.registered(decodeMetadata);
                LOGGER.debug("Publish registered: " + registered);
                this.sink.next(registered);
                return;
            }
            if (!discoveryType.equals(DiscoveryType.REMOVED) || this.serviceRegistry.unregisterService(decodeMetadata.id()) == null) {
                return;
            }
            LOGGER.info("Service Reference was REMOVED since Member have left the cluster {} : {}", member, decodeMetadata);
            ServiceDiscoveryEvent unregistered = ServiceDiscoveryEvent.unregistered(decodeMetadata);
            LOGGER.debug("Publish unregistered: " + unregistered);
            this.sink.next(unregistered);
        });
    }
}
