package org.apache.kafka.connect.integration;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.transforms.Filter;
import org.apache.kafka.connect.transforms.predicates.HasHeaderKey;
import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone;
import org.apache.kafka.connect.transforms.predicates.TopicNameMatches;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/integration/TransformationIntegrationTest.class */
public class TransformationIntegrationTest {
    private static final int NUM_RECORDS_PRODUCED = 2000;
    private static final int NUM_TOPIC_PARTITIONS = 3;
    private static final int NUM_TASKS = 1;
    private static final int NUM_WORKERS = 3;
    private static final String CONNECTOR_NAME = "simple-conn";
    private EmbeddedConnectCluster connect;
    private ConnectorHandle connectorHandle;
    private static final Logger log = LoggerFactory.getLogger(TransformationIntegrationTest.class);
    private static final long RECORD_TRANSFER_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
    private static final long OBSERVED_RECORDS_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
    private static final String SINK_CONNECTOR_CLASS_NAME = MonitorableSinkConnector.class.getSimpleName();
    private static final String SOURCE_CONNECTOR_CLASS_NAME = MonitorableSourceConnector.class.getSimpleName();

    @Before
    public void setup() {
        HashMap hashMap = new HashMap();
        hashMap.put("offset.flush.interval.ms", String.valueOf(5000));
        Properties properties = new Properties();
        properties.put("auto.create.topics.enable", "false");
        this.connect = new EmbeddedConnectCluster.Builder().name("connect-cluster").numWorkers(3).numBrokers(1).workerProps(hashMap).brokerProps(properties).build();
        this.connect.start();
        this.connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
    }

    @After
    public void close() {
        RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
        this.connect.stop();
    }

