package net.jxta.impl.pipe;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.jxta.document.MimeMediaType;
import net.jxta.document.StructuredDocumentFactory;
import net.jxta.document.XMLDocument;
import net.jxta.endpoint.EndpointAddress;
import net.jxta.endpoint.EndpointListener;
import net.jxta.endpoint.Message;
import net.jxta.endpoint.MessageElement;
import net.jxta.endpoint.TextDocumentMessageElement;
import net.jxta.id.ID;
import net.jxta.impl.id.UUID.UUID;
import net.jxta.impl.id.UUID.UUIDFactory;
import net.jxta.logging.Logging;
import net.jxta.peer.PeerID;
import net.jxta.peergroup.PeerGroup;
import net.jxta.pipe.InputPipe;
import net.jxta.pipe.PipeService;
import net.jxta.protocol.PipeAdvertisement;
import net.jxta.rendezvous.RendezVousService;

/* loaded from: input_file:META-INF/lib/shoal-jxta-1.1_12142008.jar:net/jxta/impl/pipe/WirePipe.class */
public class WirePipe implements EndpointListener, InputPipe, PipeRegistrar {
    private static final transient Logger LOG = Logger.getLogger(WirePipe.class.getName());
    private static final int MAX_RECORDED_MSGIDS = 250;
    private final PeerGroup peerGroup;
    private final PipeResolver pipeResolver;
    private final WirePipeImpl wireService;
    private final PipeAdvertisement pipeAdv;
    private final RendezVousService rendezvous;
    private final PeerID localPeerId;
    private NonBlockingWireOutputPipe repropagater;
    private volatile boolean closed = false;
    int messagesReceived = 0;
    private final Map<InputPipe, Object> wireinputpipes = new WeakHashMap();
    private final List<UUID> msgIds = new ArrayList(MAX_RECORDED_MSGIDS);

    public WirePipe(PeerGroup peerGroup, PipeResolver pipeResolver, WirePipeImpl wirePipeImpl, PipeAdvertisement pipeAdvertisement) {
        this.peerGroup = peerGroup;
        this.pipeResolver = pipeResolver;
        this.wireService = wirePipeImpl;
        this.pipeAdv = pipeAdvertisement;
        this.localPeerId = this.peerGroup.getPeerID();
        this.rendezvous = peerGroup.getRendezVousService();
    }

    protected synchronized void finalize() throws Throwable {
        if (!this.closed && Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
            LOG.warning("Pipe is being finalized without being previously closed. This is likely a bug.");
        }
        close();
        super.finalize();
    }

