package io.scalecube.cluster.gossip;

import io.scalecube.cluster.ClusterMath;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.transport.Message;
import io.scalecube.transport.Transport;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:io/scalecube/cluster/gossip/GossipProtocolImpl.class */
public final class GossipProtocolImpl implements GossipProtocol {
    private static final Logger LOGGER = LoggerFactory.getLogger(GossipProtocolImpl.class);
    public static final String GOSSIP_REQ = "sc/gossip/req";
    private final Transport transport;
    private final Supplier<Member> memberSupplier;
    private final GossipConfig config;
    private long period = 0;
    private long gossipCounter = 0;
    private Map<String, GossipState> gossips = new HashMap();
    private Map<String, MonoSink<String>> futures = new HashMap();
    private List<Member> remoteMembers = new ArrayList();
    private int remoteMembersIndex = -1;
    private final Disposable.Composite actionsDisposables = Disposables.composite();
    private final FluxProcessor<Message, Message> subject = DirectProcessor.create().serialize();
    private final FluxSink<Message> sink = this.subject.sink();
    private final Scheduler scheduler;
    private Disposable spreadGossipTask;

    public GossipProtocolImpl(Supplier<Member> supplier, Transport transport, Flux<MembershipEvent> flux, GossipConfig gossipConfig, Scheduler scheduler) {
        this.transport = (Transport) Objects.requireNonNull(transport);
        this.config = (GossipConfig) Objects.requireNonNull(gossipConfig);
        this.memberSupplier = (Supplier) Objects.requireNonNull(supplier);
        this.scheduler = (Scheduler) Objects.requireNonNull(scheduler);
        this.actionsDisposables.addAll(Arrays.asList(flux.publishOn(scheduler).subscribe(this::onMemberEvent, this::onError), transport.listen().publishOn(scheduler).filter(this::isGossipReq).subscribe(this::onGossipReq, this::onError)));
    }

    @Override // io.scalecube.cluster.gossip.GossipProtocol
    public void start() {
        this.spreadGossipTask = this.scheduler.schedulePeriodically(this::doSpreadGossip, this.config.getGossipInterval(), this.config.getGossipInterval(), TimeUnit.MILLISECONDS);
    }

    @Override // io.scalecube.cluster.gossip.GossipProtocol
    public void stop() {
        this.actionsDisposables.dispose();
        if (this.spreadGossipTask != null && !this.spreadGossipTask.isDisposed()) {
            this.spreadGossipTask.dispose();
        }
        this.sink.complete();
    }

    @Override // io.scalecube.cluster.gossip.GossipProtocol
    public Mono<String> spread(Message message) {
        return Mono.just(message).publishOn(this.scheduler).flatMap(message2 -> {
            return Mono.create(monoSink -> {
                this.futures.put(createAndPutGossip(message2), monoSink);
            });
        });
    }

    @Override // io.scalecube.cluster.gossip.GossipProtocol
    public Flux<Message> listen() {
        return this.subject.onBackpressureBuffer();
    }

    private void doSpreadGossip() {
        this.period++;
        if (this.gossips.isEmpty()) {
            return;
        }
        try {
            selectGossipMembers().forEach(this::spreadGossipsTo);
            sweepGossips();
        } catch (Exception e) {
            LOGGER.error("Exception on sending GossipReq[{}] exception: {}", new Object[]{Long.valueOf(this.period), e.getMessage(), e});
        }
    }

    private String createAndPutGossip(Message message) {
        Gossip gossip = new Gossip(generateGossipId(), message);
        this.gossips.put(gossip.gossipId(), new GossipState(gossip, this.period));
        return gossip.gossipId();
    }

    private void onGossipReq(Message message) {
        GossipRequest gossipRequest = (GossipRequest) message.data();
        for (Gossip gossip : gossipRequest.gossips()) {
            GossipState gossipState = this.gossips.get(gossip.gossipId());
            if (gossipState == null) {
                gossipState = new GossipState(gossip, this.period);
                this.gossips.put(gossip.gossipId(), gossipState);
                this.sink.next(gossip.message());
            }
            gossipState.addToInfected(gossipRequest.from());
        }
    }

