package kakafka.client;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import kakafka.util.KUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kakafka/client/KakafkaConsumer.class */
public class KakafkaConsumer {
    private static final Logger log = LoggerFactory.getLogger(KakafkaConsumer.class);
    private static final List<ConsumerRecord<String, byte[]>> POLLED_RECORDS = new ArrayList();
    private final Properties properties;
    private final List<String> topicList;
    private final Consumer<String, byte[]> consumer;
    private final ScheduledFuture<?> scheduler;

    public KakafkaConsumer(Properties properties, String... strArr) {
        this(properties, Duration.ofMillis(200L), strArr);
    }

    public KakafkaConsumer(Properties properties, Duration duration, String... strArr) {
        this.properties = properties;
        this.consumer = new KafkaConsumer(getProperties());
        this.topicList = (List) Arrays.stream(strArr).collect(Collectors.toList());
        initConsumer();
        this.scheduler = Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> {
            pollRecords(getConsumer(), duration);
        }, 0L, duration.toMillis(), TimeUnit.MILLISECONDS);
    }

    protected synchronized void dropMessagesByFilter(Predicate<ConsumerRecord<String, byte[]>> predicate) {
        POLLED_RECORDS.removeIf(predicate);
    }

    protected void initConsumer() {
        log.info("[KFK] Subscribe for topics: {}", getTopicList());
        getConsumer().subscribe(getTopicList());
        log.info("[KFK] Poll consumer at first time");
        pollRecords(getConsumer(), Duration.ofSeconds(2L));
        log.info("[KFK] Kafka consumer started for topics: {}", this.consumer.subscription());
    }

    protected void pollRecords(Consumer<String, byte[]> consumer, Duration duration) {
        consumer.poll(duration).forEach(consumerRecord -> {
            log.info("[KFK] Polled records:\n{}", KUtils.consumerRecordToString(consumerRecord));
            POLLED_RECORDS.add(consumerRecord);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized List<ConsumerRecord<String, byte[]>> getMessages(Predicate<ConsumerRecord<String, byte[]>> predicate, boolean z) {
        List<ConsumerRecord<String, byte[]>> list = (List) POLLED_RECORDS.stream().filter(predicate).collect(Collectors.toList());
        log.info("[KFK] Found {} records by filter from claimed records {}", Integer.valueOf(list.size()), Integer.valueOf(POLLED_RECORDS.size()));
        if (z) {
            list.forEach(consumerRecord -> {
                log.info("[KFK] Message removed from static list: {}", KUtils.consumerRecordToString(consumerRecord));
            });
            POLLED_RECORDS.removeAll(list);
        }
        return list;
    }

    public List<ConsumerRecord<String, byte[]>> getPolledRecords() {
        return POLLED_RECORDS;
    }

    public Properties getProperties() {
        return this.properties;
    }

    public List<String> getTopicList() {
        return this.topicList;
    }

    public Consumer<String, byte[]> getConsumer() {
        return this.consumer;
    }

    public ScheduledFuture<?> getScheduler() {
        return this.scheduler;
    }
}
