package kakafka.client;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import kakafka.client.api.IKakafkaConsumer;
import kakafka.util.KUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kakafka/client/KakafkaConsumer.class */
public class KakafkaConsumer implements IKakafkaConsumer {
    private static final Logger log = LoggerFactory.getLogger(KakafkaConsumer.class);
    private final List<ConsumerRecord<String, byte[]>> polledRecords;
    private final Properties properties;
    private final List<String> topicList;
    private final Consumer<String, byte[]> consumer;

    public KakafkaConsumer(Properties properties, String... strArr) {
        this(properties, Duration.ofMillis(200L), strArr);
    }

    public KakafkaConsumer(Properties properties, Duration duration, String... strArr) {
        this.polledRecords = Collections.synchronizedList(new ArrayList());
        this.properties = properties;
        this.consumer = new KafkaConsumer(getProperties());
        this.topicList = (List) Arrays.stream(strArr).collect(Collectors.toList());
        initConsumer();
        ScheduledFuture<?> scheduleAtFixedRate = Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> {
            pollRecords(getConsumer(), duration);
        }, 0L, duration.toMillis(), TimeUnit.MILLISECONDS);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            scheduleAtFixedRate.cancel(true);
        }));
    }

    protected void initConsumer() {
        log.info("[KFK] Subscribe for topics: {}", getTopicList());
        getConsumer().subscribe(getTopicList());
        log.info("[KFK] Poll consumer at first time");
        pollRecords(getConsumer(), Duration.ofSeconds(2L));
        log.info("[KFK] Kafka consumer started for topics: {}", getConsumer().subscription());
    }

    protected void pollRecords(Consumer<String, byte[]> consumer, Duration duration) {
        consumer.poll(duration).forEach(consumerRecord -> {
            log.info("[KFK] Polled record:\n{}", KUtils.consumerRecordToString(consumerRecord));
            getPolledRecords().add(consumerRecord);
        });
    }

    @Override // kakafka.client.api.IKakafkaConsumer
    public List<ConsumerRecord<String, byte[]>> getMessages(Predicate<ConsumerRecord<String, byte[]>> predicate) {
        List<ConsumerRecord<String, byte[]>> list = (List) getPolledRecords().stream().filter(predicate).collect(Collectors.toList());
        log.info("[KFK] Found {} records by filter", Integer.valueOf(list.size()));
        getPolledRecords().removeAll(list);
        if (log.isTraceEnabled()) {
            list.forEach(consumerRecord -> {
                log.trace("[KFK] Message removed from polled records list:\n{}", KUtils.consumerRecordToString(consumerRecord));
            });
        }
        log.debug("[KFK] Count of deleted messages from the list of polled records: {}", Integer.valueOf(list.size()));
        return list;
    }

    public List<ConsumerRecord<String, byte[]>> getPolledRecords() {
        return this.polledRecords;
    }

    protected synchronized void dropMessagesByFilter(Predicate<ConsumerRecord<String, byte[]>> predicate) {
        getPolledRecords().removeIf(predicate);
    }

    public Properties getProperties() {
        return this.properties;
    }

    public List<String> getTopicList() {
        return this.topicList;
    }

    public Consumer<String, byte[]> getConsumer() {
        return this.consumer;
    }
}
