package tech.harmonysoft.oss.kafka.service;

import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.inject.Named;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.TuplesKt;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.collections.MapsKt;
import kotlin.concurrent.ThreadsKt;
import kotlin.io.CloseableKt;
import kotlin.jdk7.AutoCloseableKt;
import kotlin.jvm.JvmClassMappingKt;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.Reflection;
import kotlin.reflect.KClass;
import kotlin.text.Charsets;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import tech.harmonysoft.oss.common.ProcessingResult;
import tech.harmonysoft.oss.json.JsonApi;
import tech.harmonysoft.oss.kafka.config.TestKafkaConfig;
import tech.harmonysoft.oss.kafka.config.TestKafkaConfigProvider;
import tech.harmonysoft.oss.kafka.fixture.KafkaFixtureContext;
import tech.harmonysoft.oss.kafka.fixture.KafkaTestFixture;
import tech.harmonysoft.oss.test.TestAware;
import tech.harmonysoft.oss.test.binding.DynamicBindingContext;
import tech.harmonysoft.oss.test.fixture.FixtureDataHelper;
import tech.harmonysoft.oss.test.json.CommonJsonUtil;
import tech.harmonysoft.oss.test.match.TestMatchResult;
import tech.harmonysoft.oss.test.util.VerificationUtil;

/* compiled from: TestKafkaManager.kt */
@Metadata(mv = {1, 7, 1}, k = 1, xi = 48, d1 = {"��~\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010!\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n\u0002\b\b\n\u0002\u0010��\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0010\n\u0002\u0018\u0002\n\u0002\u0010\u000b\n\u0002\b\u0006\n\u0002\u0018\u0002\n��\b\u0007\u0018��2\u00020\u0001B-\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\t\u0012\u0006\u0010\n\u001a\u00020\u000b¢\u0006\u0002\u0010\fJ\u0016\u0010\u0015\u001a\u00020\u00162\u0006\u0010\u0017\u001a\u00020\u000f2\u0006\u0010\u0018\u001a\u00020\u000fJ\u0006\u0010\u0019\u001a\u00020\u0016J\u000e\u0010\u001a\u001a\u00020\u00162\u0006\u0010\u0017\u001a\u00020\u000fJ\u001a\u0010\u001b\u001a\u000e\u0012\u0004\u0012\u00020\u000f\u0012\u0004\u0012\u00020\u000f0\u00112\u0006\u0010\u001c\u001a\u00020\u000fJb\u0010\u001b\u001a\u000e\u0012\u0004\u0012\u0002H\u001d\u0012\u0004\u0012\u0002H\u001e0\u0011\"\b\b��\u0010\u001d*\u00020\u001f\"\b\b\u0001\u0010\u001e*\u00020\u001f2\u0006\u0010\u001c\u001a\u00020\u000f2\u0006\u0010 \u001a\u00020!2\u0014\u0010\"\u001a\u0010\u0012\f\b\u0001\u0012\b\u0012\u0004\u0012\u0002H\u001d0$0#2\u0014\u0010%\u001a\u0010\u0012\f\b\u0001\u0012\b\u0012\u0004\u0012\u0002H\u001e0$0#J\u000e\u0010&\u001a\u00020\u00162\u0006\u0010\u001c\u001a\u00020\u000fJ\u0016\u0010&\u001a\u00020\u00162\u0006\u0010 \u001a\u00020!2\u0006\u0010\u001c\u001a\u00020\u000fJ\u000e\u0010'\u001a\u00020(2\u0006\u0010 \u001a\u00020!J\b\u0010)\u001a\u00020\u0016H\u0016J\u0016\u0010*\u001a\u00020\u000f2\u000e\u0010+\u001a\n\u0012\u0002\b\u0003\u0012\u0002\b\u00030\u0014J\u0016\u0010,\u001a\u00020\u00162\u0006\u0010\u001c\u001a\u00020\u000f2\u0006\u0010-\u001a\u00020\u000fJ\u001e\u0010,\u001a\u00020\u00162\u0006\u0010 \u001a\u00020!2\u0006\u0010\u001c\u001a\u00020\u000f2\u0006\u0010-\u001a\u00020\u000fJ\u000e\u0010.\u001a\u00020\u00162\u0006\u0010\u001c\u001a\u00020\u000fJ\u001e\u0010/\u001a\u00020\u00162\u0006\u00100\u001a\u00020\u000f2\u000e\u00101\u001a\n\u0012\u0002\b\u0003\u0012\u0002\b\u00030\u0011J&\u0010/\u001a\u00020\u00162\u0006\u0010 \u001a\u00020!2\u0006\u00100\u001a\u00020\u000f2\u000e\u00101\u001a\n\u0012\u0002\b\u0003\u0012\u0002\b\u00030\u0011J\u0016\u00102\u001a\u00020\u00162\u0006\u00103\u001a\u00020\u000f2\u0006\u0010\u001c\u001a\u00020\u000fJ\u0016\u00104\u001a\u00020\u00162\u0006\u00105\u001a\u00020\u000f2\u0006\u0010\u001c\u001a\u00020\u000fJ\u0016\u00106\u001a\u00020\u00162\u0006\u00103\u001a\u00020\u000f2\u0006\u0010\u001c\u001a\u00020\u000fJ\u0016\u00107\u001a\u00020\u00162\u0006\u00103\u001a\u00020\u000f2\u0006\u0010\u001c\u001a\u00020\u000fJ6\u00107\u001a\u00020\u00162\u0006\u00103\u001a\u00020\u000f2\u0006\u0010\u001c\u001a\u00020\u000f2\u001e\u00108\u001a\u001a\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u000f\u0012\u0004\u0012\u00020\u000f0\u0014\u0012\u0004\u0012\u00020:09J\u001e\u0010;\u001a\u00020\u00162\u0006\u0010\u001c\u001a\u00020\u000f2\u0006\u0010<\u001a\u00020\u000f2\u0006\u0010=\u001a\u00020\u000fJ\u000e\u0010>\u001a\u00020\u00162\u0006\u0010\u001c\u001a\u00020\u000fJ&\u0010?\u001a\u00020\u00162\u001e\u0010@\u001a\u001a\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u000f\u0012\u0004\u0012\u00020\u000f0A\u0012\u0004\u0012\u00020\u001f09J.\u0010?\u001a\u00020\u00162\u0006\u0010 \u001a\u00020!2\u001e\u0010@\u001a\u001a\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u000f\u0012\u0004\u0012\u00020\u000f0A\u0012\u0004\u0012\u00020\u001f09R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\b\u001a\u00020\tX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\r\u001a\u000e\u0012\u0004\u0012\u00020\u000f\u0012\u0004\u0012\u00020\u000f0\u000eX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\n\u001a\u00020\u000bX\u0082\u0004¢\u0006\u0002\n��R&\u0010\u0010\u001a\u001a\u0012\u0004\u0012\u00020\u000f\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u000f\u0012\u0004\u0012\u00020\u000f0\u00110\u000eX\u0082\u0004¢\u0006\u0002\n��R,\u0010\u0012\u001a \u0012\u0004\u0012\u00020\u000f\u0012\u0016\u0012\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u000f\u0012\u0004\u0012\u00020\u000f0\u00140\u00130\u000eX\u0082\u0004¢\u0006\u0002\n��¨\u0006B"}, d2 = {"Ltech/harmonysoft/oss/kafka/service/TestKafkaManager;", "Ltech/harmonysoft/oss/test/TestAware;", "configProvider", "Ltech/harmonysoft/oss/kafka/config/TestKafkaConfigProvider;", "fixtureHelper", "Ltech/harmonysoft/oss/test/fixture/FixtureDataHelper;", "jsonApi", "Ltech/harmonysoft/oss/json/JsonApi;", "dynamicContext", "Ltech/harmonysoft/oss/test/binding/DynamicBindingContext;", "logger", "Lorg/slf4j/Logger;", "(Ltech/harmonysoft/oss/kafka/config/TestKafkaConfigProvider;Ltech/harmonysoft/oss/test/fixture/FixtureDataHelper;Ltech/harmonysoft/oss/json/JsonApi;Ltech/harmonysoft/oss/test/binding/DynamicBindingContext;Lorg/slf4j/Logger;)V", "headers", "Ljava/util/concurrent/ConcurrentHashMap;", "", "topic2consumer", "Lorg/apache/kafka/clients/consumer/KafkaConsumer;", "topic2messages", "", "Lorg/apache/kafka/clients/consumer/ConsumerRecord;", "addHeader", "", "key", "value", "cleanAllHeaders", "cleanHeader", "createConsumer", "topic", "K", "V", "", "config", "Ltech/harmonysoft/oss/kafka/config/TestKafkaConfig;", "keyDeserializer", "Lkotlin/reflect/KClass;", "Lorg/apache/kafka/common/serialization/Deserializer;", "valueDeserializer", "ensureTopicExists", "getAdmin", "Lorg/apache/kafka/clients/admin/Admin;", "onTestEnd", "recordToString", "record", "sendMessage", "message", "subscribe", "verifyConsumerGroupIsStableAfterConsumerAddition", "group", "consumer", "verifyJsonMessageIsNotReceived", "expected", "verifyJsonMessageIsReceived", "expectedJson", "verifyMessageIsNotReceived", "verifyMessageIsReceived", "checker", "Lkotlin/Function1;", "", "verifyMessageWithTargetHeaderValueIsReceived", "headerKey", "expectedHeaderValue", "verifyNoMessageIsReceived", "withProducer", "action", "Lorg/apache/kafka/clients/producer/KafkaProducer;", "harmonysoft-kafka-test"})
@Named
/* loaded from: input_file:tech/harmonysoft/oss/kafka/service/TestKafkaManager.class */
public final class TestKafkaManager implements TestAware {

