package io.strimzi.test.container;

import com.github.dockerjava.api.command.InspectContainerResponse;
import com.github.dockerjava.api.model.ContainerNetwork;
import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import io.strimzi.test.container.KafkaVersionService;
import java.io.IOException;
import java.io.StringWriter;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.logging.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.ToxiproxyContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.MountableFile;

/* loaded from: input_file:io/strimzi/test/container/StrimziKafkaContainer.class */
public class StrimziKafkaContainer extends GenericContainer<StrimziKafkaContainer> implements KafkaContainer {
    private static final Logger LOGGER = LoggerFactory.getLogger(StrimziKafkaContainer.class);
    public static final String STARTER_SCRIPT = "/testcontainers_start.sh";
    public static final int KAFKA_PORT = 9092;
    protected static final String NETWORK_ALIAS_PREFIX = "broker-";
    protected static final int INTER_BROKER_LISTENER_PORT = 9091;
    private final CompletableFuture<String> imageNameProvider;
    private final boolean enableBrokerContainerSlf4jLogging;
    private int kafkaExposedPort;
    private int internalZookeeperExposedPort;
    private Map<String, String> kafkaConfigurationMap;
    private String externalZookeeperConnect;
    private int brokerId;
    private Integer nodeId;
    private String kafkaVersion;
    private boolean useKraft;
    private Function<StrimziKafkaContainer, String> bootstrapServersProvider;
    private String clusterId;
    private MountableFile serverPropertiesFile;
    private ToxiproxyContainer proxyContainer;
    private ToxiproxyClient toxiproxyClient;
    private Proxy proxy;
    protected Set<String> listenerNames;
    private boolean oauthEnabled;
    private String realm;
    private String clientId;
    private String clientSecret;
    private String oauthUri;
    private String usernameClaim;
    private String saslUsername;
    private String saslPassword;
    private AuthenticationType authenticationType;

    public StrimziKafkaContainer() {
        this((CompletableFuture<String>) new CompletableFuture());
    }

    public StrimziKafkaContainer(String str) {
        this((CompletableFuture<String>) CompletableFuture.completedFuture(str));
    }

    private StrimziKafkaContainer(CompletableFuture<String> completableFuture) {
        super(completableFuture);
        this.enableBrokerContainerSlf4jLogging = Boolean.parseBoolean(System.getenv().getOrDefault("STRIMZI_TEST_CONTAINER_LOGGING_ENABLED", "false"));
        this.bootstrapServersProvider = strimziKafkaContainer -> {
            return String.format("PLAINTEXT://%s:%s", getHost(), Integer.valueOf(this.kafkaExposedPort));
        };
        this.listenerNames = new HashSet();
        this.authenticationType = AuthenticationType.NONE;
        this.imageNameProvider = completableFuture;
        super.setNetwork(Network.SHARED);
        super.setExposedPorts(Collections.singletonList(Integer.valueOf(KAFKA_PORT)));
        super.addEnv("LOG_DIR", "/tmp");
    }

