package org.imixs.workflow.kafka;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.annotation.Resource;
import jakarta.ejb.Singleton;
import jakarta.ejb.Startup;
import jakarta.ejb.Timeout;
import jakarta.ejb.TimerConfig;
import jakarta.ejb.TimerService;
import java.io.Serializable;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.logging.Logger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

@Singleton
@Startup
/* loaded from: input_file:org/imixs/workflow/kafka/ConsumerService.class */
public class ConsumerService implements Serializable {
    private static final long TEN_SECONDS = 10000;

    @Resource
    private TimerService timerService;
    private static final long serialVersionUID = 1;
    Consumer<Long, String> consumer;
    Properties props = null;
    public static String TOPIC_NAME = "IN-1.0.1";
    public static String GROUP_ID_CONFIG = "consumerGroup1";
    public static Integer MAX_NO_MESSAGE_FOUND_COUNT = 100;
    public static String OFFSET_RESET_LATEST = "latest";
    public static String OFFSET_RESET_EARLIER = "earliest";
    public static Integer MAX_POLL_RECORDS = 1;
    private static Logger logger = Logger.getLogger(ConsumerService.class.getName());

    @PostConstruct
    void init() {
        TimerConfig timerConfig = new TimerConfig();
        timerConfig.setPersistent(false);
        this.timerService.createIntervalTimer(TEN_SECONDS, TEN_SECONDS, timerConfig);
    }

    @PreDestroy
    void close() {
        if (this.consumer != null) {
            this.consumer.close();
        }
    }

    void initalizeConsumer() {
        if (this.consumer == null) {
            logger.info("......initalize kafka consumer...");
            this.props = new Properties();
            this.props.put("bootstrap.servers", ConfigService.getEnv(ConfigService.ENV_KAFKA_BROKERS, "kafka:9092"));
            this.props.put("group.id", GROUP_ID_CONFIG);
            this.props.put("client.id", ConfigService.getEnv(ConfigService.ENV_KAFKA_CLIENTID, "Imixs-Workflow-1"));
            this.props.put("key.deserializer", LongDeserializer.class.getName());
            this.props.put("value.deserializer", StringDeserializer.class.getName());
            this.props.put("max.poll.records", MAX_POLL_RECORDS);
            this.props.put("enable.auto.commit", "false");
            this.props.put("auto.offset.reset", OFFSET_RESET_EARLIER);
            this.consumer = new KafkaConsumer(this.props);
            logger.info("...register topic: " + TOPIC_NAME);
            this.consumer.subscribe(Collections.singletonList(TOPIC_NAME));
        }
    }

    @Timeout
    private synchronized void onTimer() {
        initalizeConsumer();
        ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(1000L));
        if (poll.count() > 0) {
            poll.forEach(consumerRecord -> {
                System.out.println("Record Key " + String.valueOf(consumerRecord.key()));
                System.out.println("Record value " + ((String) consumerRecord.value()));
                System.out.println("Record partition " + consumerRecord.partition());
                System.out.println("Record offset " + consumerRecord.offset());
            });
            this.consumer.commitAsync();
        }
    }
}
