package org.apache.pulsar.testclient;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.lang.invoke.SerializedLambda;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/testclient/LoadSimulationClient.class */
public class LoadSimulationClient {
    public static final byte CHANGE_COMMAND = 0;
    public static final byte STOP_COMMAND = 1;
    public static final byte TRADE_COMMAND = 2;
    public static final byte CHANGE_GROUP_COMMAND = 3;
    public static final byte STOP_GROUP_COMMAND = 4;
    public static final byte FIND_COMMAND = 5;
    private final PulsarAdmin admin;
    private final PulsarClient client;
    private final int port;
    private static final Logger log = LoggerFactory.getLogger(LoadSimulationClient.class);
    private static final MessageListener<byte[]> ackListener = (v0, v1) -> {
        v0.acknowledgeAsync(v1);
    };
    private final Map<Integer, byte[]> payloadCache = new ConcurrentHashMap();
    private final Map<String, TradeUnit> topicsToTradeUnits = new ConcurrentHashMap();
    private final ExecutorService executor = Executors.newCachedThreadPool(new DefaultThreadFactory("test-client"));

    /* loaded from: input_file:org/apache/pulsar/testclient/LoadSimulationClient$MainArguments.class */
    private static class MainArguments {

        @Parameter(names = {"-h", "--help"}, description = "Help message", help = true)
        boolean help;

        @Parameter(names = {"--port"}, description = "Port to listen on for controller", required = true)
        public int port;

        @Parameter(names = {"--service-url"}, description = "Pulsar Service URL", required = true)
        public String serviceURL;

        private MainArguments() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/testclient/LoadSimulationClient$TradeConfiguration.class */
    public static class TradeConfiguration {
        public String topic;
        public String tenant;
        public String group;
        public byte command = -1;
        public double rate = 100.0d;
        public int size = 1024;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/testclient/LoadSimulationClient$TradeUnit.class */
    public static class TradeUnit {
        Future<Consumer<byte[]>> consumerFuture;
        final AtomicBoolean stop;
        final RateLimiter rateLimiter;
        final AtomicReference<byte[]> payload = new AtomicReference<>();
        final PulsarClient client;
        final String topic;
        final Map<Integer, byte[]> payloadCache;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/pulsar/testclient/LoadSimulationClient$TradeUnit$MutableBoolean.class */
        public class MutableBoolean {
            public volatile boolean value;

            private MutableBoolean() {
                this.value = true;
            }
        }

        public TradeUnit(TradeConfiguration tradeConfiguration, PulsarClient pulsarClient, Map<Integer, byte[]> map) throws Exception {
            this.consumerFuture = pulsarClient.newConsumer().topic(new String[]{tradeConfiguration.topic}).subscriptionName("Subscriber-" + tradeConfiguration.topic).messageListener(LoadSimulationClient.ackListener).subscribeAsync();
            this.payloadCache = map;
            this.client = pulsarClient;
            this.topic = tradeConfiguration.topic;
            this.payload.set(map.computeIfAbsent(Integer.valueOf(tradeConfiguration.size), i -> {
                return new byte[i];
            }));
            this.rateLimiter = RateLimiter.create(tradeConfiguration.rate);
            this.stop = new AtomicBoolean(false);
        }

        public void change(TradeConfiguration tradeConfiguration) {
            this.rateLimiter.setRate(tradeConfiguration.rate);
            this.payload.set(this.payloadCache.computeIfAbsent(Integer.valueOf(tradeConfiguration.size), i -> {
                return new byte[i];
            }));
        }

        private Producer<byte[]> getNewProducer() throws Exception {
            while (true) {
                try {
                    return this.client.newProducer().topic(this.topic).sendTimeout(0, TimeUnit.SECONDS).create();
                } catch (Exception e) {
                    Thread.sleep(10000L);
                }
            }
        }

        public void start() throws Exception {
            Producer<byte[]> newProducer = getNewProducer();
            Consumer<byte[]> consumer = this.consumerFuture.get();
            while (!this.stop.get()) {
                MutableBoolean mutableBoolean = new MutableBoolean();
                Function function = th -> {
                    mutableBoolean.value = false;
                    return null;
                };
                while (!this.stop.get() && mutableBoolean.value) {
                    newProducer.sendAsync(this.payload.get()).exceptionally(function);
                    this.rateLimiter.acquire();
                }
                newProducer.closeAsync();
                if (this.stop.get()) {
                    consumer.closeAsync();
                } else {
                    newProducer = getNewProducer();
                }
            }
        }
    }

    private void handle(Socket socket) throws Exception {
        DataInputStream dataInputStream = new DataInputStream(socket.getInputStream());
        while (true) {
            int read = dataInputStream.read();
            if (read == -1) {
                return;
            } else {
                handle((byte) read, dataInputStream, new DataOutputStream(socket.getOutputStream()));
            }
        }
    }

    private void decodeProducerOptions(TradeConfiguration tradeConfiguration, DataInputStream dataInputStream) throws Exception {
        tradeConfiguration.topic = dataInputStream.readUTF();
        tradeConfiguration.size = dataInputStream.readInt();
        tradeConfiguration.rate = dataInputStream.readDouble();
    }

