package io.debezium.testing.testcontainers;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.testing.testcontainers.Connector;
import io.debezium.testing.testcontainers.util.ContainerImageVersions;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.management.InstanceNotFoundException;
import javax.management.JMException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.awaitility.Awaitility;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.images.PullPolicy;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:io/debezium/testing/testcontainers/DebeziumContainer.class */
public class DebeziumContainer extends GenericContainer<DebeziumContainer> {
    private static final String DEBEZIUM_CONTAINER = "quay.io/debezium/connect";
    private static final String DEBEZIUM_NIGHTLY_TAG = "nightly";
    private static final int KAFKA_CONNECT_PORT = 8083;
    private static final String TEST_PROPERTY_PREFIX = "debezium.test.";
    private static String debeziumLatestStable;
    private static final Integer DEFAULT_JMX_PORT = 13333;
    private static final Duration DEBEZIUM_CONTAINER_STARTUP_TIMEOUT = Duration.ofSeconds(waitTimeForRecords() * 30);
    public static final MediaType JSON = MediaType.get("application/json; charset=utf-8");
    protected static final ObjectMapper MAPPER = new ObjectMapper();
    protected static final OkHttpClient CLIENT = new OkHttpClient();

    public DebeziumContainer(DockerImageName dockerImageName) {
        super(dockerImageName);
        defaultConfig();
    }

    public DebeziumContainer(Future<String> future) {
        super(future);
        defaultConfig();
    }

    public DebeziumContainer(String str) {
        super(DockerImageName.parse(str));
        defaultConfig();
    }

    public static DebeziumContainer latestStable() {
        return new DebeziumContainer(String.format("%s:%s", DEBEZIUM_CONTAINER, lazilyRetrieveAndCacheLatestStable()));
    }

    private static String lazilyRetrieveAndCacheLatestStable() {
        if (debeziumLatestStable == null) {
            debeziumLatestStable = ContainerImageVersions.getStableVersion(DEBEZIUM_CONTAINER);
        }
        return debeziumLatestStable;
    }

    public static DebeziumContainer nightly() {
        return (DebeziumContainer) new DebeziumContainer(String.format("%s:%s", DEBEZIUM_CONTAINER, DEBEZIUM_NIGHTLY_TAG)).withImagePullPolicy(PullPolicy.ageBased(Duration.ofDays(1L)));
    }

    private void defaultConfig() {
        setWaitStrategy(new HttpWaitStrategy().forPath("/connectors").forPort(KAFKA_CONNECT_PORT).withStartupTimeout(DEBEZIUM_CONTAINER_STARTUP_TIMEOUT));
        withEnv("GROUP_ID", "1");
        withEnv("CONFIG_STORAGE_TOPIC", "debezium_connect_config");
        withEnv("OFFSET_STORAGE_TOPIC", "debezium_connect_offsets");
        withEnv("STATUS_STORAGE_TOPIC", "debezium_connect_status");
        withEnv("CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE", "false");
        withEnv("CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE", "false");
        withExposedPorts(new Integer[]{Integer.valueOf(KAFKA_CONNECT_PORT)});
    }

    public DebeziumContainer withKafka(KafkaContainer kafkaContainer) {
        return withKafka(kafkaContainer.getNetwork(), ((String) kafkaContainer.getNetworkAliases().get(0)) + ":9092");
    }

    public DebeziumContainer withKafka(Network network, String str) {
        withNetwork(network);
        withEnv("BOOTSTRAP_SERVERS", str);
        return self();
    }

    public DebeziumContainer enableApicurioConverters() {
        withEnv("ENABLE_APICURIO_CONVERTERS", "true");
        return self();
    }

    public DebeziumContainer enableJMX() {
        return enableJMX(DEFAULT_JMX_PORT);
    }