    @NotNull
    private final TestKafkaConfigProvider configProvider;

    @NotNull
    private final FixtureDataHelper fixtureHelper;

    @NotNull
    private final JsonApi jsonApi;

    @NotNull
    private final DynamicBindingContext dynamicContext;

    @NotNull
    private final Logger logger;

    @NotNull
    private final ConcurrentHashMap<String, List<ConsumerRecord<String, String>>> topic2messages;

    @NotNull
    private final ConcurrentHashMap<String, KafkaConsumer<String, String>> topic2consumer;

    @NotNull
    private final ConcurrentHashMap<String, String> headers;

    public TestKafkaManager(@NotNull TestKafkaConfigProvider testKafkaConfigProvider, @NotNull FixtureDataHelper fixtureDataHelper, @NotNull JsonApi jsonApi, @NotNull DynamicBindingContext dynamicBindingContext, @NotNull Logger logger) {
        Intrinsics.checkNotNullParameter(testKafkaConfigProvider, "configProvider");
        Intrinsics.checkNotNullParameter(fixtureDataHelper, "fixtureHelper");
        Intrinsics.checkNotNullParameter(jsonApi, "jsonApi");
        Intrinsics.checkNotNullParameter(dynamicBindingContext, "dynamicContext");
        Intrinsics.checkNotNullParameter(logger, "logger");
        this.configProvider = testKafkaConfigProvider;
        this.fixtureHelper = fixtureDataHelper;
        this.jsonApi = jsonApi;
        this.dynamicContext = dynamicBindingContext;
        this.logger = logger;
        this.topic2messages = new ConcurrentHashMap<>();
        this.topic2consumer = new ConcurrentHashMap<>();
        ThreadsKt.thread$default(false, true, (ClassLoader) null, (String) null, 0, new Function0<Unit>() { // from class: tech.harmonysoft.oss.kafka.service.TestKafkaManager.1
            {
                super(0);
            }

            public final void invoke() {
                Duration ofMillis = Duration.ofMillis(300L);
                while (true) {
                    for (Map.Entry entry : TestKafkaManager.this.topic2consumer.entrySet()) {
                        String str = (String) entry.getKey();
                        try {
                            ConsumerRecords poll = ((KafkaConsumer) entry.getValue()).poll(ofMillis);
                            if (!poll.isEmpty()) {
                                Iterator it = poll.iterator();
                                while (it.hasNext()) {
                                    ConsumerRecord<?, ?> consumerRecord = (ConsumerRecord) it.next();
                                    Logger logger2 = TestKafkaManager.this.logger;
                                    TestKafkaManager testKafkaManager = TestKafkaManager.this;
                                    Intrinsics.checkNotNullExpressionValue(consumerRecord, "record");
                                    logger2.info("received new kafka message from topic '{}': {}", str, testKafkaManager.recordToString(consumerRecord));
                                    Object computeIfAbsent = TestKafkaManager.this.topic2messages.computeIfAbsent(str, AnonymousClass1::m2invoke$lambda0);
                                    Intrinsics.checkNotNullExpressionValue(computeIfAbsent, "topic2messages.computeIf… CopyOnWriteArrayList() }");
                                    ((Collection) computeIfAbsent).add(consumerRecord);
                                }
                            }
                        } catch (Exception e) {
                            TestKafkaManager.this.logger.warn("Unexpected exception occurred on attempt to get kafka messages from topic '{}'", str, e);
                        }
                    }
                }
            }

            /* renamed from: invoke$lambda-0, reason: not valid java name */
            private static final List m2invoke$lambda0(String str) {
                Intrinsics.checkNotNullParameter(str, "it");
                return new CopyOnWriteArrayList();
            }

            /* renamed from: invoke, reason: collision with other method in class */
            public /* bridge */ /* synthetic */ Object m3invoke() {
                invoke();
                return Unit.INSTANCE;
            }
        }, 29, (Object) null);
        this.headers = new ConcurrentHashMap<>();
    }

