package io.streamthoughts.jikkou.kafka.internals;

import io.streamthoughts.jikkou.common.utils.Time;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.common.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/jikkou/kafka/internals/KafkaBrokersReady.class */
public final class KafkaBrokersReady {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBrokersReady.class);
    private final Options options;

    /* loaded from: input_file:io/streamthoughts/jikkou/kafka/internals/KafkaBrokersReady$Options.class */
    public static class Options {
        private final int minAvailableBrokers;
        private final long timeoutMs;
        private final long retryBackoffMs;

        public static Options withDefaults() {
            return new Options();
        }

        public Options() {
            this(1, 60000L, 1000L);
        }

        public Options(int i, long j, long j2) {
            this.minAvailableBrokers = i;
            this.timeoutMs = j;
            this.retryBackoffMs = j2;
        }

        public Options withMinAvailableBrokers(int i) {
            return new Options(i, this.timeoutMs, this.retryBackoffMs);
        }

        public Options withTimeoutMs(long j) {
            return new Options(this.minAvailableBrokers, j, this.retryBackoffMs);
        }

        public Options withRetryBackoffMs(long j) {
            return new Options(this.minAvailableBrokers, this.timeoutMs, j);
        }
    }

    public KafkaBrokersReady(@Nonnull Options options) {
        this.options = options;
    }

    public boolean waitForBrokers(AdminClient adminClient) {
        boolean z = false;
        int i = 0;
        try {
            LOG.info("Checking for Kafka to be ready. Expected broker(s): {}", Integer.valueOf(this.options.minAvailableBrokers));
            long milliseconds = Time.SYSTEM.milliseconds();
            long j = this.options.timeoutMs;
            while (j > 0) {
                try {
                    Collection<Node> clusterNodes = getClusterNodes(adminClient, j);
                    i = clusterNodes.size();
                    z = !clusterNodes.isEmpty() && i >= this.options.minAvailableBrokers;
                } catch (InterruptedException | ExecutionException e) {
                    LOG.error("Error while listing Kafka nodes: {}", e.getMessage());
                    if (e instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                }
                if (z) {
                    break;
                }
                sleep(Duration.ofMillis(Math.min(this.options.retryBackoffMs, remaining(milliseconds, this.options.timeoutMs))));
                LOG.info("Waiting for Kafka cluster to be ready. Expected {} brokers, but only {} was found.", Integer.valueOf(this.options.minAvailableBrokers), Integer.valueOf(i));
                j = remaining(milliseconds, this.options.timeoutMs);
            }
            return z;
        } finally {
            if (!z) {
                LOG.warn("Timeout expired. Kafka cluster is not ready yet. Expected {} brokers, but only {} were available.", Integer.valueOf(this.options.minAvailableBrokers), Integer.valueOf(i));
            }
        }
    }

    private long remaining(long j, long j2) {
        return Math.max(0L, j2 - (Time.SYSTEM.milliseconds() - j));
    }

    private Collection<Node> getClusterNodes(AdminClient adminClient, long j) throws InterruptedException, ExecutionException {
        return (Collection) adminClient.describeCluster(timeoutMsOptions(j)).nodes().get();
    }

    private static DescribeClusterOptions timeoutMsOptions(long j) {
        return new DescribeClusterOptions().timeoutMs(Integer.valueOf((int) Math.min(2147483647L, j)));
    }

    public void sleep(Duration duration) {
        try {
            Thread.sleep(duration.toMillis());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