    public DebeziumContainer enableJMX(Integer num) {
        ((DebeziumContainer) ((DebeziumContainer) ((DebeziumContainer) withEnv("JMXHOST", "localhost")).withEnv("JMXPORT", String.valueOf(num))).withEnv("JMXAUTH", "false")).withEnv("JMXSSL", "false");
        addFixedExposedPort(num.intValue(), num.intValue());
        return self();
    }

    public static int waitTimeForRecords() {
        return Integer.parseInt(System.getProperty("debezium.test.records.waittime", "2"));
    }

    public String getTarget() {
        return "http://" + getHost() + ":" + getMappedPort(KAFKA_CONNECT_PORT);
    }

    public String getConnectorsUri() {
        return getTarget() + "/connectors/";
    }

    public String getConnectorUri(String str) {
        return getConnectorsUri() + str;
    }

    public String getPauseConnectorUri(String str) {
        return getConnectorUri(str) + "/pause";
    }

    public String getResumeConnectorUri(String str) {
        return getConnectorUri(str) + "/resume";
    }

    public String getConnectorStatusUri(String str) {
        return getConnectorUri(str) + "/status";
    }

    public String getConnectorConfigUri(String str) {
        return getConnectorUri(str) + "/config";
    }

    public void registerConnector(String str, ConnectorConfiguration connectorConfiguration) {
        Connector from = Connector.from(str, connectorConfiguration);
        executePOSTRequestSuccessfully(from.toJson(), getConnectorsUri());
        Awaitility.await().atMost(waitTimeForRecords() * 5, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(isConnectorConfigured(from.getName()));
        });
    }

    public void updateOrCreateConnector(String str, ConnectorConfiguration connectorConfiguration) {
        executePUTRequestSuccessfully(connectorConfiguration.getConfiguration().toString(), getConnectorConfigUri(str));
        Awaitility.await().atMost(waitTimeForRecords() * 5, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(isConnectorConfigured(str));
        });
    }

    private static void handleFailedResponse(Response response) {
        String str = "{empty response body}";
        try {
            ResponseBody body = response.body();
            if (null != body) {
                try {
                    str = body.string();
                } finally {
                }
            }
            throw new IllegalStateException("Unexpected response: " + String.valueOf(response) + " ; Response Body: " + str);
        } catch (IOException e) {
            throw new RuntimeException("Error connecting to Debezium container", e);
        }
    }

    private void executePOSTRequestSuccessfully(String str, String str2) {
        try {
            Response execute = CLIENT.newCall(new Request.Builder().url(str2).post(RequestBody.create(str, JSON)).build()).execute();
            try {
                if (!execute.isSuccessful()) {
                    handleFailedResponse(execute);
                }
                if (execute != null) {
                    execute.close();
                }
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException("Error connecting to Debezium container on URL: " + str2, e);
        }
    }

    private void executePUTRequestSuccessfully(String str, String str2) {
        try {
            Response execute = CLIENT.newCall(new Request.Builder().url(str2).put(RequestBody.create(str, JSON)).build()).execute();
            try {
                if (!execute.isSuccessful()) {
                    handleFailedResponse(execute);
                }
                if (execute != null) {
                    execute.close();
                }
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException("Error connecting to Debezium container", e);
        }
    }

    protected static Response executeGETRequest(Request request) {
        try {
            return CLIENT.newCall(request).execute();
        } catch (IOException e) {
            throw new RuntimeException("Error connecting to Debezium container", e);
        }
    }

    protected static Response executeGETRequestSuccessfully(Request request) {
        Response executeGETRequest = executeGETRequest(request);
        if (!executeGETRequest.isSuccessful()) {
            handleFailedResponse(executeGETRequest);
        }
        return executeGETRequest;
    }

    public boolean connectorIsNotRegistered(String str) {
        Response executeGETRequest = executeGETRequest(new Request.Builder().url(getConnectorUri(str)).build());
        try {
            boolean z = executeGETRequest.code() == 404;
            if (executeGETRequest != null) {
                executeGETRequest.close();
            }
            return z;
        } catch (Throwable th) {
            if (executeGETRequest != null) {
                try {
                    executeGETRequest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected void deleteDebeziumConnector(String str) {
        executeGETRequestSuccessfully(new Request.Builder().url(getConnectorUri(str)).delete().build()).close();
    }

    public void deleteConnector(String str) {
        deleteDebeziumConnector(str);
        Awaitility.await().atMost(waitTimeForRecords() * 5, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(connectorIsNotRegistered(str));
        });
    }

    public List<String> getRegisteredConnectors() {
        try {
            ResponseBody body = executeGETRequestSuccessfully(new Request.Builder().url(getConnectorsUri()).build()).body();
            if (null == body) {
                if (body != null) {
                    body.close();
                }
                return Collections.emptyList();
            }
            try {
                List<String> list = (List) MAPPER.readValue(body.string(), new TypeReference<List<String>>(this) { // from class: io.debezium.testing.testcontainers.DebeziumContainer.1
                });
                if (body != null) {
                    body.close();
                }
                return list;
            } finally {
            }
        } catch (IOException e) {
            throw new IllegalStateException("Error fetching list of registered connectors", e);
        }
    }

    public boolean isConnectorConfigured(String str) {
        Response executeGETRequest = executeGETRequest(new Request.Builder().url(getConnectorUri(str)).build());
        try {
            boolean isSuccessful = executeGETRequest.isSuccessful();
            if (executeGETRequest != null) {
                executeGETRequest.close();
            }
            return isSuccessful;
        } catch (Throwable th) {
            if (executeGETRequest != null) {
                try {
                    executeGETRequest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void ensureConnectorRegistered(String str) {
        Awaitility.await().atMost(waitTimeForRecords() * 5, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(isConnectorConfigured(str));
        });
    }

    public void deleteAllConnectors() {
        Iterator<String> it = getRegisteredConnectors().iterator();
        while (it.hasNext()) {
            deleteDebeziumConnector(it.next());
        }
        Awaitility.await().atMost(waitTimeForRecords() * 5, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(getRegisteredConnectors().size() == 0);
        });
    }

    public Connector.State getConnectorState(String str) {
        try {
            ResponseBody body = executeGETRequestSuccessfully(new Request.Builder().url(getConnectorStatusUri(str)).build()).body();
            if (null == body) {
                if (body != null) {
                    body.close();
                }
                return null;
            }
            try {
                Connector.State valueOf = Connector.State.valueOf(MAPPER.readTree(body.string()).get("connector").get("state").asText());
                if (body != null) {
                    body.close();
                }
                return valueOf;
            } catch (Throwable th) {
                if (body != null) {
                    try {
                        body.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (IOException e) {
            throw new IllegalStateException("Error fetching connector state for connector: " + str, e);
        }
    }

    public Connector.State getConnectorTaskState(String str, int i) {
        try {
            ResponseBody body = executeGETRequestSuccessfully(new Request.Builder().url(getConnectorStatusUri(str)).get().build()).body();
            if (null == body) {
                if (body != null) {
                    body.close();
                }
                return null;
            }
            try {
                JsonNode jsonNode = MAPPER.readTree(body.string()).get("tasks").get(i);
                if (jsonNode == null) {
                    if (body != null) {
                        body.close();
                    }
                    return null;
                }
                Connector.State valueOf = Connector.State.valueOf(jsonNode.get("state").asText());
                if (body != null) {
                    body.close();
                }
                return valueOf;
            } catch (Throwable th) {
                if (body != null) {
                    try {
                        body.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (IOException e) {
            throw new IllegalStateException("Error fetching connector task state for connector task: " + str + "#" + i, e);
        }
    }

    public String getConnectorConfigProperty(String str, String str2) {
        try {
            ResponseBody body = executeGETRequestSuccessfully(new Request.Builder().url(getConnectorConfigUri(str)).get().build()).body();
            if (null == body) {
                if (body != null) {
                    body.close();
                }
                return null;
            }
            try {
                String asText = MAPPER.readTree(body.string()).get(str2).asText();
                if (body != null) {
                    body.close();
                }
                return asText;
            } catch (Throwable th) {
                if (body != null) {
                    try {
                        body.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (IOException e) {
            throw new IllegalStateException("Error fetching connector config property for connector: " + str, e);
        }
    }

    public void pauseConnector(String str) {
        executeGETRequestSuccessfully(new Request.Builder().url(getPauseConnectorUri(str)).put(RequestBody.create("", JSON)).build()).close();
        Awaitility.await().atMost(waitTimeForRecords() * 5, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(getConnectorState(str) == Connector.State.PAUSED);
        });
    }

    public void resumeConnector(String str) {
        executeGETRequestSuccessfully(new Request.Builder().url(getResumeConnectorUri(str)).put(RequestBody.create("", JSON)).build()).close();
        Awaitility.await().atMost(waitTimeForRecords() * 5, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(getConnectorState(str) == Connector.State.RUNNING);
        });
    }

    public void ensureConnectorState(String str, Connector.State state) {
        Awaitility.await().atMost(waitTimeForRecords() * 5, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(getConnectorState(str) == state);
        });
    }

    public void ensureConnectorTaskState(String str, int i, Connector.State state) {
        Awaitility.await().atMost(waitTimeForRecords() * 5, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(getConnectorTaskState(str, i) == state);
        });
    }

    public void ensureConnectorConfigProperty(String str, String str2, String str3) {
        Awaitility.await().atMost(waitTimeForRecords() * 5, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Objects.equals(str3, getConnectorConfigProperty(str, str2)));
        });
    }

    public void waitForStreamingRunning(String str, String str2) throws InterruptedException {
        waitForStreamingRunning(str, str2, "streaming");
    }

    public void waitForStreamingRunning(String str, String str2, String str3) {
        waitForStreamingRunning(str, str2, str3, null);
    }

    public void waitForStreamingRunning(String str, String str2, String str3, String str4) {
        Awaitility.await().atMost(120L, TimeUnit.SECONDS).ignoreException(InstanceNotFoundException.class).until(() -> {
            return Boolean.valueOf(isStreamingRunning(str, str2, str3, str4));
        });
    }

    public boolean isStreamingRunning(String str, String str2, String str3, String str4) throws JMException {
        try {
            JMXConnector connect = JMXConnectorFactory.connect(new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + DEFAULT_JMX_PORT + "/jmxrmi"), (Map) null);
            try {
                boolean booleanValue = ((Boolean) connect.getMBeanServerConnection().getAttribute(str4 != null ? getStreamingMetricsObjectName(str, str2, str3, str4) : getStreamingMetricsObjectName(str, str2, str3), "Connected")).booleanValue();
                if (connect != null) {
                    connect.close();
                }
                return booleanValue;
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException("Unable to connect to JMX service", e);
        }
    }

    private static ObjectName getStreamingMetricsObjectName(String str, String str2, String str3) throws MalformedObjectNameException {
        return new ObjectName("debezium." + str + ":type=connector-metrics,context=" + str3 + ",server=" + str2);
    }

    private static ObjectName getStreamingMetricsObjectName(String str, String str2, String str3, String str4) throws MalformedObjectNameException {
        return new ObjectName("debezium." + str + ":type=connector-metrics,context=" + str3 + ",server=" + str2 + ",task=" + str4);
    }

    public static ConnectorConfiguration getPostgresConnectorConfiguration(PostgreSQLContainer<?> postgreSQLContainer, int i, String... strArr) {
        ConnectorConfiguration with = ConnectorConfiguration.forJdbcContainer(postgreSQLContainer).with("topic.prefix", "dbserver" + i).with("slot.name", "debezium_" + i);
        if (strArr != null && strArr.length > 0) {
            for (int i2 = 0; i2 < strArr.length; i2 += 2) {
                with.with(strArr[i2], strArr[i2 + 1]);
            }
        }
        return with;
    }
}
