package org.apache.pulsar.testclient;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.google.gson.Gson;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.TimeAverageBrokerData;
import org.apache.pulsar.testclient.utils.FixedColumnLengthTableMaker;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/testclient/BrokerMonitor.class */
public class BrokerMonitor {
    private static final String BROKER_ROOT = "/loadbalance/brokers";
    private static final int ZOOKEEPER_TIMEOUT_MILLIS = 30000;
    private static final int GLOBAL_STATS_PRINT_PERIOD_MILLIS = 60000;
    private ZooKeeper zkClient;
    private Map<String, Object> loadData;
    private static final FixedColumnLengthTableMaker globalTableMaker;
    private TableView<BrokerLoadData> brokerLoadDataTableView;
    private static final Logger log = LoggerFactory.getLogger(BrokerMonitor.class);
    private static final Gson gson = new Gson();
    private static final List<Object> MESSAGE_FIELDS = Arrays.asList("MSG/S IN", "MSG/S OUT", "TOTAL", "KB/S IN", "KB/S OUT", "TOTAL");
    private static final List<Object> SYSTEM_FIELDS = Arrays.asList("CPU %", "MEMORY %", "DIRECT %", "BW IN %", "BW OUT %", "MAX %");
    private static final Object[] SYSTEM_ROW = makeSystemRow("SYSTEM");
    private static final Object[] COUNT_ROW = {"COUNT", "TOPIC", "BUNDLE", "PRODUCER", "CONSUMER", "BUNDLE +", "BUNDLE -"};
    private static final Object[] LATEST_ROW = makeMessageRow("LATEST");
    private static final Object[] SHORT_ROW = makeMessageRow("SHORT");
    private static final Object[] LONG_ROW = makeMessageRow("LONG");
    private static final Object[] RAW_SYSTEM_ROW = makeSystemRow("RAW SYSTEM");
    private static final Object[] ALLOC_SYSTEM_ROW = makeSystemRow("ALLOC SYSTEM");
    private static final Object[] RAW_MESSAGE_ROW = makeMessageRow("RAW MSG");
    private static final Object[] ALLOC_MESSAGE_ROW = makeMessageRow("ALLOC MSG");
    private static final Object[] GLOBAL_HEADER = {"BROKER", "BUNDLE", "MSG/S", "LONG/S", "KB/S", "MAX %"};
    private static final FixedColumnLengthTableMaker localTableMaker = new FixedColumnLengthTableMaker();

    @Parameters(commandDescription = "Monitors brokers and prints to the console information about their system resource usages, \ntheir topic and bundle counts, their message rates, and other metrics.")
    /* loaded from: input_file:org/apache/pulsar/testclient/BrokerMonitor$Arguments.class */
    private static class Arguments {

        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
        boolean help;

        @Parameter(names = {"--connect-string"}, description = "Zookeeper or broker connect string", required = true)
        public String connectString = null;

        @Parameter(names = {"--extensions"}, description = "true to monitor Load Balance Extensions.")
        boolean extensions = false;

        private Arguments() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/testclient/BrokerMonitor$BrokerDataWatcher.class */
    public class BrokerDataWatcher implements Watcher {
        private final ZooKeeper zkClient;

        private BrokerDataWatcher(ZooKeeper zooKeeper) {
            this.zkClient = zooKeeper;
        }

        private String brokerNameFromPath(String str) {
            return str.substring(str.lastIndexOf(47) + 1);
        }