    private void onMemberEvent(MembershipEvent membershipEvent) {
        Member member = membershipEvent.member();
        if (membershipEvent.isRemoved()) {
            this.remoteMembers.removeIf(member2 -> {
                return member2.id().equals(member.id());
            });
        }
        if (membershipEvent.isAdded()) {
            this.remoteMembers.add(member);
        }
    }

    private void onError(Throwable th) {
        LOGGER.error("Received unexpected error: ", th);
    }

    private boolean isGossipReq(Message message) {
        return GOSSIP_REQ.equals(message.qualifier());
    }

    private String generateGossipId() {
        StringBuilder append = new StringBuilder().append(this.memberSupplier.get().id()).append("-");
        long j = this.gossipCounter;
        this.gossipCounter = j + 1;
        return append.append(j).toString();
    }

    private void spreadGossipsTo(Member member) {
        List<Gossip> selectGossipsToSend = selectGossipsToSend(member);
        if (selectGossipsToSend.isEmpty()) {
            return;
        }
        this.transport.send(member.address(), buildGossipRequestMessage(selectGossipsToSend)).subscribe();
    }

    private List<Gossip> selectGossipsToSend(Member member) {
        int gossipPeriodsToSpread = ClusterMath.gossipPeriodsToSpread(this.config.getGossipRepeatMult(), this.remoteMembers.size() + 1);
        return (List) this.gossips.values().stream().filter(gossipState -> {
            return gossipState.infectionPeriod() + ((long) gossipPeriodsToSpread) >= this.period;
        }).filter(gossipState2 -> {
            return !gossipState2.isInfected(member.id());
        }).map((v0) -> {
            return v0.gossip();
        }).collect(Collectors.toList());
    }

    private List<Member> selectGossipMembers() {
        int gossipFanout = this.config.getGossipFanout();
        if (this.remoteMembers.size() < gossipFanout) {
            return this.remoteMembers;
        }
        if (this.remoteMembersIndex < 0 || this.remoteMembersIndex + gossipFanout > this.remoteMembers.size()) {
            Collections.shuffle(this.remoteMembers);
            this.remoteMembersIndex = 0;
        }
        List<Member> singletonList = gossipFanout == 1 ? Collections.singletonList(this.remoteMembers.get(this.remoteMembersIndex)) : this.remoteMembers.subList(this.remoteMembersIndex, this.remoteMembersIndex + gossipFanout);
        this.remoteMembersIndex += gossipFanout;
        return singletonList;
    }

    private Message buildGossipRequestMessage(List<Gossip> list) {
        return Message.withData(new GossipRequest(list, this.memberSupplier.get().id())).qualifier(GOSSIP_REQ).build();
    }

    private void sweepGossips() {
        int gossipPeriodsToSweep = ClusterMath.gossipPeriodsToSweep(this.config.getGossipRepeatMult(), this.remoteMembers.size() + 1);
        Set<GossipState> set = (Set) this.gossips.values().stream().filter(gossipState -> {
            return this.period > gossipState.infectionPeriod() + ((long) gossipPeriodsToSweep);
        }).collect(Collectors.toSet());
        if (set.isEmpty()) {
            return;
        }
        LOGGER.debug("Sweep gossips: {}", set);
        for (GossipState gossipState2 : set) {
            this.gossips.remove(gossipState2.gossip().gossipId());
            MonoSink<String> remove = this.futures.remove(gossipState2.gossip().gossipId());
            if (remove != null) {
                remove.success(gossipState2.gossip().gossipId());
            }
        }
    }

    Transport getTransport() {
        return this.transport;
    }

    Member getMember() {
        return this.memberSupplier.get();
    }
}
