package io.scalecube.cluster.metadata;

import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.CorrelationIdGenerator;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.net.Address;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:io/scalecube/cluster/metadata/MetadataStoreImpl.class */
public class MetadataStoreImpl implements MetadataStore {
    private static final Logger LOGGER = LoggerFactory.getLogger(MetadataStoreImpl.class);
    public static final String GET_METADATA_REQ = "sc/metadata/req";
    public static final String GET_METADATA_RESP = "sc/metadata/resp";
    private Object localMetadata;
    private final Member localMember;
    private final Transport transport;
    private final ClusterConfig config;
    private final CorrelationIdGenerator cidGenerator;
    private final Scheduler scheduler;
    private final Map<Member, ByteBuffer> membersMetadata = new ConcurrentHashMap();
    private final Disposable.Composite actionsDisposables = Disposables.composite();

    public MetadataStoreImpl(Member member, Transport transport, Object obj, ClusterConfig clusterConfig, Scheduler scheduler, CorrelationIdGenerator correlationIdGenerator) {
        this.localMember = (Member) Objects.requireNonNull(member);
        this.transport = (Transport) Objects.requireNonNull(transport);
        this.config = (ClusterConfig) Objects.requireNonNull(clusterConfig);
        this.scheduler = (Scheduler) Objects.requireNonNull(scheduler);
        this.cidGenerator = (CorrelationIdGenerator) Objects.requireNonNull(correlationIdGenerator);
        this.localMetadata = Objects.requireNonNull(obj);
    }

    public void start() {
        this.actionsDisposables.add(this.transport.listen().publishOn(this.scheduler).subscribe(this::onMessage, this::onError));
    }

    public void stop() {
        this.actionsDisposables.dispose();
        this.membersMetadata.clear();
    }

    public Object metadata() {
        return this.localMetadata;
    }

    public Optional<ByteBuffer> metadata(Member member) {
        return Optional.ofNullable(this.membersMetadata.get(member)).map((v0) -> {
            return v0.slice();
        });
    }

    public void updateMetadata(Object obj) {
        Objects.requireNonNull(obj, "updateMetadata(): metadata must be not null");
        this.localMetadata = obj;
    }

    public ByteBuffer updateMetadata(Member member, ByteBuffer byteBuffer) {
        if (this.localMember.equals(member)) {
            throw new IllegalArgumentException("removeMetadata must not accept local member");
        }
        if (byteBuffer == null) {
            return removeMetadata(member);
        }
        ByteBuffer slice = byteBuffer.slice();
        ByteBuffer put = this.membersMetadata.put(member, slice);
        if (put == null) {
            LOGGER.debug("Added metadata: {} for member {} [at {}]", new Object[]{Integer.valueOf(slice.remaining()), member, this.localMember});
        } else {
            LOGGER.debug("Updated metadata: {} for member {} [at {}]", new Object[]{Integer.valueOf(slice.remaining()), member, this.localMember});
        }
        return put;
    }

    public ByteBuffer removeMetadata(Member member) {
        if (this.localMember.equals(member)) {
            throw new IllegalArgumentException("removeMetadata must not accept local member");
        }
        ByteBuffer remove = this.membersMetadata.remove(member);
        if (remove == null) {
            return null;
        }
        LOGGER.debug("Removed metadata for member {} [at {}]", member, this.localMember);
        return remove;
    }

    public Mono<ByteBuffer> fetchMetadata(Member member) {
        return Mono.create(monoSink -> {
            LOGGER.debug("Getting metadata for member {} [at {}]", member, this.localMember);
            String nextCid = this.cidGenerator.nextCid();
            Address address = member.address();
            this.transport.requestResponse(address, Message.builder().qualifier(GET_METADATA_REQ).correlationId(nextCid).data(new GetMetadataRequest(member)).build()).timeout(Duration.ofMillis(this.config.metadataTimeout()), this.scheduler).publishOn(this.scheduler).subscribe(message -> {
                LOGGER.debug("Received GetMetadataResp[{}] from {} [at {}]", new Object[]{nextCid, address, this.localMember});
                monoSink.success(((GetMetadataResponse) message.data()).getMetadata());
            }, th -> {
                LOGGER.warn("Failed getting GetMetadataResp[{}] from {} within {} ms [at {}], cause : {}", new Object[]{nextCid, address, Integer.valueOf(this.config.metadataTimeout()), this.localMember, th.toString()});
                monoSink.error(th);
            });
        });
    }

    private void onMessage(Message message) {
        if (GET_METADATA_REQ.equals(message.qualifier())) {
            onMetadataRequest(message);
        }
    }

    private void onError(Throwable th) {
        LOGGER.error("Received unexpected error: ", th);
    }

    private void onMetadataRequest(Message message) {
        LOGGER.debug("Received GetMetadataReq: {} [at {}]", message, this.localMember);
        Member member = ((GetMetadataRequest) message.data()).getMember();
        if (!member.id().equals(this.localMember.id())) {
            LOGGER.warn("Received GetMetadataReq: {} to {}, but local member is {}", new Object[]{message, member, this.localMember});
            return;
        }
        Message build = Message.builder().qualifier(GET_METADATA_RESP).correlationId(message.correlationId()).data(new GetMetadataResponse(this.localMember, this.config.metadataEncoder().encode(this.localMetadata))).build();
        Address sender = message.sender();
        LOGGER.debug("Send GetMetadataResp: {} to {} [at {}]", new Object[]{build, sender, this.localMember});
        this.transport.send(sender, build).subscribe((Consumer) null, th -> {
            LOGGER.debug("Failed to send GetMetadataResp: {} to {} [at {}], cause: {}", new Object[]{build, sender, this.localMember, th.toString()});
        });
    }
}
