package io.floodplain.kotlindsl;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.floodplain.streams.api.Topic;
import io.floodplain.streams.api.TopologyContext;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import kotlin.Metadata;
import kotlin.TuplesKt;
import kotlin.Unit;
import kotlin.collections.MapsKt;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.Intrinsics;
import kotlin.text.StringsKt;
import mu.KLogger;
import mu.KotlinLogging;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.Converter;
import org.jetbrains.annotations.NotNull;

/* compiled from: FloodplainConnector.kt */
@Metadata(mv = {1, 7, 1}, k = 2, xi = 48, d1 = {"��\u0082\u0001\n��\n\u0002\u0010\t\n��\n\u0002\u0010\b\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010$\n\u0002\u0010��\n��\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010 \n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010%\n\u0002\u0018\u0002\n\u0002\u0010!\n\u0002\b\u0007\n\u0002\u0010\u000b\n\u0002\b\u0002\u001a*\u0010\f\u001a\u00020\r2\u0006\u0010\u000e\u001a\u00020\u000f2\u0006\u0010\u0010\u001a\u00020\r2\u0012\u0010\u0011\u001a\u000e\u0012\u0004\u0012\u00020\r\u0012\u0004\u0012\u00020\u00130\u0012\u001a\u0018\u0010\u0014\u001a\u00020\u00152\u0006\u0010\u0016\u001a\u00020\r2\u0006\u0010\u0017\u001a\u00020\u0018H\u0002\u001a\u0016\u0010\u0019\u001a\b\u0012\u0004\u0012\u00020\r0\u001a2\u0006\u0010\u001b\u001a\u00020\u0018H\u0002\u001a&\u0010\u001c\u001a\u00020\u001d2\u0006\u0010\u001e\u001a\u00020\u001f2\u0006\u0010 \u001a\u00020!2\u0006\u0010\"\u001a\u00020#2\u0006\u0010$\u001a\u00020#\u001a \u0010%\u001a\u0014\u0012\u0004\u0012\u00020'\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u001d0(0&2\u0006\u0010 \u001a\u00020!\u001a\u0018\u0010)\u001a\u00020\u00152\u0006\u0010\u001b\u001a\u00020\u00182\u0006\u0010*\u001a\u00020\rH\u0002\u001a0\u0010+\u001a\u000e\u0012\u0004\u0012\u00020\r\u0012\u0004\u0012\u00020\u00130\u00122\u0012\u0010,\u001a\u000e\u0012\u0004\u0012\u00020\r\u0012\u0004\u0012\u00020\u00130\u00122\u0006\u0010-\u001a\u00020\rH\u0002\u001a.\u0010.\u001a\u00020\u00152\u0006\u0010\u0010\u001a\u00020\r2\u0006\u0010\u000e\u001a\u00020\u000f2\u0006\u0010\u0017\u001a\u00020\u00182\u0006\u0010*\u001a\u00020\r2\u0006\u0010/\u001a\u000200\u001a\f\u00101\u001a\u000200*\u00020\u0003H\u0002\"\u000e\u0010��\u001a\u00020\u0001X\u0086T¢\u0006\u0002\n��\"\u000e\u0010\u0002\u001a\u00020\u0003X\u0082T¢\u0006\u0002\n��\"\u0011\u0010\u0004\u001a\u00020\u0005¢\u0006\b\n��\u001a\u0004\b\u0006\u0010\u0007\"\u000e\u0010\b\u001a\u00020\tX\u0082\u0004¢\u0006\u0002\n��\"\u000e\u0010\n\u001a\u00020\u000bX\u0082\u0004¢\u0006\u0002\n��¨\u00062"}, d2 = {"DEFAULT_HTTP_TIMEOUT", "", "ERRORCODES", "", "httpClient", "Ljava/net/http/HttpClient;", "getHttpClient", "()Ljava/net/http/HttpClient;", "logger", "Lmu/KLogger;", "objectMapper", "Lcom/fasterxml/jackson/databind/ObjectMapper;", "constructConnectorJson", "", "topologyContext", "Lio/floodplain/streams/api/TopologyContext;", "connectorName", "parameters", "", "", "deleteConnector", "", "name", "connectURL", "Ljava/net/URL;", "existingConnectors", "", "url", "floodplainSinkFromTask", "Lio/floodplain/kotlindsl/FloodplainSink;", "task", "Lorg/apache/kafka/connect/sink/SinkTask;", "config", "Lio/floodplain/kotlindsl/SinkConfig;", "keyConverter", "Lorg/apache/kafka/connect/storage/Converter;", "valueConverter", "instantiateSinkConfig", "", "Lio/floodplain/streams/api/Topic;", "", "postToHttpJava11", "jsonString", "settingsWithPrefix", "settings", "prefix", "startConstructor", "force", "", "isErrorCode", "floodplain-dsl"})
/* loaded from: input_file:io/floodplain/kotlindsl/FloodplainConnectorKt.class */
public final class FloodplainConnectorKt {