    private void decodeGroupOptions(TradeConfiguration tradeConfiguration, DataInputStream dataInputStream) throws Exception {
        tradeConfiguration.tenant = dataInputStream.readUTF();
        tradeConfiguration.group = dataInputStream.readUTF();
    }

    private void handle(byte b, DataInputStream dataInputStream, DataOutputStream dataOutputStream) throws Exception {
        TradeConfiguration tradeConfiguration = new TradeConfiguration();
        tradeConfiguration.command = b;
        switch (b) {
            case CHANGE_COMMAND /* 0 */:
                decodeProducerOptions(tradeConfiguration, dataInputStream);
                if (this.topicsToTradeUnits.containsKey(tradeConfiguration.topic)) {
                    this.topicsToTradeUnits.get(tradeConfiguration.topic).change(tradeConfiguration);
                    return;
                }
                return;
            case STOP_COMMAND /* 1 */:
                tradeConfiguration.topic = dataInputStream.readUTF();
                if (this.topicsToTradeUnits.containsKey(tradeConfiguration.topic)) {
                    this.topicsToTradeUnits.get(tradeConfiguration.topic).stop.set(true);
                    return;
                }
                return;
            case TRADE_COMMAND /* 2 */:
                decodeProducerOptions(tradeConfiguration, dataInputStream);
                TradeUnit tradeUnit = new TradeUnit(tradeConfiguration, this.client, this.payloadCache);
                this.topicsToTradeUnits.put(tradeConfiguration.topic, tradeUnit);
                this.executor.submit(() -> {
                    try {
                        String str = tradeConfiguration.topic;
                        try {
                            this.admin.namespaces().createNamespace(str.substring("persistent://".length(), str.lastIndexOf(47)));
                        } catch (PulsarAdminException.ConflictException e) {
                        }
                        tradeUnit.start();
                    } catch (Exception e2) {
                        throw new RuntimeException(e2);
                    }
                });
                return;
            case CHANGE_GROUP_COMMAND /* 3 */:
                decodeGroupOptions(tradeConfiguration, dataInputStream);
                tradeConfiguration.size = dataInputStream.readInt();
                tradeConfiguration.rate = dataInputStream.readDouble();
                String str = ".*://" + tradeConfiguration.tenant + "/.*/" + tradeConfiguration.group + "-.*/.*";
                for (Map.Entry<String, TradeUnit> entry : this.topicsToTradeUnits.entrySet()) {
                    String key = entry.getKey();
                    TradeUnit value = entry.getValue();
                    if (key.matches(str)) {
                        value.change(tradeConfiguration);
                    }
                }
                return;
            case STOP_GROUP_COMMAND /* 4 */:
                decodeGroupOptions(tradeConfiguration, dataInputStream);
                String str2 = ".*://" + tradeConfiguration.tenant + "/.*/" + tradeConfiguration.group + "-.*/.*";
                for (Map.Entry<String, TradeUnit> entry2 : this.topicsToTradeUnits.entrySet()) {
                    String key2 = entry2.getKey();
                    TradeUnit value2 = entry2.getValue();
                    if (key2.matches(str2)) {
                        value2.stop.set(true);
                    }
                }
                return;
            case FIND_COMMAND /* 5 */:
                dataOutputStream.writeBoolean(this.topicsToTradeUnits.containsKey(dataInputStream.readUTF()));
                dataOutputStream.flush();
                return;
            default:
                throw new IllegalArgumentException("Unrecognized command code received: " + ((int) b));
        }
    }

    public LoadSimulationClient(MainArguments mainArguments) throws Exception {
        this.admin = PulsarAdmin.builder().serviceHttpUrl(mainArguments.serviceURL).build();
        this.client = PulsarClient.builder().serviceUrl(mainArguments.serviceURL).connectionsPerBroker(4).ioThreads(Runtime.getRuntime().availableProcessors()).statsInterval(0L, TimeUnit.SECONDS).build();
        this.port = mainArguments.port;
    }

    public static void main(String[] strArr) throws Exception {
        MainArguments mainArguments = new MainArguments();
        JCommander jCommander = new JCommander(mainArguments);
        jCommander.setProgramName("pulsar-perf simulation-client");
        try {
            jCommander.parse(strArr);
        } catch (ParameterException e) {
            System.out.println(e.getMessage());
            jCommander.usage();
            System.exit(-1);
        }
        new LoadSimulationClient(mainArguments).run();
    }

    public void run() throws Exception {
        ServerSocket serverSocket = new ServerSocket(this.port);
        while (true) {
            log.info("Listening for controller command...");
            Socket accept = serverSocket.accept();
            log.info("Connected to {}", accept.getInetAddress().getHostName());
            this.executor.submit(() -> {
                try {
                    handle(accept);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1020563136:
                if (implMethodName.equals("acknowledgeAsync")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case CHANGE_COMMAND /* 0 */:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/Consumer") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/pulsar/client/api/Message;)Ljava/util/concurrent/CompletableFuture;")) {
                    return (v0, v1) -> {
                        v0.acknowledgeAsync(v1);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
