package org.apache.kafka.tools;

import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/tools/LineMessageReaderTest.class */
public class LineMessageReaderTest {
    @Test
    public void testLineReader() {
        Properties defaultTestProps = defaultTestProps();
        defaultTestProps.put("parse.headers", "false");
        runTest(defaultTestProps, "key0\tvalue0\nkey1\tvalue1", record("key0", "value0"), record("key1", "value1"));
    }

    @Test
    public void testLineReaderHeader() {
        runTest(defaultTestProps(), "headerKey0:headerValue0,headerKey1:headerValue1\tkey0\tvalue0\n", record("key0", "value0", Arrays.asList(new RecordHeader("headerKey0", "headerValue0".getBytes(StandardCharsets.UTF_8)), new RecordHeader("headerKey1", "headerValue1".getBytes(StandardCharsets.UTF_8)))));
    }

    @Test
    public void testMinimalValidInputWithHeaderKeyAndValue() {
        runTest(defaultTestProps(), ":\t\t", record("", "", Collections.singletonList(new RecordHeader("", "".getBytes(StandardCharsets.UTF_8)))));
    }

    @Test
    public void testKeyMissingValue() {
        Properties defaultTestProps = defaultTestProps();
        defaultTestProps.put("parse.headers", "false");
        runTest(defaultTestProps, "key\t", record("key", ""));
    }

    @Test
    public void testDemarcationsLongerThanOne() {
        Properties defaultTestProps = defaultTestProps();
        defaultTestProps.put("key.separator", "\t\t");
        defaultTestProps.put("headers.delimiter", "\t\t");
        defaultTestProps.put("headers.separator", "---");
        defaultTestProps.put("headers.key.separator", "::::");
        runTest(defaultTestProps, "headerKey0.0::::headerValue0.0---headerKey1.0::::\t\tkey\t\tvalue", record("key", "value", Arrays.asList(new RecordHeader("headerKey0.0", "headerValue0.0".getBytes(StandardCharsets.UTF_8)), new RecordHeader("headerKey1.0", "".getBytes(StandardCharsets.UTF_8)))));
    }

    @Test
    public void testLineReaderHeaderNoKey() {
        Properties defaultTestProps = defaultTestProps();
        defaultTestProps.put("parse.key", "false");
        runTest(defaultTestProps, "headerKey:headerValue\tvalue\n", record(null, "value", Collections.singletonList(new RecordHeader("headerKey", "headerValue".getBytes(StandardCharsets.UTF_8)))));
    }

    @Test
    public void testLineReaderOnlyValue() {
        Properties defaultTestProps = defaultTestProps();
        defaultTestProps.put("parse.key", "false");
        defaultTestProps.put("parse.headers", "false");
        runTest(defaultTestProps, "value\n", record(null, "value"));
    }

    @Test
    public void testParseHeaderEnabledWithCustomDelimiterAndVaryingNumberOfKeyValueHeaderPairs() {
        Properties defaultTestProps = defaultTestProps();
        defaultTestProps.put("key.separator", "#");
        defaultTestProps.put("headers.delimiter", "!");
        defaultTestProps.put("headers.separator", "&");
        defaultTestProps.put("headers.key.separator", ":");
        runTest(defaultTestProps, "headerKey0.0:headerValue0.0&headerKey0.1:headerValue0.1!key0#value0\nheaderKey1.0:headerValue1.0!key1#value1", record("key0", "value0", Arrays.asList(new RecordHeader("headerKey0.0", "headerValue0.0".getBytes(StandardCharsets.UTF_8)), new RecordHeader("headerKey0.1", "headerValue0.1".getBytes(StandardCharsets.UTF_8)))), record("key1", "value1", Collections.singletonList(new RecordHeader("headerKey1.0", "headerValue1.0".getBytes(StandardCharsets.UTF_8)))));
    }

