package org.apache.pulsar.testclient;

import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.net.Socket;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.policies.data.loadbalancer.BundleData;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.testclient.CmdBase;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;

@CommandLine.Command(name = "simulation-controller", description = {"Provides a shell for the user to dictate how simulation clients should incur load."})
/* loaded from: input_file:org/apache/pulsar/testclient/LoadSimulationController.class */
public class LoadSimulationController extends CmdBase {
    private DataInputStream[] inputStreams;
    private DataOutputStream[] outputStreams;
    private String[] clients;
    private Random random;

    @CommandLine.Option(names = {"--cluster"}, description = {"Cluster to test on"}, required = true)
    String cluster;

    @CommandLine.Option(names = {"--clients"}, description = {"Comma separated list of client hostnames"}, required = true)
    String clientHostNames;

    @CommandLine.Option(names = {"--client-port"}, description = {"Port that the clients are listening on"}, required = true)
    int clientPort;
    private static final Logger log = LoggerFactory.getLogger(LoadSimulationController.class);
    private static final ExecutorService threadPool = Executors.newCachedThreadPool();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/testclient/LoadSimulationController$BrokerWatcher.class */
    public class BrokerWatcher implements Watcher {
        private final ZooKeeper zkClient;
        private final Set<String> brokers = new HashSet();
        private final ShellArguments arguments;

        private BrokerWatcher(ZooKeeper zooKeeper, ShellArguments shellArguments) {
            this.zkClient = zooKeeper;
            this.arguments = shellArguments;
            process(null);
        }

