package kakafka.client;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.function.Predicate;
import kakafka.KakafkaException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kakafka/client/KakafkaClientBase.class */
public abstract class KakafkaClientBase {
    private final Properties properties;
    private final String topic4Produce;
    private final String[] topics4Consume;
    private final KakafkaProducer producer;
    private final KakafkaConsumer consumer;
    private static final Logger log = LoggerFactory.getLogger(KakafkaClientBase.class);
    private static final List<Class<?>> INSTANCES = new ArrayList();

    protected KakafkaClientBase(Properties properties, String str, String... strArr) {
        String simpleName = getClass().getSimpleName();
        log.info("[KFK] Init kafka client: {}", simpleName);
        if (INSTANCES.contains(getClass())) {
            throw new KakafkaException("Client must be singleton: " + getClass());
        }
        INSTANCES.add(getClass());
        this.topic4Produce = str;
        this.topics4Consume = strArr;
        this.properties = properties;
        Object obj = this.properties.get("client.id");
        if (obj == null || String.valueOf(obj).isEmpty()) {
            this.properties.setProperty("client.id", "KaKafka-" + simpleName);
        }
        this.producer = new KakafkaProducer(this.properties);
        this.consumer = new KakafkaConsumer(this.properties, strArr);
    }

    protected KakafkaClientBase(String str, String... strArr) {
        this((Properties) null, str, strArr);
    }

    protected KakafkaClientBase(Properties properties, KakafkaTopic kakafkaTopic, KakafkaTopic... kakafkaTopicArr) {
        this(properties, kakafkaTopic.getName(), (String[]) Arrays.stream(kakafkaTopicArr).map((v0) -> {
            return v0.getName();
        }).toArray(i -> {
            return new String[i];
        }));
    }

    protected KakafkaClientBase(KakafkaTopic kakafkaTopic, KakafkaTopic... kakafkaTopicArr) {
        this((Properties) null, kakafkaTopic, kakafkaTopicArr);
    }

    public <M> void send(M m) {
        send(m, false);
    }

    public <M> void send(M m, boolean z) {
        getProducer().sendMessage(getTopic4Produce(), (String) m, z);
    }

    public List<ConsumerRecord<String, byte[]>> getMessages(Predicate<ConsumerRecord<String, byte[]>> predicate, boolean z, int i, Duration duration) {
        long millis = duration.toMillis();
        ArrayList arrayList = new ArrayList();
        if (millis < 200) {
            return getConsumer().getMessages(predicate, z);
        }
        while (millis > 0) {
            millis -= 200;
            arrayList.addAll(getConsumer().getMessages(predicate, z));
            if (arrayList.size() >= i) {
                return arrayList;
            }
            try {
                Thread.sleep(200L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        return arrayList;
    }

    public List<ConsumerRecord<String, byte[]>> getMessages(Predicate<ConsumerRecord<String, byte[]>> predicate, boolean z, Duration duration) {
        return getMessages(predicate, z, 1, duration);
    }

    public List<ConsumerRecord<String, byte[]>> getMessages(Predicate<ConsumerRecord<String, byte[]>> predicate, Duration duration) {
        return getMessages(predicate, false, 1, duration);
    }

    public List<ConsumerRecord<String, byte[]>> getMessages(Predicate<ConsumerRecord<String, byte[]>> predicate) {
        return getMessages(predicate, false, 1, defaultWaitDuration());
    }

    public List<ConsumerRecord<String, byte[]>> getMessages(String str, boolean z, int i, Duration duration) {
        return getMessages(consumerRecord -> {
            return ((String) consumerRecord.key()).contains(str) || new String((byte[]) consumerRecord.value()).contains(str);
        }, z, i, duration);
    }

    public List<ConsumerRecord<String, byte[]>> getMessages(String str, boolean z, Duration duration) {
        return getMessages(str, z, 1, duration);
    }

    public List<ConsumerRecord<String, byte[]>> getMessages(String str, Duration duration) {
        return getMessages(str, false, 1, duration);
    }

    public List<ConsumerRecord<String, byte[]>> getMessages(String str) {
        return getMessages(str, false, 1, defaultWaitDuration());
    }

    protected Duration defaultWaitDuration() {
        return Duration.ofSeconds(60L);
    }

    public String toString() {
        return "Produce topic: " + this.topic4Produce + "\nConsume topics: " + Arrays.toString(this.topics4Consume);
    }

    public Properties getProperties() {
        return this.properties;
    }

    public String getTopic4Produce() {
        return this.topic4Produce;
    }

    public String[] getTopics4Consume() {
        return this.topics4Consume;
    }

    public KakafkaProducer getProducer() {
        return this.producer;
    }

    public KakafkaConsumer getConsumer() {
        return this.consumer;
    }
}
