package io.questdb.cutlass.line.tcp;

import io.questdb.cairo.AbstractCairoTest;
import io.questdb.cairo.TableReader;
import io.questdb.cairo.TableWriter;
import io.questdb.cairo.pool.PoolListener;
import io.questdb.cairo.pool.ex.EntryLockedException;
import io.questdb.cairo.security.AllowAllCairoSecurityContext;
import io.questdb.cutlass.line.LineProtoSender;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.mp.SOCountDownLatch;
import io.questdb.mp.WorkerPool;
import io.questdb.mp.WorkerPoolConfiguration;
import io.questdb.network.DefaultIODispatcherConfiguration;
import io.questdb.network.IODispatcherConfiguration;
import io.questdb.network.Net;
import io.questdb.network.NetworkError;
import io.questdb.std.CharSequenceHashSet;
import io.questdb.std.Chars;
import io.questdb.std.Misc;
import io.questdb.std.Os;
import io.questdb.std.Rnd;
import io.questdb.std.Unsafe;
import io.questdb.std.datetime.microtime.TimestampFormatUtils;
import io.questdb.std.str.Path;
import io.questdb.std.str.StringSink;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.PrivateKey;
import java.util.concurrent.locks.LockSupport;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:io/questdb/cutlass/line/tcp/LineTcpServerTest.class */
public class LineTcpServerTest extends AbstractCairoTest {
    private static final String AUTH_KEY_ID1 = "testUser1";
    private static final String AUTH_KEY_ID2 = "testUser2";
    private static final long TEST_TIMEOUT_IN_MS = 120000;
    private int maxMeasurementSize = 50;
    private final WorkerPool sharedWorkerPool = new WorkerPool(new WorkerPoolConfiguration() { // from class: io.questdb.cutlass.line.tcp.LineTcpServerTest.1
        private final int[] affinity = {-1, -1};

        public int[] getWorkerAffinity() {
            return this.affinity;
        }

        public int getWorkerCount() {
            return 2;
        }

        public boolean haltOnError() {
            return true;
        }
    });
    private final int bindPort = 9002;
    private final IODispatcherConfiguration ioDispatcherConfiguration = new DefaultIODispatcherConfiguration() { // from class: io.questdb.cutlass.line.tcp.LineTcpServerTest.2
        public int getBindIPv4Address() {
            return 0;
        }

        public int getBindPort() {
            return 9002;
        }
    };
    private String authKeyId = null;
    private int msgBufferSize = 1024;
    private long minIdleMsBeforeWriterRelease = 30000;
    private final LineTcpReceiverConfiguration lineConfiguration = new DefaultLineTcpReceiverConfiguration() { // from class: io.questdb.cutlass.line.tcp.LineTcpServerTest.3
        static final /* synthetic */ boolean $assertionsDisabled;

        public IODispatcherConfiguration getNetDispatcherConfiguration() {
            return LineTcpServerTest.this.ioDispatcherConfiguration;
        }

        public int getNetMsgBufferSize() {
            return LineTcpServerTest.this.msgBufferSize;
        }

        public int getMaxMeasurementSize() {
            return LineTcpServerTest.this.maxMeasurementSize;
        }

        public int getWriterQueueCapacity() {
            return 4;
        }

        public int getNUpdatesPerLoadRebalance() {
            return 100;
        }

        public double getMaxLoadRatio() {
            return 1.0d;
        }

        public long getMaintenanceInterval() {
            return 25L;
        }

        public String getAuthDbPath() {
            if (null == LineTcpServerTest.this.authKeyId) {
                return null;
            }
            URL resource = getClass().getResource("authDb.txt");
            if ($assertionsDisabled || resource != null) {
                return resource.getFile();
            }
            throw new AssertionError();
        }

        public long getWriterIdleTimeout() {
            return LineTcpServerTest.this.minIdleMsBeforeWriterRelease;
        }

        static {
            $assertionsDisabled = !LineTcpServerTest.class.desiredAssertionStatus();
        }
    };
    private Path path;
    private static final Log LOG = LogFactory.getLog(LineTcpServerTest.class);
    private static final PrivateKey AUTH_PRIVATE_KEY1 = AuthDb.importPrivateKey("5UjEMuA0Pj5pjK8a-fa24dyIf-Es5mYny3oE_Wmus48");
    private static final PrivateKey AUTH_PRIVATE_KEY2 = AuthDb.importPrivateKey("lwJi3TSb4G6UcHxFJmPhOTWa4BLwJOOiK76wT6Uk7pI");