        public synchronized void process(WatchedEvent watchedEvent) {
            try {
                for (String str : this.zkClient.getChildren("/loadbalance/brokers", this)) {
                    if (!this.brokers.contains(str)) {
                        new LoadReportWatcher(String.format("%s/%s", "/loadbalance/brokers", str), this.zkClient, this.arguments);
                        this.brokers.add(str);
                    }
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/testclient/LoadSimulationController$LoadReportWatcher.class */
    public class LoadReportWatcher implements Watcher {
        private final ZooKeeper zkClient;
        private final String path;
        private final ShellArguments arguments;

        public LoadReportWatcher(String str, ZooKeeper zooKeeper, ShellArguments shellArguments) {
            this.path = str;
            this.zkClient = zooKeeper;
            this.arguments = shellArguments;
            process(null);
        }

        public synchronized void process(WatchedEvent watchedEvent) {
            try {
                for (Map.Entry entry : ((LoadReport) ObjectMapperFactory.getMapper().getObjectMapper().readValue(this.zkClient.getData(this.path, this, (Stat) null), LoadReport.class)).getBundleStats().entrySet()) {
                    String str = (String) entry.getKey();
                    String format = String.format("%s/%s", str.substring(0, str.lastIndexOf(47)), "t");
                    NamespaceBundleStats namespaceBundleStats = (NamespaceBundleStats) entry.getValue();
                    double d = (this.arguments.rateMultiplier * (namespaceBundleStats.msgRateIn + namespaceBundleStats.msgRateOut)) / 2.0d;
                    int ceil = (int) Math.ceil((this.arguments.rateMultiplier * (namespaceBundleStats.msgThroughputIn + namespaceBundleStats.msgThroughputOut)) / (2.0d * d));
                    this.arguments.rate = d;
                    this.arguments.size = ceil;
                    LoadSimulationController.this.changeOrCreate(this.arguments, format);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/testclient/LoadSimulationController$ShellArguments.class */
    public static class ShellArguments {

        @CommandLine.Parameters(description = {"Command arguments:\ntrade tenant namespace topic\nchange tenant namespace topic\nstop tenant namespace topic\ntrade_group tenant group_name num_namespaces\nchange_group tenant group_name\nstop_group tenant group_name\nscript script_name\ncopy tenant_name source_zk target_zk\nstream source_zk\nsimulate zk\n"}, arity = "1")
        List<String> commandArguments;

        @CommandLine.Option(names = {"--rand-rate"}, description = {"Choose message rate uniformly randomly from the next two comma separated values (overrides --rate)"})
        String rangeString = "";

        @CommandLine.Option(names = {"--rate"}, description = {"Messages per second"})
        double rate = 1.0d;

        @CommandLine.Option(names = {"--rate-multiplier"}, description = {"Multiplier to use for copying or streaming rates"})
        double rateMultiplier = 1.0d;

        @CommandLine.Option(names = {"--separation"}, description = {"Separation time in ms for trade_group actions (0 for no separation)"})
        int separation = 0;

        @CommandLine.Option(names = {"--size"}, description = {"Message size in bytes"})
        int size = 1024;

        @CommandLine.Option(names = {"--topics-per-namespace"}, description = {"Number of topics to create per namespace in trade_group (total number of topics is num_namespaces X num_topics)"})
        int topicsPerNamespace = 1;

        private ShellArguments() {
        }
    }

    public LoadSimulationController() throws Exception {
        super("simulation-controller");
    }

    private boolean checkAppArgs(int i, int i2) {
        if (i == i2) {
            return true;
        }
        log.info("ERROR: Wrong number of application arguments (found {}, required {})", Integer.valueOf(i), Integer.valueOf(i2));
        return false;
    }

    private void getResourceQuotas(String str, ZooKeeper zooKeeper, Map<String, ResourceQuota>[] mapArr) throws Exception {
        List children = zooKeeper.getChildren(str, false);
        if (children.isEmpty()) {
            mapArr[this.random.nextInt(this.clients.length)].put(str, (ResourceQuota) ObjectMapperFactory.getMapper().getObjectMapper().readValue(zooKeeper.getData(str, false, (Stat) null), ResourceQuota.class));
            return;
        }
        Iterator it = children.iterator();
        while (it.hasNext()) {
            getResourceQuotas(String.format("%s/%s", str, (String) it.next()), zooKeeper, mapArr);
        }
    }

    private BundleData initializeBundleData(ResourceQuota resourceQuota, ShellArguments shellArguments) {
        double msgRateIn = (resourceQuota.getMsgRateIn() + resourceQuota.getMsgRateOut()) / 2.0d;
        int ceil = (int) Math.ceil((resourceQuota.getBandwidthIn() + resourceQuota.getBandwidthOut()) / (2.0d * msgRateIn));
        shellArguments.rate = msgRateIn * shellArguments.rateMultiplier;
        shellArguments.size = ceil;
        NamespaceBundleStats namespaceBundleStats = new NamespaceBundleStats();
        double d = msgRateIn * shellArguments.rateMultiplier;
        double bandwidthIn = ((resourceQuota.getBandwidthIn() + resourceQuota.getBandwidthOut()) * shellArguments.rateMultiplier) / 2.0d;
        resourceQuota.setMsgRateIn(d);
        resourceQuota.setMsgRateOut(d);
        resourceQuota.setBandwidthIn(bandwidthIn);
        resourceQuota.setBandwidthOut(bandwidthIn);
        resourceQuota.setMemory(resourceQuota.getMemory() * shellArguments.rateMultiplier);
        namespaceBundleStats.msgRateIn = resourceQuota.getMsgRateIn();
        namespaceBundleStats.msgRateOut = resourceQuota.getMsgRateOut();
        namespaceBundleStats.msgThroughputIn = resourceQuota.getBandwidthIn();
        namespaceBundleStats.msgThroughputOut = resourceQuota.getBandwidthOut();
        BundleData bundleData = new BundleData(10, 1000, namespaceBundleStats);
        bundleData.getLongTermData().setNumSamples(1000);
        bundleData.getShortTermData().setNumSamples(10);
        return bundleData;
    }

    private String makeTopic(String str, String str2, String str3) {
        return String.format("persistent://%s/%s/%s/%s", str, this.cluster, str2, str3);
    }

    private void writeProducerOptions(DataOutputStream dataOutputStream, ShellArguments shellArguments, String str) throws Exception {
        if (!shellArguments.rangeString.isEmpty()) {
            String[] split = shellArguments.rangeString.split(",");
            if (split.length != 2) {
                log.error("Argument to --rand-rate should be two comma-separated values");
                return;
            }
            double parseDouble = Double.parseDouble(split[0]);
            double parseDouble2 = Double.parseDouble(split[1]);
            double min = Math.min(parseDouble, parseDouble2);
            shellArguments.rate = (this.random.nextDouble() * (Math.max(parseDouble, parseDouble2) - min)) + min;
        }
        dataOutputStream.writeUTF(str);
        dataOutputStream.writeInt(shellArguments.size);
        dataOutputStream.writeDouble(shellArguments.rate);
    }

    private void change(ShellArguments shellArguments, String str, int i) throws Exception {
        this.outputStreams[i].write(0);
        writeProducerOptions(this.outputStreams[i], shellArguments, str);
        this.outputStreams[i].flush();
    }

    private int changeOrCreate(ShellArguments shellArguments, String str) throws Exception {
        int find = find(str);
        if (find == -1) {
            trade(shellArguments, str, this.random.nextInt(this.clients.length));
        } else {
            change(shellArguments, str, find);
        }
        return find;
    }

    private int changeIfExists(ShellArguments shellArguments, String str) throws Exception {
        int find = find(str);
        if (find != -1) {
            change(shellArguments, str, find);
        }
        return find;
    }

    private int find(String str) throws Exception {
        int i = -1;
        for (int i2 = 0; i2 < this.clients.length; i2++) {
            this.outputStreams[i2].write(5);
            this.outputStreams[i2].writeUTF(str);
        }
        for (int i3 = 0; i3 < this.clients.length; i3++) {
            if (this.inputStreams[i3].readBoolean()) {
                i = i3;
            }
        }
        return i;
    }

    private synchronized void trade(ShellArguments shellArguments, String str, int i) throws Exception {
        this.outputStreams[i].write(2);
        writeProducerOptions(this.outputStreams[i], shellArguments, str);
        this.outputStreams[i].flush();
    }

    private void handleChange(ShellArguments shellArguments) throws Exception {
        List<String> list = shellArguments.commandArguments;
        if (checkAppArgs(list.size() - 1, 3)) {
            String makeTopic = makeTopic(list.get(1), list.get(2), list.get(3));
            if (changeIfExists(shellArguments, makeTopic) == -1) {
                log.info("Topic {} not found", makeTopic);
            }
        }
    }

    private void handleCopy(ShellArguments shellArguments) throws Exception {
        List<String> list = shellArguments.commandArguments;
        if (checkAppArgs(list.size() - 1, 3)) {
            String str = list.get(1);
            String str2 = list.get(2);
            String str3 = list.get(3);
            ZooKeeper zooKeeper = new ZooKeeper(str2, 5000, (Watcher) null);
            ZooKeeper zooKeeper2 = new ZooKeeper(str3, 5000, (Watcher) null);
            Map<String, ResourceQuota>[] mapArr = new Map[this.clients.length];
            for (int i = 0; i < this.clients.length; i++) {
                mapArr[i] = new HashMap();
            }
            getResourceQuotas("/loadbalance/resource-quota", zooKeeper, mapArr);
            ArrayList arrayList = new ArrayList(this.clients.length);
            int i2 = 0;
            log.info("Copying...");
            for (Map<String, ResourceQuota> map : mapArr) {
                int i3 = i2;
                arrayList.add(threadPool.submit(() -> {
                    for (Map.Entry entry : map.entrySet()) {
                        String str4 = (String) entry.getKey();
                        ResourceQuota resourceQuota = (ResourceQuota) entry.getValue();
                        int length = "/loadbalance/resource-quota".length() + 1;
                        int indexOf = str4.indexOf(47, length) + 1;
                        String substring = str4.substring(length, indexOf - 1);
                        int indexOf2 = str4.indexOf(47, indexOf) + 1;
                        String substring2 = str4.substring(indexOf, indexOf2 - 1);
                        String format = String.format("%s-%s", String.format("%s-%s-%s", substring2, substring, str4.substring(str4.lastIndexOf(47) + 1)), str4.substring(indexOf2, str4.lastIndexOf(47)));
                        BundleData initializeBundleData = initializeBundleData(resourceQuota, shellArguments);
                        String format2 = String.format("%s/namespace/%s/%s/%s/0x00000000_0xffffffff", "/loadbalance/bundle-data", str, this.cluster, format);
                        String format3 = String.format("%s/%s/%s/%s/0x00000000_0xffffffff", "/loadbalance/bundle-data", str, this.cluster, format);
                        try {
                            ZkUtils.createFullPathOptimistic(zooKeeper2, format2, ObjectMapperFactory.getMapper().writer().writeValueAsBytes(resourceQuota), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                        } catch (KeeperException.NodeExistsException e) {
                        } catch (Exception e2) {
                            throw new RuntimeException(e2);
                        }
                        try {
                            ZkUtils.createFullPathOptimistic(zooKeeper2, format3, ObjectMapperFactory.getMapper().writer().writeValueAsBytes(initializeBundleData), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                        } catch (KeeperException.NodeExistsException e3) {
                        } catch (Exception e4) {
                            throw new RuntimeException(e4);
                        }
                        try {
                            trade(shellArguments, makeTopic(str, format, "t"), i3);
                        } catch (Exception e5) {
                            throw new RuntimeException(e5);
                        }
                    }
                }));
                i2++;
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((Future) it.next()).get();
            }
            zooKeeper.close();
            zooKeeper2.close();
        }
    }

    private void handleSimulate(ShellArguments shellArguments) throws Exception {
        List<String> list = shellArguments.commandArguments;
        checkAppArgs(list.size() - 1, 1);
        ZooKeeper zooKeeper = new ZooKeeper(list.get(1), 5000, (Watcher) null);
        Map<String, ResourceQuota>[] mapArr = new Map[this.clients.length];
        for (int i = 0; i < this.clients.length; i++) {
            mapArr[i] = new HashMap();
        }
        getResourceQuotas("/loadbalance/resource-quota", zooKeeper, mapArr);
        ArrayList arrayList = new ArrayList(this.clients.length);
        int i2 = 0;
        log.info("Simulating...");
        for (Map<String, ResourceQuota> map : mapArr) {
            int i3 = i2;
            arrayList.add(threadPool.submit(() -> {
                for (Map.Entry entry : map.entrySet()) {
                    String str = (String) entry.getKey();
                    String replace = str.replace("/loadbalance/resource-quota", "/loadbalance/bundle-data");
                    ResourceQuota resourceQuota = (ResourceQuota) entry.getValue();
                    String format = String.format("persistent://%s/t", str.substring("/loadbalance/resource-quota".length() + 1));
                    BundleData initializeBundleData = initializeBundleData(resourceQuota, shellArguments);
                    try {
                        ZkUtils.createFullPathOptimistic(zooKeeper, replace, ObjectMapperFactory.getMapper().writer().writeValueAsBytes(initializeBundleData), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    } catch (KeeperException.NodeExistsException e) {
                        try {
                            zooKeeper.setData(replace, ObjectMapperFactory.getMapper().writer().writeValueAsBytes(initializeBundleData), -1);
                        } catch (Exception e2) {
                            throw new RuntimeException(e2);
                        }
                    } catch (Exception e3) {
                        throw new RuntimeException(e3);
                    }
                    try {
                        trade(shellArguments, format, i3);
                    } catch (Exception e4) {
                        throw new RuntimeException(e4);
                    }
                }
            }));
            i2++;
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        zooKeeper.close();
    }

    private void handleStop(ShellArguments shellArguments) throws Exception {
        List<String> list = shellArguments.commandArguments;
        if (checkAppArgs(list.size() - 1, 3)) {
            String makeTopic = makeTopic(list.get(1), list.get(2), list.get(3));
            for (DataOutputStream dataOutputStream : this.outputStreams) {
                dataOutputStream.write(1);
                dataOutputStream.writeUTF(makeTopic);
                dataOutputStream.flush();
            }
        }
    }

    private void handleStream(ShellArguments shellArguments) throws Exception {
        List<String> list = shellArguments.commandArguments;
        if (checkAppArgs(list.size() - 1, 1)) {
            new BrokerWatcher(new ZooKeeper(list.get(1), 5000, (Watcher) null), shellArguments);
            Thread.currentThread().join();
        }
    }

    private void handleTrade(ShellArguments shellArguments) throws Exception {
        List<String> list = shellArguments.commandArguments;
        if (checkAppArgs(list.size() - 1, 3)) {
            trade(shellArguments, makeTopic(list.get(1), list.get(2), list.get(3)), this.random.nextInt(this.clients.length));
        }
    }

    private void handleGroupChange(ShellArguments shellArguments) throws Exception {
        List<String> list = shellArguments.commandArguments;
        if (checkAppArgs(list.size() - 1, 2)) {
            String str = list.get(1);
            String str2 = list.get(2);
            for (DataOutputStream dataOutputStream : this.outputStreams) {
                dataOutputStream.write(3);
                dataOutputStream.writeUTF(str);
                dataOutputStream.writeUTF(str2);
                dataOutputStream.writeInt(shellArguments.size);
                dataOutputStream.writeDouble(shellArguments.rate);
                dataOutputStream.flush();
            }
        }
    }

    private void handleGroupStop(ShellArguments shellArguments) throws Exception {
        List<String> list = shellArguments.commandArguments;
        if (checkAppArgs(list.size() - 1, 2)) {
            String str = list.get(1);
            String str2 = list.get(2);
            for (DataOutputStream dataOutputStream : this.outputStreams) {
                dataOutputStream.write(4);
                dataOutputStream.writeUTF(str);
                dataOutputStream.writeUTF(str2);
                dataOutputStream.flush();
            }
        }
    }

    private void handleGroupTrade(ShellArguments shellArguments) throws Exception {
        List<String> list = shellArguments.commandArguments;
        if (checkAppArgs(list.size() - 1, 3)) {
            String str = list.get(1);
            String str2 = list.get(2);
            int parseInt = Integer.parseInt(list.get(3));
            for (int i = 0; i < parseInt; i++) {
                for (int i2 = 0; i2 < shellArguments.topicsPerNamespace; i2++) {
                    trade(shellArguments, makeTopic(str, String.format("%s-%d", str2, Integer.valueOf(i)), Integer.toString(i2)), this.random.nextInt(this.clients.length));
                    Thread.sleep(shellArguments.separation);
                }
            }
        }
    }

    private void read(String[] strArr) {
        if (strArr.length > 0) {
            if (strArr.length == 1 && strArr[0].isEmpty()) {
                return;
            }
            ShellArguments shellArguments = new ShellArguments();
            CommandLine commandLine = new CommandLine(shellArguments);
            try {
                commandLine.parseArgs(strArr);
                String str = shellArguments.commandArguments.get(0);
                boolean z = -1;
                switch (str.hashCode()) {
                    case -1995722780:
                        if (str.equals("trade_group")) {
                            z = 3;
                            break;
                        }
                        break;
                    case -1361636432:
                        if (str.equals("change")) {
                            z = true;
                            break;
                        }
                        break;
                    case -1017522174:
                        if (str.equals("stop_group")) {
                            z = 5;
                            break;
                        }
                        break;
                    case -907685685:
                        if (str.equals("script")) {
                            z = 6;
                            break;
                        }
                        break;
                    case -891990144:
                        if (str.equals("stream")) {
                            z = 8;
                            break;
                        }
                        break;
                    case 3059573:
                        if (str.equals("copy")) {
                            z = 7;
                            break;
                        }
                        break;
                    case 3127582:
                        if (str.equals("exit")) {
                            z = 11;
                            break;
                        }
                        break;
                    case 3482191:
                        if (str.equals("quit")) {
                            z = 10;
                            break;
                        }
                        break;
                    case 3540994:
                        if (str.equals("stop")) {
                            z = 2;
                            break;
                        }
                        break;
                    case 110621028:
                        if (str.equals("trade")) {
                            z = false;
                            break;
                        }
                        break;
                    case 239266096:
                        if (str.equals("change_group")) {
                            z = 4;
                            break;
                        }
                        break;
                    case 490275364:
                        if (str.equals("simulate")) {
                            z = 9;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case LoadSimulationClient.CHANGE_COMMAND /* 0 */:
                        handleTrade(shellArguments);
                        break;
                    case LoadSimulationClient.STOP_COMMAND /* 1 */:
                        handleChange(shellArguments);
                        break;
                    case LoadSimulationClient.TRADE_COMMAND /* 2 */:
                        handleStop(shellArguments);
                        break;
                    case LoadSimulationClient.CHANGE_GROUP_COMMAND /* 3 */:
                        handleGroupTrade(shellArguments);
                        break;
                    case LoadSimulationClient.STOP_GROUP_COMMAND /* 4 */:
                        handleGroupChange(shellArguments);
                        break;
                    case LoadSimulationClient.FIND_COMMAND /* 5 */:
                        handleGroupStop(shellArguments);
                        break;
                    case true:
                        List<String> list = shellArguments.commandArguments;
                        checkAppArgs(list.size() - 1, 1);
                        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(Paths.get(list.get(1), new String[0]).toFile())));
                        for (String readLine = bufferedReader.readLine(); readLine != null; readLine = bufferedReader.readLine()) {
                            read(readLine.split("\\s+"));
                        }
                        bufferedReader.close();
                        break;
                    case true:
                        handleCopy(shellArguments);
                        break;
                    case true:
                        handleStream(shellArguments);
                        break;
                    case true:
                        handleSimulate(shellArguments);
                        break;
                    case true:
                    case true:
                        PerfClientUtils.exit(0);
                        break;
                    default:
                        log.info("ERROR: Unknown command \"{}\"", str);
                        break;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } catch (CmdBase.ParameterException e2) {
                System.out.println(e2.getMessage());
                commandLine.usage(commandLine.getOut());
            }
        }
    }

    public void start() throws Exception {
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
        while (!Thread.currentThread().isInterrupted()) {
            System.out.println();
            System.out.print("> ");
            read(bufferedReader.readLine().split("\\s+"));
        }
    }

    @Override // org.apache.pulsar.testclient.CmdBase
    public void run() throws Exception {
        this.random = new Random();
        this.clients = this.clientHostNames.split(",");
        Socket[] socketArr = new Socket[this.clients.length];
        this.inputStreams = new DataInputStream[this.clients.length];
        this.outputStreams = new DataOutputStream[this.clients.length];
        log.info("Found {} clients:", Integer.valueOf(this.clients.length));
        for (int i = 0; i < this.clients.length; i++) {
            socketArr[i] = new Socket(this.clients[i], this.clientPort);
            this.inputStreams[i] = new DataInputStream(socketArr[i].getInputStream());
            this.outputStreams[i] = new DataOutputStream(socketArr[i].getOutputStream());
            log.info("Connected to {}", this.clients[i]);
        }
        start();
    }
}