    @Override // net.jxta.impl.pipe.PipeRegistrar
    public synchronized boolean register(InputPipe inputPipe) {
        boolean z;
        if (this.closed) {
            return false;
        }
        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
            LOG.fine("Registering input pipe for " + this.pipeAdv.getPipeID());
        }
        this.wireinputpipes.put(inputPipe, null);
        if (1 == this.wireinputpipes.size()) {
            if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
                LOG.info("Registering " + this.pipeAdv.getPipeID() + " with pipe resolver.");
            }
            z = this.pipeResolver.register(this);
        } else {
            z = true;
        }
        return z;
    }

    @Override // net.jxta.impl.pipe.PipeRegistrar
    public synchronized boolean forget(InputPipe inputPipe) {
        boolean z = null != this.wireinputpipes.remove(inputPipe);
        if (this.wireinputpipes.isEmpty()) {
            if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
                LOG.info("Deregistering wire pipe with pipe resolver");
            }
            this.pipeResolver.forget(this);
        }
        if (z && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
            LOG.fine("Removed input pipe for " + this.pipeAdv.getPipeID());
        }
        return z;
    }

    @Override // net.jxta.pipe.InputPipe
    public Message waitForMessage() throws InterruptedException {
        if (!Logging.SHOW_FINE || !LOG.isLoggable(Level.FINE)) {
            return null;
        }
        LOG.fine("This method is not really supported.");
        return null;
    }

    @Override // net.jxta.pipe.InputPipe
    public Message poll(int i) throws InterruptedException {
        if (!Logging.SHOW_FINE || !LOG.isLoggable(Level.FINE)) {
            return null;
        }
        LOG.fine("This method is not really supported.");
        return null;
    }

    @Override // net.jxta.pipe.InputPipe
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        if (null != this.repropagater) {
            this.repropagater.close();
            this.repropagater = null;
        }
        Iterator it = new ArrayList(this.wireinputpipes.keySet()).iterator();
        while (it.hasNext()) {
            ((InputPipe) it.next()).close();
        }
        this.wireinputpipes.clear();
        this.msgIds.clear();
        this.wireService.forgetWirePipe(this.pipeAdv.getPipeID());
    }

    @Override // net.jxta.pipe.InputPipe
    public String getType() {
        return this.pipeAdv.getType();
    }

    @Override // net.jxta.pipe.InputPipe
    public ID getPipeID() {
        return this.pipeAdv.getPipeID();
    }

    @Override // net.jxta.pipe.InputPipe
    public String getName() {
        return this.pipeAdv.getName();
    }

    @Override // net.jxta.pipe.InputPipe
    public PipeAdvertisement getAdvertisement() {
        return this.pipeAdv;
    }

    @Override // net.jxta.endpoint.EndpointListener
    public void processIncomingMessage(Message message, EndpointAddress endpointAddress, EndpointAddress endpointAddress2) {
        MessageElement messageElement = message.getMessageElement("jxta", "JxtaWireHeader");
        if (null == messageElement) {
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine("No JxtaWireHeader element. Discarding " + message);
                return;
            }
            return;
        }
        try {
            processIncomingMessage(message, new WireHeader((XMLDocument) StructuredDocumentFactory.newStructuredDocument(messageElement)), endpointAddress, endpointAddress2);
        } catch (Exception e) {
            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                LOG.log(Level.WARNING, "bad wire header", (Throwable) e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processIncomingMessage(Message message, WireHeader wireHeader, EndpointAddress endpointAddress, EndpointAddress endpointAddress2) {
        if (recordSeenMessage(wireHeader.getMsgId())) {
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine("Discarding duplicate " + message);
                return;
            }
            return;
        }
        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
            LOG.fine("Processing " + message + " from " + endpointAddress + " on " + this.pipeAdv.getPipeID());
        }
        callLocalListeners(message, endpointAddress, endpointAddress2);
        if (this.peerGroup.isRendezvous()) {
            repropagate(message, wireHeader);
        }
    }

    private void callLocalListeners(Message message, EndpointAddress endpointAddress, EndpointAddress endpointAddress2) {
        ArrayList<InputPipeImpl> arrayList = new ArrayList(this.wireinputpipes.keySet());
        if (arrayList.isEmpty()) {
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine("No local listeners for " + this.pipeAdv.getPipeID());
                return;
            }
            return;
        }
        int i = 0;
        for (InputPipeImpl inputPipeImpl : arrayList) {
            try {
                inputPipeImpl.processIncomingMessage(message.m61clone(), endpointAddress, endpointAddress2);
                i++;
            } catch (Throwable th) {
                if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
                    LOG.log(Level.SEVERE, "Uncaught Throwable during callback (" + inputPipeImpl + ") for " + inputPipeImpl.getPipeID(), th);
                }
            }
        }
        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
            LOG.fine("Called " + i + " of " + arrayList.size() + " local listeners for " + this.pipeAdv.getPipeID());
        }
    }

    void repropagate(Message message, WireHeader wireHeader) {
        if (this.closed) {
            return;
        }
        if (wireHeader.getTTL() <= 1) {
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine("No TTL remaining - discarding " + message + " on " + wireHeader.getPipeID());
                return;
            }
            return;
        }
        Message m61clone = message.m61clone();
        wireHeader.setTTL(wireHeader.getTTL() - 1);
        m61clone.replaceMessageElement("jxta", new TextDocumentMessageElement("JxtaWireHeader", (XMLDocument) wireHeader.getDocument(MimeMediaType.XMLUTF8), null));
        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
            LOG.fine("Repropagating " + m61clone + " on " + wireHeader.getPipeID());
        }
        synchronized (this) {
            if (this.closed) {
                return;
            }
            if (null == this.repropagater) {
                this.repropagater = this.wireService.createOutputPipe(this.pipeAdv, Collections.EMPTY_SET);
            }
            try {
                if (!this.repropagater.sendUnModified(m61clone, wireHeader) && Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                    LOG.warning("Failure repropagating " + m61clone + " on " + wireHeader.getPipeID() + ". Could not queue message.");
                }
            } catch (IOException e) {
                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
                    LOG.log(Level.WARNING, "Failure repropagating " + m61clone + " on " + wireHeader.getPipeID(), (Throwable) e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendMessage(Message message, Set<? extends ID> set, WireHeader wireHeader) throws IOException {
        Message m61clone = message.m61clone();
        if ((set.isEmpty() || set.contains(this.localPeerId)) && !recordSeenMessage(wireHeader.getMsgId())) {
            callLocalListeners(m61clone, new EndpointAddress(this.localPeerId, (String) null, (String) null), new EndpointAddress(this.pipeAdv.getPipeID(), (String) null, (String) null));
        }
        if (!set.isEmpty()) {
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine("Propagating " + m61clone + " to " + set.size() + " peers.");
            }
            this.rendezvous.propagate(Collections.enumeration(set), m61clone, "jxta.service.wirepipe", this.wireService.getServiceParameter(), 1);
            return;
        }
        if (this.peerGroup.isRendezvous()) {
            List<PeerID> query = this.pipeResolver.getSrdiIndex().query(PipeService.PropagateType, PipeAdvertisement.IdTag, getPipeID().toString(), RendezVousService.DEFAULT_TTL);
            query.retainAll(this.rendezvous.getConnectedPeerIDs());
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine("Propagating " + m61clone + " to " + query.size() + " subscriber peers.");
            }
            this.rendezvous.propagate(Collections.enumeration(query), m61clone, "jxta.service.wirepipe", this.wireService.getServiceParameter(), 1);
        } else {
            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                LOG.fine("Propagating " + m61clone + " to whole network.");
            }
            this.rendezvous.propagateToNeighbors(m61clone, "jxta.service.wirepipe", this.wireService.getServiceParameter(), RendezVousService.DEFAULT_TTL);
        }
        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
            LOG.fine("Walking " + m61clone + " through peerview.");
        }
        this.rendezvous.walk(m61clone, "jxta.service.wirepipe", this.wireService.getServiceParameter(), RendezVousService.DEFAULT_TTL);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String createMsgId() {
        return UUIDFactory.newSeqUUID().toString();
    }

    private boolean recordSeenMessage(String str) {
        UUID newHashUUID;
        try {
            newHashUUID = new UUID(str);
        } catch (IllegalArgumentException e) {
            try {
                newHashUUID = UUIDFactory.newHashUUID(Long.parseLong(str), 0L);
            } catch (NumberFormatException e2) {
                newHashUUID = UUIDFactory.newHashUUID(str.hashCode(), 0L);
            }
        }
        synchronized (this.msgIds) {
            if (this.msgIds.contains(newHashUUID)) {
                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
                    LOG.fine("duplicate " + newHashUUID);
                }
                return true;
            }
            if (this.msgIds.size() < MAX_RECORDED_MSGIDS) {
                this.msgIds.add(newHashUUID);
            } else {
                this.msgIds.set(this.messagesReceived % MAX_RECORDED_MSGIDS, newHashUUID);
            }
            this.messagesReceived++;
            if (!Logging.SHOW_FINE || !LOG.isLoggable(Level.FINE)) {
                return false;
            }
            LOG.fine("added " + newHashUUID);
            return false;
        }
    }
}