    @Test
    public void testFilterOnTopicNameWithSinkConnector() throws Exception {
        Map<String, Long> observeRecords = observeRecords();
        this.connect.kafka().createTopic("foo-topic", 3);
        this.connect.kafka().createTopic("bar-topic", 3);
        HashMap hashMap = new HashMap();
        hashMap.put("name", CONNECTOR_NAME);
        hashMap.put("connector.class", SINK_CONNECTOR_CLASS_NAME);
        hashMap.put("tasks.max", String.valueOf(1));
        hashMap.put("topics", String.join(",", "foo-topic", "bar-topic"));
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("transforms", "filter");
        hashMap.put("transforms.filter.type", Filter.class.getSimpleName());
        hashMap.put("transforms.filter.predicate", "barPredicate");
        hashMap.put("predicates", "barPredicate");
        hashMap.put("predicates.barPredicate.type", TopicNameMatches.class.getSimpleName());
        hashMap.put("predicates.barPredicate.pattern", "bar-.*");
        this.connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED);
        this.connectorHandle.expectedCommits(NUM_RECORDS_PRODUCED);
        this.connect.configureConnector(CONNECTOR_NAME, hashMap);
        assertConnectorRunning();
        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
            this.connect.kafka().produce("bar-topic", Integer.valueOf(i % 3), "key", "simple-message-value-" + i);
        }
        for (int i2 = 0; i2 < NUM_RECORDS_PRODUCED; i2++) {
            this.connect.kafka().produce("foo-topic", Integer.valueOf(i2 % 3), "key", "simple-message-value-" + i2);
        }
        Assert.assertEquals("Unexpected number of records consumed", NUM_RECORDS_PRODUCED, this.connect.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "foo-topic").count());
        Assert.assertEquals("Unexpected number of records consumed", NUM_RECORDS_PRODUCED, this.connect.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "bar-topic").count());
        this.connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
        this.connectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
        assertObservedRecords(observeRecords, Collections.singletonMap("foo-topic", Long.valueOf(NUM_RECORDS_PRODUCED)));
        this.connect.deleteConnector(CONNECTOR_NAME);
    }

    private void assertConnectorRunning() throws InterruptedException {
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, "Connector tasks did not start in time.");
    }

    private void assertObservedRecords(Map<String, Long> map, Map<String, Long> map2) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            return map2.equals(map);
        }, OBSERVED_RECORDS_DURATION_MS, () -> {
            return "The observed records should be " + map2 + " but was " + map;
        });
    }

    private Map<String, Long> observeRecords() {
        HashMap hashMap = new HashMap();
        this.connectorHandle.taskHandle("simple-conn-0", sinkRecord -> {
        });
        return hashMap;
    }

    @Test
    public void testFilterOnTombstonesWithSinkConnector() throws Exception {
        Map<String, Long> observeRecords = observeRecords();
        this.connect.kafka().createTopic("foo-topic", 3);
        HashMap hashMap = new HashMap();
        hashMap.put("name", CONNECTOR_NAME);
        hashMap.put("connector.class", SINK_CONNECTOR_CLASS_NAME);
        hashMap.put("tasks.max", String.valueOf(1));
        hashMap.put("topics", String.join(",", "foo-topic"));
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("transforms", "filter");
        hashMap.put("transforms.filter.type", Filter.class.getSimpleName());
        hashMap.put("transforms.filter.predicate", "barPredicate");
        hashMap.put("predicates", "barPredicate");
        hashMap.put("predicates.barPredicate.type", RecordIsTombstone.class.getSimpleName());
        this.connectorHandle.expectedCommits(NUM_RECORDS_PRODUCED / 2);
        this.connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED / 2);
        this.connect.configureConnector(CONNECTOR_NAME, hashMap);
        assertConnectorRunning();
        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
            this.connect.kafka().produce("foo-topic", Integer.valueOf(i % 3), "key", i % 2 == 0 ? "simple-message-value-" + i : null);
        }
        Assert.assertEquals("Unexpected number of records consumed", NUM_RECORDS_PRODUCED, this.connect.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "foo-topic").count());
        this.connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
        this.connectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
        assertObservedRecords(observeRecords, Collections.singletonMap("foo-topic", Long.valueOf(NUM_RECORDS_PRODUCED / 2)));
        this.connect.deleteConnector(CONNECTOR_NAME);
    }

    @Test
    public void testFilterOnHasHeaderKeyWithSourceConnectorAndTopicCreation() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("name", CONNECTOR_NAME);
        hashMap.put("connector.class", SOURCE_CONNECTOR_CLASS_NAME);
        hashMap.put("tasks.max", String.valueOf(1));
        hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, "test-topic");
        hashMap.put(MonitorableSourceConnector.MAX_MESSAGES_PER_SECOND_CONFIG, String.valueOf(500));
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("transforms", "filter");
        hashMap.put("transforms.filter.type", Filter.class.getSimpleName());
        hashMap.put("transforms.filter.predicate", "headerPredicate");
        hashMap.put("transforms.filter.negate", "true");
        hashMap.put("predicates", "headerPredicate");
        hashMap.put("predicates.headerPredicate.type", HasHeaderKey.class.getSimpleName());
        hashMap.put("predicates.headerPredicate.name", "header-8");
        hashMap.put("topic.creation.default.replication.factor", String.valueOf(-1));
        hashMap.put("topic.creation.default.partitions", String.valueOf(3));
        this.connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED);
        this.connectorHandle.expectedCommits(NUM_RECORDS_PRODUCED);
        this.connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(SOURCE_CONNECTOR_CLASS_NAME, hashMap, 0, "Validating connector configuration produced an unexpected number or errors.");
        this.connect.configureConnector(CONNECTOR_NAME, hashMap);
        assertConnectorRunning();
        this.connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
        this.connectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
        Iterator it = this.connect.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "test-topic").iterator();
        while (it.hasNext()) {
            Assert.assertNotNull("Expected header to exist", ((ConsumerRecord) it.next()).headers().lastHeader("header-8"));
        }
        this.connect.deleteConnector(CONNECTOR_NAME);
    }
}
