package io.debezium.testing.system.tools.kafka;

import io.debezium.testing.system.tools.HttpUtils;
import io.debezium.testing.system.tools.OpenShiftUtils;
import io.debezium.testing.system.tools.WaitConditions;
import io.debezium.testing.system.tools.kafka.connectors.ConnectorDeployer;
import io.debezium.testing.system.tools.kafka.connectors.ConnectorMetricsReader;
import io.debezium.testing.system.tools.kafka.connectors.CustomResourceConnectorDeployer;
import io.debezium.testing.system.tools.kafka.connectors.JsonConnectorDeployer;
import io.debezium.testing.system.tools.kafka.connectors.RestPrometheusMetricReader;
import io.debezium.testing.system.tools.kafka.docker.KafkaConnectConainer;
import io.fabric8.kubernetes.api.model.IntOrString;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy;
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyPortBuilder;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import io.fabric8.kubernetes.client.dsl.ServiceResource;
import io.fabric8.openshift.api.model.Route;
import io.fabric8.openshift.client.OpenShiftClient;
import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.model.connect.KafkaConnect;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/testing/system/tools/kafka/OcpKafkaConnectController.class */
public class OcpKafkaConnectController implements KafkaConnectController {
    private static final Logger LOGGER = LoggerFactory.getLogger(OcpKafkaConnectController.class);
    private static final int METRICS_PORT = 9404;
    private final OpenShiftClient ocp;
    private final OkHttpClient http;
    private final String project;
    private final StrimziOperatorController operatorController;
    private final OpenShiftUtils ocpUtils;
    private final HttpUtils httpUtils;
    private final String name;
    private KafkaConnect kafkaConnect;
    private Route apiRoute;
    private Route metricsRoute;
    private Service metricsService;

    public OcpKafkaConnectController(KafkaConnect kafkaConnect, StrimziOperatorController strimziOperatorController, OpenShiftClient openShiftClient, OkHttpClient okHttpClient) {
        this.kafkaConnect = kafkaConnect;
        this.name = kafkaConnect.getMetadata().getName();
        this.operatorController = strimziOperatorController;
        this.ocp = openShiftClient;
        this.http = okHttpClient;
        this.project = kafkaConnect.getMetadata().getNamespace();
        this.ocpUtils = new OpenShiftUtils(openShiftClient);
        this.httpUtils = new HttpUtils(okHttpClient);
    }

