package net.lightapi.portal.user.query;

import com.networknt.config.Config;
import com.networknt.kafka.common.KafkaStreamsConfig;
import com.networknt.kafka.streams.LightStreams;
import com.networknt.utility.ByteUtil;
import java.util.Properties;
import net.lightapi.portal.PortalConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/lightapi/portal/user/query/NonceQueryStreams.class */
public class NonceQueryStreams implements LightStreams {
    private static final Logger logger = LoggerFactory.getLogger(NonceQueryStreams.class);
    static final KafkaStreamsConfig streamsConfig = (KafkaStreamsConfig) Config.getInstance().getJsonObjectConfig("kafka-streams", KafkaStreamsConfig.class);
    static final PortalConfig portalConfig = (PortalConfig) Config.getInstance().getJsonObjectConfig("portal", PortalConfig.class);
    private static final String nonce = "user-nonce-store";
    KafkaStreams nonceStreams;

    /* loaded from: input_file:net/lightapi/portal/user/query/NonceQueryStreams$NonceEventProcessor.class */
    public static class NonceEventProcessor extends AbstractProcessor<byte[], byte[]> {
        private ProcessorContext pc;
        private KeyValueStore<String, Long> nonceStore;

        public void init(ProcessorContext processorContext) {
            this.pc = processorContext;
            this.nonceStore = processorContext.getStateStore(NonceQueryStreams.nonce);
            if (NonceQueryStreams.logger.isInfoEnabled()) {
                NonceQueryStreams.logger.info("Processor initialized");
            }
        }

        public void process(byte[] bArr, byte[] bArr2) {
            this.nonceStore.put(new String(bArr), Long.valueOf(ByteUtil.bytesToLong(bArr2)));
        }

        public void close() {
            if (NonceQueryStreams.logger.isInfoEnabled()) {
                NonceQueryStreams.logger.info("Closing processor...");
            }
        }
    }

    public NonceQueryStreams() {
        logger.info("NonceQueryStreams is created");
    }

    public ReadOnlyKeyValueStore<String, Long> getUserNonceStore() {
        return (ReadOnlyKeyValueStore) this.nonceStreams.store(StoreQueryParameters.fromNameAndType(nonce, QueryableStoreTypes.keyValueStore()));
    }

    public KeyQueryMetadata getUserNonceStreamsMetadata(String str) {
        return this.nonceStreams.queryMetadataForKey(nonce, str, Serdes.String().serializer());
    }

    private void startNonceStreams(String str, int i) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(nonce), Serdes.String(), Serdes.Long()));
        streamsBuilder.stream(portalConfig.getNonceTopic()).process(new ProcessorSupplier(this) { // from class: net.lightapi.portal.user.query.NonceQueryStreams.1
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public Processor m1get() {
                return new NonceEventProcessor();
            }
        }, new String[]{nonce});
        Topology build = streamsBuilder.build();
        Properties properties = new Properties();
        properties.putAll(streamsConfig.getProperties());
        properties.put("default.key.serde", Serdes.ByteArray().getClass());
        properties.put("default.value.serde", Serdes.ByteArray().getClass());
        properties.put("application.id", portalConfig.getNonceApplicationId());
        properties.put("application.server", str + ":" + i);
        this.nonceStreams = new KafkaStreams(build, properties);
        this.nonceStreams.setUncaughtExceptionHandler(th -> {
            logger.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", th);
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
        });
        if (streamsConfig.isCleanUp()) {
            this.nonceStreams.cleanUp();
        }
        this.nonceStreams.start();
    }

    public void start(String str, int i) {
        if (logger.isDebugEnabled()) {
            logger.debug("NonceStreams is starting...");
        }
        startNonceStreams(str, i);
    }

    public void close() {
        if (logger.isDebugEnabled()) {
            logger.debug("NonceStreams is closing...");
        }
        this.nonceStreams.close();
    }
}
