package org.apache.kafka.tools.other;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.imageio.ImageIO;
import kafka.server.BrokerServer;
import kafka.server.KafkaBroker;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.server.quota.QuotaType;
import org.apache.kafka.tools.reassign.ReassignPartitionsCommand;
import org.apache.logging.log4j.core.config.Configurator;
import org.jfree.chart.ChartFactory;
import org.jfree.chart.ChartFrame;
import org.jfree.chart.JFreeChart;
import org.jfree.chart.plot.PlotOrientation;
import org.jfree.data.xy.XYSeries;
import org.jfree.data.xy.XYSeriesCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

/* loaded from: input_file:org/apache/kafka/tools/other/ReplicationQuotasTestRig.class */
public class ReplicationQuotasTestRig {
    public static final Logger LOGGER = LoggerFactory.getLogger(ReplicationQuotasTestRig.class);
    public static final int K = 1000000;
    private static final String DIR;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/tools/other/ReplicationQuotasTestRig$Experiment.class */
    public static class Experiment {
        static final String TOPIC_NAME = "my-topic";
        String experimentName = "unset";
        Map<Integer, List<Double>> leaderRates = new HashMap();
        Map<Integer, List<Double>> followerRates = new HashMap();
        KafkaClusterTestKit cluster;
        Admin adminClient;

        Experiment() {
        }

        void startBrokers(int i) {
            System.out.println("Starting Brokers");
            try {
                this.cluster = new KafkaClusterTestKit.Builder(new TestKitNodes.Builder().setNumControllerNodes(1).setNumBrokerNodes(i).build()).build();
                this.cluster.format();
                this.cluster.startup();
                this.cluster.waitForReadyBrokers();
                this.adminClient = Admin.create(this.cluster.clientProperties());
            } catch (Exception e) {
                throw new RuntimeException("Failed to start test Kafka cluster", e);
            }
        }

