package org.mmbase.clustering;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.StreamCorruptedException;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.mmbase.core.event.AllEventListener;
import org.mmbase.core.event.Event;
import org.mmbase.core.event.EventManager;
import org.mmbase.core.event.NodeEvent;
import org.mmbase.core.event.RelationEvent;
import org.mmbase.core.event.SystemEvent;
import org.mmbase.core.util.DaemonThread;
import org.mmbase.module.core.MMBase;
import org.mmbase.module.core.MMObjectBuilder;
import org.mmbase.module.core.MMObjectNode;
import org.mmbase.util.ThreadPools;
import org.mmbase.util.logging.Logger;
import org.mmbase.util.logging.Logging;

/* loaded from: input_file:WEB-INF/lib/mmbase-clustering-1.9.7-rc1.jar:org/mmbase/clustering/ClusterManager.class */
public abstract class ClusterManager implements AllEventListener, Runnable {
    private static final Logger log = Logging.getLoggerInstance(ClusterManager.class);
    protected final Statistics receive = new Statistics();
    protected final Statistics send = new Statistics();
    protected BlockingQueue<byte[]> nodesToSend = new LinkedBlockingQueue(64);
    protected BlockingQueue<byte[]> nodesToSpawn = new LinkedBlockingQueue(64);
    protected Thread kicker = null;
    protected boolean spawnThreads = true;
    protected boolean compatible17 = false;
    protected int follownr = 1;
    protected int lastRecievedMessage;