    @Test
    public void testMissingKeySeparator() {
        LineMessageReader lineMessageReader = new LineMessageReader();
        lineMessageReader.configure(Utils.propsToStringMap(defaultTestProps()));
        Iterator readRecords = lineMessageReader.readRecords(new ByteArrayInputStream("headerKey0.0:headerValue0.0,headerKey0.1:headerValue0.1\tkey0\tvalue0\nheaderKey1.0:headerValue1.0\tkey1[MISSING-DELIMITER]value1".getBytes()));
        readRecords.next();
        Objects.requireNonNull(readRecords);
        Assertions.assertEquals("No key separator found on line number 2: 'headerKey1.0:headerValue1.0\tkey1[MISSING-DELIMITER]value1'", Assertions.assertThrows(KafkaException.class, readRecords::next).getMessage());
    }

    @Test
    public void testMissingHeaderKeySeparator() {
        LineMessageReader lineMessageReader = new LineMessageReader();
        lineMessageReader.configure(Utils.propsToStringMap(defaultTestProps()));
        Iterator readRecords = lineMessageReader.readRecords(new ByteArrayInputStream("key[MISSING-DELIMITER]val\tkey0\tvalue0\n".getBytes()));
        Objects.requireNonNull(readRecords);
        Assertions.assertEquals("No header key separator found in pair 'key[MISSING-DELIMITER]val' on line number 1", Assertions.assertThrows(KafkaException.class, readRecords::next).getMessage());
    }

    @Test
    public void testHeaderDemarcationCollision() {
        Properties defaultTestProps = defaultTestProps();
        defaultTestProps.put("headers.delimiter", "\t");
        defaultTestProps.put("headers.separator", "\t");
        defaultTestProps.put("headers.key.separator", "\t");
        assertThrowsOnInvalidPatternConfig(defaultTestProps, "headers.delimiter and headers.separator may not be equal");
        defaultTestProps.put("headers.separator", ",");
        assertThrowsOnInvalidPatternConfig(defaultTestProps, "headers.delimiter and headers.key.separator may not be equal");
        defaultTestProps.put("headers.key.separator", ",");
        assertThrowsOnInvalidPatternConfig(defaultTestProps, "headers.separator and headers.key.separator may not be equal");
    }

    @Test
    public void testIgnoreErrorInInput() {
        Properties defaultTestProps = defaultTestProps();
        defaultTestProps.put("ignore.error", "true");
        runTest(defaultTestProps, "headerKey0.0:headerValue0.0\tkey0\tvalue0\nheaderKey1.0:headerValue1.0,headerKey1.1:headerValue1.1[MISSING-HEADER-DELIMITER]key1\tvalue1\nheaderKey2.0:headerValue2.0\tkey2[MISSING-KEY-DELIMITER]value2\nheaderKey3.0:headerValue3.0[MISSING-HEADER-DELIMITER]key3[MISSING-KEY-DELIMITER]value3\n", record("key0", "value0", Collections.singletonList(new RecordHeader("headerKey0.0", "headerValue0.0".getBytes(StandardCharsets.UTF_8)))), record(null, "value1", Arrays.asList(new RecordHeader("headerKey1.0", "headerValue1.0".getBytes(StandardCharsets.UTF_8)), new RecordHeader("headerKey1.1", "headerValue1.1[MISSING-HEADER-DELIMITER]key1".getBytes(StandardCharsets.UTF_8)))), record(null, "key2[MISSING-KEY-DELIMITER]value2", Collections.singletonList(new RecordHeader("headerKey2.0", "headerValue2.0".getBytes(StandardCharsets.UTF_8)))), record(null, "headerKey3.0:headerValue3.0[MISSING-HEADER-DELIMITER]key3[MISSING-KEY-DELIMITER]value3", Collections.emptyList()));
    }

    @Test
    public void testMalformedHeaderIgnoreError() {
        Properties defaultTestProps = defaultTestProps();
        defaultTestProps.put("ignore.error", "true");
        runTest(defaultTestProps, "key-val\tkey0\tvalue0\n", record("key0", "value0", Collections.singletonList(new RecordHeader("key-val", (byte[]) null))));
    }

