package io.scalecube.services.discovery;

import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
import io.scalecube.cluster.Cluster;
import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.MessageCodec;
import io.scalecube.net.Address;
import io.scalecube.services.ServiceEndpoint;
import io.scalecube.services.ServiceGroup;
import io.scalecube.services.discovery.api.ServiceDiscovery;
import io.scalecube.services.discovery.api.ServiceDiscoveryEvent;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.UnaryOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/scalecube/services/discovery/ScalecubeServiceDiscovery.class */
public final class ScalecubeServiceDiscovery implements ServiceDiscovery {
    private static final Logger LOGGER = LoggerFactory.getLogger("io.scalecube.services.discovery.ServiceDiscovery");
    private static final Logger LOGGER_GROUP = LoggerFactory.getLogger("io.scalecube.services.discovery.ServiceGroupDiscovery");
    private final ServiceEndpoint serviceEndpoint;
    private ClusterConfig clusterConfig;
    private Cluster cluster;
    private Map<ServiceGroup, Collection<ServiceEndpoint>> groups;
    private final DirectProcessor<ServiceDiscoveryEvent> subject;
    private final FluxSink<ServiceDiscoveryEvent> sink;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/scalecube/services/discovery/ScalecubeServiceDiscovery$MessageCodecImpl.class */
    public static class MessageCodecImpl implements MessageCodec {
        private MessageCodecImpl() {
        }

        public Message deserialize(InputStream inputStream) throws Exception {
            return (Message) DefaultObjectMapper.OBJECT_MAPPER.readValue(inputStream, Message.class);
        }

        public void serialize(Message message, OutputStream outputStream) throws Exception {
            DefaultObjectMapper.OBJECT_MAPPER.writeValue(outputStream, message);
        }
    }

    public ScalecubeServiceDiscovery(ServiceEndpoint serviceEndpoint) {
        this.groups = new HashMap();
        this.subject = DirectProcessor.create();
        this.sink = this.subject.sink();
        this.serviceEndpoint = serviceEndpoint;
        Optional.ofNullable(serviceEndpoint.serviceGroup()).ifPresent(serviceGroup -> {
            addToGroup(serviceGroup, serviceEndpoint);
        });
        this.clusterConfig = ClusterConfig.defaultLanConfig().metadata(serviceEndpoint).transport(transportConfig -> {
            return transportConfig.messageCodec(new MessageCodecImpl());
        }).metadataEncoder(this::encode).metadataDecoder(this::decode);
    }

    private ScalecubeServiceDiscovery(ScalecubeServiceDiscovery scalecubeServiceDiscovery) {
        this.groups = new HashMap();
        this.subject = DirectProcessor.create();
        this.sink = this.subject.sink();
        this.serviceEndpoint = scalecubeServiceDiscovery.serviceEndpoint;
        this.clusterConfig = scalecubeServiceDiscovery.clusterConfig;
        this.cluster = scalecubeServiceDiscovery.cluster;
        this.groups = scalecubeServiceDiscovery.groups;
    }

    public ScalecubeServiceDiscovery options(UnaryOperator<ClusterConfig> unaryOperator) {
        ScalecubeServiceDiscovery scalecubeServiceDiscovery = new ScalecubeServiceDiscovery(this);
        scalecubeServiceDiscovery.clusterConfig = (ClusterConfig) unaryOperator.apply(this.clusterConfig);
        return scalecubeServiceDiscovery;
    }

    public Address address() {
        return this.cluster.address();
    }

    public ServiceEndpoint serviceEndpoint() {
        return this.serviceEndpoint;
    }