    @NotNull
    private static final KLogger logger = KotlinLogging.INSTANCE.logger(new Function0<Unit>() { // from class: io.floodplain.kotlindsl.FloodplainConnectorKt$logger$1
        public final void invoke() {
        }

        /* renamed from: invoke, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m3invoke() {
            invoke();
            return Unit.INSTANCE;
        }
    });

    @NotNull
    private static final ObjectMapper objectMapper = new ObjectMapper();
    public static final long DEFAULT_HTTP_TIMEOUT = 10;

    @NotNull
    private static final HttpClient httpClient;
    private static final int ERRORCODES = 400;

    @NotNull
    public static final HttpClient getHttpClient() {
        return httpClient;
    }

    @NotNull
    public static final String constructConnectorJson(@NotNull TopologyContext topologyContext, @NotNull String str, @NotNull Map<String, ? extends Object> map) {
        Intrinsics.checkNotNullParameter(topologyContext, "topologyContext");
        Intrinsics.checkNotNullParameter(str, "connectorName");
        Intrinsics.checkNotNullParameter(map, "parameters");
        String str2 = topologyContext.topicName(str);
        ObjectNode createObjectNode = objectMapper.createObjectNode();
        createObjectNode.put("name", str2);
        JsonNode createObjectNode2 = objectMapper.createObjectNode();
        createObjectNode.set("config", createObjectNode2);
        for (Map.Entry<String, ? extends Object> entry : map.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();
            if (value instanceof String) {
                createObjectNode2.put(key, (String) value);
            } else if (value instanceof Integer) {
                createObjectNode2.put(key, (Integer) value);
            } else if (value instanceof Long) {
                createObjectNode2.put(key, (Long) value);
            } else if (value instanceof Float) {
                createObjectNode2.put(key, (Float) value);
            } else if (value instanceof Double) {
                createObjectNode2.put(key, (Double) value);
            } else if (value instanceof Boolean) {
                createObjectNode2.put(key, (Boolean) value);
            }
        }
        createObjectNode2.put("name", str2);
        createObjectNode2.put("database.server.name", str2);
        String writeValueAsString = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(createObjectNode);
        Intrinsics.checkNotNullExpressionValue(writeValueAsString, "objectMapper.writerWithD….writeValueAsString(node)");
        return writeValueAsString;
    }

    public static final void startConstructor(@NotNull String str, @NotNull TopologyContext topologyContext, @NotNull URL url, @NotNull String str2, boolean z) {
        Intrinsics.checkNotNullParameter(str, "connectorName");
        Intrinsics.checkNotNullParameter(topologyContext, "topologyContext");
        Intrinsics.checkNotNullParameter(url, "connectURL");
        Intrinsics.checkNotNullParameter(str2, "jsonString");
        String str3 = topologyContext.topicName(str);
        if (existingConnectors(url).contains(str3)) {
            if (z) {
                logger.warn("Force enabled, deleting old");
                Intrinsics.checkNotNullExpressionValue(str3, "generatedName");
                deleteConnector(str3, url);
            } else {
                logger.warn("Connector: {} already present, ignoring", str3);
            }
        }
        postToHttpJava11(url, str2);
    }

    private static final List<String> existingConnectors(URL url) {
        logger.info("Connecting to URL: " + url);
        ArrayNode readTree = objectMapper.readTree((InputStream) httpClient.send(HttpRequest.newBuilder().uri(url.toURI()).build(), HttpResponse.BodyHandlers.ofInputStream()).body());
        Intrinsics.checkNotNull(readTree, "null cannot be cast to non-null type com.fasterxml.jackson.databind.node.ArrayNode");
        ArrayNode arrayNode = readTree;
        ArrayList arrayList = new ArrayList();
        arrayNode.forEach((v1) -> {
            existingConnectors$lambda$1(r1, v1);
        });
        List<String> unmodifiableList = Collections.unmodifiableList(arrayList);
        Intrinsics.checkNotNullExpressionValue(unmodifiableList, "unmodifiableList(result)");
        return unmodifiableList;
    }

    private static final boolean isErrorCode(int i) {
        return i > ERRORCODES;
    }

    private static final void deleteConnector(String str, URL url) throws IOException {
        HttpRequest build = HttpRequest.newBuilder().uri(new URL(url + '/' + str).toURI()).DELETE().build();
        Intrinsics.checkNotNullExpressionValue(build, "newBuilder()\n        .ur…DELETE()\n        .build()");
        HttpResponse send = httpClient.send(build, HttpResponse.BodyHandlers.ofString());
        if (isErrorCode(send.statusCode())) {
            throw new IOException("Error deleting connector: " + send.uri());
        }
    }

    private static final void postToHttpJava11(URL url, String str) {
        HttpRequest build = HttpRequest.newBuilder().uri(url.toURI()).header("Content-Type", "application/json").header("Accept", "application/json").POST(HttpRequest.BodyPublishers.ofString(str)).build();
        Intrinsics.checkNotNullExpressionValue(build, "newBuilder()\n        .ur…String))\n        .build()");
        HttpResponse send = httpClient.send(build, HttpResponse.BodyHandlers.ofString());
        Intrinsics.checkNotNullExpressionValue(send, "httpClient.send(request, BodyHandlers.ofString())");
        if (isErrorCode(send.statusCode())) {
            logger.error("Scheduling connector failed. Request: " + str);
            throw new IOException("Error calling connector: " + send.uri() + " code: " + send.statusCode() + " body: " + ((String) send.body()));
        }
    }

    @NotNull
    public static final FloodplainSink floodplainSinkFromTask(@NotNull SinkTask sinkTask, @NotNull SinkConfig sinkConfig, @NotNull Converter converter, @NotNull Converter converter2) {
        Intrinsics.checkNotNullParameter(sinkTask, "task");
        Intrinsics.checkNotNullParameter(sinkConfig, "config");
        Intrinsics.checkNotNullParameter(converter, "keyConverter");
        Intrinsics.checkNotNullParameter(converter2, "valueConverter");
        return new LocalConnectorSink(sinkTask, sinkConfig, converter, converter2);
    }

    @NotNull
    public static final Map<Topic, List<FloodplainSink>> instantiateSinkConfig(@NotNull SinkConfig sinkConfig) {
        Intrinsics.checkNotNullParameter(sinkConfig, "config");
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (MaterializedConfig materializedConfig : sinkConfig.materializeConnectorConfig()) {
            Object newInstance = Class.forName(materializedConfig.getSettings().get("connector.class")).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            Intrinsics.checkNotNull(newInstance, "null cannot be cast to non-null type org.apache.kafka.connect.sink.SinkConnector");
            SinkConnector sinkConnector = (SinkConnector) newInstance;
            sinkConnector.start(materializedConfig.getSettings());
            Object newInstance2 = sinkConnector.taskClass().getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            Intrinsics.checkNotNull(newInstance2, "null cannot be cast to non-null type org.apache.kafka.connect.sink.SinkTask");
            SinkTask sinkTask = (SinkTask) newInstance2;
            sinkTask.start(materializedConfig.getSettings());
            Object newInstance3 = Class.forName(materializedConfig.getSettings().get("key.converter")).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            Intrinsics.checkNotNull(newInstance3, "null cannot be cast to non-null type org.apache.kafka.connect.storage.Converter");
            Converter converter = (Converter) newInstance3;
            Object newInstance4 = Class.forName(materializedConfig.getSettings().get("value.converter")).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            Intrinsics.checkNotNull(newInstance4, "null cannot be cast to non-null type org.apache.kafka.connect.storage.Converter");
            Converter converter2 = (Converter) newInstance4;
            converter.configure(settingsWithPrefix(materializedConfig.getSettings(), "key.converter."), true);
            converter2.configure(settingsWithPrefix(materializedConfig.getSettings(), "value.converter."), false);
            FloodplainSink floodplainSinkFromTask = floodplainSinkFromTask(sinkTask, sinkConfig, converter, converter2);
            for (Topic topic : materializedConfig.getTopics()) {
                FloodplainConnectorKt$instantiateSinkConfig$1$1$list$1 floodplainConnectorKt$instantiateSinkConfig$1$1$list$1 = new Function1<Topic, List<FloodplainSink>>() { // from class: io.floodplain.kotlindsl.FloodplainConnectorKt$instantiateSinkConfig$1$1$list$1
                    @NotNull
                    public final List<FloodplainSink> invoke(@NotNull Topic topic2) {
                        Intrinsics.checkNotNullParameter(topic2, "it");
                        return new ArrayList();
                    }
                };
                Object computeIfAbsent = linkedHashMap.computeIfAbsent(topic, (v1) -> {
                    return instantiateSinkConfig$lambda$4$lambda$3$lambda$2(r2, v1);
                });
                Intrinsics.checkNotNullExpressionValue(computeIfAbsent, "result.computeIfAbsent(topic) { mutableListOf() }");
                ((List) computeIfAbsent).add(floodplainSinkFromTask);
            }
        }
        return linkedHashMap;
    }

    private static final Map<String, Object> settingsWithPrefix(Map<String, ? extends Object> map, String str) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (Map.Entry<String, ? extends Object> entry : map.entrySet()) {
            if (StringsKt.startsWith$default(entry.getKey(), str, false, 2, (Object) null)) {
                linkedHashMap.put(entry.getKey(), entry.getValue());
            }
        }
        ArrayList arrayList = new ArrayList(linkedHashMap.size());
        for (Map.Entry entry2 : linkedHashMap.entrySet()) {
            String str2 = (String) entry2.getKey();
            Object value = entry2.getValue();
            String substring = str2.substring(str.length());
            Intrinsics.checkNotNullExpressionValue(substring, "this as java.lang.String).substring(startIndex)");
            arrayList.add(TuplesKt.to(substring, value));
        }
        return MapsKt.toMap(arrayList);
    }

    private static final void existingConnectors$lambda$1(List list, JsonNode jsonNode) {
        Intrinsics.checkNotNullParameter(list, "$result");
        Intrinsics.checkNotNullParameter(jsonNode, "j");
        String asText = jsonNode.asText();
        Intrinsics.checkNotNullExpressionValue(asText, "j.asText()");
        list.add(asText);
    }

    private static final List instantiateSinkConfig$lambda$4$lambda$3$lambda$2(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (List) function1.invoke(obj);
    }

    static {
        HttpClient build = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).followRedirects(HttpClient.Redirect.NORMAL).connectTimeout(Duration.ofSeconds(10L)).build();
        Intrinsics.checkNotNullExpressionValue(build, "newBuilder()\n    .versio…TP_TIMEOUT))\n    .build()");
        httpClient = build;
    }
}
