package net.lightapi.portal.schedule.query;

import com.networknt.config.Config;
import com.networknt.kafka.common.KafkaStreamsConfig;
import com.networknt.kafka.streams.LightStreams;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/lightapi/portal/schedule/query/ScheduleQueryStreams.class */
public class ScheduleQueryStreams implements LightStreams {
    private static final Logger logger = LoggerFactory.getLogger(ScheduleQueryStreams.class);
    static final KafkaStreamsConfig config = (KafkaStreamsConfig) Config.getInstance().getJsonObjectConfig("kafka-streams", KafkaStreamsConfig.class);
    private KafkaStreams schedulerStreams;

    private void startSchedulerStreams(String str, int i) {
        TaskSchedulingStreamTopology taskSchedulingStreamTopology = new TaskSchedulingStreamTopology();
        Properties properties = new Properties();
        properties.putAll(config.getProperties());
        properties.put("application.server", str + ":" + i);
        this.schedulerStreams = new KafkaStreams(taskSchedulingStreamTopology.buildTaskStreamingTopology(), properties);
        this.schedulerStreams.setUncaughtExceptionHandler(th -> {
            logger.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", th);
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
        });
        if (config.isCleanUp()) {
            this.schedulerStreams.cleanUp();
        }
        this.schedulerStreams.start();
    }

    public void start(String str, int i) {
        if (logger.isDebugEnabled()) {
            logger.info("ServiceStreams is starting...");
        }
        startSchedulerStreams(str, i);
        registerModule();
    }

    public void close() {
        if (logger.isDebugEnabled()) {
            logger.info("ServiceStreams is closing...");
        }
        this.schedulerStreams.close();
    }

    public KafkaStreams getKafkaStreams() {
        return this.schedulerStreams;
    }
}
