package org.apache.pulsar.testclient;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramLogWriter;
import org.HdrHistogram.Recorder;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionBuilder;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/testclient/PerformanceProducer.class */
public class PerformanceProducer {
    private static final ExecutorService executor = Executors.newCachedThreadPool(new DefaultThreadFactory("pulsar-perf-producer-exec"));
    private static final LongAdder messagesSent = new LongAdder();
    private static final LongAdder messagesFailed = new LongAdder();
    private static final LongAdder bytesSent = new LongAdder();
    private static final LongAdder totalNumTxnOpenTxnFail = new LongAdder();
    private static final LongAdder totalNumTxnOpenTxnSuccess = new LongAdder();
    private static final LongAdder totalMessagesSent = new LongAdder();
    private static final LongAdder totalBytesSent = new LongAdder();
    private static final Recorder recorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
    private static final Recorder cumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
    private static final LongAdder totalEndTxnOpSuccessNum = new LongAdder();
    private static final LongAdder totalEndTxnOpFailNum = new LongAdder();
    private static final LongAdder numTxnOpSuccess = new LongAdder();
    private static IMessageFormatter messageFormatter = null;
    static final DecimalFormat THROUGHPUTFORMAT = new PaddingDecimalFormat("0.0", 8);
    static final DecimalFormat DEC = new PaddingDecimalFormat("0.000", 7);
    static final DecimalFormat INTFORMAT = new PaddingDecimalFormat("0", 7);
    static final DecimalFormat TOTALFORMAT = new DecimalFormat("0.000");
    private static final Logger log = LoggerFactory.getLogger(PerformanceProducer.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    @Parameters(commandDescription = "Test pulsar producer performance.")
    /* loaded from: input_file:org/apache/pulsar/testclient/PerformanceProducer$Arguments.class */
    public static class Arguments {

        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
        boolean help;

        @Parameter(names = {"-cf", "--conf-file"}, description = "Configuration file")
        public String confFile;

        @Parameter(description = "persistent://prop/ns/my-topic", required = true)
        public List<String> topics;

        @Parameter(names = {"-u", "--service-url"}, description = "Pulsar Service URL")
        public String serviceURL;

        @Parameter(names = {"-au", "--admin-url"}, description = "Pulsar Admin URL")
        public String adminURL;

        @Parameter(names = {"--auth_plugin"}, description = "Authentication plugin class name", hidden = true)
        public String deprecatedAuthPluginClassName;

        @Parameter(names = {"--auth-plugin"}, description = "Authentication plugin class name")
        public String authPluginClassName;

        @Parameter(names = {"--auth-params"}, description = "Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class, for example \"key1:val1,key2:val2\" or \"{\"key1\":\"val1\",\"key2\":\"val2\"}.")
        public String authParams;

        @Parameter(names = {"-threads", "--num-test-threads"}, description = "Number of test threads", validateWith = {PositiveNumberParameterValidator.class})
        public int numTestThreads = 1;

        @Parameter(names = {"-r", "--rate"}, description = "Publish rate msg/s across topics")
        public int msgRate = 100;

        @Parameter(names = {"-s", "--size"}, description = "Message size (bytes)")
        public int msgSize = 1024;

        @Parameter(names = {"-t", "--num-topic"}, description = "Number of topics", validateWith = {PositiveNumberParameterValidator.class})
        public int numTopics = 1;

        @Parameter(names = {"-n", "--num-producers"}, description = "Number of producers (per topic)", validateWith = {PositiveNumberParameterValidator.class})
        public int numProducers = 1;

        @Parameter(names = {"--separator"}, description = "Separator between the topic and topic number")
        public String separator = "-";

        @Parameter(names = {"--send-timeout"}, description = "Set the sendTimeout value default 0 to keep compatibility with previous version of pulsar-perf")
        public int sendTimeout = 0;

        @Parameter(names = {"-pn", "--producer-name"}, description = "Producer Name")
        public String producerName = null;

        @Parameter(names = {"--listener-name"}, description = "Listener name for the broker.")
        String listenerName = null;

        @Parameter(names = {"-ch", "--chunking"}, description = "Should split the message and publish in chunks if message size is larger than allowed max size")
        private boolean chunkingAllowed = false;

        @Parameter(names = {"-o", "--max-outstanding"}, description = "Max number of outstanding messages")
        public int maxOutstanding = 0;

        @Parameter(names = {"-p", "--max-outstanding-across-partitions"}, description = "Max number of outstanding messages across partitions")
        public int maxPendingMessagesAcrossPartitions = 0;

        @Parameter(names = {"-np", "--partitions"}, description = "Create partitioned topics with the given number of partitions, set 0 to not try to create the topic")
        public Integer partitions = null;

        @Parameter(names = {"-c", "--max-connections"}, description = "Max number of TCP connections to a single broker")
        public int maxConnections = 100;

        @Parameter(names = {"-m", "--num-messages"}, description = "Number of messages to publish in total. If <= 0, it will keep publishing")
        public long numMessages = 0;

        @Parameter(names = {"-i", "--stats-interval-seconds"}, description = "Statistics Interval Seconds. If 0, statistics will be disabled")
        public long statsIntervalSeconds = 0;

        @Parameter(names = {"-z", "--compression"}, description = "Compress messages payload")
        public CompressionType compression = CompressionType.NONE;

        @Parameter(names = {"-f", "--payload-file"}, description = "Use payload from an UTF-8 encoded text file and a payload will be randomly selected when publishing messages")
        public String payloadFilename = null;

        @Parameter(names = {"-e", "--payload-delimiter"}, description = "The delimiter used to split lines when using payload from a file")
        public String payloadDelimiter = "\\n";

        @Parameter(names = {"-b", "--batch-time-window"}, description = "Batch messages in 'x' ms window (Default: 1ms)")
        public double batchTimeMillis = 1.0d;

        @Parameter(names = {"-bm", "--batch-max-messages"}, description = "Maximum number of messages per batch")
        public int batchMaxMessages = 1000;

        @Parameter(names = {"-bb", "--batch-max-bytes"}, description = "Maximum number of bytes per batch")
        public int batchMaxBytes = 4194304;

        @Parameter(names = {"-time", "--test-duration"}, description = "Test duration in secs. If <= 0, it will keep publishing")
        public long testTime = 0;

        @Parameter(names = {"--warmup-time"}, description = "Warm-up time in seconds (Default: 1 sec)")
        public double warmupTimeSeconds = 1.0d;

        @Parameter(names = {"--trust-cert-file"}, description = "Path for the trusted TLS certificate file")
        public String tlsTrustCertsFilePath = "";

        @Parameter(names = {"--tls-allow-insecure"}, description = "Allow insecure TLS connection")
        public Boolean tlsAllowInsecureConnection = null;

        @Parameter(names = {"-k", "--encryption-key-name"}, description = "The public key name to encrypt payload")
        public String encKeyName = null;

        @Parameter(names = {"-v", "--encryption-key-value-file"}, description = "The file which contains the public key to encrypt payload")
        public String encKeyFile = null;

        @Parameter(names = {"-d", "--delay"}, description = "Mark messages with a given delay in seconds")
        public long delay = 0;

        @Parameter(names = {"-ef", "--exit-on-failure"}, description = "Exit from the process on publish failure (default: disable)")
        public boolean exitOnFailure = false;

        @Parameter(names = {"-mk", "--message-key-generation-mode"}, description = "The generation mode of message key, valid options are: [autoIncrement, random]")
        public String messageKeyGenerationMode = null;

        @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be used for handling connections to brokers. The default value is 1.")
        public int ioThreads = 1;

        @Parameter(names = {"-bw", "--busy-wait"}, description = "Enable Busy-Wait on the Pulsar client")
        public boolean enableBusyWait = false;

        @Parameter(names = {"-am", "--access-mode"}, description = "Producer access mode")
        public ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared;

        @Parameter(names = {"-fp", "--format-payload"}, description = "Format %i as a message index in the stream from producer and/or %t as the timestamp nanoseconds.")
        public boolean formatPayload = false;

        @Parameter(names = {"-fc", "--format-class"}, description = "Custom Formatter class name")
        public String formatterClass = "org.apache.pulsar.testclient.DefaultMessageFormatter";

        @Parameter(names = {"-tto", "--txn-timeout"}, description = "Set the time value of transaction timeout, and the time unit is second. (After --txn-enable setting to true, --txn-timeout takes effect)")
        public long transactionTimeout = 10;

        @Parameter(names = {"-nmt", "--numMessage-perTransaction"}, description = "The number of messages sent by a transaction. (After --txn-enable setting to true, -nmt takes effect)")
        public int numMessagesPerTransaction = 50;

        @Parameter(names = {"-txn", "--txn-enable"}, description = "Enable or disable the transaction")
        public boolean isEnableTransaction = false;

        @Parameter(names = {"-abort"}, description = "Abort the transaction. (After --txn-enable setting to true, -abort takes effect)")
        public boolean isAbortTransaction = false;

        @Parameter(names = {"--histogram-file"}, description = "HdrHistogram output file")
        public String histogramFile = null;

        Arguments() {
        }
    }

    /* loaded from: input_file:org/apache/pulsar/testclient/PerformanceProducer$MessageKeyGenerationMode.class */
    public enum MessageKeyGenerationMode {
        autoIncrement,
        random
    }

    public static void main(String[] strArr) throws Exception {
        Arguments arguments = new Arguments();
        JCommander jCommander = new JCommander(arguments);
        jCommander.setProgramName("pulsar-perf produce");
        try {
            jCommander.parse(strArr);
        } catch (ParameterException e) {
            System.out.println(e.getMessage());
            jCommander.usage();
            PerfClientUtils.exit(-1);
        }
        if (arguments.help) {
            jCommander.usage();
            PerfClientUtils.exit(-1);
        }
        if (StringUtils.isBlank(arguments.authPluginClassName) && !StringUtils.isBlank(arguments.deprecatedAuthPluginClassName)) {
            arguments.authPluginClassName = arguments.deprecatedAuthPluginClassName;
        }
        if (arguments.topics != null && arguments.topics.size() != arguments.numTopics) {
            if (arguments.topics.size() == 1) {
                String str = arguments.topics.get(0);
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < arguments.numTopics; i++) {
                    arrayList.add(String.format("%s%s%d", str, arguments.separator, Integer.valueOf(i)));
                }
                arguments.topics = arrayList;
            } else {
                System.out.println("The size of topics list should be equal to --num-topic");
                jCommander.usage();
                PerfClientUtils.exit(-1);
            }
        }
        if (arguments.confFile != null) {
            Properties properties = new Properties(System.getProperties());
            properties.load(new FileInputStream(arguments.confFile));
            if (arguments.serviceURL == null) {
                arguments.serviceURL = properties.getProperty("brokerServiceUrl");
            }
            if (arguments.serviceURL == null) {
                arguments.serviceURL = properties.getProperty("webServiceUrl");
            }
            if (arguments.serviceURL == null) {
                arguments.serviceURL = properties.getProperty("serviceUrl", "http://localhost:8080/");
            }
            if (arguments.adminURL == null) {
                arguments.adminURL = properties.getProperty("webServiceUrl");
            }
            if (arguments.adminURL == null) {
                arguments.adminURL = properties.getProperty("adminURL", "http://localhost:8080/");
            }
            if (arguments.authPluginClassName == null) {
                arguments.authPluginClassName = properties.getProperty("authPlugin", null);
            }
            if (arguments.authParams == null) {
                arguments.authParams = properties.getProperty("authParams", null);
            }
            if (StringUtils.isBlank(arguments.tlsTrustCertsFilePath)) {
                arguments.tlsTrustCertsFilePath = properties.getProperty("tlsTrustCertsFilePath", "");
            }
            if (StringUtils.isBlank(arguments.messageKeyGenerationMode)) {
                arguments.messageKeyGenerationMode = properties.getProperty("messageKeyGenerationMode", null);
            }
            if (arguments.tlsAllowInsecureConnection == null) {
                arguments.tlsAllowInsecureConnection = Boolean.valueOf(Boolean.parseBoolean(properties.getProperty("tlsAllowInsecureConnection", "")));
            }
        }
        PerfClientUtils.printJVMInformation(log);
        log.info("Starting Pulsar perf producer with config: {}", new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(arguments));
        byte[] bArr = new byte[arguments.msgSize];
        Random random = new Random(0L);
        ArrayList arrayList2 = new ArrayList();
        if (arguments.payloadFilename != null) {
            Path path = Paths.get(arguments.payloadFilename, new String[0]);
            if (Files.notExists(path, new LinkOption[0]) || Files.size(path) == 0) {
                throw new IllegalArgumentException("Payload file doesn't exist or it is empty.");
            }
            String[] split = new String(Files.readAllBytes(path), StandardCharsets.UTF_8).split(arguments.payloadDelimiter.equals("\\n") ? "\n" : arguments.payloadDelimiter);
            log.info("Reading payloads from {} and {} records read", path.toAbsolutePath(), Integer.valueOf(split.length));
            for (String str2 : split) {
                arrayList2.add(str2.getBytes(StandardCharsets.UTF_8));
            }
            if (arguments.formatPayload) {
                messageFormatter = getMessageFormatter(arguments.formatterClass);
            }
        } else {
            for (int i2 = 0; i2 < bArr.length; i2++) {
                bArr[i2] = (byte) (random.nextInt(26) + 65);
            }
        }
        long nanoTime = System.nanoTime();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            printAggregatedThroughput(nanoTime, arguments);
            printAggregatedStats();
        }));
        if (arguments.partitions != null) {
            PulsarAdminBuilder tlsTrustCertsFilePath = PulsarAdmin.builder().serviceHttpUrl(arguments.adminURL).tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
            if (StringUtils.isNotBlank(arguments.authPluginClassName)) {
                tlsTrustCertsFilePath.authentication(arguments.authPluginClassName, arguments.authParams);
            }
            if (arguments.tlsAllowInsecureConnection != null) {
                tlsTrustCertsFilePath.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection.booleanValue());
            }
            PulsarAdmin build = tlsTrustCertsFilePath.build();
            Throwable th = null;
            try {
                try {
                    for (String str3 : arguments.topics) {
                        log.info("Creating partitioned topic {} with {} partitions", str3, arguments.partitions);
                        try {
                            build.topics().createPartitionedTopic(str3, arguments.partitions.intValue());
                        } catch (PulsarAdminException.ConflictException e2) {
                            if (log.isDebugEnabled()) {
                                log.debug("Topic {} already exists: {}", str3, e2);
                            }
                            PartitionedTopicMetadata partitionedTopicMetadata = build.topics().getPartitionedTopicMetadata(str3);
                            if (partitionedTopicMetadata.partitions != arguments.partitions.intValue()) {
                                log.error("Topic {} already exists but it has a wrong number of partitions: {}, expecting {}", new Object[]{str3, Integer.valueOf(partitionedTopicMetadata.partitions), arguments.partitions});
                                PerfClientUtils.exit(-1);
                            }
                        }
                    }
                    if (build != null) {
                        if (0 != 0) {
                            try {
                                build.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            build.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (build != null) {
                    if (th != null) {
                        try {
                            build.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        build.close();
                    }
                }
                throw th4;
            }
        }
        CountDownLatch countDownLatch = new CountDownLatch(arguments.numTestThreads);
        long j = arguments.numMessages / arguments.numTestThreads;
        int i3 = arguments.msgRate / arguments.numTestThreads;
        for (int i4 = 0; i4 < arguments.numTestThreads; i4++) {
            int i5 = i4;
            executor.submit(() -> {
                log.info("Started performance test thread {}", Integer.valueOf(i5));
                runProducer(i5, arguments, j, i3, arrayList2, bArr, random, countDownLatch);
            });
        }
        long nanoTime2 = System.nanoTime();
        Histogram histogram = null;
        HistogramLogWriter histogramLogWriter = null;
        if (arguments.histogramFile != null) {
            String str4 = arguments.histogramFile;
            log.info("Dumping latency stats to {}", str4);
            histogramLogWriter = new HistogramLogWriter(new PrintStream((OutputStream) new FileOutputStream(str4), false));
            histogramLogWriter.outputLogFormatVersion();
            histogramLogWriter.outputLegend();
        }
        while (true) {
            try {
                Thread.sleep(10000L);
                if (countDownLatch.getCount() <= 0) {
                    return;
                }
                long nanoTime3 = System.nanoTime();
                double d = (nanoTime3 - nanoTime2) / 1.0E9d;
                long sum = totalMessagesSent.sum();
                double sumThenReset = messagesSent.sumThenReset() / d;
                double sumThenReset2 = messagesFailed.sumThenReset() / d;
                double sumThenReset3 = (((bytesSent.sumThenReset() / d) / 1024.0d) / 1024.0d) * 8.0d;
                histogram = recorder.getIntervalHistogram(histogram);
                if (arguments.isEnableTransaction) {
                    log.info("--- Transaction : {} transaction end successfully --- {} transaction end failed --- {} Txn/s", new Object[]{Long.valueOf(totalEndTxnOpSuccessNum.sum()), Long.valueOf(totalEndTxnOpFailNum.sum()), TOTALFORMAT.format(numTxnOpSuccess.sumThenReset() / d)});
                }
                log.info("Throughput produced: {} msg --- {} msg/s --- {} Mbit/s  --- failure {} msg/s --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}", new Object[]{INTFORMAT.format(sum), THROUGHPUTFORMAT.format(sumThenReset), THROUGHPUTFORMAT.format(sumThenReset3), THROUGHPUTFORMAT.format(sumThenReset2), DEC.format(histogram.getMean() / 1000.0d), DEC.format(histogram.getValueAtPercentile(50.0d) / 1000.0d), DEC.format(histogram.getValueAtPercentile(95.0d) / 1000.0d), DEC.format(histogram.getValueAtPercentile(99.0d) / 1000.0d), DEC.format(histogram.getValueAtPercentile(99.9d) / 1000.0d), DEC.format(histogram.getValueAtPercentile(99.99d) / 1000.0d), DEC.format(histogram.getMaxValue() / 1000.0d)});
                if (histogramLogWriter != null) {
                    histogramLogWriter.outputIntervalHistogram(histogram);
                }
                histogram.reset();
                nanoTime2 = nanoTime3;
            } catch (InterruptedException e3) {
                return;
            }
        }
    }

    static IMessageFormatter getMessageFormatter(String str) {
        try {
            return (IMessageFormatter) PerformanceProducer.class.getClassLoader().loadClass(str).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        } catch (Exception e) {
            return null;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r3v1 */
    /* JADX WARN: Type inference failed for: r3v12 */
    /* JADX WARN: Type inference failed for: r3v15 */
    /* JADX WARN: Type inference failed for: r3v9, types: [java.lang.Object] */
    private static void runProducer(int i, Arguments arguments, long j, int i2, List<byte[]> list, byte[] bArr, Random random, CountDownLatch countDownLatch) {
        AtomicReference atomicReference;
        TypedMessageBuilder value;
        long j2;
        PulsarClient pulsarClient = null;
        try {
            try {
                ArrayList arrayList = new ArrayList();
                ClientBuilder tlsTrustCertsFilePath = PulsarClient.builder().enableTransaction(arguments.isEnableTransaction).serviceUrl(arguments.serviceURL).connectionsPerBroker(arguments.maxConnections).ioThreads(arguments.ioThreads).statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS).enableBusyWait(arguments.enableBusyWait).tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
                if (StringUtils.isNotBlank(arguments.authPluginClassName)) {
                    tlsTrustCertsFilePath.authentication(arguments.authPluginClassName, arguments.authParams);
                }
                if (arguments.tlsAllowInsecureConnection != null) {
                    tlsTrustCertsFilePath.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection.booleanValue());
                }
                if (StringUtils.isNotBlank(arguments.listenerName)) {
                    tlsTrustCertsFilePath.listenerName(arguments.listenerName);
                }
                PulsarClient build = tlsTrustCertsFilePath.build();
                ProducerBuilder messageRoutingMode = build.newProducer().sendTimeout(arguments.sendTimeout, TimeUnit.SECONDS).compressionType(arguments.compression).maxPendingMessages(arguments.maxOutstanding).accessMode(arguments.producerAccessMode).messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
                if (arguments.maxPendingMessagesAcrossPartitions > 0) {
                    messageRoutingMode.maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions);
                }
                if (arguments.isEnableTransaction) {
                    messageRoutingMode.sendTimeout(0, TimeUnit.SECONDS);
                    TransactionBuilder newTransaction = build.newTransaction();
                    j2 = arguments.transactionTimeout;
                    atomicReference = new AtomicReference(newTransaction.withTransactionTimeout(j2, TimeUnit.SECONDS).build().get());
                } else {
                    atomicReference = new AtomicReference(null);
                }
                ?? r3 = j2;
                if (arguments.producerName != null) {
                    r3 = 2;
                    messageRoutingMode.producerName(String.format("%s%s%d", arguments.producerName, arguments.separator, Integer.valueOf(i)));
                }
                if (arguments.batchTimeMillis > 0.0d || arguments.batchMaxMessages > 0) {
                    messageRoutingMode.batchingMaxPublishDelay((long) (arguments.batchTimeMillis * 1000.0d), TimeUnit.MICROSECONDS).enableBatching(true);
                } else {
                    messageRoutingMode.enableBatching(false);
                }
                if (arguments.batchMaxMessages > 0) {
                    messageRoutingMode.batchingMaxMessages(arguments.batchMaxMessages);
                }
                if (arguments.batchMaxBytes > 0) {
                    messageRoutingMode.batchingMaxBytes(arguments.batchMaxBytes);
                }
                messageRoutingMode.blockIfQueueFull(true);
                if (StringUtils.isNotBlank(arguments.encKeyName) && StringUtils.isNotBlank(arguments.encKeyFile)) {
                    messageRoutingMode.addEncryptionKey(arguments.encKeyName);
                    messageRoutingMode.defaultCryptoKeyReader(arguments.encKeyFile);
                }
                int i3 = 0;
                long j3 = r3;
                while (i3 < arguments.numTopics) {
                    String str = arguments.topics.get(i3);
                    ?? r32 = str;
                    log.info("Adding {} publishers on topic {}", Integer.valueOf(arguments.numProducers), (Object) r32);
                    for (int i4 = 0; i4 < arguments.numProducers; i4++) {
                        ProducerBuilder producerBuilder = messageRoutingMode.clone().topic(str);
                        if (arguments.chunkingAllowed) {
                            producerBuilder.enableChunking(true);
                            producerBuilder.enableBatching(false);
                        }
                        arrayList.add(producerBuilder.createAsync());
                    }
                    i3++;
                    j3 = r32;
                }
                ArrayList<Producer> arrayList2 = new ArrayList(arrayList.size());
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    arrayList2.add(((Future) it.next()).get());
                }
                Collections.shuffle(arrayList2);
                log.info("Created {} producers", Integer.valueOf(arrayList2.size()));
                RateLimiter create = RateLimiter.create(i2);
                long nanoTime = System.nanoTime();
                long j4 = nanoTime + ((long) (arguments.warmupTimeSeconds * 1.0E9d));
                long j5 = nanoTime + ((long) (arguments.testTime * 1.0E9d));
                MessageKeyGenerationMode messageKeyGenerationMode = null;
                if (StringUtils.isNotBlank(arguments.messageKeyGenerationMode)) {
                    try {
                        messageKeyGenerationMode = MessageKeyGenerationMode.valueOf(arguments.messageKeyGenerationMode);
                    } catch (IllegalArgumentException e) {
                        throw new IllegalArgumentException("messageKeyGenerationMode only support [autoIncrement, random]");
                    }
                }
                long j6 = 0;
                AtomicLong atomicLong = new AtomicLong(0L);
                Semaphore semaphore = new Semaphore(arguments.numMessagesPerTransaction);
                long j7 = j3;
                while (true) {
                    long j8 = j7;
                    for (Producer producer : arrayList2) {
                        if (arguments.testTime > 0 && System.nanoTime() > j5) {
                            log.info("------------- DONE (reached the maximum duration: [{} seconds] of production) --------------", Long.valueOf(arguments.testTime));
                            countDownLatch.countDown();
                            Thread.sleep(5000L);
                            PerfClientUtils.exit(0);
                        }
                        if (j > 0) {
                            long j9 = j6;
                            j6 = j8 + 1;
                            if (j9 >= j) {
                                log.info("------------- DONE (reached the maximum number: {} of production) --------------", Long.valueOf(j));
                                countDownLatch.countDown();
                                Thread.sleep(5000L);
                                PerfClientUtils.exit(0);
                            }
                        }
                        create.acquire();
                        Transaction transaction = (Transaction) atomicReference.get();
                        long nanoTime2 = System.nanoTime();
                        byte[] formatMessage = arguments.payloadFilename != null ? messageFormatter != null ? messageFormatter.formatMessage(arguments.producerName, j6, list.get(random.nextInt(list.size()))) : list.get(random.nextInt(list.size())) : bArr;
                        if (arguments.isEnableTransaction) {
                            if (arguments.numMessagesPerTransaction > 0) {
                                try {
                                    semaphore.acquire();
                                } catch (InterruptedException e2) {
                                    log.error("Get exception: ", e2);
                                }
                            }
                            value = producer.newMessage(transaction).value(formatMessage);
                        } else {
                            value = producer.newMessage().value(formatMessage);
                        }
                        if (arguments.delay > 0) {
                            value.deliverAfter(arguments.delay, TimeUnit.SECONDS);
                        }
                        if (messageKeyGenerationMode == MessageKeyGenerationMode.random) {
                            value.key(String.valueOf(random.nextInt()));
                        } else if (messageKeyGenerationMode == MessageKeyGenerationMode.autoIncrement) {
                            value.key(String.valueOf(j6));
                        }
                        byte[] bArr2 = formatMessage;
                        long j10 = nanoTime2;
                        value.sendAsync().thenRun(() -> {
                            bytesSent.add(bArr2.length);
                            messagesSent.increment();
                            totalMessagesSent.increment();
                            totalBytesSent.add(bArr2.length);
                            long nanoTime3 = System.nanoTime();
                            if (nanoTime3 > j4) {
                                long micros = TimeUnit.NANOSECONDS.toMicros(nanoTime3 - j10);
                                recorder.recordValue(micros);
                                cumulativeRecorder.recordValue(micros);
                            }
                        }).exceptionally(th -> {
                            if (th.getCause() instanceof ArrayIndexOutOfBoundsException) {
                                return null;
                            }
                            log.warn("Write message error with exception", th);
                            messagesFailed.increment();
                            if (!arguments.exitOnFailure) {
                                return null;
                            }
                            PerfClientUtils.exit(-1);
                            return null;
                        });
                        if (arguments.isEnableTransaction && atomicLong.incrementAndGet() == arguments.numMessagesPerTransaction) {
                            if (arguments.isAbortTransaction) {
                                transaction.abort().thenRun(() -> {
                                    if (log.isDebugEnabled()) {
                                        log.debug("Abort transaction {}", transaction.getTxnID().toString());
                                    }
                                    totalEndTxnOpSuccessNum.increment();
                                    numTxnOpSuccess.increment();
                                }).exceptionally(th2 -> {
                                    log.error("Abort transaction {} failed with exception", transaction.getTxnID().toString(), th2);
                                    totalEndTxnOpFailNum.increment();
                                    return null;
                                });
                            } else {
                                transaction.commit().thenRun(() -> {
                                    if (log.isDebugEnabled()) {
                                        log.debug("Committed transaction {}", transaction.getTxnID().toString());
                                    }
                                    totalEndTxnOpSuccessNum.increment();
                                    numTxnOpSuccess.increment();
                                }).exceptionally(th3 -> {
                                    log.error("Commit transaction failed with exception : ", th3);
                                    totalEndTxnOpFailNum.increment();
                                    return null;
                                });
                            }
                            while (true) {
                                try {
                                    atomicReference.compareAndSet(transaction, (Transaction) build.newTransaction().withTransactionTimeout(arguments.transactionTimeout, TimeUnit.SECONDS).build().get());
                                    atomicLong.set(0L);
                                    semaphore.release(arguments.numMessagesPerTransaction);
                                    totalNumTxnOpenTxnSuccess.increment();
                                    break;
                                } catch (Exception e3) {
                                    totalNumTxnOpenTxnFail.increment();
                                    log.error("Failed to new transaction with exception: ", e3);
                                }
                            }
                        }
                        j8 = j10;
                    }
                    j7 = j8;
                }
            } catch (Throwable th4) {
                if (0 != 0) {
                    try {
                        pulsarClient.close();
                        PerfClientUtils.exit(-1);
                    } catch (PulsarClientException e4) {
                        log.error("Failed to close test client", e4);
                    }
                }
                throw th4;
            }
        } catch (Throwable th5) {
            log.error("Got error", th5);
            if (0 != 0) {
                try {
                    pulsarClient.close();
                    PerfClientUtils.exit(-1);
                } catch (PulsarClientException e5) {
                    log.error("Failed to close test client", e5);
                }
            }
        }
    }

    private static void printAggregatedThroughput(long j, Arguments arguments) {
        double nanoTime = (System.nanoTime() - j) / 1.0E9d;
        double sum = totalMessagesSent.sum() / nanoTime;
        double sum2 = (((totalBytesSent.sum() / nanoTime) / 1024.0d) / 1024.0d) * 8.0d;
        if (arguments.isEnableTransaction) {
            double d = nanoTime / (r0 + r0);
            log.info("--- Transaction : {} transaction end successfully --- {} transaction end failed --- {} transaction open successfully --- {} transaction open failed --- {} Txn/s", new Object[]{Long.valueOf(totalEndTxnOpSuccessNum.sum()), Long.valueOf(totalEndTxnOpFailNum.sum()), Long.valueOf(totalNumTxnOpenTxnSuccess.sum()), Long.valueOf(totalNumTxnOpenTxnFail.sum()), TOTALFORMAT.format(d)});
        }
        log.info("Aggregated throughput stats --- {} records sent --- {} msg/s --- {} Mbit/s ", new Object[]{Long.valueOf(totalMessagesSent.sum()), TOTALFORMAT.format(sum), TOTALFORMAT.format(sum2)});
    }

    private static void printAggregatedStats() {
        log.info("Aggregated latency stats --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}", new Object[]{DEC.format(cumulativeRecorder.getIntervalHistogram().getMean() / 1000.0d), DEC.format(r0.getValueAtPercentile(50.0d) / 1000.0d), DEC.format(r0.getValueAtPercentile(95.0d) / 1000.0d), DEC.format(r0.getValueAtPercentile(99.0d) / 1000.0d), DEC.format(r0.getValueAtPercentile(99.9d) / 1000.0d), DEC.format(r0.getValueAtPercentile(99.99d) / 1000.0d), DEC.format(r0.getValueAtPercentile(99.999d) / 1000.0d), DEC.format(r0.getMaxValue() / 1000.0d)});
    }
}
