package net.jxta.impl.pipe;

import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.jxta.endpoint.EndpointAddress;
import net.jxta.endpoint.EndpointListener;
import net.jxta.endpoint.Message;
import net.jxta.id.ID;
import net.jxta.impl.util.UnbiasedQueue;
import net.jxta.logging.Logging;
import net.jxta.pipe.InputPipe;
import net.jxta.pipe.PipeID;
import net.jxta.pipe.PipeMsgEvent;
import net.jxta.pipe.PipeMsgListener;
import net.jxta.protocol.PipeAdvertisement;

/* loaded from: input_file:META-INF/lib/shoal-jxta-1.1_12142008.jar:net/jxta/impl/pipe/InputPipeImpl.class */
class InputPipeImpl implements EndpointListener, InputPipe {
    private static final Logger LOG = Logger.getLogger(InputPipeImpl.class.getName());
    protected static final int QUEUESIZE = 100;
    protected PipeRegistrar registrar;
    protected final PipeAdvertisement pipeAdv;
    protected final ID pipeID;
    protected volatile boolean closed = false;
    protected PipeMsgListener listener;
    protected final UnbiasedQueue queue;

    /* JADX INFO: Access modifiers changed from: package-private */
    public InputPipeImpl(PipeRegistrar pipeRegistrar, PipeAdvertisement pipeAdvertisement, PipeMsgListener pipeMsgListener) throws IOException {
        this.registrar = pipeRegistrar;
        this.pipeAdv = pipeAdvertisement;
        this.listener = pipeMsgListener;
        this.pipeID = pipeAdvertisement.getPipeID();
        if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
            LOG.info("Creating InputPipe for " + this.pipeID + " of type " + pipeAdvertisement.getType() + " with " + (null != pipeMsgListener ? "listener" : "queue"));
        }
        if (pipeMsgListener == null) {
            this.queue = UnbiasedQueue.synchronizedQueue(new UnbiasedQueue(100, true));
        } else {
            this.queue = null;
        }
        if (!this.registrar.register(this)) {
            throw new IOException("Could not register input pipe (already registered) for " + this.pipeID);
        }
    }

    protected synchronized void finalize() throws Throwable {
        if (!this.closed && Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
            LOG.warning("Pipe is being finalized without being previously closed. This is likely a bug.");
        }
        close();
        super.finalize();
    }

    @Override // net.jxta.pipe.InputPipe
    public Message waitForMessage() throws InterruptedException {
        return poll(0);
    }

    @Override // net.jxta.pipe.InputPipe
    public Message poll(int i) throws InterruptedException {
        if (this.listener == null) {
            return (Message) this.queue.pop(i);
        }
        if (!Logging.SHOW_WARNING || !LOG.isLoggable(Level.WARNING)) {
            return null;
        }
        LOG.warning("poll() has no effect in listener mode.");
        return null;
    }

    @Override // net.jxta.pipe.InputPipe
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        if (null == this.listener) {
            this.queue.close();
        }
        this.listener = null;
        if (!this.registrar.forget(this) && Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
            LOG.warning("close() : pipe was not registered with registrar.");
        }
        this.registrar = null;
        if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
            LOG.info("Closed " + this.pipeID);
        }
    }

    @Override // net.jxta.endpoint.EndpointListener
    public void processIncomingMessage(Message message, EndpointAddress endpointAddress, EndpointAddress endpointAddress2) {
        if (this.closed) {
            return;
        }
        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
            LOG.fine("Received " + message + " from " + endpointAddress + " for " + this.pipeID);
        }
        if (null == this.queue) {
            PipeMsgListener pipeMsgListener = this.listener;
            if (null == pipeMsgListener) {
                return;
            }
            try {
                pipeMsgListener.pipeMsgEvent(new PipeMsgEvent(this, message, (PipeID) this.pipeID));
                return;
            } catch (Throwable th) {
                if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
                    LOG.log(Level.SEVERE, "Uncaught Throwable in listener for : " + this.pipeID + "(" + pipeMsgListener.getClass().getName() + ")", th);
                    return;
                }
                return;
            }
        }
        boolean z = false;
        while (!z && !this.queue.isClosed()) {
            try {
                z = this.queue.push(message, 1000L);
            } catch (InterruptedException e) {
                Thread.interrupted();
            }
        }
        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
            synchronized (this) {
                LOG.fine("Queued " + message + " for " + this.pipeID + "\n\tqueue closed : " + this.queue.isClosed() + "\tnumber in queue : " + this.queue.getCurrentInQueue() + "\tnumber queued : " + this.queue.getNumEnqueued() + "\tnumber dequeued : " + this.queue.getNumDequeued());
            }
        }
    }

    @Override // net.jxta.pipe.InputPipe
    public String getType() {
        return this.pipeAdv.getType();
    }

    @Override // net.jxta.pipe.InputPipe
    public ID getPipeID() {
        return this.pipeID;
    }

    @Override // net.jxta.pipe.InputPipe
    public String getName() {
        return this.pipeAdv.getName();
    }

    @Override // net.jxta.pipe.InputPipe
    public PipeAdvertisement getAdvertisement() {
        return this.pipeAdv;
    }
}