    @Test
    public void testGoodAuthenticated() throws Exception {
        test(AUTH_KEY_ID1, AUTH_PRIVATE_KEY1, 768, 1000);
    }

    @Test(expected = NetworkError.class)
    public void testInvalidSignature() throws Exception {
        test(AUTH_KEY_ID1, AUTH_PRIVATE_KEY2, 768, 100);
    }

    @Test(expected = NetworkError.class)
    public void testInvalidUser() throws Exception {
        test(AUTH_KEY_ID2, AUTH_PRIVATE_KEY2, 768, 100);
    }

    @Test
    public void testUnauthenticated() throws Exception {
        test(null, null, 200, 1000);
    }

    @Test
    public void testWriterRelease1() throws Exception {
        runInContext(() -> {
            send("weather,location=us-midwest temperature=82 1465839830100400200\nweather,location=us-midwest temperature=83 1465839830100500200\nweather,location=us-eastcoast temperature=81 1465839830101400200\n", "weather");
            send("weather,location=us-midwest temperature=85 1465839830102300200\nweather,location=us-eastcoast temperature=89 1465839830102400200\nweather,location=us-westcost temperature=82 1465839830102500200\n", "weather");
            assertTable("location\ttemperature\ttimestamp\nus-midwest\t82.0\t2016-06-13T17:43:50.100400Z\nus-midwest\t83.0\t2016-06-13T17:43:50.100500Z\nus-eastcoast\t81.0\t2016-06-13T17:43:50.101400Z\nus-midwest\t85.0\t2016-06-13T17:43:50.102300Z\nus-eastcoast\t89.0\t2016-06-13T17:43:50.102400Z\nus-westcost\t82.0\t2016-06-13T17:43:50.102500Z\n", "weather");
        });
    }

