package io.streamthoughts.jikkou.kafka;

import io.streamthoughts.jikkou.api.config.ConfigProperty;
import io.streamthoughts.jikkou.api.config.Configuration;
import io.streamthoughts.jikkou.api.error.JikkouException;
import io.streamthoughts.jikkou.common.utils.PropertiesUtils;
import io.streamthoughts.jikkou.kafka.control.KafkaFunction;
import io.streamthoughts.jikkou.kafka.internals.KafkaBrokersReady;
import io.streamthoughts.jikkou.kafka.internals.KafkaUtils;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.AdminClient;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/jikkou/kafka/AdminClientContext.class */
public class AdminClientContext implements AutoCloseable {
    private final Supplier<AdminClient> adminClientSupplier;
    private String clusterId;
    private AdminClient client;
    private KafkaBrokersReady.Options options = KafkaBrokersReady.Options.withDefaults().withTimeoutMs(DEFAULT_TIMEOUT_MS).withMinAvailableBrokers(DEFAULT_MIN_AVAILABLE_BROKERS).withRetryBackoffMs(DEFAULT_RETRY_BACKOFF_MS);
    private boolean isWaitForKafkaBrokersEnabled = true;
    private final ReentrantLock stateLock = new ReentrantLock();
    private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED);
    private static final Logger LOG = LoggerFactory.getLogger(AdminClientContext.class);
    public static final String ADMIN_CLIENT_CONFIG_NAME = "client";
    public static final ConfigProperty<Properties> ADMIN_CLIENT_CONFIG = ConfigProperty.ofMap(ADMIN_CLIENT_CONFIG_NAME).orElse(HashMap::new).map(KafkaUtils::getAdminClientConfigs).map(PropertiesUtils::fromMap);
    public static final ConfigProperty<Boolean> KAFKA_BROKERS_WAIT_FOR_ENABLED = ConfigProperty.ofBoolean("kafka.brokers.wait-for-enabled").orElse(true);
    private static final int DEFAULT_MIN_AVAILABLE_BROKERS = 1;
    public static final ConfigProperty<Integer> KAFKA_BROKERS_WAIT_FOR_MIN_AVAILABLE = ConfigProperty.ofInt("kafka.brokers.wait-for-min-available").orElse(Integer.valueOf(DEFAULT_MIN_AVAILABLE_BROKERS));
    private static final long DEFAULT_RETRY_BACKOFF_MS = 1000;
    public static final ConfigProperty<Long> KAFKA_BROKERS_WAIT_FOR_RETRY_BACKOFF_MS = ConfigProperty.ofLong("kafka.brokers.wait-for-retry-backoff-ms").orElse(Long.valueOf(DEFAULT_RETRY_BACKOFF_MS));
    private static final long DEFAULT_TIMEOUT_MS = 60000;
    public static final ConfigProperty<Long> KAFKA_BROKERS_WAIT_FOR_TIMEOUT_MS = ConfigProperty.ofLong("kafka.brokers.wait-for-timeout-ms").orElse(Long.valueOf(DEFAULT_TIMEOUT_MS));

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/streamthoughts/jikkou/kafka/AdminClientContext$State.class */
    public enum State {
        CREATED,
        CLOSED
    }

    public AdminClientContext(@NotNull Supplier<AdminClient> supplier) {
        this.adminClientSupplier = (Supplier) Objects.requireNonNull(supplier, "'adminClientSupplier' should not be null");
    }

    public AdminClientContext(@NotNull Configuration configuration) {
        this.adminClientSupplier = () -> {
            return KafkaUtils.newAdminClient((Properties) ADMIN_CLIENT_CONFIG.evaluate(configuration));
        };
        withWaitForKafkaBrokersEnabled(((Boolean) KAFKA_BROKERS_WAIT_FOR_ENABLED.evaluate(configuration)).booleanValue());
        if (this.isWaitForKafkaBrokersEnabled) {
            withWaitForRetryBackoff(((Long) KAFKA_BROKERS_WAIT_FOR_RETRY_BACKOFF_MS.evaluate(configuration)).longValue());
            withWaitForMinAvailableBrokers(((Integer) KAFKA_BROKERS_WAIT_FOR_MIN_AVAILABLE.evaluate(configuration)).intValue());
            withWaitTimeoutMs(((Long) KAFKA_BROKERS_WAIT_FOR_TIMEOUT_MS.evaluate(configuration)).longValue());
        }
    }

    public AdminClientContext withWaitForKafkaBrokersEnabled(boolean z) {
        this.isWaitForKafkaBrokersEnabled = z;
        return this;
    }

    public AdminClientContext withWaitForMinAvailableBrokers(int i) {
        this.options = this.options.withMinAvailableBrokers(i);
        return this;
    }

    public AdminClientContext withWaitForRetryBackoff(long j) {
        this.options = this.options.withRetryBackoffMs(j);
        return this;
    }

    public AdminClientContext withWaitTimeoutMs(long j) {
        this.options = this.options.withTimeoutMs(j);
        return this;
    }

    public <O> O invoke(@NotNull KafkaFunction<O> kafkaFunction) {
        return kafkaFunction.apply(client());
    }

    public <O> O invokeAndClose(@NotNull KafkaFunction<O> kafkaFunction) {
        try {
            return kafkaFunction.apply(client());
        } finally {
            close();
        }
    }

    public String getClusterId() {
        if (this.clusterId == null) {
            this.clusterId = (String) ((Optional) invoke(adminClient -> {
                try {
                    return Optional.ofNullable((String) adminClient.describeCluster().clusterId().get());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    LOG.error("Failed to describe Kafka ClusterID, thread was interrupted while waiting response");
                    return Optional.empty();
                } catch (ExecutionException e2) {
                    LOG.error("Failed to describe Kafka ClusterID due to an unexpected error", e2);
                    return Optional.empty();
                }
            })).orElse("unknown");
        }
        return this.clusterId;
    }

    public boolean isInitialized() {
        this.stateLock.lock();
        try {
            return this.state.get() == State.CREATED;
        } finally {
            this.stateLock.unlock();
        }
    }

    public AdminClient client() {
        this.stateLock.lock();
        try {
            if (!this.state.compareAndSet(State.CLOSED, State.CREATED)) {
                LOG.debug("AdminClient has already been created");
                return this.client;
            }
            this.client = this.adminClientSupplier.get();
            if (!this.isWaitForKafkaBrokersEnabled || KafkaUtils.waitForKafkaBrokers(this.client, this.options)) {
                return this.client;
            }
            throw new JikkouException("Timeout expired. The timeout period elapsed prior to the requested number of kafka brokers is available.");
        } finally {
            this.stateLock.unlock();
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.stateLock.lock();
        try {
            if (!this.state.compareAndSet(State.CREATED, State.CLOSED)) {
                LOG.info("AdminClient has already been closed");
                return;
            }
            LOG.info("Closing context for Kafka AdminClient");
            if (this.client != null) {
                this.client.close();
                this.client = null;
            }
        } finally {
            this.stateLock.unlock();
        }
    }
}