    @Test
    public void testNullMarker() {
        Properties defaultTestProps = defaultTestProps();
        defaultTestProps.put("null.marker", "<NULL>");
        defaultTestProps.put("parse.headers", "false");
        runTest(defaultTestProps, "key\t\nkey\t<NULL>\nkey\t<NULL>value\n<NULL>\tvalue\n<NULL>\t<NULL>", record("key", ""), record("key", null), record("key", "<NULL>value"), record(null, "value"), record(null, null));
        defaultTestProps.remove("null.marker");
        runTest(defaultTestProps, "key\t\nkey\t<NULL>\nkey\t<NULL>value\n<NULL>\tvalue\n<NULL>\t<NULL>", record("key", ""), record("key", "<NULL>"), record("key", "<NULL>value"), record("<NULL>", "value"), record("<NULL>", "<NULL>"));
    }

    @Test
    public void testNullMarkerWithHeaders() {
        Header recordHeader = new RecordHeader("h1", "v1".getBytes(StandardCharsets.UTF_8));
        Properties defaultTestProps = defaultTestProps();
        defaultTestProps.put("null.marker", "<NULL>");
        runTest(defaultTestProps, "h0:v0,h1:v1\t<NULL>\tvalue\n<NULL>\tkey\t<NULL>\nh0:,h1:v1\t<NULL>\t<NULL>\nh0:<NULL>,h1:v1\tkey\t<NULL>\nh0:<NULL>,h1:<NULL>value\tkey\t<NULL>\n", record(null, "value", Arrays.asList(new RecordHeader("h0", "v0".getBytes(StandardCharsets.UTF_8)), recordHeader)), record("key", null), record(null, null, Arrays.asList(new RecordHeader("h0", "".getBytes(StandardCharsets.UTF_8)), recordHeader)), record("key", null, Arrays.asList(new RecordHeader("h0", (byte[]) null), recordHeader)), record("key", null, Arrays.asList(new RecordHeader("h0", (byte[]) null), new RecordHeader("h1", "<NULL>value".getBytes(StandardCharsets.UTF_8)))));
        LineMessageReader lineMessageReader = new LineMessageReader();
        defaultTestProps.remove("null.marker");
        lineMessageReader.configure(Utils.propsToStringMap(defaultTestProps));
        Iterator readRecords = lineMessageReader.readRecords(new ByteArrayInputStream("h0:v0,h1:v1\t<NULL>\tvalue\n<NULL>\tkey\t<NULL>\nh0:,h1:v1\t<NULL>\t<NULL>\nh0:<NULL>,h1:v1\tkey\t<NULL>\nh0:<NULL>,h1:<NULL>value\tkey\t<NULL>\n".getBytes()));
        assertRecordEquals(record("<NULL>", "value", Arrays.asList(new RecordHeader("h0", "v0".getBytes(StandardCharsets.UTF_8)), recordHeader)), (ProducerRecord) readRecords.next());
        Objects.requireNonNull(readRecords);
        Assertions.assertEquals("No header key separator found in pair '<NULL>' on line number 2", Assertions.assertThrows(KafkaException.class, readRecords::next).getMessage());
        assertRecordEquals(record("<NULL>", "<NULL>", Arrays.asList(new RecordHeader("h0", "".getBytes(StandardCharsets.UTF_8)), recordHeader)), (ProducerRecord) readRecords.next());
        assertRecordEquals(record("key", "<NULL>", Arrays.asList(new RecordHeader("h0", "<NULL>".getBytes(StandardCharsets.UTF_8)), recordHeader)), (ProducerRecord) readRecords.next());
        assertRecordEquals(record("key", "<NULL>", Arrays.asList(new RecordHeader("h0", "<NULL>".getBytes(StandardCharsets.UTF_8)), new RecordHeader("h1", "<NULL>value".getBytes(StandardCharsets.UTF_8)))), (ProducerRecord) readRecords.next());
    }

