package io.floodplain.kotlindsl;

import io.floodplain.reactive.topology.ReactivePipe;
import io.floodplain.reactive.topology.ReactivePipeParser;
import io.floodplain.streams.api.CoreOperators;
import io.floodplain.streams.api.TopologyContext;
import io.floodplain.streams.base.RocksDBConfigurationSetter;
import io.floodplain.streams.base.StreamOperators;
import io.floodplain.streams.remotejoin.ReplicationTopologyParser;
import io.floodplain.streams.remotejoin.TopologyConstructor;
import java.io.IOException;
import java.lang.Thread;
import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Stack;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.Triple;
import kotlin.TuplesKt;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.Intrinsics;
import mu.KLogger;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* compiled from: Stream.kt */
@Metadata(mv = {1, 1, 15}, bv = {1, 0, 3}, k = 1, d1 = {"��t\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0010!\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000e\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010 \n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\u0018��2\u00020\u0001B\r\u0012\u0006\u0010\u0002\u001a\u00020\u0003¢\u0006\u0002\u0010\u0004J\u000e\u0010\r\u001a\u00020\u000e2\u0006\u0010\u000f\u001a\u00020\tJ\u000e\u0010\u0010\u001a\u00020��2\u0006\u0010\u0011\u001a\u00020\fJ\u000e\u0010\u0012\u001a\u00020\u000e2\u0006\u0010\u000f\u001a\u00020\tJ\"\u0010\u0013\u001a\u0004\u0018\u00010\u00142\u0006\u0010\u0015\u001a\u00020\u00162\u0006\u0010\u0017\u001a\u00020\u00162\u0006\u0010\u0018\u001a\u00020\u0016H\u0002JD\u0010\u0019\u001a8\u0012\u0004\u0012\u00020\u001b\u0012\u0016\u0012\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u0016\u0012\u0004\u0012\u00020\u00160\u001d0\u001c\u0012\u0016\u0012\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u0016\u0012\u0004\u0012\u00020\u00160\u001d0\u001c0\u001a2\u0006\u0010\u001e\u001a\u00020\u001fJ\u001e\u0010 \u001a\u00020\u000e2\u0006\u0010!\u001a\u00020\"2\u0006\u0010#\u001a\u00020\u00162\u0006\u0010$\u001a\u00020\u0016J\u001f\u0010%\u001a\u00020��2\u0017\u0010&\u001a\u0013\u0012\u0004\u0012\u00020(\u0012\u0004\u0012\u00020\u000e0'¢\u0006\u0002\b)J\u0010\u0010*\u001a\u00020\u001b2\u0006\u0010\u001e\u001a\u00020\u001fH\u0002J(\u0010+\u001a\u0004\u0018\u00010,2\u0006\u0010-\u001a\u00020\u001b2\u0006\u0010\u0015\u001a\u00020\u00162\u0006\u0010#\u001a\u00020\u00162\u0006\u0010\u0018\u001a\u00020\u0016J\u000e\u0010\u0007\u001a\b\u0012\u0004\u0012\u00020\t0\u001cH\u0002J\u000e\u0010\n\u001a\b\u0012\u0004\u0012\u00020\t0\u001cH\u0002R\u0011\u0010\u0002\u001a\u00020\u0003¢\u0006\b\n��\u001a\u0004\b\u0005\u0010\u0006R\u0014\u0010\u0007\u001a\b\u0012\u0004\u0012\u00020\t0\bX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\n\u001a\b\u0012\u0004\u0012\u00020\t0\bX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u000b\u001a\b\u0012\u0004\u0012\u00020\f0\bX\u0082\u0004¢\u0006\u0002\n��¨\u0006."}, d2 = {"Lio/floodplain/kotlindsl/Stream;", "", "context", "Lio/floodplain/streams/api/TopologyContext;", "(Lio/floodplain/streams/api/TopologyContext;)V", "getContext", "()Lio/floodplain/streams/api/TopologyContext;", "sinkConfigurations", "", "Lio/floodplain/kotlindsl/Config;", "sourceConfigurations", "sources", "Lio/floodplain/kotlindsl/Source;", "addSinkConfiguration", "", "c", "addSource", "source", "addSourceConfiguration", "createProperties", "Ljava/util/Properties;", "applicationId", "", "brokers", "storagePath", "render", "Lkotlin/Triple;", "Lorg/apache/kafka/streams/Topology;", "", "Lkotlin/Pair;", "topologyConstructor", "Lio/floodplain/streams/remotejoin/TopologyConstructor;", "renderAndStart", "connectorURL", "Ljava/net/URL;", "kafkaHosts", "clientId", "renderAndTest", "testCmds", "Lkotlin/Function1;", "Lio/floodplain/kotlindsl/TestContext;", "Lkotlin/ExtensionFunctionType;", "renderTopology", "runTopology", "Lorg/apache/kafka/streams/KafkaStreams;", "topology", "floodplain-dsl"})
/* loaded from: input_file:io/floodplain/kotlindsl/Stream.class */
public final class Stream {
    private final List<Source> sources;
    private final List<Config> sinkConfigurations;
    private final List<Config> sourceConfigurations;