        public void tearDown() {
            Utils.closeQuietly(this.adminClient, "adminClient");
            try {
                this.cluster.close();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public void run(final ExperimentDef experimentDef, Journal journal, boolean z) throws Exception {
            this.experimentName = experimentDef.name;
            final int round = Math.round(experimentDef.brokers / 2.0f);
            IntSupplier intSupplier = new IntSupplier() { // from class: org.apache.kafka.tools.other.ReplicationQuotasTestRig.Experiment.1
                int count = 0;

                @Override // java.util.function.IntSupplier
                public int getAsInt() {
                    this.count++;
                    return (this.count + round) % experimentDef.brokers;
                }
            };
            Map<Integer, List<Integer>> map = (Map) IntStream.rangeClosed(0, experimentDef.partitions - 1).boxed().collect(Collectors.toMap(Function.identity(), num -> {
                return Collections.singletonList(Integer.valueOf(intSupplier.getAsInt()));
            }));
            startBrokers(experimentDef.brokers);
            this.adminClient.createTopics(Collections.singleton(new NewTopic(TOPIC_NAME, map))).all().get();
            TestUtils.waitUntilTrue(() -> {
                return Boolean.valueOf(this.cluster.brokers().values().stream().allMatch(brokerServer -> {
                    TopicImage topic = brokerServer.metadataCache().currentImage().topics().getTopic(TOPIC_NAME);
                    return topic != null && topic.partitions().values().stream().allMatch((v0) -> {
                        return v0.hasLeader();
                    });
                }));
            }, () -> {
                return "Timed out waiting for topic listing";
            }, 15000L, 500L);
            System.out.println("Writing Data");
            KafkaProducer<byte[], byte[]> createProducer = createProducer();
            for (int i = 0; i < experimentDef.msgsPerPartition; i++) {
                try {
                    for (int i2 = 0; i2 < experimentDef.partitions; i2++) {
                        createProducer.send(new ProducerRecord(TOPIC_NAME, Integer.valueOf(i2), (Object) null, new byte[experimentDef.msgSize]));
                    }
                } catch (Throwable th) {
                    if (createProducer != null) {
                        try {
                            createProducer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            if (createProducer != null) {
                createProducer.close();
            }
            System.out.println("Generating Reassignment");
            Map<TopicPartition, List<Integer>> map2 = (Map) ReassignPartitionsCommand.generateAssignment(this.adminClient, json(TOPIC_NAME), (String) this.cluster.brokers().values().stream().map(brokerServer -> {
                return String.valueOf(brokerServer.replicaManager().localBrokerId());
            }).collect(Collectors.joining(",")), true).getKey();
            System.out.println("Starting Reassignment");
            long currentTimeMillis = System.currentTimeMillis();
            ReassignPartitionsCommand.executeAssignment(this.adminClient, false, ReassignPartitionsCommand.formatAsReassignmentJson(map2, Collections.emptyMap()), Long.valueOf(experimentDef.throttle), -1L, 10000L, Time.SYSTEM);
            waitForReassignmentToComplete();
            System.out.println("Reassignment took " + ((System.currentTimeMillis() - currentTimeMillis) / 1000) + "s");
            validateAllOffsetsMatch(experimentDef);
            journal.appendToJournal(experimentDef);
            renderChart(this.leaderRates, "Leader", journal, z);
            renderChart(this.followerRates, "Follower", journal, z);
            logOutput(experimentDef, map, map2);
            System.out.println("Output can be found here: " + journal.path());
        }

        void validateAllOffsetsMatch(ExperimentDef experimentDef) {
            for (KafkaBroker kafkaBroker : this.cluster.brokers().values()) {
                for (int i = 0; i < experimentDef.partitions; i++) {
                    long longValue = ((Long) kafkaBroker.logManager().getLog(new TopicPartition(TOPIC_NAME, i), false).map((v0) -> {
                        return v0.logEndOffset();
                    }).getOrElse(() -> {
                        return -1L;
                    })).longValue();
                    if (longValue >= 0 && longValue != experimentDef.msgsPerPartition) {
                        throw new RuntimeException("Run failed as offsets did not match for partition " + i + " on broker " + kafkaBroker.config().nodeId() + ". Expected " + experimentDef.msgsPerPartition + " but was " + longValue + ".");
                    }
                }
            }
        }

        void logOutput(ExperimentDef experimentDef, Map<Integer, List<Integer>> map, Map<TopicPartition, List<Integer>> map2) throws Exception {
            Map map3 = (Map) ((TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singleton(TOPIC_NAME)).allTopicNames().get()).get(TOPIC_NAME)).partitions().stream().collect(Collectors.toMap((v0) -> {
                return v0.partition();
            }, topicPartitionInfo -> {
                return (List) topicPartitionInfo.replicas().stream().map((v0) -> {
                    return v0.id();
                }).collect(Collectors.toList());
            }));
            System.out.println("The replicas are " + ((String) new TreeMap(map).entrySet().stream().map(entry -> {
                return "\n" + String.valueOf(entry);
            }).collect(Collectors.joining())));
            System.out.println("This is the current replica assignment:\n" + String.valueOf(map3));
            System.out.println("proposed assignment is: \n" + String.valueOf(map2));
            System.out.println("This is the assignment we ended up with " + String.valueOf(map3));
            System.out.println("numBrokers: " + experimentDef.brokers);
            System.out.println("numPartitions: " + experimentDef.partitions);
            System.out.println("throttle: " + experimentDef.throttle);
            System.out.println("numMessagesPerPartition: " + experimentDef.msgsPerPartition);
            System.out.println("msgSize: " + experimentDef.msgSize);
            System.out.println("We will write " + experimentDef.targetBytesPerBrokerMB + "MB of data per broker");
            System.out.println("Worst case duration is " + (((experimentDef.targetBytesPerBrokerMB * 1000) * 1000) / experimentDef.throttle));
        }

        void waitForReassignmentToComplete() {
            TestUtils.waitUntilTrue(() -> {
                printRateMetrics();
                try {
                    return Boolean.valueOf(((Map) this.adminClient.listPartitionReassignments().reassignments().get()).isEmpty());
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }, () -> {
                return "Partition reassignments didn't complete.";
            }, 3600000L, 1000L);
        }

        void renderChart(Map<Integer, List<Double>> map, String str, Journal journal, boolean z) throws Exception {
            JFreeChart createChart = createChart(str, addDataToChart(map));
            writeToFile(str, journal, createChart);
            maybeDisplayOnScreen(z, createChart);
            System.out.println("Chart generated for " + str);
        }

        void maybeDisplayOnScreen(boolean z, JFreeChart jFreeChart) {
            if (z) {
                ChartFrame chartFrame = new ChartFrame(this.experimentName, jFreeChart);
                chartFrame.pack();
                chartFrame.setVisible(true);
            }
        }

        void writeToFile(String str, Journal journal, JFreeChart jFreeChart) throws Exception {
            File file = new File(ReplicationQuotasTestRig.DIR, this.experimentName + "-" + str + ".png");
            ImageIO.write(jFreeChart.createBufferedImage(1000, 700), "png", file);
            journal.appendChart(file.getAbsolutePath(), str.equals("Leader"));
        }

        JFreeChart createChart(String str, XYSeriesCollection xYSeriesCollection) {
            return ChartFactory.createXYLineChart(this.experimentName + " - " + str + " Throttling Performance", "Time (s)", "Throttle Throughput (B/s)", xYSeriesCollection, PlotOrientation.VERTICAL, false, true, false);
        }

        XYSeriesCollection addDataToChart(Map<Integer, List<Double>> map) {
            XYSeriesCollection xYSeriesCollection = new XYSeriesCollection();
            map.forEach((num, list) -> {
                XYSeries xYSeries = new XYSeries("Broker:" + num);
                int i = 0;
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    xYSeries.add(i, ((Double) it.next()).doubleValue());
                    i++;
                }
                xYSeriesCollection.addSeries(xYSeries);
            });
            return xYSeriesCollection;
        }

        void record(Map<Integer, List<Double>> map, int i, Double d) {
            List<Double> orDefault = map.getOrDefault(Integer.valueOf(i), new ArrayList());
            orDefault.add(d);
            map.put(Integer.valueOf(i), orDefault);
        }

        void printRateMetrics() {
            for (BrokerServer brokerServer : this.cluster.brokers().values()) {
                double measuredRate = measuredRate(brokerServer, QuotaType.LEADER_REPLICATION);
                if (brokerServer.config().nodeId() == 0) {
                    ReplicationQuotasTestRig.LOGGER.info("waiting... Leader rate on 1 is {}", Double.valueOf(measuredRate));
                }
                record(this.leaderRates, brokerServer.config().nodeId(), Double.valueOf(measuredRate));
                if (measuredRate > 0.0d) {
                    ReplicationQuotasTestRig.LOGGER.trace("Leader Rate on {} is {}", Integer.valueOf(brokerServer.config().nodeId()), Double.valueOf(measuredRate));
                }
                double measuredRate2 = measuredRate(brokerServer, QuotaType.FOLLOWER_REPLICATION);
                record(this.followerRates, brokerServer.config().nodeId(), Double.valueOf(measuredRate2));
                if (measuredRate2 > 0.0d) {
                    ReplicationQuotasTestRig.LOGGER.trace("Follower Rate on {} is {}", Integer.valueOf(brokerServer.config().nodeId()), Double.valueOf(measuredRate2));
                }
            }
        }

        private double measuredRate(KafkaBroker kafkaBroker, QuotaType quotaType) {
            MetricName metricName = kafkaBroker.metrics().metricName("byte-rate", quotaType.toString());
            if (kafkaBroker.metrics().metrics().containsKey(metricName)) {
                return ((Double) ((KafkaMetric) kafkaBroker.metrics().metrics().get(metricName)).metricValue()).doubleValue();
            }
            return -1.0d;
        }

        String json(String... strArr) {
            return "{\"topics\": [" + ((String) Arrays.stream(strArr).map(str -> {
                return "{\"topic\": \"" + str + "\"}";
            }).collect(Collectors.joining(","))) + "],\"version\":1}";
        }

        KafkaProducer<byte[], byte[]> createProducer() {
            return TestUtils.createProducer(this.cluster.bootstrapServers(), 1, 60000L, 1048576L, Integer.MAX_VALUE, 30000, 0, 16384, "none", 20000, SecurityProtocol.PLAINTEXT, Option.empty(), Option.empty(), new ByteArraySerializer(), new ByteArraySerializer(), false);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/tools/other/ReplicationQuotasTestRig$ExperimentDef.class */
    public static class ExperimentDef {
        String name;
        int brokers;
        int partitions;
        long throttle;
        int msgsPerPartition;
        int msgSize;
        final long targetBytesPerBrokerMB;

        public ExperimentDef(String str, int i, int i2, long j, int i3, int i4) {
            this.name = str;
            this.brokers = i;
            this.partitions = i2;
            this.throttle = j;
            this.msgsPerPartition = i3;
            this.msgSize = i4;
            this.targetBytesPerBrokerMB = (((i3 * i4) * i2) / i) / 1000000;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/tools/other/ReplicationQuotasTestRig$Journal.class */
    public static class Journal {
        File log = new File(ReplicationQuotasTestRig.DIR, "Log.html");

        public Journal() {
            header();
        }

        void appendToJournal(ExperimentDef experimentDef) {
            DecimalFormat decimalFormat = new DecimalFormat("###,###.###");
            append("\n\n<h3>" + experimentDef.name + "</h3><p>- BrokerCount: " + experimentDef.brokers + "<p>- PartitionCount: " + experimentDef.partitions + "<p>- Throttle: " + decimalFormat.format(experimentDef.throttle) + " MB/s<p>- MsgCount: " + decimalFormat.format(experimentDef.msgsPerPartition) + " <p>- MsgSize: " + decimalFormat.format(experimentDef.msgSize) + "<p>- TargetBytesPerBrokerMB: " + experimentDef.targetBytesPerBrokerMB + "<p>");
        }

        void appendChart(String str, boolean z) {
            StringBuilder sb = new StringBuilder();
            if (z) {
                sb.append("<p><p>");
            }
            sb.append("<img src=\"" + str + "\" alt=\"Chart\" style=\"width:600px;height:400px;align=\"middle\"\">");
            if (!z) {
                sb.append("<p><p>");
            }
            append(sb.toString());
        }

        void header() {
            append("<html><head><h1>Replication Quotas Test Rig</h1></head><body>");
        }

        void footer() {
            append("</body></html>");
        }

        void append(String str) {
            try {
                PrintWriter printWriter = new PrintWriter(Files.newOutputStream(this.log.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND));
                printWriter.append((CharSequence) str);
                printWriter.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        String path() {
            return this.log.getAbsolutePath();
        }
    }

    public static void main(String[] strArr) {
        boolean z = strArr.length > 0 && Objects.equals(strArr[0], "show-gui");
        Journal journal = new Journal();
        Arrays.asList(new ExperimentDef("Experiment1", 5, 20, 1000000L, 500, 100000), new ExperimentDef("Experiment2", 5, 50, 10000000L, 1000, 100000), new ExperimentDef("Experiment3", 50, 50, 2000000L, 1000, 100000), new ExperimentDef("Experiment4", 25, 100, 4000000L, 1000, 100000), new ExperimentDef("Experiment5", 5, 50, 50000000L, 4000, 100000)).forEach(experimentDef -> {
            run(experimentDef, journal, z);
        });
        if (z) {
            return;
        }
        Exit.exit(0);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void run(ExperimentDef experimentDef, Journal journal, boolean z) {
        Experiment experiment = new Experiment();
        try {
            try {
                experiment.run(experimentDef, journal, z);
                journal.footer();
                experiment.tearDown();
            } catch (Exception e) {
                e.printStackTrace();
                experiment.tearDown();
            }
        } catch (Throwable th) {
            experiment.tearDown();
            throw th;
        }
    }

    static {
        Configurator.reconfigure();
        new File("Experiments").mkdir();
        DIR = "Experiments/Run" + Long.valueOf(System.currentTimeMillis()).toString().substring(8);
        new File(DIR).mkdir();
    }
}
