package org.mmbase.clustering.jgroups;

import java.util.concurrent.BlockingQueue;
import org.jgroups.Address;
import org.jgroups.ChannelException;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.mmbase.clustering.Statistics;
import org.mmbase.core.util.DaemonThread;
import org.mmbase.util.logging.Logger;
import org.mmbase.util.logging.Logging;

/* loaded from: input_file:WEB-INF/lib/mmbase-clustering-1.9.7-rc1.jar:org/mmbase/clustering/jgroups/ChangesSender.class */
public class ChangesSender implements Runnable {
    private static final Logger log = Logging.getLoggerInstance(ChangesSender.class);
    private Thread kicker = null;
    private final BlockingQueue<byte[]> nodesToSend;
    private final JChannel channel;
    private final Statistics send;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChangesSender(JChannel jChannel, BlockingQueue<byte[]> blockingQueue, Statistics statistics) {
        this.send = statistics;
        this.channel = jChannel;
        this.nodesToSend = blockingQueue;
        start();
    }

    private void start() {
        if (this.kicker == null) {
            this.kicker = new DaemonThread(this, "MulticastSender");
            this.kicker.start();
            log.debug("MulticastSender started");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        if (this.kicker == null) {
            log.service("Cannot stop thread, because it is null");
            return;
        }
        this.kicker.interrupt();
        this.kicker.setPriority(1);
        this.kicker = null;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.kicker != null) {
            try {
                if (this.channel == null || !this.channel.isConnected()) {
                    log.warn("Channel " + this.channel + " not connected. Sleeping for 5 s.");
                    try {
                        Thread.sleep(5000L);
                    } catch (InterruptedException e) {
                    }
                } else {
                    byte[] take = this.nodesToSend.take();
                    long currentTimeMillis = System.currentTimeMillis();
                    Message message = new Message((Address) null, (Address) null, take);
                    try {
                        if (log.isDebugEnabled()) {
                            log.debug("SEND=>" + take);
                        }
                        this.channel.send(message);
                    } catch (ChannelException e2) {
                        log.error("Can't send message" + take + ": " + e2.getMessage(), e2);
                    }
                    this.send.count++;
                    this.send.bytes += take.length;
                    this.send.cost += System.currentTimeMillis() - currentTimeMillis;
                }
            } catch (InterruptedException e3) {
                log.debug(Thread.currentThread().getName() + " was interruped.");
                return;
            } catch (Exception e4) {
                log.error(e4);
            }
        }
    }
}
