package com.intel.jndn.utils.pubsub;

import com.intel.jndn.utils.Cancellation;
import com.intel.jndn.utils.Client;
import com.intel.jndn.utils.On;
import com.intel.jndn.utils.Subscriber;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.named_data.jndn.Data;
import net.named_data.jndn.Face;
import net.named_data.jndn.Interest;
import net.named_data.jndn.Name;
import net.named_data.jndn.encoding.EncodingException;
import net.named_data.jndn.util.Blob;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/intel/jndn/utils/pubsub/NdnSubscriber.class */
public class NdnSubscriber implements Subscriber {
    private static final Logger LOGGER = Logger.getLogger(NdnSubscriber.class.getName());
    private final Face face;
    private final Name prefix;
    private final On<Blob> onMessage;
    private final On<Exception> onError;
    private final AnnouncementService announcementService;
    private final Client client;
    private final Map<Long, Subscription> subscriptions = new ConcurrentHashMap();
    private Cancellation newAnnouncementCancellation;
    private Cancellation existingAnnouncementsCancellation;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/intel/jndn/utils/pubsub/NdnSubscriber$Subscription.class */
    public class Subscription implements Cancellation {
        final long publisherId;
        long messageId;
        CompletableFuture<Data> currentRequest;

        Subscription(long j) {
            this.publisherId = j;
        }

        @Override // com.intel.jndn.utils.Cancellation
        public synchronized void cancel() {
            if (this.currentRequest != null) {
                this.currentRequest.cancel(true);
            }
        }

        synchronized void subscribe() {
            this.currentRequest = NdnSubscriber.this.client.getAsync(NdnSubscriber.this.face, buildLatestInterest(this.publisherId));
            this.currentRequest.handle(this::handleResponse);
        }

        private Interest buildLatestInterest(long j) {
            Interest interest = new Interest(PubSubNamespace.toPublisherName(NdnSubscriber.this.prefix, j));
            interest.setChildSelector(1);
            return interest;
        }

        synchronized void next(long j, long j2) {
            this.currentRequest = NdnSubscriber.this.client.getAsync(NdnSubscriber.this.face, buildNextInterest(j, j2));
            this.currentRequest.handle(this::handleResponse);
        }

        private Interest buildNextInterest(long j, long j2) {
            return new Interest(PubSubNamespace.toMessageName(NdnSubscriber.this.prefix, j, j2 + 1));
        }

        private Void handleResponse(Data data, Throwable th) {
            if (th != null) {
                NdnSubscriber.this.onError.on((Exception) th);
                return null;
            }
            try {
                Response parseResponse = PubSubNamespace.parseResponse(data);
                this.messageId = parseResponse.messageId();
                NdnSubscriber.this.onMessage.on(parseResponse.content());
            } catch (EncodingException e) {
                NdnSubscriber.this.onError.on(e);
            }
            next(this.publisherId, this.messageId);
            return null;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.publisherId == ((Subscription) obj).publisherId;
        }

        public int hashCode() {
            return (int) (this.publisherId ^ (this.publisherId >>> 32));
        }

        public String toString() {
            return "Subscription{publisherId=" + this.publisherId + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NdnSubscriber(Face face, Name name, On<Blob> on, On<Exception> on2, AnnouncementService announcementService, Client client) {
        this.face = face;
        this.prefix = name;
        this.onMessage = on;
        this.onError = on2;
        this.announcementService = announcementService;
        this.client = client;
    }

    @Override // com.intel.jndn.utils.Subscriber
    public Set<Long> knownPublishers() {
        return this.subscriptions.keySet();
    }

    @Override // com.intel.jndn.utils.Subscriber
    public void open() throws IOException {
        LOGGER.log(Level.INFO, "Starting subscriber: {0}", this.prefix);
        this.existingAnnouncementsCancellation = this.announcementService.discoverExistingAnnouncements((v1) -> {
            addPublisher(v1);
        }, null, exc -> {
            close();
        });
        this.newAnnouncementCancellation = this.announcementService.observeNewAnnouncements((v1) -> {
            addPublisher(v1);
        }, (v1) -> {
            removePublisher(v1);
        }, exc2 -> {
            close();
        });
    }

    void addPublisher(long j) {
        if (this.subscriptions.containsKey(Long.valueOf(j))) {
            LOGGER.log(Level.WARNING, "Duplicate publisher ID {} received from announcement service; this should not happen and will be ignored", Long.valueOf(j));
            return;
        }
        Subscription subscription = new Subscription(j);
        this.subscriptions.put(Long.valueOf(j), subscription);
        subscription.subscribe();
    }

    void removePublisher(long j) {
        this.subscriptions.remove(Long.valueOf(j)).cancel();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        LOGGER.log(Level.INFO, "Stopping subscriber, knows of {0} publishers: {1} ", new Object[]{Integer.valueOf(this.subscriptions.size()), this.subscriptions});
        if (this.newAnnouncementCancellation != null) {
            this.newAnnouncementCancellation.cancel();
        }
        if (this.existingAnnouncementsCancellation != null) {
            this.existingAnnouncementsCancellation.cancel();
        }
        Iterator<Subscription> it = this.subscriptions.values().iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
    }
}