    @NotNull
    private final TopologyContext context;

    @NotNull
    public final Stream addSource(@NotNull Source source) {
        Intrinsics.checkParameterIsNotNull(source, "source");
        this.sources.add(source);
        return this;
    }

    public final void addSinkConfiguration(@NotNull Config config) {
        Intrinsics.checkParameterIsNotNull(config, "c");
        this.sinkConfigurations.add(config);
    }

    public final void addSourceConfiguration(@NotNull Config config) {
        Intrinsics.checkParameterIsNotNull(config, "c");
        this.sourceConfigurations.add(config);
    }

    private final List<Config> sinkConfigurations() {
        return CollectionsKt.toList(this.sinkConfigurations);
    }

    private final List<Config> sourceConfigurations() {
        return CollectionsKt.toList(this.sourceConfigurations);
    }

    private final Topology renderTopology(TopologyConstructor topologyConstructor) {
        Topology topology = new Topology();
        List<Source> list = this.sources;
        ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(list, 10));
        Iterator<T> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(((Source) it.next()).toReactivePipe());
        }
        ArrayList arrayList2 = arrayList;
        Stack stack = new Stack();
        Iterator it2 = arrayList2.iterator();
        while (it2.hasNext()) {
            ReactivePipeParser.processPipe(this.context, topologyConstructor, topology, topologyConstructor.generateNewStreamId(), stack, (ReactivePipe) it2.next(), false);
        }
        ReplicationTopologyParser.materializeStateStores(topologyConstructor, topology);
        return topology;
    }

    @NotNull
    public final Stream renderAndTest(@NotNull Function1<? super TestContext, Unit> function1) {
        KLogger kLogger;
        Intrinsics.checkParameterIsNotNull(function1, "testCmds");
        Topology renderTopology = renderTopology(new TopologyConstructor());
        kLogger = StreamKt.logger;
        kLogger.info("Testing topology:\n" + renderTopology.describe());
        PipeTestKt.testTopology(renderTopology, function1, this.context);
        return this;
    }

    public final void renderAndStart(@NotNull URL url, @NotNull String str, @NotNull String str2) {
        KLogger kLogger;
        Intrinsics.checkParameterIsNotNull(url, "connectorURL");
        Intrinsics.checkParameterIsNotNull(str, "kafkaHosts");
        Intrinsics.checkParameterIsNotNull(str2, "clientId");
        TopologyConstructor topologyConstructor = new TopologyConstructor();
        Triple<Topology, List<Pair<String, String>>, List<Pair<String, String>>> render = render(topologyConstructor);
        Topology topology = (Topology) render.component1();
        List<Pair> list = (List) render.component2();
        List<Pair> list2 = (List) render.component3();
        topologyConstructor.createTopicsAsNeeded(str, str2);
        for (Pair pair : list) {
            FloodplainConnectorKt.startConstructor((String) pair.component1(), this.context, url, (String) pair.component2(), true);
        }
        for (Pair pair2 : list2) {
            FloodplainConnectorKt.startConstructor((String) pair2.component1(), this.context, url, (String) pair2.component2(), true);
        }
        String generationalGroup = CoreOperators.generationalGroup("appId", this.context);
        Intrinsics.checkExpressionValueIsNotNull(generationalGroup, "appId");
        runTopology(topology, generationalGroup, str, "storagePath");
        kLogger = StreamKt.logger;
        kLogger.info(new Function0<String>() { // from class: io.floodplain.kotlindsl.Stream$renderAndStart$3
            @NotNull
            public final String invoke() {
                return "Topology running!";
            }
        });
    }

    @NotNull
    public final Triple<Topology, List<Pair<String, String>>, List<Pair<String, String>>> render(@NotNull TopologyConstructor topologyConstructor) {
        Intrinsics.checkParameterIsNotNull(topologyConstructor, "topologyConstructor");
        Topology renderTopology = renderTopology(topologyConstructor);
        List<Config> sourceConfigurations = sourceConfigurations();
        ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(sourceConfigurations, 10));
        Iterator<T> it = sourceConfigurations.iterator();
        while (it.hasNext()) {
            Pair<String, Map<String, String>> materializeConnectorConfig = ((Config) it.next()).materializeConnectorConfig(this.context);
            String str = (String) materializeConnectorConfig.component1();
            arrayList.add(TuplesKt.to(str, FloodplainConnectorKt.constructConnectorJson(this.context, str, (Map) materializeConnectorConfig.component2())));
        }
        ArrayList arrayList2 = arrayList;
        List<Config> sinkConfigurations = sinkConfigurations();
        ArrayList arrayList3 = new ArrayList(CollectionsKt.collectionSizeOrDefault(sinkConfigurations, 10));
        Iterator<T> it2 = sinkConfigurations.iterator();
        while (it2.hasNext()) {
            Pair<String, Map<String, String>> materializeConnectorConfig2 = ((Config) it2.next()).materializeConnectorConfig(this.context);
            String str2 = (String) materializeConnectorConfig2.component1();
            arrayList3.add(TuplesKt.to(str2, FloodplainConnectorKt.constructConnectorJson(this.context, str2, (Map) materializeConnectorConfig2.component2())));
        }
        return new Triple<>(renderTopology, arrayList2, arrayList3);
    }

    @Nullable
    public final KafkaStreams runTopology(@NotNull Topology topology, @NotNull String str, @NotNull String str2, @NotNull String str3) throws InterruptedException, IOException {
        Intrinsics.checkParameterIsNotNull(topology, "topology");
        Intrinsics.checkParameterIsNotNull(str, "applicationId");
        Intrinsics.checkParameterIsNotNull(str2, "kafkaHosts");
        Intrinsics.checkParameterIsNotNull(str3, "storagePath");
        final KafkaStreams kafkaStreams = new KafkaStreams(topology, createProperties(str, str2, str3));
        System.out.println((Object) ("CurrentTopology:\n " + topology.describe()));
        kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: io.floodplain.kotlindsl.Stream$runTopology$1
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public final void uncaughtException(@NotNull Thread thread, @Nullable Throwable th) {
                KLogger kLogger;
                Intrinsics.checkParameterIsNotNull(thread, "thread");
                kLogger = StreamKt.logger;
                kLogger.error("Error in streams. thread: " + thread.getName() + " exception: ", th);
                kafkaStreams.close();
            }
        });
        kafkaStreams.setStateListener(new KafkaStreams.StateListener() { // from class: io.floodplain.kotlindsl.Stream$runTopology$2
            public final void onChange(@Nullable KafkaStreams.State state, @Nullable KafkaStreams.State state2) {
                KLogger kLogger;
                kLogger = StreamKt.logger;
                kLogger.info("State moving from {} to {}", new Object[]{state2, state, kafkaStreams.state()});
            }
        });
        kafkaStreams.start();
        return kafkaStreams;
    }

    private final Properties createProperties(String str, String str2, String str3) {
        KLogger kLogger;
        KLogger kLogger2;
        KLogger kLogger3;
        Properties properties = new Properties();
        kLogger = StreamKt.logger;
        kLogger.info("Creating application with name: {}", str);
        kLogger2 = StreamKt.logger;
        kLogger2.info("Creating application id: {}", str);
        kLogger3 = StreamKt.logger;
        kLogger3.info("Starting instance in storagePath: {}", str3);
        properties.put("application.id", str);
        properties.put("bootstrap.servers", str2);
        properties.put("default.key.serde", Serdes.String().getClass());
        properties.put("default.value.serde", StreamOperators.replicationSerde.getClass());
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", 30000);
        properties.put("request.timeout.ms", 40000);
        properties.put("heartbeat.interval.ms", 5000);
        properties.put("max.poll.interval.ms", 7200000);
        properties.put("max.poll.records", 100);
        properties.put("compression.type", "lz4");
        properties.put("state.dir", str3);
        properties.put("num.stream.threads", 1);
        properties.put("num.standby.replicas", 0);
        properties.put("retries", 50);
        properties.put("replication.factor", Integer.valueOf(CoreOperators.topicReplicationCount()));
        properties.put("default.timestamp.extractor", WallclockTimestampExtractor.class);
        properties.put("retention.ms", 86400000);
        properties.put("message.timestamp.difference.max.ms", 6048000000L);
        properties.put("log.message.timestamp.difference.max.ms", 6652800000L);
        properties.put("cache.max.bytes.buffering", 10485760L);
        properties.put("commit.interval.ms", 1000);
        properties.put("max.request.size", 7900000);
        properties.put("max.partition.fetch.bytes", 7900000);
        properties.put("rocksdb.config.setter", RocksDBConfigurationSetter.class);
        return properties;
    }

    @NotNull
    public final TopologyContext getContext() {
        return this.context;
    }

    public Stream(@NotNull TopologyContext topologyContext) {
        Intrinsics.checkParameterIsNotNull(topologyContext, "context");
        this.context = topologyContext;
        this.sources = new ArrayList();
        this.sinkConfigurations = new ArrayList();
        this.sourceConfigurations = new ArrayList();
    }
}