    public void onTestEnd() {
        cleanAllHeaders();
        this.topic2messages.clear();
    }

    @NotNull
    public final String recordToString(@NotNull ConsumerRecord<?, ?> consumerRecord) {
        Intrinsics.checkNotNullParameter(consumerRecord, "record");
        Iterable headers = consumerRecord.headers();
        Intrinsics.checkNotNullExpressionValue(headers, "record.headers()");
        return "headers=" + CollectionsKt.joinToString$default(headers, (CharSequence) null, (CharSequence) null, (CharSequence) null, 0, (CharSequence) null, new Function1<Header, CharSequence>() { // from class: tech.harmonysoft.oss.kafka.service.TestKafkaManager$recordToString$headers$1
            @NotNull
            public final CharSequence invoke(Header header) {
                StringBuilder append = new StringBuilder().append(header.key()).append('=');
                byte[] value = header.value();
                Intrinsics.checkNotNullExpressionValue(value, "it.value()");
                return append.append(new String(value, Charsets.UTF_8)).toString();
            }
        }, 31, (Object) null) + ", value=" + consumerRecord.value();
    }

    @NotNull
    public final Admin getAdmin(@NotNull TestKafkaConfig testKafkaConfig) {
        Intrinsics.checkNotNullParameter(testKafkaConfig, "config");
        Admin create = Admin.create(MapsKt.mapOf(TuplesKt.to("bootstrap.servers", testKafkaConfig.getHost() + ':' + testKafkaConfig.getPort())));
        Intrinsics.checkNotNullExpressionValue(create, "create(mapOf(\n          …{config.port}\"\n        ))");
        return create;
    }

    public final void withProducer(@NotNull Function1<? super KafkaProducer<String, String>, ? extends Object> function1) {
        Intrinsics.checkNotNullParameter(function1, "action");
        Object data = this.configProvider.getData();
        Intrinsics.checkNotNullExpressionValue(data, "configProvider.data");
        withProducer((TestKafkaConfig) data, function1);
    }

    public final void withProducer(@NotNull TestKafkaConfig testKafkaConfig, @NotNull Function1<? super KafkaProducer<String, String>, ? extends Object> function1) {
        Intrinsics.checkNotNullParameter(testKafkaConfig, "config");
        Intrinsics.checkNotNullParameter(function1, "action");
        KafkaProducer kafkaProducer = (Closeable) new KafkaProducer(MapsKt.mapOf(new Pair[]{TuplesKt.to("bootstrap.servers", testKafkaConfig.getHost() + ':' + testKafkaConfig.getPort()), TuplesKt.to("client.id", "test"), TuplesKt.to("key.serializer", StringSerializer.class), TuplesKt.to("value.serializer", StringSerializer.class)}));
        Throwable th = null;
        try {
            try {
                function1.invoke(kafkaProducer);
                CloseableKt.closeFinally(kafkaProducer, (Throwable) null);
            } finally {
            }
        } catch (Throwable th2) {
            CloseableKt.closeFinally(kafkaProducer, th);
            throw th2;
        }
    }

    public final void addHeader(@NotNull String str, @NotNull String str2) {
        Intrinsics.checkNotNullParameter(str, "key");
        Intrinsics.checkNotNullParameter(str2, "value");
        this.headers.put(str, str2);
        this.logger.info("registered kafka message header {}={}", str, str2);
    }