    @Test
    public void testWriterRelease2() throws Exception {
        runInContext(() -> {
            send("weather,location=us-midwest temperature=82 1465839830100400200\nweather,location=us-midwest temperature=83 1465839830100500200\nweather,location=us-eastcoast temperature=81 1465839830101400200\n", "weather");
            TableWriter writer = engine.getWriter(AllowAllCairoSecurityContext.INSTANCE, "weather");
            Throwable th = null;
            try {
                try {
                    writer.truncate();
                    if (writer != null) {
                        if (0 != 0) {
                            try {
                                writer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            writer.close();
                        }
                    }
                    send("weather,location=us-midwest temperature=85 1465839830102300200\nweather,location=us-eastcoast temperature=89 1465839830102400200\nweather,location=us-westcost temperature=82 1465839830102500200\n", "weather");
                    assertTable("location\ttemperature\ttimestamp\nus-midwest\t85.0\t2016-06-13T17:43:50.102300Z\nus-eastcoast\t89.0\t2016-06-13T17:43:50.102400Z\nus-westcost\t82.0\t2016-06-13T17:43:50.102500Z\n", "weather");
                } finally {
                }
            } catch (Throwable th3) {
                if (writer != null) {
                    if (th != null) {
                        try {
                            writer.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        writer.close();
                    }
                }
                throw th3;
            }
        });
    }

    @Test
    public void testWriterRelease3() throws Exception {
        runInContext(() -> {
            send("weather,location=us-midwest temperature=82 1465839830100400200\nweather,location=us-midwest temperature=83 1465839830100500200\nweather,location=us-eastcoast temperature=81 1465839830101400200\n", "weather");
            engine.remove(AllowAllCairoSecurityContext.INSTANCE, this.path, "weather");
            send("weather,location=us-midwest temperature=85 1465839830102300200\nweather,location=us-eastcoast temperature=89 1465839830102400200\nweather,location=us-westcost temperature=82 1465839830102500200\n", "weather");
            assertTable("location\ttemperature\ttimestamp\nus-midwest\t85.0\t2016-06-13T17:43:50.102300Z\nus-eastcoast\t89.0\t2016-06-13T17:43:50.102400Z\nus-westcost\t82.0\t2016-06-13T17:43:50.102500Z\n", "weather");
        });
    }

    @Test
    public void testWriterRelease4() throws Exception {
        runInContext(() -> {
            send("weather,location=us-midwest temperature=82 1465839830100400200\nweather,location=us-midwest temperature=83 1465839830100500200\nweather,location=us-eastcoast temperature=81 1465839830101400200\n", "weather");
            engine.remove(AllowAllCairoSecurityContext.INSTANCE, this.path, "weather");
            send("weather,loc=us-midwest temp=85 1465839830102300200\nweather,loc=us-eastcoast temp=89 1465839830102400200\nweather,loc=us-westcost temp=82 1465839830102500200\n", "weather");
            assertTable("loc\ttemp\ttimestamp\nus-midwest\t85.0\t2016-06-13T17:43:50.102300Z\nus-eastcoast\t89.0\t2016-06-13T17:43:50.102400Z\nus-westcost\t82.0\t2016-06-13T17:43:50.102500Z\n", "weather");
        });
    }

    @Test
    public void testWriterRelease5() throws Exception {
        runInContext(() -> {
            send("weather,location=us-midwest temperature=82 1465839830100400200\nweather,location=us-midwest temperature=83 1465839830100500200\nweather,location=us-eastcoast temperature=81 1465839830101400200\n", "weather");
            engine.remove(AllowAllCairoSecurityContext.INSTANCE, this.path, "weather");
            send("weather,location=us-midwest,source=sensor1 temp=85 1465839830102300200\nweather,location=us-eastcoast,source=sensor2 temp=89 1465839830102400200\nweather,location=us-westcost,source=sensor1 temp=82 1465839830102500200\n", "weather");
            assertTable("location\tsource\ttemp\ttimestamp\nus-midwest\tsensor1\t85.0\t2016-06-13T17:43:50.102300Z\nus-eastcoast\tsensor2\t89.0\t2016-06-13T17:43:50.102400Z\nus-westcost\tsensor1\t82.0\t2016-06-13T17:43:50.102500Z\n", "weather");
        });
    }

    @Test
    public void testWriter17Fields() throws Exception {
        int i = this.maxMeasurementSize;
        String str = "tableCRASH,tag_n_1=1,tag_n_2=2,tag_n_3=3,tag_n_4=4,tag_n_5=5,tag_n_6=6,tag_n_7=7,tag_n_8=8,tag_n_9=9,tag_n_10=10,tag_n_11=11,tag_n_12=12,tag_n_13=13,tag_n_14=14,tag_n_15=15,tag_n_16=16,tag_n_17=17 value=42.4 1619509249714000000\n";
        try {
            this.maxMeasurementSize = "tableCRASH,tag_n_1=1,tag_n_2=2,tag_n_3=3,tag_n_4=4,tag_n_5=5,tag_n_6=6,tag_n_7=7,tag_n_8=8,tag_n_9=9,tag_n_10=10,tag_n_11=11,tag_n_12=12,tag_n_13=13,tag_n_14=14,tag_n_15=15,tag_n_16=16,tag_n_17=17 value=42.4 1619509249714000000\n".length();
            runInContext(() -> {
                send(str, "tableCRASH");
                assertTable("tag_n_1\ttag_n_2\ttag_n_3\ttag_n_4\ttag_n_5\ttag_n_6\ttag_n_7\ttag_n_8\ttag_n_9\ttag_n_10\ttag_n_11\ttag_n_12\ttag_n_13\ttag_n_14\ttag_n_15\ttag_n_16\ttag_n_17\tvalue\ttimestamp\n1\t2\t3\t4\t5\t6\t7\t8\t9\t10\t11\t12\t13\t14\t15\t16\t17\t42.400000000000006\t2021-04-27T07:40:49.714000Z\n", "tableCRASH");
            });
        } finally {
            this.maxMeasurementSize = i;
        }
    }

    private void assertTable(CharSequence charSequence, CharSequence charSequence2) {
        TableReader reader = engine.getReader(AllowAllCairoSecurityContext.INSTANCE, charSequence2);
        Throwable th = null;
        try {
            try {
                assertCursorTwoPass(charSequence, reader.getCursor(), reader.getMetadata());
                if (reader != null) {
                    if (0 == 0) {
                        reader.close();
                        return;
                    }
                    try {
                        reader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (reader != null) {
                if (th != null) {
                    try {
                        reader.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    reader.close();
                }
            }
            throw th4;
        }
    }

    private void runInContext(Runnable runnable) throws Exception {
        this.minIdleMsBeforeWriterRelease = 250L;
        assertMemoryLeak(() -> {
            this.path = new Path(4096);
            try {
                LineTcpServer create = LineTcpServer.create(this.lineConfiguration, this.sharedWorkerPool, LOG, engine);
                this.sharedWorkerPool.start(LOG);
                runnable.run();
                this.sharedWorkerPool.halt();
                Misc.free(create);
            } finally {
                Misc.free(this.path);
            }
        });
    }

    private void send(String str, String str2) {
        SOCountDownLatch sOCountDownLatch = new SOCountDownLatch(1);
        engine.setPoolListener((b, j, charSequence, s, s2, s3) -> {
            if (b == 1 && s == 1 && Chars.equals(str2, charSequence)) {
                sOCountDownLatch.countDown();
            }
        });
        try {
            int parseIPv4 = Net.parseIPv4("127.0.0.1");
            long sockaddr = Net.sockaddr(parseIPv4, 9002);
            long socketTcp = Net.socketTcp(true);
            if (Net.connect(socketTcp, sockaddr) != 0) {
                throw NetworkError.instance(Os.errno(), "could not connect to ").ip(parseIPv4);
            }
            byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
            long malloc = Unsafe.malloc(bytes.length);
            for (int i = 0; i < bytes.length; i++) {
                Unsafe.getUnsafe().putByte(malloc + i, bytes[i]);
            }
            int send = Net.send(socketTcp, malloc, bytes.length);
            Unsafe.free(malloc, bytes.length);
            Net.close(socketTcp);
            Net.freeSockAddr(sockaddr);
            Assert.assertEquals(bytes.length, send);
            sOCountDownLatch.await();
            engine.setPoolListener((PoolListener) null);
        } catch (Throwable th) {
            engine.setPoolListener((PoolListener) null);
            throw th;
        }
    }

    private void test(String str, PrivateKey privateKey, int i, int i2) throws Exception {
        this.authKeyId = str;
        this.msgBufferSize = i;
        assertMemoryLeak(() -> {
            int i3;
            String[] strArr = {"london", "paris", "rome"};
            CharSequenceHashSet charSequenceHashSet = new CharSequenceHashSet();
            charSequenceHashSet.add("weather1");
            charSequenceHashSet.add("weather2");
            charSequenceHashSet.add("weather3");
            SOCountDownLatch sOCountDownLatch = new SOCountDownLatch();
            sOCountDownLatch.setCount(charSequenceHashSet.size());
            Rnd rnd = new Rnd();
            StringBuilder[] sbArr = new StringBuilder[charSequenceHashSet.size()];
            engine.setPoolListener((b, j, charSequence, s, s2, s3) -> {
                if (b == 1 && s == 1 && charSequenceHashSet.contains(charSequence)) {
                    sOCountDownLatch.countDown();
                }
            });
            this.minIdleMsBeforeWriterRelease = 100L;
            try {
                try {
                    LineTcpServer create = LineTcpServer.create(this.lineConfiguration, this.sharedWorkerPool, LOG, engine);
                    Throwable th = null;
                    long currentTimeMillis = System.currentTimeMillis();
                    this.sharedWorkerPool.assignCleaner(Path.CLEANER);
                    this.sharedWorkerPool.start(LOG);
                    try {
                        LineProtoSender[] lineProtoSenderArr = new LineProtoSender[charSequenceHashSet.size()];
                        for (int i4 = 0; i4 < lineProtoSenderArr.length; i4++) {
                            if (null != str) {
                                AuthenticatedLineTCPProtoSender authenticatedLineTCPProtoSender = new AuthenticatedLineTCPProtoSender(str, privateKey, Net.parseIPv4("127.0.0.1"), 9002, 4096);
                                authenticatedLineTCPProtoSender.authenticate();
                                lineProtoSenderArr[i4] = authenticatedLineTCPProtoSender;
                            } else {
                                lineProtoSenderArr[i4] = new LineTCPProtoSender(Net.parseIPv4("127.0.0.1"), 9002, 4096);
                            }
                            StringBuilder sb = new StringBuilder((i2 + 1) * this.lineConfiguration.getMaxMeasurementSize());
                            sb.append("location\ttemp\ttimestamp\n");
                            sbArr[i4] = sb;
                        }
                        long currentTimeMicros = Os.currentTimeMicros();
                        StringSink stringSink = new StringSink();
                        int i5 = 0;
                        while (i5 < i2) {
                            int nextInt = i5 < charSequenceHashSet.size() ? i5 : rnd.nextInt(charSequenceHashSet.size());
                            LineProtoSender lineProtoSender = lineProtoSenderArr[nextInt];
                            StringBuilder sb2 = sbArr[nextInt];
                            lineProtoSender.metric(charSequenceHashSet.get(nextInt));
                            String str2 = strArr[rnd.nextInt(strArr.length)];
                            sb2.append(str2);
                            sb2.append('\t');
                            lineProtoSender.tag("location", str2);
                            int nextInt2 = rnd.nextInt(100);
                            sb2.append(nextInt2);
                            sb2.append('\t');
                            lineProtoSender.field("temp", nextInt2);
                            stringSink.clear();
                            TimestampFormatUtils.appendDateTimeUSec(stringSink, currentTimeMicros);
                            sb2.append((CharSequence) stringSink);
                            sb2.append('\n');
                            lineProtoSender.$(currentTimeMicros * 1000);
                            lineProtoSender.flush();
                            currentTimeMicros += rnd.nextInt(1000);
                            i5++;
                        }
                        for (LineProtoSender lineProtoSender2 : lineProtoSenderArr) {
                            lineProtoSender2.close();
                        }
                        Assert.assertTrue(sOCountDownLatch.await(120000000000L));
                        loop3: while (true) {
                            int i6 = 0;
                            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                            if (currentTimeMillis2 > TEST_TIMEOUT_IN_MS) {
                                LOG.error().$("after ").$(currentTimeMillis2).$("ms tables only had ").$(0).$(" rows out of ").$(i2).$();
                                break;
                            }
                            Thread.yield();
                            i3 = 0;
                            while (i3 < charSequenceHashSet.size()) {
                                CharSequence charSequence2 = charSequenceHashSet.get(i3);
                                while (true) {
                                    try {
                                        TableReader reader = engine.getReader(AllowAllCairoSecurityContext.INSTANCE, charSequence2);
                                        Throwable th2 = null;
                                        try {
                                            try {
                                                while (reader.getCursor().hasNext()) {
                                                    i6++;
                                                }
                                                break;
                                            } catch (Throwable th3) {
                                                th2 = th3;
                                                throw th3;
                                                break loop3;
                                            }
                                        } catch (Throwable th4) {
                                            if (reader != null) {
                                                if (th2 != null) {
                                                    try {
                                                        reader.close();
                                                    } catch (Throwable th5) {
                                                        th2.addSuppressed(th5);
                                                    }
                                                } else {
                                                    reader.close();
                                                }
                                            }
                                            throw th4;
                                            break loop3;
                                        }
                                    } catch (EntryLockedException e) {
                                        LOG.info().$("retrying read for ").$(charSequence2).$();
                                        LockSupport.parkNanos(1L);
                                    }
                                }
                            }
                            if (i6 >= i2) {
                                break;
                            }
                        }
                        engine.setPoolListener((PoolListener) null);
                        for (int i7 = 0; i7 < charSequenceHashSet.size(); i7++) {
                            CharSequence charSequence3 = charSequenceHashSet.get(i7);
                            LOG.info().$("checking table ").$(charSequence3).$();
                            assertTable(sbArr[i7], charSequence3);
                        }
                        return;
                        i3++;
                    } catch (Throwable th6) {
                        this.sharedWorkerPool.halt();
                        throw th6;
                    }
                } finally {
                }
            } catch (Throwable th7) {
                engine.setPoolListener((PoolListener) null);
                throw th7;
            }
        });
    }
}
