package io.debezium.testing.system.assertions;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.assertj.core.api.Assertions;

/* loaded from: input_file:io/debezium/testing/system/assertions/PlainKafkaAssertions.class */
public class PlainKafkaAssertions implements KafkaAssertions<String, String> {
    private final Properties kafkaConsumerProps;

    public PlainKafkaAssertions(Properties properties) {
        this.kafkaConsumerProps = properties;
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
    }

    @Override // io.debezium.testing.system.assertions.KafkaAssertions
    public Consumer<String, String> getConsumer() {
        return new KafkaConsumer(this.kafkaConsumerProps);
    }

    @Override // io.debezium.testing.system.assertions.KafkaAssertions
    public void assertRecordsContain(String str, String str2) {
        Consumer<String, String> consumer = getConsumer();
        try {
            consumer.subscribe(Collections.singleton(str));
            consumer.seekToBeginning(consumer.assignment());
            Assertions.assertThat(StreamSupport.stream(consumer.poll(Duration.of(10L, ChronoUnit.SECONDS)).records(str).spliterator(), false).filter(consumerRecord -> {
                return ((String) consumerRecord.value()).contains(str2);
            }).count()).withFailMessage("Topic '%s' doesn't have message containing <%s>.", new Object[]{str, str2}).isGreaterThan(0L);
            if (consumer != null) {
                consumer.close();
            }
        } catch (Throwable th) {
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
