package org.apache.pulsar.testclient;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.lang.invoke.SerializedLambda;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramLogWriter;
import org.HdrHistogram.Recorder;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/testclient/PerformanceConsumer.class */
public class PerformanceConsumer {
    private static final LongAdder messagesReceived = new LongAdder();
    private static final LongAdder bytesReceived = new LongAdder();
    private static final DecimalFormat intFormat = new PaddingDecimalFormat("0", 7);
    private static final DecimalFormat dec = new DecimalFormat("0.000");
    private static final LongAdder totalMessagesReceived = new LongAdder();
    private static final LongAdder totalBytesReceived = new LongAdder();
    private static final LongAdder totalNumTxnOpenFail = new LongAdder();
    private static final LongAdder totalNumTxnOpenSuccess = new LongAdder();
    private static final LongAdder totalMessageAck = new LongAdder();
    private static final LongAdder totalMessageAckFailed = new LongAdder();
    private static final LongAdder messageAck = new LongAdder();
    private static final LongAdder totalEndTxnOpFailNum = new LongAdder();
    private static final LongAdder totalEndTxnOpSuccessNum = new LongAdder();
    private static final LongAdder numTxnOpSuccess = new LongAdder();
    private static final Recorder recorder = new Recorder(TimeUnit.DAYS.toMillis(10), 5);
    private static final Recorder cumulativeRecorder = new Recorder(TimeUnit.DAYS.toMillis(10), 5);
    private static final Logger log = LoggerFactory.getLogger(PerformanceConsumer.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    @Parameters(commandDescription = "Test pulsar consumer performance.")
    /* loaded from: input_file:org/apache/pulsar/testclient/PerformanceConsumer$Arguments.class */
    public static class Arguments {

        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
        boolean help;

        @Parameter(names = {"--conf-file"}, description = "Configuration file")
        public String confFile;

        @Parameter(description = "persistent://prop/ns/my-topic", required = true)
        public List<String> topic;

        @Parameter(names = {"-s", "--subscriber-name"}, description = "Subscriber name prefix", hidden = true)
        public String subscriberName;

        @Parameter(names = {"-u", "--service-url"}, description = "Pulsar Service URL")
        public String serviceURL;

        @Parameter(names = {"--auth_plugin"}, description = "Authentication plugin class name", hidden = true)
        public String deprecatedAuthPluginClassName;

        @Parameter(names = {"--auth-plugin"}, description = "Authentication plugin class name")
        public String authPluginClassName;

        @Parameter(names = {"--auth-params"}, description = "Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class, for example \"key1:val1,key2:val2\" or \"{\"key1\":\"val1\",\"key2\":\"val2\"}.")
        public String authParams;

        @Parameter(names = {"-t", "--num-topics"}, description = "Number of topics", validateWith = {PositiveNumberParameterValidator.class})
        public int numTopics = 1;

        @Parameter(names = {"-n", "--num-consumers"}, description = "Number of consumers (per subscription), only one consumer is allowed when subscriptionType is Exclusive", validateWith = {PositiveNumberParameterValidator.class})
        public int numConsumers = 1;

        @Parameter(names = {"-ns", "--num-subscriptions"}, description = "Number of subscriptions (per topic)", validateWith = {PositiveNumberParameterValidator.class})
        public int numSubscriptions = 1;

        @Parameter(names = {"-ss", "--subscriptions"}, description = "A list of subscriptions to consume (for example, sub1,sub2)")
        public List<String> subscriptions = Collections.singletonList("sub");

        @Parameter(names = {"-st", "--subscription-type"}, description = "Subscription type")
        public SubscriptionType subscriptionType = SubscriptionType.Exclusive;

        @Parameter(names = {"-sp", "--subscription-position"}, description = "Subscription position")
        private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;

        @Parameter(names = {"-r", "--rate"}, description = "Simulate a slow message consumer (rate in msg/s)")
        public double rate = 0.0d;

        @Parameter(names = {"-q", "--receiver-queue-size"}, description = "Size of the receiver queue")
        public int receiverQueueSize = 1000;

        @Parameter(names = {"-p", "--receiver-queue-size-across-partitions"}, description = "Max total size of the receiver queue across partitions")
        public int maxTotalReceiverQueueSizeAcrossPartitions = 50000;

        @Parameter(names = {"--replicated"}, description = "Whether the subscription status should be replicated")
        public boolean replicatedSubscription = false;

        @Parameter(names = {"--acks-delay-millis"}, description = "Acknowledgements grouping delay in millis")
        public int acknowledgmentsGroupingDelayMillis = 100;

        @Parameter(names = {"-m", "--num-messages"}, description = "Number of messages to consume in total. If <= 0, it will keep consuming")
        public long numMessages = 0;

        @Parameter(names = {"-c", "--max-connections"}, description = "Max number of TCP connections to a single broker")
        public int maxConnections = 100;

        @Parameter(names = {"-i", "--stats-interval-seconds"}, description = "Statistics Interval Seconds. If 0, statistics will be disabled")
        public long statsIntervalSeconds = 0;

        @Parameter(names = {"--listener-name"}, description = "Listener name for the broker.")
        String listenerName = null;

        @Parameter(names = {"-mc", "--max_chunked_msg"}, description = "Max pending chunk messages")
        private int maxPendingChunkedMessage = 0;

        @Parameter(names = {"-ac", "--auto_ack_chunk_q_full"}, description = "Auto ack for oldest message on queue is full")
        private boolean autoAckOldestChunkedMessageOnQueueFull = false;

        @Parameter(names = {"-e", "--expire_time_incomplete_chunked_messages"}, description = "Expire time in ms for incomplete chunk messages")
        private long expireTimeOfIncompleteChunkedMessageMs = 0;

        @Parameter(names = {"--trust-cert-file"}, description = "Path for the trusted TLS certificate file")
        public String tlsTrustCertsFilePath = "";

        @Parameter(names = {"--tls-allow-insecure"}, description = "Allow insecure TLS connection")
        public Boolean tlsAllowInsecureConnection = null;

        @Parameter(names = {"-v", "--encryption-key-value-file"}, description = "The file which contains the private key to decrypt payload")
        public String encKeyFile = null;

        @Parameter(names = {"-time", "--test-duration"}, description = "Test duration in secs. If <= 0, it will keep consuming")
        public long testTime = 0;

        @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be used for handling connections to brokers. The default value is 1.")
        public int ioThreads = 1;

        @Parameter(names = {"-lt", "--num-listener-threads"}, description = "Set the number of threads to be used for message listeners")
        public int listenerThreads = 1;

        @Parameter(names = {"--batch-index-ack"}, description = "Enable or disable the batch index acknowledgment")
        public boolean batchIndexAck = false;

        @Parameter(names = {"-pm", "--pool-messages"}, description = "Use the pooled message", arity = LoadSimulationClient.STOP_COMMAND)
        private boolean poolMessages = true;

        @Parameter(names = {"-bw", "--busy-wait"}, description = "Enable Busy-Wait on the Pulsar client")
        public boolean enableBusyWait = false;

        @Parameter(names = {"-tto", "--txn-timeout"}, description = "Set the time value of transaction timeout, and the time unit is second. (After --txn-enable setting to true, --txn-timeout takes effect)")
        public long transactionTimeout = 10;

        @Parameter(names = {"-nmt", "--numMessage-perTransaction"}, description = "The number of messages acknowledged by a transaction. (After --txn-enable setting to true, -numMessage-perTransaction takes effect")
        public int numMessagesPerTransaction = 50;

        @Parameter(names = {"-txn", "--txn-enable"}, description = "Enable or disable the transaction")
        public boolean isEnableTransaction = false;

        @Parameter(names = {"-ntxn"}, description = "The number of opened transactions, 0 means keeping open.(After --txn-enable setting to true, -ntxn takes effect.)")
        public long totalNumTxn = 0;

        @Parameter(names = {"-abort"}, description = "Abort the transaction. (After --txn-enable setting to true, -abort takes effect)")
        public boolean isAbortTransaction = false;

        @Parameter(names = {"--histogram-file"}, description = "HdrHistogram output file")
        public String histogramFile = null;

        Arguments() {
        }
    }

    public static void main(String[] strArr) throws Exception {
        Arguments arguments = new Arguments();
        JCommander jCommander = new JCommander(arguments);
        jCommander.setProgramName("pulsar-perf consume");
        try {
            jCommander.parse(strArr);
        } catch (ParameterException e) {
            System.out.println(e.getMessage());
            jCommander.usage();
            PerfClientUtils.exit(-1);
        }
        if (arguments.help) {
            jCommander.usage();
            PerfClientUtils.exit(-1);
        }
        if (StringUtils.isBlank(arguments.authPluginClassName) && !StringUtils.isBlank(arguments.deprecatedAuthPluginClassName)) {
            arguments.authPluginClassName = arguments.deprecatedAuthPluginClassName;
        }
        if (arguments.topic != null && arguments.topic.size() != arguments.numTopics) {
            if (arguments.topic.size() == 1) {
                String topicName = TopicName.get(arguments.topic.get(0)).toString();
                ArrayList newArrayList = Lists.newArrayList();
                for (int i = 0; i < arguments.numTopics; i++) {
                    newArrayList.add(String.format("%s-%d", topicName, Integer.valueOf(i)));
                }
                arguments.topic = newArrayList;
            } else {
                System.out.println("The size of topics list should be equal to --num-topics");
                jCommander.usage();
                PerfClientUtils.exit(-1);
            }
        }
        if (arguments.subscriptionType == SubscriptionType.Exclusive && arguments.numConsumers > 1) {
            System.out.println("Only one consumer is allowed when subscriptionType is Exclusive");
            jCommander.usage();
            PerfClientUtils.exit(-1);
        }
        if (arguments.subscriptions != null && arguments.subscriptions.size() != arguments.numSubscriptions) {
            if (arguments.subscriptions.size() == 1) {
                if (arguments.subscriberName == null) {
                    arguments.subscriberName = arguments.subscriptions.get(0);
                }
                ArrayList newArrayList2 = Lists.newArrayList();
                for (int i2 = 0; i2 < arguments.numSubscriptions; i2++) {
                    newArrayList2.add(String.format("%s-%d", arguments.subscriberName, Integer.valueOf(i2)));
                }
                arguments.subscriptions = newArrayList2;
            } else {
                System.out.println("The size of subscriptions list should be equal to --num-subscriptions");
                jCommander.usage();
                PerfClientUtils.exit(-1);
            }
        }
        if (arguments.confFile != null) {
            Properties properties = new Properties(System.getProperties());
            properties.load(new FileInputStream(arguments.confFile));
            if (arguments.serviceURL == null) {
                arguments.serviceURL = properties.getProperty("brokerServiceUrl");
            }
            if (arguments.serviceURL == null) {
                arguments.serviceURL = properties.getProperty("webServiceUrl");
            }
            if (arguments.serviceURL == null) {
                arguments.serviceURL = properties.getProperty("serviceUrl", "http://localhost:8080/");
            }
            if (arguments.authPluginClassName == null) {
                arguments.authPluginClassName = properties.getProperty("authPlugin", null);
            }
            if (arguments.authParams == null) {
                arguments.authParams = properties.getProperty("authParams", null);
            }
            if (StringUtils.isBlank(arguments.tlsTrustCertsFilePath)) {
                arguments.tlsTrustCertsFilePath = properties.getProperty("tlsTrustCertsFilePath", "");
            }
            if (arguments.tlsAllowInsecureConnection == null) {
                arguments.tlsAllowInsecureConnection = Boolean.valueOf(Boolean.parseBoolean(properties.getProperty("tlsAllowInsecureConnection", "")));
            }
        }
        PerfClientUtils.printJVMInformation(log);
        log.info("Starting Pulsar performance consumer with config: {}", new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(arguments));
        RateLimiter create = arguments.rate > 0.0d ? RateLimiter.create(arguments.rate) : null;
        long nanoTime = System.nanoTime() + ((long) (arguments.testTime * 1.0E9d));
        ClientBuilder tlsTrustCertsFilePath = PulsarClient.builder().enableTransaction(arguments.isEnableTransaction).serviceUrl(arguments.serviceURL).connectionsPerBroker(arguments.maxConnections).statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS).ioThreads(arguments.ioThreads).listenerThreads(arguments.listenerThreads).enableBusyWait(arguments.enableBusyWait).tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
        if (StringUtils.isNotBlank(arguments.authPluginClassName)) {
            tlsTrustCertsFilePath.authentication(arguments.authPluginClassName, arguments.authParams);
        }
        if (arguments.tlsAllowInsecureConnection != null) {
            tlsTrustCertsFilePath.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection.booleanValue());
        }
        if (StringUtils.isNotBlank(arguments.listenerName)) {
            tlsTrustCertsFilePath.listenerName(arguments.listenerName);
        }
        PulsarClient build = tlsTrustCertsFilePath.build();
        AtomicReference atomicReference = arguments.isEnableTransaction ? new AtomicReference(build.newTransaction().withTransactionTimeout(arguments.transactionTimeout, TimeUnit.SECONDS).build().get()) : new AtomicReference(null);
        AtomicLong atomicLong = new AtomicLong();
        Semaphore semaphore = new Semaphore(arguments.numMessagesPerTransaction);
        Thread currentThread = Thread.currentThread();
        AtomicReference atomicReference2 = atomicReference;
        MessageListener messageListener = (consumer, message) -> {
            if (arguments.testTime > 0 && System.nanoTime() > nanoTime) {
                log.info("------------------- DONE -----------------------");
                printAggregatedStats();
                PerfClientUtils.exit(0);
                currentThread.interrupt();
            }
            if (arguments.totalNumTxn > 0 && totalEndTxnOpFailNum.sum() + totalEndTxnOpSuccessNum.sum() >= arguments.totalNumTxn) {
                log.info("------------------- DONE -----------------------");
                printAggregatedStats();
                PerfClientUtils.exit(0);
                currentThread.interrupt();
            }
            messagesReceived.increment();
            bytesReceived.add(message.size());
            totalMessagesReceived.increment();
            totalBytesReceived.add(message.size());
            if (create != null) {
                create.acquire();
            }
            long currentTimeMillis = System.currentTimeMillis() - message.getPublishTime();
            if (currentTimeMillis >= 0) {
                recorder.recordValue(currentTimeMillis);
                cumulativeRecorder.recordValue(currentTimeMillis);
            }
            if (arguments.isEnableTransaction) {
                try {
                    semaphore.acquire();
                } catch (InterruptedException e2) {
                    log.error("Got error: ", e2);
                }
                consumer.acknowledgeAsync(message.getMessageId(), (Transaction) atomicReference2.get()).thenRun(() -> {
                    totalMessageAck.increment();
                    messageAck.increment();
                }).exceptionally(th -> {
                    log.error("Ack message {} failed with exception", message, th);
                    totalMessageAckFailed.increment();
                    return null;
                });
            } else {
                consumer.acknowledgeAsync(message).thenRun(() -> {
                    totalMessageAck.increment();
                    messageAck.increment();
                }).exceptionally(th2 -> {
                    log.error("Ack message {} failed with exception", message, th2);
                    totalMessageAckFailed.increment();
                    return null;
                });
            }
            if (arguments.poolMessages) {
                message.release();
            }
            if (!arguments.isEnableTransaction || atomicLong.incrementAndGet() != arguments.numMessagesPerTransaction) {
                return;
            }
            Transaction transaction = (Transaction) atomicReference2.get();
            if (arguments.isAbortTransaction) {
                transaction.abort().thenRun(() -> {
                    log.info("Abort transaction {}", transaction.getTxnID().toString());
                    totalEndTxnOpSuccessNum.increment();
                    numTxnOpSuccess.increment();
                }).exceptionally(th3 -> {
                    log.error("Commit transaction {} failed with exception", transaction.getTxnID().toString(), th3);
                    totalEndTxnOpFailNum.increment();
                    return null;
                });
            } else {
                transaction.commit().thenRun(() -> {
                    totalEndTxnOpSuccessNum.increment();
                    numTxnOpSuccess.increment();
                }).exceptionally(th4 -> {
                    log.error("Commit transaction failed with exception : ", th4);
                    totalEndTxnOpFailNum.increment();
                    return null;
                });
            }
            while (true) {
                try {
                    atomicReference2.compareAndSet(transaction, (Transaction) build.newTransaction().withTransactionTimeout(arguments.transactionTimeout, TimeUnit.SECONDS).build().get());
                    totalNumTxnOpenSuccess.increment();
                    atomicLong.set(0L);
                    semaphore.release(arguments.numMessagesPerTransaction);
                    return;
                } catch (Exception e3) {
                    log.error("Failed to new transaction with exception:", e3);
                    totalNumTxnOpenFail.increment();
                }
            }
        };
        ArrayList newArrayList3 = Lists.newArrayList();
        ConsumerBuilder replicateSubscriptionState = build.newConsumer(Schema.BYTEBUFFER).messageListener(messageListener).receiverQueueSize(arguments.receiverQueueSize).maxTotalReceiverQueueSizeAcrossPartitions(arguments.maxTotalReceiverQueueSizeAcrossPartitions).acknowledgmentGroupTime(arguments.acknowledgmentsGroupingDelayMillis, TimeUnit.MILLISECONDS).subscriptionType(arguments.subscriptionType).subscriptionInitialPosition(arguments.subscriptionInitialPosition).autoAckOldestChunkedMessageOnQueueFull(arguments.autoAckOldestChunkedMessageOnQueueFull).enableBatchIndexAcknowledgment(arguments.batchIndexAck).poolMessages(arguments.poolMessages).replicateSubscriptionState(arguments.replicatedSubscription);
        if (arguments.maxPendingChunkedMessage > 0) {
            replicateSubscriptionState.maxPendingChunkedMessage(arguments.maxPendingChunkedMessage);
        }
        if (arguments.expireTimeOfIncompleteChunkedMessageMs > 0) {
            replicateSubscriptionState.expireTimeOfIncompleteChunkedMessage(arguments.expireTimeOfIncompleteChunkedMessageMs, TimeUnit.MILLISECONDS);
        }
        if (StringUtils.isNotBlank(arguments.encKeyFile)) {
            replicateSubscriptionState.defaultCryptoKeyReader(arguments.encKeyFile);
        }
        for (int i3 = 0; i3 < arguments.numTopics; i3++) {
            TopicName topicName2 = TopicName.get(arguments.topic.get(i3));
            log.info("Adding {} consumers per subscription on topic {}", Integer.valueOf(arguments.numConsumers), topicName2);
            for (int i4 = 0; i4 < arguments.numSubscriptions; i4++) {
                String str = arguments.subscriptions.get(i4);
                for (int i5 = 0; i5 < arguments.numConsumers; i5++) {
                    newArrayList3.add(replicateSubscriptionState.clone().topic(new String[]{topicName2.toString()}).subscriptionName(str).subscribeAsync());
                }
            }
        }
        Iterator it = newArrayList3.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        log.info("Start receiving from {} consumers per subscription on {} topics", Integer.valueOf(arguments.numConsumers), Integer.valueOf(arguments.numTopics));
        long nanoTime2 = System.nanoTime();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            printAggregatedThroughput(nanoTime2, arguments);
            printAggregatedStats();
        }));
        long nanoTime3 = System.nanoTime();
        Histogram histogram = null;
        HistogramLogWriter histogramLogWriter = null;
        if (arguments.histogramFile != null) {
            String str2 = arguments.histogramFile;
            log.info("Dumping latency stats to {}", str2);
            histogramLogWriter = new HistogramLogWriter(new PrintStream((OutputStream) new FileOutputStream(str2), false));
            histogramLogWriter.outputLogFormatVersion();
            histogramLogWriter.outputLegend();
        }
        while (true) {
            try {
                Thread.sleep(10000L);
                long nanoTime4 = System.nanoTime();
                double d = (nanoTime4 - nanoTime3) / 1.0E9d;
                long sum = totalMessagesReceived.sum();
                double sumThenReset = messagesReceived.sumThenReset() / d;
                double sumThenReset2 = (((bytesReceived.sumThenReset() / d) * 8.0d) / 1024.0d) / 1024.0d;
                double sumThenReset3 = messageAck.sumThenReset() / d;
                histogram = recorder.getIntervalHistogram(histogram);
                if (arguments.isEnableTransaction) {
                    log.info("--- Transaction: {} transaction end successfully --- {} transaction end failed --- {}  Txn/s --- AckRate: {} msg/s", new Object[]{Long.valueOf(totalEndTxnOpSuccessNum.sum()), Long.valueOf(totalEndTxnOpFailNum.sum()), dec.format(numTxnOpSuccess.sumThenReset() / d), dec.format(sumThenReset3)});
                }
                log.info("Throughput received: {} msg --- {}  msg/s --- {} Mbit/s  --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}", new Object[]{intFormat.format(sum), dec.format(sumThenReset), dec.format(sumThenReset2), dec.format(histogram.getMean()), Long.valueOf(histogram.getValueAtPercentile(50.0d)), Long.valueOf(histogram.getValueAtPercentile(95.0d)), Long.valueOf(histogram.getValueAtPercentile(99.0d)), Long.valueOf(histogram.getValueAtPercentile(99.9d)), Long.valueOf(histogram.getValueAtPercentile(99.99d)), Long.valueOf(histogram.getMaxValue())});
                if (histogramLogWriter != null) {
                    histogramLogWriter.outputIntervalHistogram(histogram);
                }
                histogram.reset();
                nanoTime3 = nanoTime4;
            } catch (InterruptedException e2) {
                build.close();
                return;
            }
        }
    }

    private static void printAggregatedThroughput(long j, Arguments arguments) {
        double nanoTime = (System.nanoTime() - j) / 1.0E9d;
        double sum = totalMessagesReceived.sum() / nanoTime;
        double sum2 = (((totalBytesReceived.sum() / nanoTime) * 8.0d) / 1024.0d) / 1024.0d;
        long j2 = 0;
        double sum3 = totalMessageAck.sum() / nanoTime;
        if (arguments.isEnableTransaction) {
            long sum4 = totalEndTxnOpSuccessNum.sum();
            long sum5 = totalEndTxnOpFailNum.sum();
            double d = (sum4 + sum5) / nanoTime;
            j2 = totalMessageAckFailed.sum();
            log.info("-- Transaction: {}  transaction end successfully --- {} transaction end failed --- {} transaction open successfully --- {} transaction open failed --- {} Txn/s ", new Object[]{Long.valueOf(sum4), Long.valueOf(sum5), Long.valueOf(totalNumTxnOpenSuccess.sum()), Long.valueOf(totalNumTxnOpenFail.sum()), dec.format(d)});
        }
        log.info("Aggregated throughput stats --- {} records received --- {} msg/s --- {} Mbit/s --- AckRate: {}  msg/s --- ack failed {} msg", new Object[]{Long.valueOf(totalMessagesReceived.sum()), dec.format(sum), dec.format(sum2), Double.valueOf(sum3), Long.valueOf(j2)});
    }

    private static void printAggregatedStats() {
        Histogram intervalHistogram = cumulativeRecorder.getIntervalHistogram();
        log.info("Aggregated latency stats --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}", new Object[]{dec.format(intervalHistogram.getMean()), Long.valueOf(intervalHistogram.getValueAtPercentile(50.0d)), Long.valueOf(intervalHistogram.getValueAtPercentile(95.0d)), Long.valueOf(intervalHistogram.getValueAtPercentile(99.0d)), Long.valueOf(intervalHistogram.getValueAtPercentile(99.9d)), Long.valueOf(intervalHistogram.getValueAtPercentile(99.99d)), Long.valueOf(intervalHistogram.getValueAtPercentile(99.999d)), Long.valueOf(intervalHistogram.getMaxValue())});
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1241618484:
                if (implMethodName.equals("lambda$main$7f0a272f$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case LoadSimulationClient.CHANGE_COMMAND /* 0 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/testclient/PerformanceConsumer") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/pulsar/testclient/PerformanceConsumer$Arguments;JLjava/lang/Thread;Lcom/google/common/util/concurrent/RateLimiter;Ljava/util/concurrent/Semaphore;Ljava/util/concurrent/atomic/AtomicReference;Ljava/util/concurrent/atomic/AtomicLong;Lorg/apache/pulsar/client/api/PulsarClient;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    Arguments arguments = (Arguments) serializedLambda.getCapturedArg(0);
                    long longValue = ((Long) serializedLambda.getCapturedArg(1)).longValue();
                    Thread thread = (Thread) serializedLambda.getCapturedArg(2);
                    RateLimiter rateLimiter = (RateLimiter) serializedLambda.getCapturedArg(3);
                    Semaphore semaphore = (Semaphore) serializedLambda.getCapturedArg(4);
                    AtomicReference atomicReference = (AtomicReference) serializedLambda.getCapturedArg(5);
                    AtomicLong atomicLong = (AtomicLong) serializedLambda.getCapturedArg(6);
                    PulsarClient pulsarClient = (PulsarClient) serializedLambda.getCapturedArg(7);
                    return (consumer, message) -> {
                        if (arguments.testTime > 0 && System.nanoTime() > longValue) {
                            log.info("------------------- DONE -----------------------");
                            printAggregatedStats();
                            PerfClientUtils.exit(0);
                            thread.interrupt();
                        }
                        if (arguments.totalNumTxn > 0 && totalEndTxnOpFailNum.sum() + totalEndTxnOpSuccessNum.sum() >= arguments.totalNumTxn) {
                            log.info("------------------- DONE -----------------------");
                            printAggregatedStats();
                            PerfClientUtils.exit(0);
                            thread.interrupt();
                        }
                        messagesReceived.increment();
                        bytesReceived.add(message.size());
                        totalMessagesReceived.increment();
                        totalBytesReceived.add(message.size());
                        if (rateLimiter != null) {
                            rateLimiter.acquire();
                        }
                        long currentTimeMillis = System.currentTimeMillis() - message.getPublishTime();
                        if (currentTimeMillis >= 0) {
                            recorder.recordValue(currentTimeMillis);
                            cumulativeRecorder.recordValue(currentTimeMillis);
                        }
                        if (arguments.isEnableTransaction) {
                            try {
                                semaphore.acquire();
                            } catch (InterruptedException e2) {
                                log.error("Got error: ", e2);
                            }
                            consumer.acknowledgeAsync(message.getMessageId(), (Transaction) atomicReference.get()).thenRun(() -> {
                                totalMessageAck.increment();
                                messageAck.increment();
                            }).exceptionally(th -> {
                                log.error("Ack message {} failed with exception", message, th);
                                totalMessageAckFailed.increment();
                                return null;
                            });
                        } else {
                            consumer.acknowledgeAsync(message).thenRun(() -> {
                                totalMessageAck.increment();
                                messageAck.increment();
                            }).exceptionally(th2 -> {
                                log.error("Ack message {} failed with exception", message, th2);
                                totalMessageAckFailed.increment();
                                return null;
                            });
                        }
                        if (arguments.poolMessages) {
                            message.release();
                        }
                        if (!arguments.isEnableTransaction || atomicLong.incrementAndGet() != arguments.numMessagesPerTransaction) {
                            return;
                        }
                        Transaction transaction = (Transaction) atomicReference.get();
                        if (arguments.isAbortTransaction) {
                            transaction.abort().thenRun(() -> {
                                log.info("Abort transaction {}", transaction.getTxnID().toString());
                                totalEndTxnOpSuccessNum.increment();
                                numTxnOpSuccess.increment();
                            }).exceptionally(th3 -> {
                                log.error("Commit transaction {} failed with exception", transaction.getTxnID().toString(), th3);
                                totalEndTxnOpFailNum.increment();
                                return null;
                            });
                        } else {
                            transaction.commit().thenRun(() -> {
                                totalEndTxnOpSuccessNum.increment();
                                numTxnOpSuccess.increment();
                            }).exceptionally(th4 -> {
                                log.error("Commit transaction failed with exception : ", th4);
                                totalEndTxnOpFailNum.increment();
                                return null;
                            });
                        }
                        while (true) {
                            try {
                                atomicReference.compareAndSet(transaction, (Transaction) pulsarClient.newTransaction().withTransactionTimeout(arguments.transactionTimeout, TimeUnit.SECONDS).build().get());
                                totalNumTxnOpenSuccess.increment();
                                atomicLong.set(0L);
                                semaphore.release(arguments.numMessagesPerTransaction);
                                return;
                            } catch (Exception e3) {
                                log.error("Failed to new transaction with exception:", e3);
                                totalNumTxnOpenFail.increment();
                            }
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
