package io.floodplain.kotlindsl;

import io.floodplain.streams.api.TopologyContext;
import io.floodplain.streams.base.BoundedMemoryRocksDBConfig;
import io.floodplain.streams.remotejoin.TopologyConstructor;
import io.floodplain.streams.serializer.ReplicationMessageSerde;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.Unit;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.BuildersKt;
import mu.KLogger;
import mu.KotlinLogging;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* compiled from: LocalRuntime.kt */
@Metadata(mv = {1, 7, 1}, k = 2, xi = 48, d1 = {"��`\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n��\n\u0002\u0010\u000e\n��\n\u0002\u0010\b\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010 \n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\u001a\u009f\u0001\u0010\u0002\u001a\u00020\u00032\u0006\u0010\u0004\u001a\u00020\u00052\b\u0010\u0006\u001a\u0004\u0018\u00010\u00072\u0006\u0010\b\u001a\u00020\t2'\u0010\n\u001a#\b\u0001\u0012\u0004\u0012\u00020\f\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00030\r\u0012\u0006\u0012\u0004\u0018\u00010\u000e0\u000b¢\u0006\u0002\b\u000f2\u0006\u0010\u0010\u001a\u00020\u00112\u0006\u0010\u0012\u001a\u00020\u00132\u0006\u0010\u0014\u001a\u00020\u00152\f\u0010\u0016\u001a\b\u0012\u0004\u0012\u00020\u00180\u00172\f\u0010\u0019\u001a\b\u0012\u0004\u0012\u00020\u001a0\u00172\u0018\u0010\u001b\u001a\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u0005\u0012\u0004\u0012\u00020\u00050\u001c0\u0017ø\u0001��¢\u0006\u0002\u0010\u001d\"\u000e\u0010��\u001a\u00020\u0001X\u0082\u0004¢\u0006\u0002\n��\u0082\u0002\u0004\n\u0002\b\u0019¨\u0006\u001e"}, d2 = {"logger", "Lmu/KLogger;", "runLocalTopology", "", "applicationId", "", "bufferTime", "", "topology", "Lorg/apache/kafka/streams/Topology;", "localCmds", "Lkotlin/Function2;", "Lio/floodplain/kotlindsl/LocalContext;", "Lkotlin/coroutines/Continuation;", "", "Lkotlin/ExtensionFunctionType;", "rootTopology", "Lio/floodplain/kotlindsl/Stream;", "topologyConstructor", "Lio/floodplain/streams/remotejoin/TopologyConstructor;", "context", "Lio/floodplain/streams/api/TopologyContext;", "sourceConfigs", "", "Lio/floodplain/kotlindsl/SourceConfig;", "sinkConfigs", "Lio/floodplain/kotlindsl/SinkConfig;", "sinks", "Lkotlin/Pair;", "(Ljava/lang/String;Ljava/lang/Integer;Lorg/apache/kafka/streams/Topology;Lkotlin/jvm/functions/Function2;Lio/floodplain/kotlindsl/Stream;Lio/floodplain/streams/remotejoin/TopologyConstructor;Lio/floodplain/streams/api/TopologyContext;Ljava/util/List;Ljava/util/List;Ljava/util/List;)V", "floodplain-dsl"})
/* loaded from: input_file:io/floodplain/kotlindsl/LocalRuntimeKt.class */
public final class LocalRuntimeKt {

