package io.scalecube.services.discovery;

import io.scalecube.cluster.Cluster;
import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.services.ServiceEndpoint;
import io.scalecube.services.discovery.api.ServiceDiscovery;
import io.scalecube.services.discovery.api.ServiceDiscoveryConfig;
import io.scalecube.services.discovery.api.ServiceDiscoveryEvent;
import io.scalecube.services.registry.api.ServiceRegistry;
import io.scalecube.services.transport.api.Address;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/scalecube/services/discovery/ScalecubeServiceDiscovery.class */
public class ScalecubeServiceDiscovery implements ServiceDiscovery {
    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);
    public static final String SERVICE_METADATA = "service";
    private ServiceRegistry serviceRegistry;
    private Cluster cluster;
    private ServiceEndpoint endpoint;
    private final DirectProcessor<ServiceDiscoveryEvent> subject = DirectProcessor.create();
    private final FluxSink<ServiceDiscoveryEvent> sink = this.subject.serialize().sink();

    public Address address() {
        io.scalecube.transport.Address address = this.cluster.address();
        return Address.create(address.host(), address.port());
    }

    public ServiceEndpoint endpoint() {
        return this.endpoint;
    }

    public Mono<ServiceDiscovery> start(ServiceDiscoveryConfig serviceDiscoveryConfig) {
        return Mono.defer(() -> {
            this.serviceRegistry = serviceDiscoveryConfig.serviceRegistry();
            this.endpoint = serviceDiscoveryConfig.endpoint();
            ClusterConfig build = clusterConfigBuilder(serviceDiscoveryConfig).addMetadata(getMetadata()).build();
            LOGGER.info("Start scalecube service discovery with config: {}", build);
            return Cluster.join(build).doOnSuccess(cluster -> {
                this.cluster = cluster;
            }).doOnSuccess(this::listen).thenReturn(this);
        });
    }

    private Map<String, String> getMetadata() {
        return (Map) this.serviceRegistry.listServiceEndpoints().stream().collect(Collectors.toMap(ClusterMetadataDecoder::encodeMetadata, serviceEndpoint -> {
            return SERVICE_METADATA;
        }));
    }

    private void listen(Cluster cluster) {
        cluster.listenMembership().subscribe(this::onMemberEvent);
    }

    public Flux<ServiceDiscoveryEvent> listen() {
        return Flux.defer(() -> {
            return Flux.fromIterable(this.serviceRegistry.listServiceEndpoints()).map(ServiceDiscoveryEvent::registered).concatWith(this.subject);
        });
    }

    public Mono<Void> shutdown() {
        return Mono.defer(() -> {
            this.sink.complete();
            return (Mono) Optional.ofNullable(this.cluster).map((v0) -> {
                return v0.shutdown();
            }).orElse(Mono.empty());
        });
    }

    private ClusterConfig.Builder clusterConfigBuilder(ServiceDiscoveryConfig serviceDiscoveryConfig) {
        ClusterConfig.Builder builder = ClusterConfig.builder();
        if (serviceDiscoveryConfig.seeds() != null) {
            builder.seedMembers((io.scalecube.transport.Address[]) Arrays.stream(serviceDiscoveryConfig.seeds()).map(address -> {
                return io.scalecube.transport.Address.create(address.host(), address.port());
            }).toArray(i -> {
                return new io.scalecube.transport.Address[i];
            }));
        }
        if (serviceDiscoveryConfig.port() != null) {
            builder.port(serviceDiscoveryConfig.port().intValue());
        }
        if (serviceDiscoveryConfig.tags() != null) {
            builder.metadata(serviceDiscoveryConfig.tags());
        }
        if (serviceDiscoveryConfig.memberHost() != null) {
            builder.memberHost(serviceDiscoveryConfig.memberHost());
        }
        if (serviceDiscoveryConfig.memberPort() != null) {
            builder.memberPort(serviceDiscoveryConfig.memberPort());
        }
        return builder;
    }

    private void onMemberEvent(MembershipEvent membershipEvent) {
        Member member = membershipEvent.member();
        member.metadata().entrySet().stream().filter(entry -> {
            return SERVICE_METADATA.equals(entry.getValue());
        }).map((v0) -> {
            return v0.getKey();
        }).map(ClusterMetadataDecoder::decodeMetadata).filter((v0) -> {
            return Objects.nonNull(v0);
        }).forEach(serviceEndpoint -> {
            if (membershipEvent.isAdded() && this.serviceRegistry.registerService(serviceEndpoint)) {
                LOGGER.info("ServiceEndpoint was ADDED since new Member has joined the cluster {} : {}", member, serviceEndpoint);
                ServiceDiscoveryEvent registered = ServiceDiscoveryEvent.registered(serviceEndpoint);
                LOGGER.info("Publish services registered: {}", registered);
                this.sink.next(registered);
            }
            if (!membershipEvent.isRemoved() || this.serviceRegistry.unregisterService(serviceEndpoint.id()) == null) {
                return;
            }
            LOGGER.info("ServiceEndpoint was REMOVED since Member have left the cluster {} : {}", member, serviceEndpoint);
            ServiceDiscoveryEvent unregistered = ServiceDiscoveryEvent.unregistered(serviceEndpoint);
            LOGGER.info("Publish services unregistered: {}", unregistered);
            this.sink.next(unregistered);
        });
    }
}
