package io.streamthoughts.jikkou.kafka.internals.admin;

import io.streamthoughts.jikkou.core.exceptions.JikkouRuntimeException;
import io.streamthoughts.jikkou.kafka.internals.KafkaBrokersReady;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.TopicExistsException;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/jikkou/kafka/internals/admin/AdminClientContext.class */
public class AdminClientContext implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(AdminClientContext.class);
    private final AdminClientFactory adminClientFactory;
    private String clusterId;
    private AdminClient adminClient;
    private KafkaBrokersReady.Options options = KafkaBrokersReady.Options.withDefaults();
    private boolean isWaitForKafkaBrokersEnabled = false;
    private final ReentrantLock stateLock = new ReentrantLock();
    private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED);

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/streamthoughts/jikkou/kafka/internals/admin/AdminClientContext$State.class */
    public enum State {
        CREATED,
        CLOSED
    }

    public AdminClientContext(@NotNull AdminClient adminClient) {
        this.adminClientFactory = () -> {
            return adminClient;
        };
    }

    public AdminClientContext(@NotNull AdminClientFactory adminClientFactory) {
        this.adminClientFactory = (AdminClientFactory) Objects.requireNonNull(adminClientFactory, "adminClientFactory must not be null");
    }

    public void createTopic(@NotNull String str, int i, short s) {
        LOG.info("Creating reporting topic: {}", str);
        try {
            getAdminClient().createTopics(List.of(new NewTopic(str, i, s))).all().get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            if ((cause != null) && (cause instanceof TopicExistsException)) {
                LOG.info("Cannot auto create topic {} - topics already exists. Error can be ignored.", str);
            } else {
                LOG.error("Cannot auto create topic {}", str, e2);
            }
        }
    }

    public boolean isTopicCleanupPolicyCompact(@NotNull String str, boolean z) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, str);
        try {
            ConfigEntry configEntry = ((Config) ((Map) getAdminClient().describeConfigs(Collections.singleton(configResource)).all().get()).get(configResource)).get("cleanup.policy");
            if (configEntry != null) {
                return configEntry.value().contains("compact");
            }
        } catch (InterruptedException e) {
            LOG.debug("Interrupted while checking if topic '{}' is configured with {}={}", new Object[]{str, "cleanup.policy", "compact"});
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            LOG.debug("Failed to check if topic '{}' is configured with {}={}", new Object[]{str, "cleanup.policy", "compact"});
        }
        return z;
    }

    @NotNull
    public String getClusterId() {
        Optional empty;
        if (this.clusterId == null) {
            try {
                empty = Optional.ofNullable((String) getAdminClient().describeCluster().clusterId().get());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOG.error("Failed to describe Kafka ClusterID, thread was interrupted while waiting response");
                empty = Optional.empty();
            } catch (ExecutionException e2) {
                LOG.error("Failed to describe Kafka ClusterID due to an unexpected error", e2);
                empty = Optional.empty();
            }
            this.clusterId = (String) empty.orElse("unknown");
        }
        return this.clusterId;
    }

    @NotNull
    public AdminClient getAdminClient() {
        this.stateLock.lock();
        try {
            if (!this.state.compareAndSet(State.CLOSED, State.CREATED)) {
                LOG.debug("AdminClient has already been created");
                return this.adminClient;
            }
            LOG.info("Retrieving Kafka AdminClient instance.");
            this.adminClient = this.adminClientFactory.createAdminClient();
            if (!this.isWaitForKafkaBrokersEnabled || new KafkaBrokersReady(this.options).waitForBrokers(this.adminClient)) {
                return this.adminClient;
            }
            throw new JikkouRuntimeException("Timeout expired. The timeout period elapsed prior to the requested number of kafka brokers is available.");
        } finally {
            this.stateLock.unlock();
        }
    }

    public void enabledWaitForKafkaBrokers(boolean z) {
        this.isWaitForKafkaBrokersEnabled = z;
    }

    public void setOptions(@NotNull KafkaBrokersReady.Options options) {
        this.options = (KafkaBrokersReady.Options) Objects.requireNonNull(options, "options must not be null");
    }

    public KafkaBrokersReady.Options getOptions() {
        return this.options;
    }

    public boolean isWaitForKafkaBrokersEnabled() {
        return this.isWaitForKafkaBrokersEnabled;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.stateLock.lock();
        try {
            if (!this.state.compareAndSet(State.CREATED, State.CLOSED)) {
                LOG.info("Kafka AdminClient has already been closed");
                return;
            }
            LOG.info("Closing context for Kafka AdminClient");
            if (this.adminClient != null) {
                this.adminClient.close();
                this.adminClient = null;
            }
        } finally {
            this.stateLock.unlock();
        }
    }
}
