package io.debezium.testing.openshift.tools.kafka;

import io.debezium.testing.openshift.tools.HttpUtils;
import io.debezium.testing.openshift.tools.OpenShiftUtils;
import io.debezium.testing.openshift.tools.WaitConditions;
import io.fabric8.kubernetes.api.model.IntOrString;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.DoneableDeployment;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicy;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicyPortBuilder;
import io.fabric8.kubernetes.client.dsl.Deletable;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import io.fabric8.kubernetes.client.dsl.ServiceResource;
import io.fabric8.openshift.api.model.Route;
import io.fabric8.openshift.client.OpenShiftClient;
import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.KafkaConnectorList;
import io.strimzi.api.kafka.model.DoneableKafkaConnector;
import io.strimzi.api.kafka.model.KafkaConnect;
import io.strimzi.api.kafka.model.KafkaConnector;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/testing/openshift/tools/kafka/KafkaConnectController.class */
public class KafkaConnectController {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConnectController.class);
    private static final int METRICS_PORT = 9404;
    private final OpenShiftClient ocp;
    private final OkHttpClient http;
    private final String project;
    private final OpenShiftUtils ocpUtils;
    private final HttpUtils httpUtils;
    private final boolean useConnectorResources;
    private final String name;
    private KafkaConnect kafkaConnect;
    private Route apiRoute;
    private Route metricsRoute;
    private Service metricsService;

    public KafkaConnectController(KafkaConnect kafkaConnect, OpenShiftClient openShiftClient, OkHttpClient okHttpClient, boolean z) {
        this.kafkaConnect = kafkaConnect;
        this.name = kafkaConnect.getMetadata().getName();
        this.ocp = openShiftClient;
        this.http = okHttpClient;
        this.useConnectorResources = z;
        this.project = kafkaConnect.getMetadata().getNamespace();
        this.ocpUtils = new OpenShiftUtils(openShiftClient);
        this.httpUtils = new HttpUtils(okHttpClient);
    }

    public void disable() {
        LOGGER.info("Disabling KafkaConnect deployment (scaling to ZERO).");
        ((DoneableDeployment) ((DoneableDeployment) ((RollableScalableResource) ((NonNamespaceOperation) this.ocp.apps().deployments().inNamespace(this.project)).withName(this.kafkaConnect.getMetadata().getName() + "-connect")).edit()).editSpec().withReplicas(0).endSpec()).done();
        Awaitility.await().atMost(30L, TimeUnit.SECONDS).pollDelay(5L, TimeUnit.SECONDS).pollInterval(3L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(((PodList) ((FilterWatchListDeletable) ((NonNamespaceOperation) this.ocp.pods().inNamespace(this.project)).withLabel("strimzi.io/kind", "KafkaConnect")).list()).getItems().isEmpty());
        });
    }

    public void destroy() {
        LOGGER.info("Force deleting all KafkaConnect pods.");
        ((Deletable) ((FilterWatchListDeletable) ((NonNamespaceOperation) this.ocp.pods().inNamespace(this.project)).withLabel("strimzi.io/kind", "KafkaConnect")).withGracePeriod(0L)).delete();
        disable();
    }

    public KafkaConnect waitForConnectCluster() throws InterruptedException {
        this.kafkaConnect = (KafkaConnect) ((Resource) ((NonNamespaceOperation) Crds.kafkaConnectOperation(this.ocp).inNamespace(this.project)).withName(this.name)).waitUntilCondition((v0) -> {
            return WaitConditions.kafkaReadyCondition(v0);
        }, 5L, TimeUnit.MINUTES);
        return this.kafkaConnect;
    }

    public NetworkPolicy allowServiceAccess() {
        LOGGER.info("Creating NetworkPolicy allowing public access to " + this.kafkaConnect.getMetadata().getName() + "'s services");
        HashMap hashMap = new HashMap();
        hashMap.put("strimzi.io/cluster", this.kafkaConnect.getMetadata().getName());
        hashMap.put("strimzi.io/kind", "KafkaConnect");
        hashMap.put("strimzi.io/name", this.kafkaConnect.getMetadata().getName() + "-connect");
        return this.ocpUtils.createNetworkPolicy(this.project, this.kafkaConnect.getMetadata().getName() + "-allowed", hashMap, (List) Stream.of((Object[]) new Integer[]{8083, 8404}).map(IntOrString::new).map(intOrString -> {
            return new NetworkPolicyPortBuilder().withProtocol("TCP").withPort(intOrString).build();
        }).collect(Collectors.toList()));
    }

    public Route exposeApi() {
        LOGGER.info("Exposing KafkaConnect API");
        String str = this.kafkaConnect.getMetadata().getName() + "-connect-api";
        this.apiRoute = this.ocpUtils.createRoute(this.project, str, str, "rest-api", ((Service) ((ServiceResource) ((NonNamespaceOperation) this.ocp.services().inNamespace(this.project)).withName(str)).get()).getMetadata().getLabels());
        this.httpUtils.awaitApi(getApiURL());
        return this.apiRoute;
    }

    public Route exposeMetrics() {
        LOGGER.info("Exposing KafkaConnect metrics");
        String str = this.kafkaConnect.getMetadata().getName() + "-connect-metrics";
        Service service = (Service) ((ServiceResource) ((NonNamespaceOperation) this.ocp.services().inNamespace(this.project)).withName(this.kafkaConnect.getMetadata().getName() + "-connect-api")).get();
        Map<String, String> selector = service.getSpec().getSelector();
        Map<String, String> labels = service.getMetadata().getLabels();
        this.metricsService = this.ocpUtils.createService(this.project, str, "tcp-prometheus", METRICS_PORT, selector, labels);
        this.metricsRoute = this.ocpUtils.createRoute(this.project, str, str, "tcp-prometheus", labels);
        this.httpUtils.awaitApi(getMetricsURL());
        return this.metricsRoute;
    }

    public void deployConnector(String str, ConnectorConfigBuilder connectorConfigBuilder) throws IOException, InterruptedException {
        LOGGER.info("Deploying connector " + str);
        if (this.useConnectorResources) {
            deployConnectorCr(str, connectorConfigBuilder);
        } else {
            deployConnectorJson(str, connectorConfigBuilder);
        }
    }

    private void deployConnectorJson(String str, ConnectorConfigBuilder connectorConfigBuilder) throws IOException {
        if (this.apiRoute == null) {
            throw new IllegalStateException("KafkaConnect API was not exposed");
        }
        Response execute = this.http.newCall(new Request.Builder().url(getApiURL().resolve("/connectors/" + str + "/config")).put(RequestBody.create(connectorConfigBuilder.getJsonString(), MediaType.parse("application/json"))).build()).execute();
        Throwable th = null;
        try {
            if (!execute.isSuccessful()) {
                LOGGER.error(execute.request().url().toString());
                throw new RuntimeException("Connector registration request returned status code '" + execute.code() + "'");
            }
            LOGGER.info("Registered kafka connector '" + str + "'");
            if (execute != null) {
                if (0 == 0) {
                    execute.close();
                    return;
                }
                try {
                    execute.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (execute != null) {
                if (0 != 0) {
                    try {
                        execute.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    execute.close();
                }
            }
            throw th3;
        }
    }

    private void deployConnectorCr(String str, ConnectorConfigBuilder connectorConfigBuilder) throws InterruptedException {
        LOGGER.info("Deploying connector CR");
        KafkaConnector customResource = connectorConfigBuilder.getCustomResource();
        customResource.getMetadata().setName(str);
        customResource.getMetadata().getLabels().put("strimzi.io/cluster", this.kafkaConnect.getMetadata().getName());
        kafkaConnectorOperation().createOrReplace(new KafkaConnector[]{customResource});
        waitForKafkaConnector(customResource.getMetadata().getName());
    }

    public KafkaConnector waitForKafkaConnector(String str) throws InterruptedException {
        if (this.useConnectorResources) {
            return (KafkaConnector) ((Resource) kafkaConnectorOperation().withName(str)).waitUntilCondition((v0) -> {
                return WaitConditions.kafkaReadyCondition(v0);
            }, 5L, TimeUnit.MINUTES);
        }
        throw new IllegalStateException("Unable to wait for connector, deployment doesn't use custom resources.");
    }

    private NonNamespaceOperation<KafkaConnector, KafkaConnectorList, DoneableKafkaConnector, Resource<KafkaConnector, DoneableKafkaConnector>> kafkaConnectorOperation() {
        return (NonNamespaceOperation) Crds.kafkaConnectorOperation(this.ocp).inNamespace(this.project);
    }

    public void undeployConnector(String str) throws IOException {
        LOGGER.info("Undeploying kafka connector " + str);
        if (this.useConnectorResources) {
            undeployConnectorCr(str);
        } else {
            undeployConnectorJson(str);
        }
    }

    private void undeployConnectorJson(String str) throws IOException {
        if (this.apiRoute == null) {
            throw new IllegalStateException("KafkaConnect API was not exposed");
        }
        Response execute = this.http.newCall(new Request.Builder().url(getApiURL().resolve("/connectors/" + str)).delete().build()).execute();
        Throwable th = null;
        try {
            if (!execute.isSuccessful()) {
                LOGGER.error(execute.request().url().toString());
                throw new RuntimeException("Connector deletion request returned status code '" + execute.code() + "'");
            }
            LOGGER.info("Deleted kafka connector '" + str + "'");
            if (execute != null) {
                if (0 == 0) {
                    execute.close();
                    return;
                }
                try {
                    execute.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (execute != null) {
                if (0 != 0) {
                    try {
                        execute.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    execute.close();
                }
            }
            throw th3;
        }
    }

    private void undeployConnectorCr(String str) {
        ((Resource) kafkaConnectorOperation().withName(str)).delete();
        Awaitility.await().atMost(1L, TimeUnit.MINUTES).pollInterval(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(((Resource) kafkaConnectorOperation().withName(str)).get() == null);
        });
    }

    public List<String> getConnectMetrics() throws IOException {
        LOGGER.info("Retrieving connector metrics");
        Response execute = new OkHttpClient().newCall(new Request.Builder().url(getMetricsURL()).get().build()).execute();
        Throwable th = null;
        try {
            List<String> list = (List) Stream.of((Object[]) execute.body().string().split("\\r?\\n")).collect(Collectors.toList());
            if (execute != null) {
                if (0 != 0) {
                    try {
                        execute.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    execute.close();
                }
            }
            return list;
        } catch (Throwable th3) {
            if (execute != null) {
                if (0 != 0) {
                    try {
                        execute.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    execute.close();
                }
            }
            throw th3;
        }
    }

    public void waitForSnapshot(String str, String str2) throws IOException {
        LOGGER.info("Waiting for connector '" + str + "' to finish snapshot");
        Awaitility.await().atMost(WaitConditions.scaled(5L), TimeUnit.MINUTES).pollInterval(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(getConnectMetrics().stream().anyMatch(str3 -> {
                return str3.contains(str2) && str3.contains(str);
            }));
        });
    }

    public void waitForMySqlSnapshot(String str) throws IOException {
        waitForSnapshot(str, "debezium_mysql_connector_metrics_snapshotcompleted");
    }

    public void waitForPostgreSqlSnapshot(String str) throws IOException {
        waitForSnapshot(str, "debezium_postgres_connector_metrics_snapshotcompleted");
    }

    public void waitForSqlServerSnapshot(String str) throws IOException {
        waitForSnapshot(str, "debezium_sql_server_connector_metrics_snapshotcompleted");
    }

    public void waitForMongoSnapshot(String str) throws IOException {
        waitForSnapshot(str, "debezium_mongodb_connector_metrics_snapshotcompleted");
    }

    public void waitForDB2Snapshot(String str) throws IOException {
        waitForSnapshot(str, "debezium_db2_server_connector_metrics_snapshotcompleted");
    }

    public HttpUrl getApiURL() {
        return new HttpUrl.Builder().scheme("http").host(this.apiRoute.getSpec().getHost()).build();
    }

    public HttpUrl getMetricsURL() {
        return new HttpUrl.Builder().scheme("http").host(this.metricsRoute.getSpec().getHost()).build();
    }

    public boolean undeployCluster() {
        return ((Boolean) Crds.kafkaConnectOperation(this.ocp).delete(new KafkaConnect[]{this.kafkaConnect})).booleanValue();
    }
}