    @DoNotMutate
    protected void doStart() {
        if (this.proxyContainer != null && !this.proxyContainer.isRunning()) {
            this.proxyContainer.start();
            if (this.toxiproxyClient == null) {
                this.toxiproxyClient = new ToxiproxyClient(this.proxyContainer.getHost(), this.proxyContainer.getControlPort());
            }
        }
        if (!this.imageNameProvider.isDone()) {
            this.imageNameProvider.complete(KafkaVersionService.strimziTestContainerImageName(this.kafkaVersion));
        }
        try {
            if (this.useKraft && ((this.kafkaVersion != null && this.kafkaVersion.startsWith("2.")) || this.imageNameProvider.get().contains("2.8.2"))) {
                throw new UnsupportedKraftKafkaVersionException("Specified Kafka version " + this.kafkaVersion + " is not supported in KRaft mode.");
            }
            if (!hasKraftOrExternalZooKeeperConfigured()) {
                super.addExposedPort(Integer.valueOf(StrimziZookeeperContainer.ZOOKEEPER_PORT));
            }
            super.withNetworkAliases(new String[]{"broker-" + this.brokerId});
            if (isOAuthEnabled()) {
                addEnv("OAUTH_JWKS_ENDPOINT_URI", this.oauthUri + "/realms/" + this.realm + "/protocol/openid-connect/certs");
                addEnv("OAUTH_VALID_ISSUER_URI", this.oauthUri + "/realms/" + this.realm);
                addEnv("OAUTH_CLIENT_ID", this.clientId);
                addEnv("OAUTH_CLIENT_SECRET", this.clientSecret);
                addEnv("OAUTH_TOKEN_ENDPOINT_URI", this.oauthUri + "/realms/" + this.realm + "/protocol/openid-connect/token");
                addEnv("OAUTH_USERNAME_CLAIM", this.usernameClaim);
            }
            if (this.enableBrokerContainerSlf4jLogging) {
                withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("StrimziKafkaContainer-" + this.brokerId)));
            }
            super.setCommand(new String[]{"sh", "-c", runStarterScript()});
            super.doStart();
        } catch (InterruptedException | ExecutionException e) {
            LOGGER.error("Error occurred during retrieving of image name provider", e);
            throw new RuntimeException(e);
        }
    }

    @DoNotMutate
    public void stop() {
        if (this.proxyContainer != null && this.proxyContainer.isRunning()) {
            this.proxyContainer.stop();
        }
        super.stop();
    }

    protected String runStarterScript() {
        return "while [ ! -x /testcontainers_start.sh ]; do sleep 0.1; done; /testcontainers_start.sh";
    }

    @DoNotMutate
    public StrimziKafkaContainer waitForRunning() {
        if (this.useKraft) {
            super.waitingFor(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1));
        } else {
            super.waitingFor(Wait.forLogMessage(".*Recorded new.*controller, from now on will use [node|broker].*", 1));
        }
        return this;
    }

    @DoNotMutate
    protected void containerIsStarting(InspectContainerResponse inspectContainerResponse, boolean z) {
        String str;
        super.containerIsStarting(inspectContainerResponse, z);
        this.kafkaExposedPort = getMappedPort(KAFKA_PORT).intValue();
        if (!hasKraftOrExternalZooKeeperConfigured()) {
            this.internalZookeeperExposedPort = getMappedPort(StrimziZookeeperContainer.ZOOKEEPER_PORT).intValue();
        }
        LOGGER.info("Mapped port: {}", Integer.valueOf(this.kafkaExposedPort));
        if (this.nodeId == null) {
            LOGGER.warn("Node ID is not set. Using broker ID {} as the default node ID.", Integer.valueOf(this.brokerId));
            this.nodeId = Integer.valueOf(this.brokerId);
        }
        String[] buildListenersConfig = buildListenersConfig(inspectContainerResponse);
        String overrideProperties = overrideProperties(buildDefaultServerProperties(buildListenersConfig[0], buildListenersConfig[1]), this.kafkaConfigurationMap);
        if (this.useKraft) {
            copyFileToContainer(Transferable.of(overrideProperties.getBytes(StandardCharsets.UTF_8)), "/opt/kafka/config/kraft/server.properties");
        } else {
            copyFileToContainer(Transferable.of(overrideProperties.getBytes(StandardCharsets.UTF_8)), "/opt/kafka/config/server.properties");
        }
        String str2 = "#!/bin/bash \n";
        if (this.useKraft) {
            if (this.clusterId == null) {
                this.clusterId = randomUuid();
                LOGGER.info("New `cluster.id` has been generated: {}", this.clusterId);
            }
            str = (str2 + "bin/kafka-storage.sh format -t=\"" + this.clusterId + "\" -c /opt/kafka/config/kraft/server.properties \n") + "bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties \n";
        } else {
            if (this.externalZookeeperConnect != null) {
                withEnv("KAFKA_ZOOKEEPER_CONNECT", this.externalZookeeperConnect);
            } else {
                str2 = str2 + "bin/zookeeper-server-start.sh config/zookeeper.properties &\n";
            }
            str = str2 + "bin/kafka-server-start.sh config/server.properties";
        }
        Utils.asTransferableBytes(this.serverPropertiesFile).ifPresent(transferable -> {
            copyFileToContainer(transferable, this.useKraft ? "/opt/kafka/config/kraft/server.properties" : "/opt/kafka/config/server.properties");
        });
        LOGGER.info("Copying command to 'STARTER_SCRIPT' script.");
        copyFileToContainer(Transferable.of(str.getBytes(StandardCharsets.UTF_8), 700), STARTER_SCRIPT);
    }

    @Override // io.strimzi.test.container.KafkaContainer
    @DoNotMutate
    public boolean hasKraftOrExternalZooKeeperConfigured() {
        return this.useKraft || this.externalZookeeperConnect != null;
    }

    protected String extractListenerName(String str) {
        String[] split = str.split(":");
        if (split.length < 3) {
            throw new IllegalArgumentException("The configured boostrap servers '" + str + "' must be prefixed with a listener name.");
        }
        return split[0];
    }

    protected String[] buildListenersConfig(InspectContainerResponse inspectContainerResponse) {
        String bootstrapServers = getBootstrapServers();
        String extractListenerName = extractListenerName(bootstrapServers);
        Collection<ContainerNetwork> values = inspectContainerResponse.getNetworkSettings().getNetworks().values();
        ArrayList<String> arrayList = new ArrayList();
        StringBuilder sb = new StringBuilder();
        StringBuilder sb2 = new StringBuilder();
        sb2.append(bootstrapServers);
        sb.append(extractListenerName).append(":").append("//").append("0.0.0.0").append(":").append(KAFKA_PORT).append(",");
        this.listenerNames.add(extractListenerName);
        int i = 1;
        int i2 = INTER_BROKER_LISTENER_PORT;
        for (ContainerNetwork containerNetwork : values) {
            String str = "BROKER" + i;
            sb2.append(",").append(str).append("://").append(containerNetwork.getIpAddress()).append(":").append(i2);
            arrayList.add(str);
            i++;
            i2--;
        }
        int i3 = INTER_BROKER_LISTENER_PORT;
        for (String str2 : arrayList) {
            sb.append(str2).append("://0.0.0.0:").append(i3).append(",");
            this.listenerNames.add(str2);
            i3--;
        }
        if (this.useKraft) {
            sb.append("CONTROLLER").append("://0.0.0.0:").append(9094);
            try {
                if ((this.kafkaVersion != null && KafkaVersionService.KafkaVersion.compareVersions(this.kafkaVersion, "3.9.0") >= 0) || KafkaVersionService.KafkaVersion.compareVersions(KafkaVersionService.KafkaVersion.extractVersionFromImageName(this.imageNameProvider.get()), "3.9.0") >= 0) {
                    sb2.append(",").append("CONTROLLER").append("://").append("broker-" + this.brokerId).append(":").append(9094);
                }
                this.listenerNames.add("CONTROLLER");
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
        LOGGER.info("This is all advertised listeners for Kafka {}", sb2);
        return new String[]{sb.toString(), sb2.toString()};
    }

    @DoNotMutate
    private String randomUuid() {
        UUID uuid = new UUID(0L, 1L);
        UUID uuid2 = new UUID(0L, 0L);
        UUID randomUUID = UUID.randomUUID();
        while (true) {
            UUID uuid3 = randomUUID;
            if (!uuid3.equals(uuid) && !uuid3.equals(uuid2)) {
                ByteBuffer wrap = ByteBuffer.wrap(new byte[16]);
                wrap.putLong(uuid3.getMostSignificantBits());
                wrap.putLong(uuid3.getLeastSignificantBits());
                return Base64.getUrlEncoder().withoutPadding().encodeToString(wrap.array());
            }
            randomUUID = UUID.randomUUID();
        }
    }

    protected Properties buildDefaultServerProperties(String str, String str2) {
        Properties properties = new Properties();
        properties.setProperty("listeners", str);
        properties.setProperty("inter.broker.listener.name", "BROKER1");
        properties.setProperty("broker.id", String.valueOf(this.brokerId));
        properties.setProperty("advertised.listeners", str2);
        properties.setProperty("listener.security.protocol.map", configureListenerSecurityProtocolMap("PLAINTEXT"));
        properties.setProperty("num.network.threads", "3");
        properties.setProperty("num.io.threads", "8");
        properties.setProperty("socket.send.buffer.bytes", "102400");
        properties.setProperty("socket.receive.buffer.bytes", "102400");
        properties.setProperty("socket.request.max.bytes", "104857600");
        properties.setProperty("log.dirs", "/tmp/default-log-dir");
        properties.setProperty("num.partitions", "1");
        properties.setProperty("num.recovery.threads.per.data.dir", "1");
        properties.setProperty("offsets.topic.replication.factor", "1");
        properties.setProperty("transaction.state.log.replication.factor", "1");
        properties.setProperty("transaction.state.log.min.isr", "1");
        properties.setProperty("log.retention.hours", "168");
        properties.setProperty("log.retention.check.interval.ms", "300000");
        if (this.useKraft) {
            properties.setProperty("process.roles", "broker,controller");
            properties.setProperty("node.id", String.valueOf(this.nodeId));
            properties.setProperty("controller.quorum.voters", String.format("%d@broker-" + this.nodeId + ":9094", this.nodeId));
            properties.setProperty("controller.listener.names", "CONTROLLER");
            if (this.authenticationType != AuthenticationType.NONE) {
                switch (this.authenticationType) {
                    case OAUTH_OVER_PLAIN:
                        if (!isOAuthEnabled()) {
                            throw new IllegalStateException("OAuth2 is not enabled: " + this.oauthEnabled);
                        }
                        configureOAuthOverPlain(properties);
                        break;
                    case OAUTH_BEARER:
                        if (!isOAuthEnabled()) {
                            throw new IllegalStateException("OAuth2 is not enabled: " + this.oauthEnabled);
                        }
                        configureOAuthBearer(properties);
                        break;
                    case SCRAM_SHA_256:
                    case SCRAM_SHA_512:
                    case GSSAPI:
                    default:
                        throw new IllegalStateException("Unsupported authentication type: " + String.valueOf(this.authenticationType));
                }
            }
        } else if (this.externalZookeeperConnect != null) {
            LOGGER.info("Using external ZooKeeper 'zookeeper.connect={}'.", this.externalZookeeperConnect);
            properties.put("zookeeper.connect", this.externalZookeeperConnect);
        } else {
            LOGGER.info("Using internal ZooKeeper 'zookeeper.connect={}.'", "localhost:2181");
            properties.put("zookeeper.connect", "localhost:2181");
        }
        return properties;
    }

    protected void configureOAuthOverPlain(Properties properties) {
        properties.setProperty("sasl.enabled.mechanisms", "PLAIN");
        properties.setProperty("sasl.mechanism.inter.broker.protocol", "PLAIN");
        properties.setProperty("listener.security.protocol.map", configureListenerSecurityProtocolMap("SASL_PLAINTEXT"));
        properties.setProperty("sasl.mechanism.controller.protocol", "PLAIN");
        properties.setProperty("principal.builder.class", "io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder");
        String format = String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", this.saslUsername, this.saslPassword);
        for (String str : this.listenerNames) {
            properties.setProperty("listener.name." + str.toLowerCase(Locale.ROOT) + ".plain.sasl.jaas.config", format);
            properties.setProperty("listener.name." + str.toLowerCase(Locale.ROOT) + ".plain.sasl.server.callback.handler.class", "io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler");
        }
    }

    protected void configureOAuthBearer(Properties properties) {
        properties.setProperty("sasl.enabled.mechanisms", "OAUTHBEARER");
        properties.setProperty("sasl.mechanism.inter.broker.protocol", "OAUTHBEARER");
        properties.setProperty("listener.security.protocol.map", configureListenerSecurityProtocolMap("SASL_PLAINTEXT"));
        properties.setProperty("sasl.mechanism.controller.protocol", "OAUTHBEARER");
        properties.setProperty("principal.builder.class", "io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder");
        for (String str : this.listenerNames) {
            properties.setProperty("listener.name." + str.toLowerCase(Locale.ROOT) + ".oauthbearer.sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;");
            properties.setProperty("listener.name." + str.toLowerCase(Locale.ROOT) + ".oauthbearer.sasl.server.callback.handler.class", "io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler");
            properties.setProperty("listener.name." + str.toLowerCase(Locale.ROOT) + ".oauthbearer.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler");
        }
    }

    protected String configureListenerSecurityProtocolMap(String str) {
        return (String) this.listenerNames.stream().map(str2 -> {
            return str2 + ":" + str;
        }).collect(Collectors.joining(","));
    }

    protected String overrideProperties(Properties properties, Map<String, String> map) {
        if (map != null && !map.isEmpty()) {
            Objects.requireNonNull(properties);
            map.forEach(properties::setProperty);
        }
        StringWriter stringWriter = new StringWriter();
        try {
            properties.store(stringWriter, (String) null);
            return stringWriter.toString();
        } catch (IOException e) {
            throw new UncheckedIOException("Failed to store Kafka server properties", e);
        }
    }

    @Override // io.strimzi.test.container.KafkaContainer
    public String getInternalZooKeeperConnect() {
        if (hasKraftOrExternalZooKeeperConfigured()) {
            throw new IllegalStateException("Connect string is not available when using KRaft or external ZooKeeper");
        }
        return getHost() + ":" + this.internalZookeeperExposedPort;
    }

    @Override // io.strimzi.test.container.KafkaContainer
    @DoNotMutate
    public String getBootstrapServers() {
        return this.proxyContainer != null ? String.format("PLAINTEXT://%s", getProxy().getListen()) : this.bootstrapServersProvider.apply(this);
    }

    public String getClusterId() {
        return this.clusterId;
    }

    public StrimziKafkaContainer withKafkaConfigurationMap(Map<String, String> map) {
        this.kafkaConfigurationMap = map;
        return this;
    }

    public StrimziKafkaContainer withExternalZookeeperConnect(String str) {
        if (this.useKraft) {
            throw new IllegalStateException("Cannot configure an external Zookeeper and use Kraft at the same time");
        }
        this.externalZookeeperConnect = str;
        return self();
    }

    public StrimziKafkaContainer withBrokerId(int i) {
        if (this.useKraft && this.brokerId != this.nodeId.intValue()) {
            throw new IllegalStateException("`broker.id` and `node.id` must have same value!");
        }
        this.brokerId = i;
        return self();
    }

    public StrimziKafkaContainer withNodeId(int i) {
        this.nodeId = Integer.valueOf(i);
        return self();
    }

    public StrimziKafkaContainer withKafkaVersion(String str) {
        this.kafkaVersion = str;
        return self();
    }

    public StrimziKafkaContainer withKraft() {
        this.useKraft = true;
        return self();
    }

    public StrimziKafkaContainer withPort(int i) {
        if (i <= 0) {
            throw new IllegalArgumentException("The fixed Kafka port must be greater than 0");
        }
        addFixedExposedPort(i, KAFKA_PORT);
        return self();
    }

    public StrimziKafkaContainer withServerProperties(MountableFile mountableFile) {
        this.serverPropertiesFile = mountableFile;
        return self();
    }

    public StrimziKafkaContainer withBootstrapServers(Function<StrimziKafkaContainer, String> function) {
        this.bootstrapServersProvider = function;
        return self();
    }

    public StrimziKafkaContainer withProxyContainer(ToxiproxyContainer toxiproxyContainer) {
        if (toxiproxyContainer != null) {
            this.proxyContainer = toxiproxyContainer;
            toxiproxyContainer.setNetwork(Network.SHARED);
            toxiproxyContainer.setNetworkAliases(Collections.singletonList("toxiproxy"));
        }
        return self();
    }

    public StrimziKafkaContainer withOAuthConfig(String str, String str2, String str3, String str4, String str5) {
        this.oauthEnabled = true;
        this.realm = str;
        this.clientId = str2;
        this.clientSecret = str3;
        this.oauthUri = str4;
        this.usernameClaim = str5;
        return self();
    }

    public StrimziKafkaContainer withAuthenticationType(AuthenticationType authenticationType) {
        if (authenticationType != null) {
            this.authenticationType = authenticationType;
        }
        return self();
    }

    public StrimziKafkaContainer withSaslUsername(String str) {
        if (str == null || str.trim().isEmpty()) {
            throw new IllegalArgumentException("SASL username cannot be null or empty.");
        }
        this.saslUsername = str;
        return self();
    }

    public StrimziKafkaContainer withSaslPassword(String str) {
        if (str == null || str.trim().isEmpty()) {
            throw new IllegalArgumentException("SASL password cannot be null or empty.");
        }
        this.saslPassword = str;
        return self();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StrimziKafkaContainer withClusterId(String str) {
        this.clusterId = str;
        return self();
    }

    public StrimziKafkaContainer withKafkaLog(Level level) {
        withCopyToContainer(Transferable.of(("log4j.rootLogger=" + level.name() + ", stdout\nlog4j.appender.stdout=org.apache.log4j.ConsoleAppender\nlog4j.appender.stdout.layout=org.apache.log4j.PatternLayout\nlog4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n\n").getBytes(StandardCharsets.UTF_8)), "/opt/kafka/config/log4j.properties");
        return self();
    }

    public synchronized Proxy getProxy() {
        if (this.proxyContainer == null) {
            throw new IllegalStateException("The proxy container has not been configured");
        }
        if (this.proxy == null) {
            if (this.toxiproxyClient == null) {
                this.toxiproxyClient = new ToxiproxyClient(this.proxyContainer.getHost(), this.proxyContainer.getControlPort());
            }
            try {
                this.proxy = this.toxiproxyClient.createProxy("kafka" + this.brokerId, "0.0.0.0:" + (8666 + this.brokerId), "toxiproxy:" + Utils.getFreePort());
            } catch (IOException e) {
                LOGGER.error("Error happened during creation of the Proxy: {}", e.getMessage());
                throw new RuntimeException(e);
            }
        }
        return this.proxy;
    }

    String getKafkaVersion() {
        return this.kafkaVersion;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getBrokerId() {
        return this.brokerId;
    }

    public boolean isOAuthEnabled() {
        return this.oauthEnabled;
    }

    public String getSaslUsername() {
        return this.saslUsername;
    }

    public String getSaslPassword() {
        return this.saslPassword;
    }

    public String getRealm() {
        return this.realm;
    }

    public String getClientId() {
        return this.clientId;
    }

    public String getClientSecret() {
        return this.clientSecret;
    }

    public String getOauthUri() {
        return this.oauthUri;
    }

    public String getUsernameClaim() {
        return this.usernameClaim;
    }

    public AuthenticationType getAuthenticationType() {
        return this.authenticationType;
    }
}