    @Override // io.debezium.testing.system.tools.kafka.KafkaConnectController
    public void disable() {
        LOGGER.info("Disabling KafkaConnect deployment (scaling to ZERO).");
        ((RollableScalableResource) ((NonNamespaceOperation) this.ocp.apps().deployments().inNamespace(this.project)).withName(this.name + "-connect")).scale(0);
        Awaitility.await().atMost(WaitConditions.scaled(30L), TimeUnit.SECONDS).pollDelay(5L, TimeUnit.SECONDS).pollInterval(3L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(((PodList) ((FilterWatchListDeletable) ((NonNamespaceOperation) this.ocp.pods().inNamespace(this.project)).withLabel("strimzi.io/kind", "KafkaConnect")).list()).getItems().isEmpty());
        });
    }

    @Override // io.debezium.testing.system.tools.kafka.KafkaConnectController
    public void destroy() {
        LOGGER.info("Force deleting all KafkaConnect pods.");
        KafkaConnect kafkaConnect = (KafkaConnect) ((Resource) ((NonNamespaceOperation) Crds.kafkaConnectOperation(this.ocp).inNamespace(this.project)).withName(this.name)).get();
        kafkaConnect.getSpec().setReplicas(0);
        ((NonNamespaceOperation) Crds.kafkaConnectOperation(this.ocp).inNamespace(this.project)).createOrReplace(kafkaConnect);
        Awaitility.await().atMost(WaitConditions.scaled(30L), TimeUnit.SECONDS).pollDelay(5L, TimeUnit.SECONDS).pollInterval(3L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(((PodList) ((FilterWatchListDeletable) ((NonNamespaceOperation) this.ocp.pods().inNamespace(this.project)).withLabel("strimzi.io/kind", "KafkaConnect")).list()).getItems().isEmpty());
        });
    }

    @Override // io.debezium.testing.system.tools.kafka.KafkaConnectController
    public void restore() throws InterruptedException {
        KafkaConnect kafkaConnect = (KafkaConnect) ((Resource) ((NonNamespaceOperation) Crds.kafkaConnectOperation(this.ocp).inNamespace(this.project)).withName(this.name)).get();
        kafkaConnect.getSpec().setReplicas(1);
        ((NonNamespaceOperation) Crds.kafkaConnectOperation(this.ocp).inNamespace(this.project)).createOrReplace(kafkaConnect);
        waitForCluster();
    }

    @Override // io.debezium.testing.system.tools.kafka.KafkaConnectController
    public void waitForCluster() throws InterruptedException {
        LOGGER.info("Waiting for Kafka Connect cluster '" + this.name + "'");
        this.kafkaConnect = (KafkaConnect) ((Resource) ((NonNamespaceOperation) Crds.kafkaConnectOperation(this.ocp).inNamespace(this.project)).withName(this.name)).waitUntilCondition((v0) -> {
            return WaitConditions.kafkaReadyCondition(v0);
        }, WaitConditions.scaled(5L), TimeUnit.MINUTES);
    }

    public NetworkPolicy allowServiceAccess() {
        LOGGER.info("Creating NetworkPolicy allowing public access to " + this.kafkaConnect.getMetadata().getName() + "'s services");
        HashMap hashMap = new HashMap();
        hashMap.put("strimzi.io/cluster", this.kafkaConnect.getMetadata().getName());
        hashMap.put("strimzi.io/kind", "KafkaConnect");
        hashMap.put("strimzi.io/name", this.kafkaConnect.getMetadata().getName() + "-connect");
        return this.ocpUtils.createNetworkPolicy(this.project, this.kafkaConnect.getMetadata().getName() + "-allowed", hashMap, (List) Stream.of((Object[]) new Integer[]{Integer.valueOf(KafkaConnectConainer.KAFKA_CONNECT_API_PORT), 8404, 9404}).map((v1) -> {
            return new IntOrString(v1);
        }).map(intOrString -> {
            return new NetworkPolicyPortBuilder().withProtocol("TCP").withPort(intOrString).build();
        }).collect(Collectors.toList()));
    }

    public Route exposeApi() {
        LOGGER.info("Exposing KafkaConnect API");
        String str = this.kafkaConnect.getMetadata().getName() + "-connect-api";
        this.apiRoute = this.ocpUtils.createRoute(this.project, str, str, "rest-api", ((Service) ((ServiceResource) ((NonNamespaceOperation) this.ocp.services().inNamespace(this.project)).withName(str)).get()).getMetadata().getLabels());
        this.httpUtils.awaitApi(getApiURL());
        return this.apiRoute;
    }

    public Route exposeMetrics() {
        LOGGER.info("Exposing KafkaConnect metrics");
        String str = this.kafkaConnect.getMetadata().getName() + "-connect-metrics";
        Service service = (Service) ((ServiceResource) ((NonNamespaceOperation) this.ocp.services().inNamespace(this.project)).withName(this.kafkaConnect.getMetadata().getName() + "-connect-api")).get();
        Map<String, String> selector = service.getSpec().getSelector();
        Map<String, String> labels = service.getMetadata().getLabels();
        this.metricsService = this.ocpUtils.createService(this.project, str, "tcp-prometheus", 9404, selector, labels);
        this.metricsRoute = this.ocpUtils.createRoute(this.project, str, str, "tcp-prometheus", labels);
        this.httpUtils.awaitApi(getMetricsURL());
        return this.metricsRoute;
    }

    @Override // io.debezium.testing.system.tools.kafka.KafkaConnectController
    public void deployConnector(ConnectorConfigBuilder connectorConfigBuilder) throws IOException, InterruptedException {
        LOGGER.info("Deploying connector " + connectorConfigBuilder.getConnectorName());
        getConnectorDeployer().deploy(connectorConfigBuilder);
    }

    private boolean hasConnectorResourcesEnabled() {
        return "true".equals(this.kafkaConnect.getMetadata().getAnnotations().get("strimzi.io/use-connector-resources"));
    }

    private ConnectorDeployer getConnectorDeployer() {
        return hasConnectorResourcesEnabled() ? new CustomResourceConnectorDeployer(this.kafkaConnect, this.ocp) : new JsonConnectorDeployer(getApiURL(), this.http);
    }

    @Override // io.debezium.testing.system.tools.kafka.KafkaConnectController
    public void undeployConnector(String str) throws IOException {
        LOGGER.info("Undeploying kafka connector " + str);
        getConnectorDeployer().undeploy(str);
    }

    @Override // io.debezium.testing.system.tools.kafka.KafkaConnectController
    public HttpUrl getApiURL() {
        if (this.apiRoute == null) {
            throw new IllegalStateException("KafkaConnect API was not exposed");
        }
        return new HttpUrl.Builder().scheme("http").host(this.apiRoute.getSpec().getHost()).build();
    }

    public HttpUrl getMetricsURL() {
        return new HttpUrl.Builder().scheme("http").host(this.metricsRoute.getSpec().getHost()).build();
    }

    @Override // io.debezium.testing.system.tools.kafka.KafkaConnectController
    public boolean undeploy() {
        try {
            ((Resource) Crds.kafkaConnectOperation(this.ocp).resource(this.kafkaConnect)).delete();
            ((Resource) Crds.kafkaConnectOperation(this.ocp).resource(this.kafkaConnect)).waitUntilCondition((v0) -> {
                return WaitConditions.resourceDeleted(v0);
            }, WaitConditions.scaled(1L), TimeUnit.MINUTES);
            return true;
        } catch (Exception e) {
            LOGGER.error("Kafka connect cluster was not deleted");
            return false;
        }
    }

    @Override // io.debezium.testing.system.tools.kafka.KafkaConnectController
    public ConnectorMetricsReader getMetricsReader() {
        return new RestPrometheusMetricReader(getMetricsURL());
    }
}