    @Test
    public void testNullMarkerHeaderKeyThrows() {
        Properties defaultTestProps = defaultTestProps();
        defaultTestProps.put("null.marker", "<NULL>");
        LineMessageReader lineMessageReader = new LineMessageReader();
        lineMessageReader.configure(Utils.propsToStringMap(defaultTestProps));
        Iterator readRecords = lineMessageReader.readRecords(new ByteArrayInputStream("<NULL>:v0,h1:v1\tkey\tvalue\n".getBytes()));
        Objects.requireNonNull(readRecords);
        Assertions.assertEquals("Header keys should not be equal to the null marker '<NULL>' as they can't be null", Assertions.assertThrows(KafkaException.class, readRecords::next).getMessage());
        defaultTestProps.remove("null.marker");
        runTest(defaultTestProps, "<NULL>:v0,h1:v1\tkey\tvalue\n", record("key", "value", Arrays.asList(new RecordHeader("<NULL>", "v0".getBytes(StandardCharsets.UTF_8)), new RecordHeader("h1", "v1".getBytes(StandardCharsets.UTF_8)))));
    }

    @Test
    public void testInvalidNullMarker() {
        Properties defaultTestProps = defaultTestProps();
        defaultTestProps.put("headers.delimiter", "-");
        defaultTestProps.put("headers.separator", ":");
        defaultTestProps.put("headers.key.separator", "/");
        defaultTestProps.put("null.marker", "-");
        assertThrowsOnInvalidPatternConfig(defaultTestProps, "null.marker and headers.delimiter may not be equal");
        defaultTestProps.put("null.marker", ":");
        assertThrowsOnInvalidPatternConfig(defaultTestProps, "null.marker and headers.separator may not be equal");
        defaultTestProps.put("null.marker", "/");
        assertThrowsOnInvalidPatternConfig(defaultTestProps, "null.marker and headers.key.separator may not be equal");
    }

    private Properties defaultTestProps() {
        Properties properties = new Properties();
        properties.put("topic", "topic");
        properties.put("parse.key", "true");
        properties.put("parse.headers", "true");
        return properties;
    }

    private void assertThrowsOnInvalidPatternConfig(Properties properties, String str) {
        Assertions.assertEquals(str, Assertions.assertThrows(KafkaException.class, () -> {
            new LineMessageReader().configure(Utils.propsToStringMap(properties));
        }).getMessage());
    }

    @SafeVarargs
    private final void runTest(Properties properties, String str, ProducerRecord<String, String>... producerRecordArr) {
        LineMessageReader lineMessageReader = new LineMessageReader();
        lineMessageReader.configure(Utils.propsToStringMap(properties));
        Iterator readRecords = lineMessageReader.readRecords(new ByteArrayInputStream(str.getBytes()));
        for (ProducerRecord<String, String> producerRecord : producerRecordArr) {
            assertRecordEquals(producerRecord, (ProducerRecord) readRecords.next());
        }
        Assertions.assertFalse(readRecords.hasNext());
        Objects.requireNonNull(readRecords);
        Assertions.assertThrows(NoSuchElementException.class, readRecords::next);
    }

    private <K, V> void assertRecordEquals(ProducerRecord<K, V> producerRecord, ProducerRecord<byte[], byte[]> producerRecord2) {
        Assertions.assertEquals(producerRecord.key(), producerRecord2.key() == null ? null : new String((byte[]) producerRecord2.key()));
        Assertions.assertEquals(producerRecord.value(), producerRecord2.value() == null ? null : new String((byte[]) producerRecord2.value()));
        Assertions.assertArrayEquals(producerRecord.headers().toArray(), producerRecord2.headers().toArray());
    }

    private <K, V> ProducerRecord<K, V> record(K k, V v, List<Header> list) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord<>("topic", k, v);
        list.forEach(header -> {
            producerRecord.headers().add(header.key(), header.value());
        });
        return producerRecord;
    }

    private <K, V> ProducerRecord<K, V> record(K k, V v) {
        return new ProducerRecord<>("topic", k, v);
    }
}