        public synchronized void process(WatchedEvent watchedEvent) {
            try {
                if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) {
                    printData(watchedEvent.getPath());
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private double percentUsage(double d, double d2) {
            if (d2 <= 0.0d || d < 0.0d) {
                return 0.0d;
            }
            return 100.0d * Math.min(1.0d, d / d2);
        }

        private synchronized void printData(String str) {
            String brokerNameFromPath = brokerNameFromPath(str);
            try {
                String str2 = new String(this.zkClient.getData(str, this, (Stat) null));
                if (str2.contains("allocated")) {
                    printLoadReport(brokerNameFromPath, (LoadReport) BrokerMonitor.gson.fromJson(str2, LoadReport.class));
                    return;
                }
                try {
                    printBrokerData(brokerNameFromPath, (LocalBrokerData) BrokerMonitor.gson.fromJson(str2, LocalBrokerData.class), (TimeAverageBrokerData) BrokerMonitor.gson.fromJson(new String(this.zkClient.getData("/loadbalance/broker-time-average/" + brokerNameFromPath, false, (Stat) null)), TimeAverageBrokerData.class));
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v5, types: [java.lang.Object[], java.lang.Object[][]] */
        private synchronized void printLoadReport(String str, LoadReport loadReport) {
            BrokerMonitor.this.loadData.put(str, loadReport);
            BrokerMonitor.initRow(r0[1], Integer.valueOf(loadReport.getNumTopics()), Integer.valueOf(loadReport.getNumBundles()), Integer.valueOf(loadReport.getNumProducers()), Integer.valueOf(loadReport.getNumConsumers()), Integer.valueOf(loadReport.getBundleGains().size()), Integer.valueOf(loadReport.getBundleLosses().size()));
            SystemResourceUsage systemResourceUsage = loadReport.getSystemResourceUsage();
            ResourceUsage cpu = systemResourceUsage.getCpu();
            ResourceUsage memory = systemResourceUsage.getMemory();
            ResourceUsage directMemory = systemResourceUsage.getDirectMemory();
            ResourceUsage bandwidthIn = systemResourceUsage.getBandwidthIn();
            ResourceUsage bandwidthOut = systemResourceUsage.getBandwidthOut();
            BrokerMonitor.initRow(r0[3], Float.valueOf(cpu.percentUsage()), Float.valueOf(memory.percentUsage()), Float.valueOf(directMemory.percentUsage()), Float.valueOf(bandwidthIn.percentUsage()), Float.valueOf(bandwidthOut.percentUsage()), Double.valueOf(Math.max(Math.max(Math.max(cpu.percentUsage(), memory.percentUsage()), Math.max(directMemory.percentUsage(), bandwidthIn.percentUsage())), bandwidthOut.percentUsage())));
            double percentUsage = percentUsage(loadReport.getAllocatedCPU(), cpu.limit);
            double percentUsage2 = percentUsage(loadReport.getAllocatedMemory(), memory.limit);
            double percentUsage3 = percentUsage(loadReport.getAllocatedBandwidthIn(), bandwidthIn.limit);
            double percentUsage4 = percentUsage(loadReport.getAllocatedBandwidthOut(), bandwidthOut.limit);
            BrokerMonitor.initRow(r0[5], Double.valueOf(percentUsage), Double.valueOf(percentUsage2), null, Double.valueOf(percentUsage3), Double.valueOf(percentUsage4), Double.valueOf(Math.max(Math.max(Math.max(percentUsage, percentUsage2), percentUsage3), percentUsage4)));
            BrokerMonitor.initMessageRow(r0[7], loadReport.getMsgRateIn(), loadReport.getMsgRateOut(), bandwidthIn.usage, bandwidthOut.usage);
            ?? r0 = {BrokerMonitor.COUNT_ROW, new Object[BrokerMonitor.COUNT_ROW.length], BrokerMonitor.RAW_SYSTEM_ROW, new Object[BrokerMonitor.RAW_SYSTEM_ROW.length], BrokerMonitor.ALLOC_SYSTEM_ROW, new Object[BrokerMonitor.ALLOC_SYSTEM_ROW.length], BrokerMonitor.RAW_MESSAGE_ROW, new Object[BrokerMonitor.RAW_MESSAGE_ROW.length], BrokerMonitor.ALLOC_MESSAGE_ROW, new Object[BrokerMonitor.ALLOC_MESSAGE_ROW.length]};
            BrokerMonitor.initMessageRow(r0[9], loadReport.getAllocatedMsgRateIn(), loadReport.getAllocatedMsgRateOut(), loadReport.getAllocatedBandwidthIn(), loadReport.getAllocatedBandwidthOut());
            BrokerMonitor.log.info("\nLoad Report for {}:\n{}\n", str, BrokerMonitor.localTableMaker.make(r0));
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v5, types: [java.lang.Object[], java.lang.Object[][]] */
        private synchronized void printBrokerData(String str, LocalBrokerData localBrokerData, TimeAverageBrokerData timeAverageBrokerData) {
            BrokerMonitor.this.loadData.put(str, localBrokerData);
            BrokerMonitor.initRow(r0[1], Float.valueOf(localBrokerData.getCpu().percentUsage()), Float.valueOf(localBrokerData.getMemory().percentUsage()), Float.valueOf(localBrokerData.getDirectMemory().percentUsage()), Float.valueOf(localBrokerData.getBandwidthIn().percentUsage()), Float.valueOf(localBrokerData.getBandwidthOut().percentUsage()), Double.valueOf(localBrokerData.getMaxResourceUsage() * 100.0d));
            BrokerMonitor.initRow(r0[3], Integer.valueOf(localBrokerData.getNumTopics()), Integer.valueOf(localBrokerData.getNumBundles()), Integer.valueOf(localBrokerData.getNumProducers()), Integer.valueOf(localBrokerData.getNumConsumers()), Integer.valueOf(localBrokerData.getLastBundleGains().size()), Integer.valueOf(localBrokerData.getLastBundleLosses().size()));
            BrokerMonitor.initMessageRow(r0[5], localBrokerData.getMsgRateIn(), localBrokerData.getMsgRateOut(), localBrokerData.getMsgThroughputIn(), localBrokerData.getMsgThroughputOut());
            BrokerMonitor.initMessageRow(r0[7], timeAverageBrokerData.getShortTermMsgRateIn(), timeAverageBrokerData.getShortTermMsgRateOut(), timeAverageBrokerData.getShortTermMsgThroughputIn(), timeAverageBrokerData.getShortTermMsgThroughputOut());
            ?? r0 = {BrokerMonitor.SYSTEM_ROW, new Object[BrokerMonitor.SYSTEM_ROW.length], BrokerMonitor.COUNT_ROW, new Object[BrokerMonitor.COUNT_ROW.length], BrokerMonitor.LATEST_ROW, new Object[BrokerMonitor.LATEST_ROW.length], BrokerMonitor.SHORT_ROW, new Object[BrokerMonitor.SHORT_ROW.length], BrokerMonitor.LONG_ROW, new Object[BrokerMonitor.LONG_ROW.length]};
            BrokerMonitor.initMessageRow(r0[9], timeAverageBrokerData.getLongTermMsgRateIn(), timeAverageBrokerData.getLongTermMsgRateOut(), timeAverageBrokerData.getLongTermMsgThroughputIn(), timeAverageBrokerData.getLongTermMsgThroughputOut());
            BrokerMonitor.log.info("\nBroker Data for {}:\n{}\n", str, BrokerMonitor.localTableMaker.make(r0));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/testclient/BrokerMonitor$BrokerWatcher.class */
    public class BrokerWatcher implements Watcher {
        private final ZooKeeper zkClient;
        private Set<String> brokers = Collections.emptySet();

        private BrokerWatcher(ZooKeeper zooKeeper) {
            this.zkClient = zooKeeper;
        }

        public synchronized void process(WatchedEvent watchedEvent) {
            try {
                if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
                    updateBrokers(watchedEvent.getPath());
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private synchronized void updateBrokers(String str) {
            HashSet<String> hashSet = new HashSet();
            try {
                hashSet.addAll(this.zkClient.getChildren(str, this));
                for (String str2 : this.brokers) {
                    if (!hashSet.contains(str2)) {
                        BrokerMonitor.log.info("Lost broker: " + str2);
                        synchronized (BrokerMonitor.this.loadData) {
                            BrokerMonitor.this.loadData.remove(str2);
                        }
                    }
                }
                for (String str3 : hashSet) {
                    if (!this.brokers.contains(str3)) {
                        BrokerMonitor.log.info("Gained broker: " + str3);
                        new BrokerDataWatcher(this.zkClient).printData(str + "/" + str3);
                    }
                }
                this.brokers = hashSet;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static Object[] makeMessageRow(String str) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(str);
        arrayList.addAll(MESSAGE_FIELDS);
        return arrayList.toArray();
    }

    private static Object[] makeSystemRow(String str) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(str);
        arrayList.addAll(SYSTEM_FIELDS);
        return arrayList.toArray();
    }

    private static void initRow(Object[] objArr, Object... objArr2) {
        System.arraycopy(objArr2, 0, objArr, 1, objArr2.length);
    }

    private static void initMessageRow(Object[] objArr, double d, double d2, double d3, double d4) {
        initRow(objArr, Double.valueOf(d), Double.valueOf(d2), Double.valueOf(d + d2), Double.valueOf(d3 / 1024.0d), Double.valueOf(d4 / 1024.0d), Double.valueOf((d3 + d4) / 1024.0d));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v8, types: [java.lang.Object[], java.lang.Object[][]] */
    private void printGlobalData() {
        int numBundles;
        double msgRateIn;
        double longTermMsgRateIn;
        double msgThroughputIn;
        double maxResourceUsage;
        synchronized (this.loadData) {
            ?? r0 = new Object[this.loadData.size() + 2];
            r0[0] = GLOBAL_HEADER;
            int i = 0;
            double d = 0.0d;
            double d2 = 0.0d;
            double d3 = 0.0d;
            double d4 = 0.0d;
            int i2 = 1;
            for (Map.Entry<String, Object> entry : this.loadData.entrySet()) {
                String key = entry.getKey();
                Object value = entry.getValue();
                r0[i2] = new Object[GLOBAL_HEADER.length];
                r0[i2][0] = key;
                if (value instanceof LoadReport) {
                    LoadReport loadReport = (LoadReport) value;
                    numBundles = loadReport.getNumBundles();
                    msgRateIn = loadReport.getMsgRateIn() + loadReport.getMsgRateOut();
                    longTermMsgRateIn = loadReport.getAllocatedMsgRateIn() + loadReport.getAllocatedMsgRateOut();
                    msgThroughputIn = (loadReport.getAllocatedBandwidthIn() + loadReport.getAllocatedBandwidthOut()) / 1024.0d;
                    SystemResourceUsage systemResourceUsage = loadReport.getSystemResourceUsage();
                    maxResourceUsage = Math.max(Math.max(Math.max(systemResourceUsage.getCpu().percentUsage(), systemResourceUsage.getMemory().percentUsage()), Math.max(systemResourceUsage.getDirectMemory().percentUsage(), systemResourceUsage.getBandwidthIn().percentUsage())), systemResourceUsage.getBandwidthOut().percentUsage());
                } else {
                    if (!(value instanceof LocalBrokerData)) {
                        throw new AssertionError("Unreachable code");
                    }
                    LocalBrokerData localBrokerData = (LocalBrokerData) value;
                    numBundles = localBrokerData.getNumBundles();
                    msgRateIn = localBrokerData.getMsgRateIn() + localBrokerData.getMsgRateOut();
                    try {
                        TimeAverageBrokerData timeAverageBrokerData = (TimeAverageBrokerData) gson.fromJson(new String(this.zkClient.getData("/loadbalance/broker-time-average/" + key, false, (Stat) null)), TimeAverageBrokerData.class);
                        longTermMsgRateIn = timeAverageBrokerData.getLongTermMsgRateIn() + timeAverageBrokerData.getLongTermMsgRateOut();
                        msgThroughputIn = (localBrokerData.getMsgThroughputIn() + localBrokerData.getMsgThroughputOut()) / 1024.0d;
                        maxResourceUsage = localBrokerData.getMaxResourceUsage();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
                r0[i2][1] = Integer.valueOf(numBundles);
                r0[i2][2] = Double.valueOf(msgRateIn);
                r0[i2][3] = Double.valueOf(msgThroughputIn);
                r0[i2][4] = Double.valueOf(longTermMsgRateIn);
                r0[i2][5] = Double.valueOf(maxResourceUsage);
                i += numBundles;
                d2 += msgRateIn;
                d3 += longTermMsgRateIn;
                d += msgThroughputIn;
                d4 = Math.max(maxResourceUsage, d4);
                i2++;
            }
            int size = this.loadData.size() + 1;
            r0[size] = new Object[GLOBAL_HEADER.length];
            r0[size][0] = "TOTAL";
            r0[size][1] = Integer.valueOf(i);
            r0[size][2] = Double.valueOf(d2);
            r0[size][3] = Double.valueOf(d3);
            r0[size][4] = Double.valueOf(d);
            r0[size][5] = Double.valueOf(d4);
            log.info("Overall Broker Data:\n{}", globalTableMaker.make(r0));
        }
    }

    public BrokerMonitor(ZooKeeper zooKeeper) {
        this.loadData = new ConcurrentHashMap();
        this.zkClient = zooKeeper;
    }

    public void start() {
        try {
            new BrokerWatcher(this.zkClient).updateBrokers(BROKER_ROOT);
            while (true) {
                Thread.sleep(60000L);
                printGlobalData();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private BrokerMonitor(String str) {
        try {
            this.brokerLoadDataTableView = PulsarClient.builder().memoryLimit(0L, SizeUnit.BYTES).serviceUrl(str).connectionsPerBroker(4).ioThreads(Runtime.getRuntime().availableProcessors()).statsInterval(0L, TimeUnit.SECONDS).build().newTableView(Schema.JSON(BrokerLoadData.class)).topic(ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC).create();
        } catch (Throwable th) {
            log.info("Failed to start BrokerMonitor", th);
            throw new RuntimeException(th);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    private synchronized void printBrokerLoadData(String str, BrokerLoadData brokerLoadData) {
        initRow(r0[1], Float.valueOf(brokerLoadData.getCpu().percentUsage()), Float.valueOf(brokerLoadData.getMemory().percentUsage()), Float.valueOf(brokerLoadData.getDirectMemory().percentUsage()), Float.valueOf(brokerLoadData.getBandwidthIn().percentUsage()), Float.valueOf(brokerLoadData.getBandwidthOut().percentUsage()), Double.valueOf(brokerLoadData.getMaxResourceUsage() * 100.0d));
        initRow(r0[3], null, Integer.valueOf(brokerLoadData.getBundleCount()), null, null, null, null);
        ?? r0 = {SYSTEM_ROW, new Object[SYSTEM_ROW.length], COUNT_ROW, new Object[COUNT_ROW.length], LATEST_ROW, new Object[LATEST_ROW.length]};
        initMessageRow(r0[5], brokerLoadData.getMsgRateIn(), brokerLoadData.getMsgRateOut(), brokerLoadData.getMsgThroughputIn(), brokerLoadData.getMsgThroughputOut());
        log.info("\nBroker Data for {}:\n{}\n", str, localTableMaker.make(r0));
    }

    private synchronized void printBrokerLoadDataStore() {
        this.brokerLoadDataTableView.forEach(this::printBrokerLoadData);
    }

    private void startBrokerLoadDataStoreMonitor() {
        while (true) {
            try {
                Thread.sleep(60000L);
                printBrokerLoadDataStore();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static void main(String[] strArr) throws Exception {
        Arguments arguments = new Arguments();
        JCommander jCommander = new JCommander(arguments);
        jCommander.setProgramName("pulsar-perf monitor-brokers");
        try {
            jCommander.parse(strArr);
        } catch (ParameterException e) {
            System.out.println(e.getMessage());
            jCommander.usage();
            PerfClientUtils.exit(1);
        }
        if (arguments.extensions) {
            new BrokerMonitor(arguments.connectString).startBrokerLoadDataStoreMonitor();
        } else {
            new BrokerMonitor(new ZooKeeper(arguments.connectString, ZOOKEEPER_TIMEOUT_MILLIS, (Watcher) null)).start();
        }
    }

    static {
        localTableMaker.elementLength = 14;
        localTableMaker.decimalFormatter = "%.2f";
        globalTableMaker = new FixedColumnLengthTableMaker();
        globalTableMaker.decimalFormatter = "%.2f";
        globalTableMaker.topBorder = '*';
        globalTableMaker.bottomBorder = '*';
        globalTableMaker.lengthFunction = num -> {
            return Integer.valueOf(num.intValue() == 0 ? 60 : 12);
        };
    }
}