    public final void cleanHeader(@NotNull String str) {
        Intrinsics.checkNotNullParameter(str, "key");
        this.headers.remove(str);
        this.logger.info("cleaned kafka message header '{}'", str);
    }

    public final void cleanAllHeaders() {
        this.headers.clear();
        this.logger.info("cleaned all kafka message headers");
    }

    public final void sendMessage(@NotNull String str, @NotNull String str2) {
        Intrinsics.checkNotNullParameter(str, "topic");
        Intrinsics.checkNotNullParameter(str2, "message");
        Object data = this.configProvider.getData();
        Intrinsics.checkNotNullExpressionValue(data, "configProvider.data");
        sendMessage((TestKafkaConfig) data, str, str2);
    }

    public final void sendMessage(@NotNull TestKafkaConfig testKafkaConfig, @NotNull final String str, @NotNull final String str2) {
        Intrinsics.checkNotNullParameter(testKafkaConfig, "config");
        Intrinsics.checkNotNullParameter(str, "topic");
        Intrinsics.checkNotNullParameter(str2, "message");
        withProducer(testKafkaConfig, new Function1<KafkaProducer<String, String>, Object>() { // from class: tech.harmonysoft.oss.kafka.service.TestKafkaManager$sendMessage$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(1);
            }

            @NotNull
            public final Object invoke(@NotNull KafkaProducer<String, String> kafkaProducer) {
                FixtureDataHelper fixtureDataHelper;
                ConcurrentHashMap concurrentHashMap;
                ConcurrentHashMap concurrentHashMap2;
                Intrinsics.checkNotNullParameter(kafkaProducer, "it");
                fixtureDataHelper = TestKafkaManager.this.fixtureHelper;
                String obj = fixtureDataHelper.prepareTestData(KafkaTestFixture.INSTANCE.getTYPE(), new KafkaFixtureContext(str), str2).toString();
                Logger logger = TestKafkaManager.this.logger;
                concurrentHashMap = TestKafkaManager.this.headers;
                logger.info("sending kafka message to topic '{}'%nheaders: {}%ncontent: {}", new Object[]{str, concurrentHashMap, str2});
                String str3 = str;
                concurrentHashMap2 = TestKafkaManager.this.headers;
                ConcurrentHashMap concurrentHashMap3 = concurrentHashMap2;
                ArrayList arrayList = new ArrayList(concurrentHashMap3.size());
                for (Map.Entry entry : concurrentHashMap3.entrySet()) {
                    String str4 = (String) entry.getKey();
                    byte[] bytes = ((String) entry.getValue()).getBytes(Charsets.UTF_8);
                    Intrinsics.checkNotNullExpressionValue(bytes, "this as java.lang.String).getBytes(charset)");
                    arrayList.add(new RecordHeader(str4, bytes));
                }
                Future send = kafkaProducer.send(new ProducerRecord(str3, (Integer) null, (Long) null, (Object) null, obj, arrayList));
                Intrinsics.checkNotNullExpressionValue(send, "it.send(ProducerRecord(\n…Array()) }\n            ))");
                return send;
            }
        });
    }

    @NotNull
    public final KafkaConsumer<String, String> createConsumer(@NotNull String str) {
        Intrinsics.checkNotNullParameter(str, "topic");
        Object data = this.configProvider.getData();
        Intrinsics.checkNotNullExpressionValue(data, "configProvider.data");
        return createConsumer(str, (TestKafkaConfig) data, Reflection.getOrCreateKotlinClass(StringDeserializer.class), Reflection.getOrCreateKotlinClass(StringDeserializer.class));
    }

    @NotNull
    public final <K, V> KafkaConsumer<K, V> createConsumer(@NotNull String str, @NotNull TestKafkaConfig testKafkaConfig, @NotNull KClass<? extends Deserializer<K>> kClass, @NotNull KClass<? extends Deserializer<V>> kClass2) {
        Intrinsics.checkNotNullParameter(str, "topic");
        Intrinsics.checkNotNullParameter(testKafkaConfig, "config");
        Intrinsics.checkNotNullParameter(kClass, "keyDeserializer");
        Intrinsics.checkNotNullParameter(kClass2, "valueDeserializer");
        String uuid = UUID.randomUUID().toString();
        Intrinsics.checkNotNullExpressionValue(uuid, "randomUUID().toString()");
        KafkaConsumer<K, V> kafkaConsumer = new KafkaConsumer<>(MapsKt.mapOf(new Pair[]{TuplesKt.to("bootstrap.servers", testKafkaConfig.getHost() + ':' + testKafkaConfig.getPort()), TuplesKt.to("group.id", uuid), TuplesKt.to("auto.offset.reset", "earliest"), TuplesKt.to("key.deserializer", JvmClassMappingKt.getJavaClass(kClass)), TuplesKt.to("value.deserializer", JvmClassMappingKt.getJavaClass(kClass2))}));
        kafkaConsumer.subscribe(CollectionsKt.listOf(str));
        verifyConsumerGroupIsStableAfterConsumerAddition(testKafkaConfig, uuid, kafkaConsumer);
        return kafkaConsumer;
    }

    public final void ensureTopicExists(@NotNull String str) {
        Intrinsics.checkNotNullParameter(str, "topic");
        Object data = this.configProvider.getData();
        Intrinsics.checkNotNullExpressionValue(data, "configProvider.data");
        ensureTopicExists((TestKafkaConfig) data, str);
    }

    public final void ensureTopicExists(@NotNull TestKafkaConfig testKafkaConfig, @NotNull String str) {
        boolean z;
        Intrinsics.checkNotNullParameter(testKafkaConfig, "config");
        Intrinsics.checkNotNullParameter(str, "topic");
        this.logger.info("start checking if kafka topic '{}' exists", str);
        Admin admin = getAdmin(testKafkaConfig);
        try {
            Object obj = admin.listTopics().listings().get(3000L, TimeUnit.MILLISECONDS);
            Intrinsics.checkNotNullExpressionValue(obj, "admin.listTopics().listi…0, TimeUnit.MILLISECONDS)");
            Iterable iterable = (Iterable) obj;
            if (!(iterable instanceof Collection) || !((Collection) iterable).isEmpty()) {
                Iterator it = iterable.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        z = false;
                        break;
                    } else if (Intrinsics.areEqual(((TopicListing) it.next()).name(), str)) {
                        z = true;
                        break;
                    }
                }
            } else {
                z = false;
            }
            if (z) {
                this.logger.info("kafka topic '{}' already exists", str);
                admin.close(Duration.ofMillis(500L));
                return;
            }
            KafkaFuture kafkaFuture = (KafkaFuture) admin.createTopics(CollectionsKt.listOf(new NewTopic(str, 1, (short) 1))).values().get(str);
            if (kafkaFuture != null) {
            }
            this.logger.info("created kafka topic '{}'", str);
            admin.close(Duration.ofMillis(500L));
        } catch (Throwable th) {
            admin.close(Duration.ofMillis(500L));
            throw th;
        }
    }

    public final void verifyConsumerGroupIsStableAfterConsumerAddition(@NotNull String str, @NotNull KafkaConsumer<?, ?> kafkaConsumer) {
        Intrinsics.checkNotNullParameter(str, "group");
        Intrinsics.checkNotNullParameter(kafkaConsumer, "consumer");
        Object data = this.configProvider.getData();
        Intrinsics.checkNotNullExpressionValue(data, "configProvider.data");
        verifyConsumerGroupIsStableAfterConsumerAddition((TestKafkaConfig) data, str, kafkaConsumer);
    }

    public final void verifyConsumerGroupIsStableAfterConsumerAddition(@NotNull TestKafkaConfig testKafkaConfig, @NotNull final String str, @NotNull final KafkaConsumer<?, ?> kafkaConsumer) {
        Intrinsics.checkNotNullParameter(testKafkaConfig, "config");
        Intrinsics.checkNotNullParameter(str, "group");
        Intrinsics.checkNotNullParameter(kafkaConsumer, "consumer");
        final Duration ofMillis = Duration.ofMillis(100L);
        Admin admin = (AutoCloseable) getAdmin(testKafkaConfig);
        Throwable th = null;
        try {
            try {
                final Admin admin2 = admin;
                VerificationUtil.verifyConditionHappens$default(VerificationUtil.INSTANCE, "kafka consumer group '" + str + "' is ready after consumer addition", 60L, 0L, new Function0<ProcessingResult<Unit, String>>() { // from class: tech.harmonysoft.oss.kafka.service.TestKafkaManager$verifyConsumerGroupIsStableAfterConsumerAddition$1$1
                    /* JADX INFO: Access modifiers changed from: package-private */
                    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                    {
                        super(0);
                    }

                    @NotNull
                    /* renamed from: invoke, reason: merged with bridge method [inline-methods] */
                    public final ProcessingResult<Unit, String> m5invoke() {
                        Object obj;
                        ProcessingResult<Unit, String> failure;
                        kafkaConsumer.poll(ofMillis);
                        Collection collection = (Collection) admin2.listConsumerGroups().all().get();
                        Intrinsics.checkNotNullExpressionValue(collection, "consumerGroups");
                        Collection collection2 = collection;
                        String str2 = str;
                        Iterator it = collection2.iterator();
                        while (true) {
                            if (!it.hasNext()) {
                                obj = null;
                                break;
                            }
                            Object next = it.next();
                            if (Intrinsics.areEqual(((ConsumerGroupListing) next).groupId(), str2)) {
                                obj = next;
                                break;
                            }
                        }
                        ConsumerGroupListing consumerGroupListing = (ConsumerGroupListing) obj;
                        if (consumerGroupListing != null) {
                            Optional state = consumerGroupListing.state();
                            if (state.isPresent()) {
                                Object obj2 = state.get();
                                Intrinsics.checkNotNullExpressionValue(obj2, "optionalState.get()");
                                ConsumerGroupState consumerGroupState = (ConsumerGroupState) obj2;
                                failure = consumerGroupState == ConsumerGroupState.STABLE ? ProcessingResult.Companion.success() : ProcessingResult.Companion.failure("kafka consumer group has state '" + consumerGroupState + '\'');
                            } else {
                                failure = ProcessingResult.Companion.failure("no state is exposed for kafka consumer group '" + consumerGroupListing + '\'');
                            }
                            if (failure != null) {
                                return failure;
                            }
                        }
                        return ProcessingResult.Companion.failure("kafka consumer group '" + str + "' is not found, " + collection.size() + " group(s) are available: " + CollectionsKt.joinToString$default(collection, (CharSequence) null, (CharSequence) null, (CharSequence) null, 0, (CharSequence) null, new Function1<ConsumerGroupListing, CharSequence>() { // from class: tech.harmonysoft.oss.kafka.service.TestKafkaManager$verifyConsumerGroupIsStableAfterConsumerAddition$1$1.3
                            @NotNull
                            public final CharSequence invoke(ConsumerGroupListing consumerGroupListing2) {
                                String groupId = consumerGroupListing2.groupId();
                                Intrinsics.checkNotNullExpressionValue(groupId, "it.groupId()");
                                return groupId;
                            }
                        }, 31, (Object) null));
                    }
                }, 4, (Object) null);
                Unit unit = Unit.INSTANCE;
                AutoCloseableKt.closeFinally(admin, (Throwable) null);
            } finally {
            }
        } catch (Throwable th2) {
            AutoCloseableKt.closeFinally(admin, th);
            throw th2;
        }
    }

    public final void subscribe(@NotNull String str) {
        Intrinsics.checkNotNullParameter(str, "topic");
        ensureTopicExists(str);
        if (this.topic2consumer.containsKey(str)) {
            this.logger.info("kafka topic '{}' is already subscribed", str);
            return;
        }
        this.logger.info("subscribing kafka topic '{}'", str);
        this.topic2consumer.computeIfAbsent(str, (v2) -> {
            return m1subscribe$lambda3(r2, r3, v2);
        });
        this.logger.info("subscribed kafka topic '{}'", str);
    }

    public final void verifyMessageIsReceived(@NotNull String str, @NotNull String str2) {
        Intrinsics.checkNotNullParameter(str, "expected");
        Intrinsics.checkNotNullParameter(str2, "topic");
        final String obj = this.fixtureHelper.prepareTestData(KafkaTestFixture.INSTANCE.getTYPE(), new KafkaFixtureContext(str2), str).toString();
        verifyMessageIsReceived(str, str2, new Function1<ConsumerRecord<String, String>, Boolean>() { // from class: tech.harmonysoft.oss.kafka.service.TestKafkaManager$verifyMessageIsReceived$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(1);
            }

            @NotNull
            public final Boolean invoke(@NotNull ConsumerRecord<String, String> consumerRecord) {
                Intrinsics.checkNotNullParameter(consumerRecord, "it");
                return Boolean.valueOf(Intrinsics.areEqual(consumerRecord.value(), obj));
            }
        });
    }

    public final void verifyMessageWithTargetHeaderValueIsReceived(@NotNull String str, @NotNull final String str2, @NotNull final String str3) {
        Intrinsics.checkNotNullParameter(str, "topic");
        Intrinsics.checkNotNullParameter(str2, "headerKey");
        Intrinsics.checkNotNullParameter(str3, "expectedHeaderValue");
        verifyMessageIsReceived("a message with header " + str2 + '=' + str3, str, new Function1<ConsumerRecord<String, String>, Boolean>() { // from class: tech.harmonysoft.oss.kafka.service.TestKafkaManager$verifyMessageWithTargetHeaderValueIsReceived$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(1);
            }

            /* JADX WARN: Removed duplicated region for block: B:13:0x007f A[SYNTHETIC] */
            /* JADX WARN: Removed duplicated region for block: B:22:? A[LOOP:0: B:4:0x0029->B:22:?, LOOP_END, SYNTHETIC] */
            @org.jetbrains.annotations.NotNull
            /*
                Code decompiled incorrectly, please refer to instructions dump.
                To view partially-correct add '--show-bad-code' argument
            */
            public final java.lang.Boolean invoke(@org.jetbrains.annotations.NotNull org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String> r6) {
                /*
                    r5 = this;
                    r0 = r6
                    java.lang.String r1 = "record"
                    kotlin.jvm.internal.Intrinsics.checkNotNullParameter(r0, r1)
                    r0 = r6
                    org.apache.kafka.common.header.Headers r0 = r0.headers()
                    r1 = r0
                    if (r1 == 0) goto L8b
                    java.lang.Iterable r0 = (java.lang.Iterable) r0
                    r7 = r0
                    r0 = r5
                    java.lang.String r0 = r4
                    r8 = r0
                    r0 = r5
                    java.lang.String r0 = r5
                    r9 = r0
                    r0 = r7
                    r10 = r0
                    r0 = r10
                    java.util.Iterator r0 = r0.iterator()
                    r11 = r0
                L29:
                    r0 = r11
                    boolean r0 = r0.hasNext()
                    if (r0 == 0) goto L84
                    r0 = r11
                    java.lang.Object r0 = r0.next()
                    r12 = r0
                    r0 = r12
                    org.apache.kafka.common.header.Header r0 = (org.apache.kafka.common.header.Header) r0
                    r13 = r0
                    r0 = 0
                    r14 = r0
                    r0 = r13
                    java.lang.String r0 = r0.key()
                    r1 = r8
                    boolean r0 = kotlin.jvm.internal.Intrinsics.areEqual(r0, r1)
                    if (r0 == 0) goto L7b
                    r0 = r13
                    byte[] r0 = r0.value()
                    r1 = r0
                    java.lang.String r2 = "it.value()"
                    kotlin.jvm.internal.Intrinsics.checkNotNullExpressionValue(r1, r2)
                    r15 = r0
                    java.lang.String r0 = new java.lang.String
                    r1 = r0
                    r2 = r15
                    java.nio.charset.Charset r3 = kotlin.text.Charsets.UTF_8
                    r1.<init>(r2, r3)
                    r1 = r9
                    boolean r0 = kotlin.jvm.internal.Intrinsics.areEqual(r0, r1)
                    if (r0 == 0) goto L7b
                    r0 = 1
                    goto L7c
                L7b:
                    r0 = 0
                L7c:
                    if (r0 == 0) goto L29
                    r0 = r12
                    goto L85
                L84:
                    r0 = 0
                L85:
                    org.apache.kafka.common.header.Header r0 = (org.apache.kafka.common.header.Header) r0
                    goto L8d
                L8b:
                    r0 = 0
                L8d:
                    if (r0 == 0) goto L94
                    r0 = 1
                    goto L95
                L94:
                    r0 = 0
                L95:
                    java.lang.Boolean r0 = java.lang.Boolean.valueOf(r0)
                    return r0
                */
                throw new UnsupportedOperationException("Method not decompiled: tech.harmonysoft.oss.kafka.service.TestKafkaManager$verifyMessageWithTargetHeaderValueIsReceived$1.invoke(org.apache.kafka.clients.consumer.ConsumerRecord):java.lang.Boolean");
            }
        });
    }

    public final void verifyMessageIsReceived(@NotNull String str, @NotNull final String str2, @NotNull final Function1<? super ConsumerRecord<String, String>, Boolean> function1) {
        Intrinsics.checkNotNullParameter(str, "expected");
        Intrinsics.checkNotNullParameter(str2, "topic");
        Intrinsics.checkNotNullParameter(function1, "checker");
        final String obj = this.fixtureHelper.prepareTestData(KafkaTestFixture.INSTANCE.getTYPE(), new KafkaFixtureContext(str2), str).toString();
        VerificationUtil.verifyConditionHappens$default(VerificationUtil.INSTANCE, "target message is received from kafka topic '" + str2 + '\'', 0L, 0L, new Function0<ProcessingResult<Unit, String>>() { // from class: tech.harmonysoft.oss.kafka.service.TestKafkaManager$verifyMessageIsReceived$2
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(0);
            }

            @NotNull
            /* renamed from: invoke, reason: merged with bridge method [inline-methods] */
            public final ProcessingResult<Unit, String> m9invoke() {
                boolean z;
                List list = (List) TestKafkaManager.this.topic2messages.get(str2);
                List list2 = list;
                if (list2 == null || list2.isEmpty()) {
                    return ProcessingResult.Companion.failure("no messages are received from kafka topic '" + str2 + '\'');
                }
                List list3 = list;
                Function1<ConsumerRecord<String, String>, Boolean> function12 = function1;
                if (!(list3 instanceof Collection) || !list3.isEmpty()) {
                    Iterator it = list3.iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            z = false;
                            break;
                        }
                        if (((Boolean) function12.invoke(it.next())).booleanValue()) {
                            z = true;
                            break;
                        }
                    }
                } else {
                    z = false;
                }
                if (z) {
                    return ProcessingResult.Companion.success();
                }
                String str3 = str2;
                String str4 = obj;
                TestKafkaManager testKafkaManager = TestKafkaManager.this;
                StringBuilder sb = new StringBuilder();
                sb.append("target message is not received from kafka topic '" + str3 + "'.\n");
                sb.append("expected:\n");
                sb.append(str4);
                sb.append('\n' + list.size() + " message(s) are received:");
                int i = 0;
                for (Object obj2 : list) {
                    int i2 = i;
                    i++;
                    if (i2 < 0) {
                        CollectionsKt.throwIndexOverflow();
                    }
                    sb.append('\n' + i2 + ") ").append(testKafkaManager.recordToString((ConsumerRecord) obj2));
                }
                String sb2 = sb.toString();
                Intrinsics.checkNotNullExpressionValue(sb2, "StringBuilder().apply(builderAction).toString()");
                return ProcessingResult.Companion.failure(sb2);
            }
        }, 6, (Object) null);
    }

    public final void verifyJsonMessageIsReceived(@NotNull String str, @NotNull String str2) {
        Intrinsics.checkNotNullParameter(str, "expectedJson");
        Intrinsics.checkNotNullParameter(str2, "topic");
        String obj = this.fixtureHelper.prepareTestData(KafkaTestFixture.INSTANCE.getTYPE(), new KafkaFixtureContext(str2), CommonJsonUtil.INSTANCE.prepareDynamicMarkers(str)).toString();
        final Object parseJson = this.jsonApi.parseJson(obj);
        verifyMessageIsReceived(obj, str2, new Function1<ConsumerRecord<String, String>, Boolean>() { // from class: tech.harmonysoft.oss.kafka.service.TestKafkaManager$verifyJsonMessageIsReceived$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(1);
            }

            @NotNull
            public final Boolean invoke(@NotNull ConsumerRecord<String, String> consumerRecord) {
                boolean z;
                JsonApi jsonApi;
                boolean z2;
                DynamicBindingContext dynamicBindingContext;
                Intrinsics.checkNotNullParameter(consumerRecord, "it");
                try {
                    jsonApi = TestKafkaManager.this.jsonApi;
                    Object value = consumerRecord.value();
                    Intrinsics.checkNotNullExpressionValue(value, "it.value()");
                    TestMatchResult compareAndBind$default = CommonJsonUtil.compareAndBind$default(CommonJsonUtil.INSTANCE, parseJson, jsonApi.parseJson((String) value), (String) null, false, (Function2) null, 20, (Object) null);
                    if (compareAndBind$default.getErrors().isEmpty()) {
                        dynamicBindingContext = TestKafkaManager.this.dynamicContext;
                        dynamicBindingContext.storeBindings(compareAndBind$default.getBoundDynamicValues());
                        z2 = true;
                    } else {
                        z2 = false;
                    }
                    z = z2;
                } catch (Exception e) {
                    z = false;
                }
                return Boolean.valueOf(z);
            }
        });
    }

    public final void verifyMessageIsNotReceived(@NotNull String str, @NotNull final String str2) {
        Intrinsics.checkNotNullParameter(str, "expected");
        Intrinsics.checkNotNullParameter(str2, "topic");
        final String obj = this.fixtureHelper.prepareTestData(KafkaTestFixture.INSTANCE.getTYPE(), new KafkaFixtureContext(str2), str).toString();
        VerificationUtil.verifyConditionDoesNotHappen$default(VerificationUtil.INSTANCE, (String) null, 0L, 0L, new Function0<ProcessingResult<Unit, String>>() { // from class: tech.harmonysoft.oss.kafka.service.TestKafkaManager$verifyMessageIsNotReceived$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(0);
            }

            @NotNull
            /* renamed from: invoke, reason: merged with bridge method [inline-methods] */
            public final ProcessingResult<Unit, String> m8invoke() {
                Object obj2;
                List list = (List) TestKafkaManager.this.topic2messages.get(str2);
                if (list != null) {
                    List list2 = list;
                    String str3 = obj;
                    Iterator it = list2.iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            obj2 = null;
                            break;
                        }
                        Object next = it.next();
                        if (Intrinsics.areEqual(((ConsumerRecord) next).value(), str3)) {
                            obj2 = next;
                            break;
                        }
                    }
                    ConsumerRecord consumerRecord = (ConsumerRecord) obj2;
                    if (consumerRecord != null) {
                        ProcessingResult<Unit, String> failure = ProcessingResult.Companion.failure("received unexpected message in kafka topic '" + str2 + "': " + ((String) consumerRecord.value()));
                        if (failure != null) {
                            return failure;
                        }
                    }
                }
                return ProcessingResult.Companion.success();
            }
        }, 7, (Object) null);
    }

    public final void verifyJsonMessageIsNotReceived(@NotNull String str, @NotNull final String str2) {
        Intrinsics.checkNotNullParameter(str, "expected");
        Intrinsics.checkNotNullParameter(str2, "topic");
        final Object parseJson = this.jsonApi.parseJson(this.fixtureHelper.prepareTestData(KafkaTestFixture.INSTANCE.getTYPE(), new KafkaFixtureContext(str2), str).toString());
        VerificationUtil.verifyConditionDoesNotHappen$default(VerificationUtil.INSTANCE, (String) null, 0L, 0L, new Function0<ProcessingResult<Unit, String>>() { // from class: tech.harmonysoft.oss.kafka.service.TestKafkaManager$verifyJsonMessageIsNotReceived$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(0);
            }

            @NotNull
            /* renamed from: invoke, reason: merged with bridge method [inline-methods] */
            public final ProcessingResult<Unit, String> m7invoke() {
                Object obj;
                JsonApi jsonApi;
                List list = (List) TestKafkaManager.this.topic2messages.get(str2);
                if (list != null) {
                    List list2 = list;
                    TestKafkaManager testKafkaManager = TestKafkaManager.this;
                    Object obj2 = parseJson;
                    Iterator it = list2.iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            obj = null;
                            break;
                        }
                        Object next = it.next();
                        ConsumerRecord consumerRecord = (ConsumerRecord) next;
                        jsonApi = testKafkaManager.jsonApi;
                        Object value = consumerRecord.value();
                        Intrinsics.checkNotNullExpressionValue(value, "record.value()");
                        if (CommonJsonUtil.compareAndBind$default(CommonJsonUtil.INSTANCE, obj2, jsonApi.parseJson((String) value), (String) null, false, (Function2) null, 20, (Object) null).getErrors().isEmpty()) {
                            obj = next;
                            break;
                        }
                    }
                    ConsumerRecord consumerRecord2 = (ConsumerRecord) obj;
                    if (consumerRecord2 != null) {
                        ProcessingResult<Unit, String> failure = ProcessingResult.Companion.failure("received unexpected message in kafka topic '" + str2 + "': " + ((String) consumerRecord2.value()));
                        if (failure != null) {
                            return failure;
                        }
                    }
                }
                return ProcessingResult.Companion.success();
            }
        }, 7, (Object) null);
    }

    public final void verifyNoMessageIsReceived(@NotNull final String str) {
        Intrinsics.checkNotNullParameter(str, "topic");
        VerificationUtil.verifyConditionDoesNotHappen$default(VerificationUtil.INSTANCE, "no message is received in kafka topic '" + str + '\'', 0L, 0L, new Function0<ProcessingResult<Unit, String>>() { // from class: tech.harmonysoft.oss.kafka.service.TestKafkaManager$verifyNoMessageIsReceived$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(0);
            }

            @NotNull
            /* renamed from: invoke, reason: merged with bridge method [inline-methods] */
            public final ProcessingResult<Unit, String> m10invoke() {
                List list = (List) TestKafkaManager.this.topic2messages.get(str);
                List list2 = list;
                if (list2 == null || list2.isEmpty()) {
                    return ProcessingResult.Companion.success();
                }
                ProcessingResult.Companion companion = ProcessingResult.Companion;
                StringBuilder append = new StringBuilder().append("received ").append(list.size()).append(" in kafka topic '").append(str).append("': ");
                final TestKafkaManager testKafkaManager = TestKafkaManager.this;
                return companion.failure(append.append(CollectionsKt.joinToString$default(list, (CharSequence) null, (CharSequence) null, (CharSequence) null, 0, (CharSequence) null, new Function1<ConsumerRecord<String, String>, CharSequence>() { // from class: tech.harmonysoft.oss.kafka.service.TestKafkaManager$verifyNoMessageIsReceived$1.1
                    {
                        super(1);
                    }

                    @NotNull
                    public final CharSequence invoke(@NotNull ConsumerRecord<String, String> consumerRecord) {
                        Intrinsics.checkNotNullParameter(consumerRecord, "it");
                        return TestKafkaManager.this.recordToString(consumerRecord);
                    }
                }, 31, (Object) null)).toString());
            }
        }, 6, (Object) null);
    }

    public void onTestStart() {
        TestAware.DefaultImpls.onTestStart(this);
    }

    /* renamed from: subscribe$lambda-3, reason: not valid java name */
    private static final KafkaConsumer m1subscribe$lambda3(TestKafkaManager testKafkaManager, String str, String str2) {
        Intrinsics.checkNotNullParameter(testKafkaManager, "this$0");
        Intrinsics.checkNotNullParameter(str, "$topic");
        Intrinsics.checkNotNullParameter(str2, "it");
        return testKafkaManager.createConsumer(str);
    }
}