    public Mono<ServiceDiscovery> start() {
        return Mono.defer(() -> {
            return new ClusterImpl().config(clusterConfig -> {
                return this.clusterConfig;
            }).handler(cluster -> {
                return new ClusterMessageHandler() { // from class: io.scalecube.services.discovery.ScalecubeServiceDiscovery.1
                    public void onMembershipEvent(MembershipEvent membershipEvent) {
                        ScalecubeServiceDiscovery.this.onMembershipEvent(membershipEvent, ScalecubeServiceDiscovery.this.sink);
                    }
                };
            }).start().doOnSuccess(cluster2 -> {
                this.cluster = cluster2;
                LOGGER.debug("Started {} with config -- {}", cluster2, this.clusterConfig);
            }).thenReturn(this);
        });
    }

    public Flux<ServiceDiscoveryEvent> listenDiscovery() {
        return this.subject.onBackpressureBuffer();
    }

    public Mono<Void> shutdown() {
        return Mono.defer(() -> {
            if (this.cluster == null) {
                this.sink.complete();
                return Mono.empty();
            }
            this.cluster.shutdown();
            return this.cluster.onShutdown().doFinally(signalType -> {
                this.sink.complete();
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onMembershipEvent(MembershipEvent membershipEvent, FluxSink<ServiceDiscoveryEvent> fluxSink) {
        if (membershipEvent.isAdded()) {
            LOGGER.debug("Member {} has joined the cluster", membershipEvent.member());
        }
        if (membershipEvent.isRemoved()) {
            LOGGER.debug("Member {} has left the cluster", membershipEvent.member());
        }
        ServiceEndpoint serviceEndpoint = getServiceEndpoint(membershipEvent);
        if (serviceEndpoint == null) {
            return;
        }
        if (membershipEvent.isAdded()) {
            LOGGER.info("Service endpoint {} is about to be added, since member {} has joined the cluster", serviceEndpoint.id(), membershipEvent.member());
        }
        if (membershipEvent.isRemoved()) {
            LOGGER.info("Service endpoint {} is about to be removed, since member {} have left the cluster", serviceEndpoint.id(), membershipEvent.member());
        }
        ServiceDiscoveryEvent serviceDiscoveryEvent = null;
        if (membershipEvent.isAdded()) {
            serviceDiscoveryEvent = ServiceDiscoveryEvent.newEndpointAdded(serviceEndpoint);
        }
        if (membershipEvent.isRemoved()) {
            serviceDiscoveryEvent = ServiceDiscoveryEvent.newEndpointRemoved(serviceEndpoint);
        }
        if (serviceDiscoveryEvent != null) {
            fluxSink.next(serviceDiscoveryEvent);
            onDiscoveryEvent(serviceDiscoveryEvent, fluxSink);
        }
    }

    private void onDiscoveryEvent(ServiceDiscoveryEvent serviceDiscoveryEvent, FluxSink<ServiceDiscoveryEvent> fluxSink) {
        ServiceEndpoint serviceEndpoint = serviceDiscoveryEvent.serviceEndpoint();
        ServiceGroup serviceGroup = serviceEndpoint.serviceGroup();
        if (serviceGroup == null) {
            LOGGER_GROUP.debug("Discovered service endpoint {}, but not registering it (serviceGroup is null)", serviceEndpoint.id());
            return;
        }
        ServiceDiscoveryEvent serviceDiscoveryEvent2 = null;
        String id = serviceGroup.id();
        if (serviceDiscoveryEvent.isEndpointAdded()) {
            if (!addToGroup(serviceGroup, serviceEndpoint)) {
                LOGGER_GROUP.warn("Failed to add service endpoint {} to group {}, group is full aready", serviceEndpoint.id(), id);
                return;
            }
            Collection<ServiceEndpoint> endpointsFromGroup = getEndpointsFromGroup(serviceGroup);
            fluxSink.next(ServiceDiscoveryEvent.newEndpointAddedToGroup(id, serviceEndpoint, endpointsFromGroup));
            LOGGER_GROUP.debug("Added service endpoint {} to group {} (size now {})", new Object[]{serviceEndpoint.id(), id, Integer.valueOf(endpointsFromGroup.size())});
            if (endpointsFromGroup.size() == serviceGroup.size()) {
                LOGGER_GROUP.info("Service group {} added to the cluster", serviceGroup);
                serviceDiscoveryEvent2 = ServiceDiscoveryEvent.newGroupAdded(id, endpointsFromGroup);
            }
        }
        if (serviceDiscoveryEvent.isEndpointRemoved()) {
            if (!removeFromGroup(serviceGroup, serviceEndpoint)) {
                LOGGER_GROUP.warn("Failed to remove service endpoint {} from group {}, there were no such group or service endpoint was never registered in group", serviceEndpoint.id(), id);
                return;
            }
            Collection<ServiceEndpoint> endpointsFromGroup2 = getEndpointsFromGroup(serviceGroup);
            fluxSink.next(ServiceDiscoveryEvent.newEndpointRemovedFromGroup(id, serviceEndpoint, endpointsFromGroup2));
            LOGGER_GROUP.debug("Removed service endpoint {} from group {} (size now {})", new Object[]{serviceEndpoint.id(), id, Integer.valueOf(endpointsFromGroup2.size())});
            if (endpointsFromGroup2.isEmpty()) {
                LOGGER_GROUP.info("Service group {} removed from the cluster", serviceGroup);
                serviceDiscoveryEvent2 = ServiceDiscoveryEvent.newGroupRemoved(id);
            }
        }
        if (serviceDiscoveryEvent2 != null) {
            fluxSink.next(serviceDiscoveryEvent2);
        }
    }

    public Collection<ServiceEndpoint> getEndpointsFromGroup(ServiceGroup serviceGroup) {
        return this.groups.getOrDefault(serviceGroup, Collections.emptyList());
    }

    private boolean addToGroup(ServiceGroup serviceGroup, ServiceEndpoint serviceEndpoint) {
        Collection<ServiceEndpoint> computeIfAbsent = this.groups.computeIfAbsent(serviceGroup, serviceGroup2 -> {
            return new ArrayList();
        });
        return computeIfAbsent.size() < serviceGroup.size() && computeIfAbsent.add(serviceEndpoint);
    }

    private boolean removeFromGroup(ServiceGroup serviceGroup, ServiceEndpoint serviceEndpoint) {
        if (!this.groups.containsKey(serviceGroup)) {
            return false;
        }
        Collection<ServiceEndpoint> endpointsFromGroup = getEndpointsFromGroup(serviceGroup);
        boolean removeIf = endpointsFromGroup.removeIf(serviceEndpoint2 -> {
            return serviceEndpoint2.id().equals(serviceEndpoint.id());
        });
        if (removeIf && endpointsFromGroup.isEmpty()) {
            this.groups.remove(serviceGroup);
        }
        return removeIf;
    }

    private ServiceEndpoint getServiceEndpoint(MembershipEvent membershipEvent) {
        ServiceEndpoint serviceEndpoint = null;
        if (membershipEvent.isAdded()) {
            serviceEndpoint = (ServiceEndpoint) decode(membershipEvent.newMetadata());
        }
        if (membershipEvent.isRemoved()) {
            serviceEndpoint = (ServiceEndpoint) decode(membershipEvent.oldMetadata());
        }
        return serviceEndpoint;
    }

    private Object decode(ByteBuffer byteBuffer) {
        try {
            return DefaultObjectMapper.OBJECT_MAPPER.readValue(new ByteBufferBackedInputStream(byteBuffer), ServiceEndpoint.class);
        } catch (IOException e) {
            LOGGER.error("Failed to read metadata: " + e);
            return null;
        }
    }

    private ByteBuffer encode(Object obj) {
        try {
            return ByteBuffer.wrap(DefaultObjectMapper.OBJECT_MAPPER.writeValueAsString((ServiceEndpoint) obj).getBytes(StandardCharsets.UTF_8));
        } catch (IOException e) {
            LOGGER.error("Failed to write metadata: " + e);
            throw Exceptions.propagate(e);
        }
    }
}
