package org.apache.pulsar.testclient;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.Recorder;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/testclient/ManagedLedgerWriter.class */
public class ManagedLedgerWriter {
    private static final ExecutorService executor = Executors.newCachedThreadPool(new DefaultThreadFactory("pulsar-perf-managed-ledger-exec"));
    private static final LongAdder messagesSent = new LongAdder();
    private static final LongAdder bytesSent = new LongAdder();
    private static final LongAdder totalMessagesSent = new LongAdder();
    private static final LongAdder totalBytesSent = new LongAdder();
    private static Recorder recorder = new Recorder(TimeUnit.SECONDS.toMillis(120000), 5);
    private static Recorder cumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMillis(120000), 5);
    static final DecimalFormat THROUGHPUTFORMAT = new PaddingDecimalFormat("0.0", 8);
    static final DecimalFormat DEC = new PaddingDecimalFormat("0.000", 7);
    static final DecimalFormat TOTALFORMAT = new DecimalFormat("0.000");
    static final DecimalFormat INTFORMAT = new PaddingDecimalFormat("0", 7);
    private static final Logger log = LoggerFactory.getLogger(ManagedLedgerWriter.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    @Parameters(commandDescription = "Write directly on managed-ledgers")
    /* loaded from: input_file:org/apache/pulsar/testclient/ManagedLedgerWriter$Arguments.class */
    public static class Arguments {

        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
        boolean help;

        @Deprecated
        @Parameter(names = {"-zk", "--zookeeperServers"}, description = "ZooKeeper connection string", hidden = true)
        public String zookeeperServers;

        @Parameter(names = {"-md", "--metadata-store"}, description = "Metadata store service URL. For example: zk:my-zk:2181")
        private String metadataStoreUrl;

        @Parameter(names = {"-r", "--rate"}, description = "Write rate msg/s across managed ledgers")
        public int msgRate = 100;

        @Parameter(names = {"-s", "--size"}, description = "Message size")
        public int msgSize = 1024;

        @Parameter(names = {"-t", "--num-topic"}, description = "Number of managed ledgers", validateWith = {PositiveNumberParameterValidator.class})
        public int numManagedLedgers = 1;

        @Parameter(names = {"--threads"}, description = "Number of threads writing", validateWith = {PositiveNumberParameterValidator.class})
        public int numThreads = 1;

        @Parameter(names = {"-o", "--max-outstanding"}, description = "Max number of outstanding requests")
        public int maxOutstanding = 1000;

        @Parameter(names = {"-c", "--max-connections"}, description = "Max number of TCP connections to a single bookie")
        public int maxConnections = 1;

        @Parameter(names = {"-m", "--num-messages"}, description = "Number of messages to publish in total. If <= 0, it will keep publishing")
        public long numMessages = 0;

        @Parameter(names = {"-e", "--ensemble-size"}, description = "Ledger ensemble size")
        public int ensembleSize = 1;

        @Parameter(names = {"-w", "--write-quorum"}, description = "Ledger write quorum")
        public int writeQuorum = 1;

        @Parameter(names = {"-a", "--ack-quorum"}, description = "Ledger ack quorum")
        public int ackQuorum = 1;

        @Parameter(names = {"-dt", "--digest-type"}, description = "BookKeeper digest type")
        public DigestType digestType = DigestType.CRC32C;

        @Parameter(names = {"-time", "--test-duration"}, description = "Test duration in secs. If <= 0, it will keep publishing")
        public long testTime = 0;

        Arguments() {
        }
    }

    public static void main(String[] strArr) throws Exception {
        Arguments arguments = new Arguments();
        JCommander jCommander = new JCommander(arguments);
        jCommander.setProgramName("pulsar-perf managed-ledger");
        try {
            jCommander.parse(strArr);
        } catch (ParameterException e) {
            System.out.println(e.getMessage());
            jCommander.usage();
            PerfClientUtils.exit(1);
        }
        if (arguments.help) {
            jCommander.usage();
            PerfClientUtils.exit(1);
        }
        if (arguments.metadataStoreUrl == null && arguments.zookeeperServers == null) {
            System.err.println("Metadata store address argument is required (--metadata-store)");
            jCommander.usage();
            PerfClientUtils.exit(1);
        }
        PerfClientUtils.printJVMInformation(log);
        log.info("Starting Pulsar managed-ledger perf writer with config: {}", new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(arguments));
        byte[] bArr = new byte[arguments.msgSize];
        ByteBuf directBuffer = PulsarByteBufAllocator.DEFAULT.directBuffer(arguments.msgSize);
        directBuffer.writerIndex(arguments.msgSize);
        String str = "test-" + DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 5);
        if (arguments.metadataStoreUrl == null) {
            arguments.metadataStoreUrl = arguments.zookeeperServers;
        }
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setUseV2WireProtocol(true);
        clientConfiguration.setAddEntryTimeout(30);
        clientConfiguration.setReadEntryTimeout(30);
        clientConfiguration.setThrottleValue(0);
        clientConfiguration.setNumChannelsPerBookie(arguments.maxConnections);
        clientConfiguration.setMetadataServiceUri(arguments.metadataStoreUrl);
        ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
        managedLedgerFactoryConfig.setMaxCacheSize(0L);
        MetadataStoreExtended create = MetadataStoreExtended.create(arguments.metadataStoreUrl, MetadataStoreConfig.builder().metadataStoreName("metadata-store").build());
        try {
            ManagedLedgerFactoryImpl managedLedgerFactoryImpl = new ManagedLedgerFactoryImpl(create, clientConfiguration, managedLedgerFactoryConfig);
            ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
            managedLedgerConfig.setEnsembleSize(arguments.ensembleSize);
            managedLedgerConfig.setWriteQuorumSize(arguments.writeQuorum);
            managedLedgerConfig.setAckQuorumSize(arguments.ackQuorum);
            managedLedgerConfig.setMinimumRolloverTime(10, TimeUnit.MINUTES);
            managedLedgerConfig.setMetadataEnsembleSize(arguments.ensembleSize);
            managedLedgerConfig.setMetadataWriteQuorumSize(arguments.writeQuorum);
            managedLedgerConfig.setMetadataAckQuorumSize(arguments.ackQuorum);
            managedLedgerConfig.setDigestType(arguments.digestType);
            managedLedgerConfig.setMaxSizePerLedgerMb(2048);
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < arguments.numManagedLedgers; i++) {
                String format = String.format("%s-%03d", str, Integer.valueOf(i));
                final CompletableFuture completableFuture = new CompletableFuture();
                arrayList.add(completableFuture);
                managedLedgerFactoryImpl.asyncOpen(format, managedLedgerConfig, new AsyncCallbacks.OpenLedgerCallback() { // from class: org.apache.pulsar.testclient.ManagedLedgerWriter.1
                    public void openLedgerComplete(ManagedLedger managedLedger, Object obj) {
                        completableFuture.complete(managedLedger);
                    }

                    public void openLedgerFailed(ManagedLedgerException managedLedgerException, Object obj) {
                        completableFuture.completeExceptionally(managedLedgerException);
                    }
                }, (Supplier) null, (Object) null);
            }
            List list = (List) arrayList.stream().map((v0) -> {
                return v0.join();
            }).collect(Collectors.toList());
            log.info("Created {} managed ledgers", Integer.valueOf(list.size()));
            long nanoTime = System.nanoTime();
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                printAggregatedThroughput(nanoTime);
                printAggregatedStats();
            }));
            Collections.shuffle(list);
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            Map allocateToThreads = allocateToThreads(list, arguments.numThreads);
            for (int i2 = 0; i2 < arguments.numThreads; i2++) {
                List list2 = (List) allocateToThreads.get(Integer.valueOf(i2));
                int size = list2.size();
                long j = arguments.numMessages / arguments.numThreads;
                int i3 = arguments.maxOutstanding;
                executor.submit(() -> {
                    try {
                        double d = arguments.msgRate / arguments.numThreads;
                        RateLimiter create2 = RateLimiter.create(d);
                        create2.acquire((int) d);
                        long nanoTime2 = System.nanoTime() + ((long) (arguments.testTime * 1.0E9d));
                        final Semaphore semaphore = new Semaphore(i3);
                        AsyncCallbacks.AddEntryCallback addEntryCallback = new AsyncCallbacks.AddEntryCallback() { // from class: org.apache.pulsar.testclient.ManagedLedgerWriter.2
                            public void addComplete(Position position, ByteBuf byteBuf, Object obj) {
                                long longValue = ((Long) obj).longValue();
                                ManagedLedgerWriter.messagesSent.increment();
                                ManagedLedgerWriter.bytesSent.add(bArr.length);
                                ManagedLedgerWriter.totalMessagesSent.increment();
                                ManagedLedgerWriter.totalBytesSent.add(bArr.length);
                                long micros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - longValue);
                                ManagedLedgerWriter.recorder.recordValue(micros);
                                ManagedLedgerWriter.cumulativeRecorder.recordValue(micros);
                                semaphore.release();
                            }

                            public void addFailed(ManagedLedgerException managedLedgerException, Object obj) {
                                ManagedLedgerWriter.log.warn("Write error on message", managedLedgerException);
                                PerfClientUtils.exit(1);
                            }
                        };
                        long j2 = 0;
                        while (true) {
                            for (int i4 = 0; i4 < size; i4++) {
                                if (arguments.testTime > 0 && System.nanoTime() > nanoTime2) {
                                    log.info("------------- DONE (reached the maximum duration: [{} seconds] of production) --------------", Long.valueOf(arguments.testTime));
                                    atomicBoolean.set(true);
                                    Thread.sleep(5000L);
                                    PerfClientUtils.exit(0);
                                }
                                if (j > 0) {
                                    long j3 = j2;
                                    j2 = j3 + 1;
                                    if (j3 >= j) {
                                        log.info("------------- DONE (reached the maximum number: [{}] of production) --------------", Long.valueOf(j));
                                        atomicBoolean.set(true);
                                        Thread.sleep(5000L);
                                        PerfClientUtils.exit(0);
                                    }
                                }
                                semaphore.acquire();
                                create2.acquire();
                                ((ManagedLedger) list2.get(i4)).asyncAddEntry(directBuffer, addEntryCallback, Long.valueOf(System.nanoTime()));
                            }
                        }
                    } catch (Throwable th) {
                        log.error("Got error", th);
                    }
                });
            }
            long nanoTime2 = System.nanoTime();
            Histogram histogram = null;
            while (true) {
                try {
                    Thread.sleep(10000L);
                    if (atomicBoolean.get()) {
                        break;
                    }
                    long nanoTime3 = System.nanoTime();
                    double d = (nanoTime3 - nanoTime2) / 1.0E9d;
                    histogram = recorder.getIntervalHistogram(histogram);
                    log.info("Throughput produced: {} msg --- {}  msg/s --- {} Mbit/s --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}", new Object[]{INTFORMAT.format(totalMessagesSent.sum()), THROUGHPUTFORMAT.format(messagesSent.sumThenReset() / d), THROUGHPUTFORMAT.format((((bytesSent.sumThenReset() / d) / 1024.0d) / 1024.0d) * 8.0d), DEC.format(histogram.getMean() / 1000.0d), DEC.format(histogram.getValueAtPercentile(50.0d) / 1000.0d), DEC.format(histogram.getValueAtPercentile(95.0d) / 1000.0d), DEC.format(histogram.getValueAtPercentile(99.0d) / 1000.0d), DEC.format(histogram.getValueAtPercentile(99.9d) / 1000.0d), DEC.format(histogram.getValueAtPercentile(99.99d) / 1000.0d), DEC.format(histogram.getMaxValue() / 1000.0d)});
                    histogram.reset();
                    nanoTime2 = nanoTime3;
                } catch (InterruptedException e2) {
                }
            }
            managedLedgerFactoryImpl.shutdown();
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    public static <T> Map<Integer, List<T>> allocateToThreads(List<T> list, int i) {
        HashMap hashMap = new HashMap();
        if (list.size() >= i) {
            int i2 = 0;
            for (T t : list) {
                List list2 = (List) hashMap.getOrDefault(Integer.valueOf(i2), new ArrayList());
                list2.add(t);
                hashMap.put(Integer.valueOf(i2), list2);
                i2++;
                if (i2 >= i) {
                    i2 %= i;
                }
            }
        } else {
            int i3 = 0;
            for (int i4 = 0; i4 < i; i4++) {
                List list3 = (List) hashMap.getOrDefault(Integer.valueOf(i4), new ArrayList());
                list3.add(list.get(i3));
                hashMap.put(Integer.valueOf(i4), list3);
                i3++;
                if (i3 >= list.size()) {
                    i3 %= list.size();
                }
            }
        }
        return hashMap;
    }

    private static void printAggregatedThroughput(long j) {
        double nanoTime = (System.nanoTime() - j) / 1.0E9d;
        log.info("Aggregated throughput stats --- {} records sent --- {} msg/s --- {} Mbit/s", new Object[]{totalMessagesSent, TOTALFORMAT.format(totalMessagesSent.sum() / nanoTime), TOTALFORMAT.format((((totalBytesSent.sum() / nanoTime) / 1024.0d) / 1024.0d) * 8.0d)});
    }

    private static void printAggregatedStats() {
        log.info("Aggregated latency stats --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}", new Object[]{DEC.format(cumulativeRecorder.getIntervalHistogram().getMean() / 1000.0d), DEC.format(r0.getValueAtPercentile(50.0d) / 1000.0d), DEC.format(r0.getValueAtPercentile(95.0d) / 1000.0d), DEC.format(r0.getValueAtPercentile(99.0d) / 1000.0d), DEC.format(r0.getValueAtPercentile(99.9d) / 1000.0d), DEC.format(r0.getValueAtPercentile(99.99d) / 1000.0d), DEC.format(r0.getValueAtPercentile(99.999d) / 1000.0d), DEC.format(r0.getMaxValue() / 1000.0d)});
    }
}