    public final void shutdown() {
        log.info("Shutting down clustering");
        stopCommunicationThreads();
        this.kicker.setPriority(1);
        this.kicker = null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void readConfiguration(Map<String, String> map) {
        String str = map.get("spawnthreads");
        if (str == null || str.equals("")) {
            return;
        }
        this.spawnThreads = !"false".equalsIgnoreCase(str);
    }

    protected abstract void startCommunicationThreads();

    protected abstract void stopCommunicationThreads();

    public void notify(Event event) {
        if (!event.getMachine().equals(MMBase.getMMBase().getMachineName())) {
            log.trace("Ignoring remote event from " + event.getMachine() + " it will not be propagated");
            return;
        }
        byte[] createMessage = createMessage(event);
        if (createMessage == null) {
            log.debug("MEssage was null");
            return;
        }
        if (createMessage.length > 5000) {
            log.warn("Sending large event to the cluster. Serialization of  " + event + " is " + createMessage.length + " long!");
        } else {
            log.debug("Sending an event to the cluster");
        }
        this.nodesToSend.offer(createMessage);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void start() {
        if (this.kicker == null) {
            this.kicker = new DaemonThread(this, "ClusterManager");
            this.kicker.start();
            try {
                this.kicker.setPriority(6);
            } catch (NullPointerException e) {
                log.warn("Could not set thread priority of Cluster Manager");
            }
            startCommunicationThreads();
        }
    }

    protected byte[] createMessage(Event event) {
        NodeEvent nodeEvent;
        if (log.isDebugEnabled()) {
            log.debug("Serializing " + event);
        }
        try {
            long currentTimeMillis = System.currentTimeMillis();
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            if (this.compatible17 && ((event instanceof NodeEvent) || (event instanceof RelationEvent))) {
                if (event instanceof RelationEvent) {
                    RelationEvent relationEvent = (RelationEvent) event;
                    nodeEvent = relationEvent.getNodeEvent();
                    ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
                    byte[] bytes = createMessage(relationEvent.getMachine(), relationEvent.getRelationSourceNumber(), relationEvent.getRelationSourceType(), "r").getBytes();
                    byteArrayOutputStream2.write(bytes, 0, bytes.length);
                    byteArrayOutputStream2.write(44);
                    byteArrayOutputStream2.write(0);
                    this.nodesToSend.offer(byteArrayOutputStream2.toByteArray());
                    ByteArrayOutputStream byteArrayOutputStream3 = new ByteArrayOutputStream();
                    byte[] bytes2 = createMessage(relationEvent.getMachine(), relationEvent.getRelationDestinationNumber(), relationEvent.getRelationDestinationType(), "r").getBytes();
                    byteArrayOutputStream3.write(bytes2, 0, bytes2.length);
                    byteArrayOutputStream3.write(44);
                    byteArrayOutputStream3.write(0);
                    this.nodesToSend.offer(byteArrayOutputStream3.toByteArray());
                } else {
                    nodeEvent = (NodeEvent) event;
                }
                byte[] bytes3 = createMessage(nodeEvent.getMachine(), nodeEvent.getNodeNumber(), nodeEvent.getBuilderName(), NodeEvent.newTypeToOldType(nodeEvent.getType())).getBytes();
                byteArrayOutputStream.write(bytes3, 0, bytes3.length);
            }
            byteArrayOutputStream.write(44);
            byteArrayOutputStream.write(0);
            new ObjectOutputStream(byteArrayOutputStream).writeObject(event);
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            this.send.parseCost += currentTimeMillis2;
            this.send.cost += currentTimeMillis2;
            return byteArrayOutputStream.toByteArray();
        } catch (IOException e) {
            log.error(e.getMessage(), e);
            return null;
        }
    }

    protected String createMessage(String str, int i, String str2, String str3) {
        StringBuilder append = new StringBuilder().append(str).append(',');
        int i2 = this.follownr;
        this.follownr = i2 + 1;
        return append.append(i2).append(',').append(i).append(',').append(str2).append(',').append(str3).toString();
    }

    protected Event parseMessage(byte[] bArr) {
        try {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
            for (int i = 1; i > 0; i = byteArrayInputStream.read()) {
            }
            Event event = (Event) new ObjectInputStream(byteArrayInputStream).readObject();
            if (log.isDebugEnabled()) {
                log.debug("Unserialized " + event);
            }
            return event;
        } catch (EOFException e) {
            NodeEvent parseMessageBackwardCompatible = parseMessageBackwardCompatible(new String(bArr));
            if (log.isDebugEnabled()) {
                log.debug("Old style message " + parseMessageBackwardCompatible);
            }
            return parseMessageBackwardCompatible;
        } catch (StreamCorruptedException e2) {
            log.debug(e2.getMessage() + ". Supposing old style message.");
            NodeEvent parseMessageBackwardCompatible2 = parseMessageBackwardCompatible(new String(bArr));
            if (log.isDebugEnabled()) {
                log.debug("Old style message " + parseMessageBackwardCompatible2);
            }
            return parseMessageBackwardCompatible2;
        } catch (IOException e3) {
            log.error(e3);
            return null;
        } catch (ClassNotFoundException e4) {
            log.error(e4);
            return null;
        }
    }

    protected NodeEvent parseMessageBackwardCompatible(String str) {
        if (log.isDebugEnabled()) {
            log.debug("RECEIVE=>" + str);
        }
        StringTokenizer stringTokenizer = new StringTokenizer(str, ",");
        if (!stringTokenizer.hasMoreTokens()) {
            log.error(str + ": 'machine' could not be extracted from this string!");
            return null;
        }
        String nextToken = stringTokenizer.nextToken();
        if (!stringTokenizer.hasMoreTokens()) {
            log.error(str + ": 'vnr' could not be extracted from this string!");
            return null;
        }
        int intValue = Integer.valueOf(stringTokenizer.nextToken()).intValue();
        int i = this.lastRecievedMessage + 1;
        if (intValue != i) {
            log.info("Expected message " + i + ", but message " + intValue + " was recieved ");
        }
        this.lastRecievedMessage = intValue;
        if (!stringTokenizer.hasMoreTokens()) {
            log.error(str + ": 'id' could not be extracted from this string!");
            return null;
        }
        String nextToken2 = stringTokenizer.nextToken();
        if (!stringTokenizer.hasMoreTokens()) {
            log.error(str + ": 'tb' could not be extracted from this string!");
            return null;
        }
        String nextToken3 = stringTokenizer.nextToken();
        if (!stringTokenizer.hasMoreTokens()) {
            log.error(str + ": 'ctype' could not be extracted from this string!");
            return null;
        }
        String nextToken4 = stringTokenizer.nextToken();
        if (nextToken4.equals("s")) {
            log.error("XML messages not suppported any more");
            return null;
        }
        MMBase mMBase = MMBase.getMMBase();
        MMObjectBuilder builder = mMBase.getBuilder(nextToken3);
        if (builder == null) {
            builder = mMBase.getBuilder("object");
        }
        MMObjectNode node = builder.getNode(nextToken2);
        if (node != null) {
            return new NodeEvent(nextToken, nextToken3, node.getNumber(), node.getOldValues(), node.getValues(), NodeEvent.oldTypeToNewType(nextToken4));
        }
        try {
            return new NodeEvent(nextToken, nextToken3, Integer.valueOf(nextToken2).intValue(), (Map) null, (Map) null, NodeEvent.oldTypeToNewType(nextToken4));
        } catch (NumberFormatException e) {
            log.error(str + ": colud not parse " + nextToken2 + " to a node number.");
            return null;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.kicker != null) {
            try {
                byte[] take = this.nodesToSpawn.take();
                if (take != null) {
                    long currentTimeMillis = System.currentTimeMillis();
                    if (log.isDebugEnabled()) {
                        log.trace("RECEIVED =>" + take.length + " bytes");
                    }
                    this.receive.count++;
                    this.receive.bytes += take.length;
                    Event parseMessage = parseMessage(take);
                    this.receive.parseCost += System.currentTimeMillis() - currentTimeMillis;
                    if (parseMessage != null) {
                        handleEvent(parseMessage);
                    } else {
                        log.warn("Could not handle event, it is null");
                    }
                    this.receive.cost += System.currentTimeMillis() - currentTimeMillis;
                }
            } catch (InterruptedException e) {
                log.debug(Thread.currentThread().getName() + " was interruped.");
                return;
            } catch (Throwable th) {
                log.error(th.getMessage(), th);
            }
        }
    }

    protected void handleEvent(final Event event) {
        MMObjectBuilder builder;
        MMBase mMBase = MMBase.getMMBase();
        if (mMBase == null || !mMBase.getState()) {
            if (log.isDebugEnabled()) {
                log.debug("Ignoring event " + event + ", mmbase is not up " + mMBase);
                return;
            }
            return;
        }
        if (mMBase.getMachineName().equals(event.getMachine())) {
            if (log.isDebugEnabled()) {
                log.debug("Ignoring event " + event + " it is from this (" + event.getMachine() + ") mmbase");
                return;
            }
            return;
        }
        if (event instanceof SystemEvent.Shutdown) {
            log.warn("Can not handle system shutdown event when sent by other server." + event);
            return;
        }
        if ((event instanceof NodeEvent) && (builder = mMBase.getBuilder(((NodeEvent) event).getBuilderName())) != null && !builder.broadcastChanges()) {
            log.info("Ignoring node-event for node type " + builder + " because broad cast changes is false");
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Handling event " + event + " for " + event.getMachine());
        }
        if (this.spawnThreads) {
            ThreadPools.jobsExecutor.execute(new Runnable() { // from class: org.mmbase.clustering.ClusterManager.1
                @Override // java.lang.Runnable
                public void run() {
                    long currentTimeMillis = System.currentTimeMillis();
                    EventManager.getInstance().propagateEvent(event);
                    ClusterManager.this.receive.cost += System.currentTimeMillis() - currentTimeMillis;
                }
            });
            return;
        }
        try {
            EventManager.getInstance().propagateEvent(event);
        } catch (Throwable th) {
            log.error("Exception during propagation of event: " + event + ": " + th.getMessage(), th);
        }
    }
}
