package io.debezium.connector.postgresql;

import io.debezium.connector.postgresql.AbstractRecordsProducerTest;
import io.debezium.data.VerifyRecord;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/postgresql/RecordsSnapshotProducerIT.class */
public class RecordsSnapshotProducerIT extends AbstractRecordsProducerTest {
    private RecordsSnapshotProducer snapshotProducer;
    private PostgresTaskContext context;

    @Before
    public void before() throws SQLException {
        TestHelper.dropAllSchemas();
        PostgresConnectorConfig postgresConnectorConfig = new PostgresConnectorConfig(TestHelper.defaultConfig().build());
        this.context = new PostgresTaskContext(postgresConnectorConfig, new PostgresSchema(postgresConnectorConfig));
    }

    @After
    public void after() throws Exception {
        if (this.snapshotProducer != null) {
            this.snapshotProducer.stop();
        }
    }

    @Test
    public void shouldGenerateSnapshotsForDefaultDatatypes() throws Exception {
        this.snapshotProducer = new RecordsSnapshotProducer(this.context, new SourceInfo("test_server"), false);
        TestHelper.executeDDL("postgres_create_tables.ddl");
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(ALL_STMTS.size());
        TestHelper.execute(((String) ALL_STMTS.stream().collect(Collectors.joining(";" + System.lineSeparator()))) + ";");
        this.snapshotProducer.start(testConsumer);
        testConsumer.await(2L, TimeUnit.SECONDS);
        Map<String, List<AbstractRecordsProducerTest.SchemaAndValueField>> schemaAndValuesByTableName = super.schemaAndValuesByTableName();
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, schemaAndValuesByTableName);
        });
        while (!testConsumer.isEmpty()) {
            assertRecordOffset(testConsumer.remove(), true, testConsumer.isEmpty());
        }
    }

    @Test
    public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
        TestHelper.execute("CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.a (pk SERIAL, aa integer, PRIMARY KEY(pk));INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);");
        this.snapshotProducer = new RecordsSnapshotProducer(this.context, new SourceInfo("test_server"), true);
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(2);
        this.snapshotProducer.start(testConsumer);
        testConsumer.await(2L, TimeUnit.SECONDS);
        testConsumer.clear();
        TestHelper.execute("INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);");
        testConsumer.expects(2);
        testConsumer.await(2L, TimeUnit.SECONDS);
        SourceRecord remove = testConsumer.remove();
        VerifyRecord.isValidInsert(remove, "pk", 2);
        Assert.assertEquals(TestHelper.topicName("s1.a"), remove.topic());
        assertRecordOffset(remove, false, false);
        SourceRecord remove2 = testConsumer.remove();
        VerifyRecord.isValidInsert(remove2, "pk", 2);
        Assert.assertEquals(TestHelper.topicName("s2.a"), remove2.topic());
        assertRecordOffset(remove2, false, false);
        this.snapshotProducer.stop();
        TestHelper.execute("INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);");
        int i = 6;
        AbstractRecordsProducerTest.TestConsumer testConsumer2 = testConsumer(6);
        this.snapshotProducer = new RecordsSnapshotProducer(this.context, new SourceInfo("test_server"), true);
        this.snapshotProducer.start(testConsumer2);
        testConsumer2.await(2L, TimeUnit.SECONDS);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        testConsumer2.process(sourceRecord -> {
            int andIncrement = atomicInteger.getAndIncrement();
            VerifyRecord.isValidRead(sourceRecord, "pk", (andIncrement % 3) + 1);
            assertRecordOffset(sourceRecord, true, andIncrement == i - 1);
        });
        testConsumer2.clear();
        TestHelper.execute("INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);");
        testConsumer2.expects(2);
        testConsumer2.await(2L, TimeUnit.SECONDS);
        SourceRecord remove3 = testConsumer2.remove();
        VerifyRecord.isValidInsert(remove3, "pk", 4);
        assertRecordOffset(remove3, false, false);
        SourceRecord remove4 = testConsumer2.remove();
        VerifyRecord.isValidInsert(remove4, "pk", 4);
        assertRecordOffset(remove4, false, false);
    }

    private void assertReadRecord(SourceRecord sourceRecord, Map<String, List<AbstractRecordsProducerTest.SchemaAndValueField>> map) {
        VerifyRecord.isValidRead(sourceRecord, "pk", 1);
        String str = sourceRecord.topic();
        String str2 = TestHelper.topicName("public.");
        Assert.assertTrue("Invalid topic name for records", str.startsWith(str2));
        String replace = str.replace(str2, "");
        List<AbstractRecordsProducerTest.SchemaAndValueField> list = map.get(replace);
        Assert.assertNotNull("No expected values for " + replace + " found", list);
        assertRecordSchemaAndValues(list, sourceRecord, "after");
    }
}