    @NotNull
    private static final KLogger logger = KotlinLogging.INSTANCE.logger(new Function0<Unit>() { // from class: io.floodplain.kotlindsl.LocalRuntimeKt$logger$1
        public final void invoke() {
        }

        /* renamed from: invoke, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m21invoke() {
            invoke();
            return Unit.INSTANCE;
        }
    });

    public static final void runLocalTopology(@NotNull String str, @Nullable Integer num, @NotNull Topology topology, @NotNull Function2<? super LocalContext, ? super Continuation<? super Unit>, ? extends Object> function2, @NotNull Stream stream, @NotNull TopologyConstructor topologyConstructor, @NotNull TopologyContext topologyContext, @NotNull List<? extends SourceConfig> list, @NotNull List<? extends SinkConfig> list2, @NotNull List<Pair<String, String>> list3) {
        Intrinsics.checkNotNullParameter(str, "applicationId");
        Intrinsics.checkNotNullParameter(topology, "topology");
        Intrinsics.checkNotNullParameter(function2, "localCmds");
        Intrinsics.checkNotNullParameter(stream, "rootTopology");
        Intrinsics.checkNotNullParameter(topologyConstructor, "topologyConstructor");
        Intrinsics.checkNotNullParameter(topologyContext, "context");
        Intrinsics.checkNotNullParameter(list, "sourceConfigs");
        Intrinsics.checkNotNullParameter(list2, "sinkConfigs");
        Intrinsics.checkNotNullParameter(list3, "sinks");
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "doesntmatter:9092");
        properties.put("default.timestamp.extractor", WallclockTimestampExtractor.class);
        properties.put("default.key.serde", Serdes.String().getClass());
        properties.put("default.value.serde", ReplicationMessageSerde.class);
        properties.put("application.id", str);
        properties.put("state.dir", "teststorage/store");
        properties.put("built.in.metrics.version", "latest");
        properties.put("rocksdb.config.setter", BoundedMemoryRocksDBConfig.class);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties);
        Map propsToStringMap = Utils.propsToStringMap(properties);
        Intrinsics.checkNotNullExpressionValue(propsToStringMap, "propsToStringMap(\n            props\n        )");
        LocalDriverContext localDriverContext = new LocalDriverContext(topologyTestDriver, stream, topologyContext, topologyConstructor, propsToStringMap, list, list2, list3, num);
        localDriverContext.getConnectJobs().addAll(localDriverContext.connectSourceAndSink());
        try {
            BuildersKt.runBlocking$default((CoroutineContext) null, new LocalRuntimeKt$runLocalTopology$1(function2, localDriverContext, null), 1, (Object) null);
            Map allStateStores = topologyTestDriver.getAllStateStores();
            Intrinsics.checkNotNullExpressionValue(allStateStores, "driver.allStateStores");
            Iterator it = allStateStores.entrySet().iterator();
            while (it.hasNext()) {
                ((StateStore) ((Map.Entry) it.next()).getValue()).close();
            }
            topologyTestDriver.close();
            logger.info("Local runtime closed");
            Path absolutePath = Path.of("teststorage/store", new String[0]).toAbsolutePath();
            if (Files.exists(absolutePath, new LinkOption[0])) {
                java.util.stream.Stream<Path> sorted = Files.walk(absolutePath, new FileVisitOption[0]).sorted(Comparator.reverseOrder());
                LocalRuntimeKt$runLocalTopology$3 localRuntimeKt$runLocalTopology$3 = new Function1<Path, Unit>() { // from class: io.floodplain.kotlindsl.LocalRuntimeKt$runLocalTopology$3
                    public final void invoke(Path path) {
                        Files.deleteIfExists(path);
                    }

                    public /* bridge */ /* synthetic */ Object invoke(Object obj) {
                        invoke((Path) obj);
                        return Unit.INSTANCE;
                    }
                };
                sorted.forEach((v1) -> {
                    runLocalTopology$lambda$1(r1, v1);
                });
            }
        } catch (Throwable th) {
            Map allStateStores2 = topologyTestDriver.getAllStateStores();
            Intrinsics.checkNotNullExpressionValue(allStateStores2, "driver.allStateStores");
            Iterator it2 = allStateStores2.entrySet().iterator();
            while (it2.hasNext()) {
                ((StateStore) ((Map.Entry) it2.next()).getValue()).close();
            }
            topologyTestDriver.close();
            logger.info("Local runtime closed");
            Path absolutePath2 = Path.of("teststorage/store", new String[0]).toAbsolutePath();
            if (Files.exists(absolutePath2, new LinkOption[0])) {
                java.util.stream.Stream<Path> sorted2 = Files.walk(absolutePath2, new FileVisitOption[0]).sorted(Comparator.reverseOrder());
                LocalRuntimeKt$runLocalTopology$3 localRuntimeKt$runLocalTopology$32 = new Function1<Path, Unit>() { // from class: io.floodplain.kotlindsl.LocalRuntimeKt$runLocalTopology$3
                    public final void invoke(Path path) {
                        Files.deleteIfExists(path);
                    }

                    public /* bridge */ /* synthetic */ Object invoke(Object obj) {
                        invoke((Path) obj);
                        return Unit.INSTANCE;
                    }
                };
                sorted2.forEach((v1) -> {
                    runLocalTopology$lambda$1(r1, v1);
                });
            }
            throw th;
        }
    }

    private static final void runLocalTopology$lambda$1(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        function1.invoke(obj);
    }
}
