package net.lightapi.portal.covid.query;

import com.networknt.config.Config;
import com.networknt.kafka.common.AvroDeserializer;
import com.networknt.kafka.common.AvroSerializer;
import com.networknt.kafka.common.KafkaStreamsConfig;
import com.networknt.kafka.streams.LightStreams;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import net.lightapi.portal.PortalConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/lightapi/portal/covid/query/CovidQueryStreams.class */
public class CovidQueryStreams implements LightStreams {
    private static final String APP = "maproot";
    private static final String city = "covid-city-store";
    private static final String map = "covid-map-store";
    private static final String entity = "covid-entity-store";
    private static final String status = "covid-status-store";
    private static final String website = "covid-website-store";
    KafkaStreams covidStreams;
    private static final Logger logger = LoggerFactory.getLogger(CovidQueryStreams.class);
    static final KafkaStreamsConfig streamsConfig = (KafkaStreamsConfig) Config.getInstance().getJsonObjectConfig("kafka-streams", KafkaStreamsConfig.class);
    static final PortalConfig portalConfig = (PortalConfig) Config.getInstance().getJsonObjectConfig("portal", PortalConfig.class);

    /* loaded from: input_file:net/lightapi/portal/covid/query/CovidQueryStreams$CovidEventProcessor.class */
    public static class CovidEventProcessor extends AbstractProcessor<byte[], byte[]> {
        private ProcessorContext pc;
        private KeyValueStore<String, String> cityStore;
        private KeyValueStore<String, String> mapStore;
        private KeyValueStore<String, String> entityStore;
        private KeyValueStore<String, String> statusStore;
        private KeyValueStore<String, String> websiteStore;

        public void init(ProcessorContext processorContext) {
            this.pc = processorContext;
            this.cityStore = processorContext.getStateStore(CovidQueryStreams.city);
            this.mapStore = processorContext.getStateStore(CovidQueryStreams.map);
            this.entityStore = processorContext.getStateStore(CovidQueryStreams.entity);
            this.statusStore = processorContext.getStateStore(CovidQueryStreams.status);
            this.websiteStore = processorContext.getStateStore(CovidQueryStreams.website);
            if (CovidQueryStreams.logger.isInfoEnabled()) {
                CovidQueryStreams.logger.info("Processor initialized");
            }
        }

        public void process(byte[] bArr, byte[] bArr2) {
            AvroDeserializer avroDeserializer = new AvroDeserializer(true);
            new AvroSerializer();
            try {
                avroDeserializer.deserialize(bArr2);
            } catch (Exception e) {
                CovidQueryStreams.logger.error("Exception:", e);
            }
        }

        public Map<String, Object> createPoint(String str, String str2, String str3, String str4, double d, double d2) {
            HashMap hashMap = new HashMap();
            hashMap.put("type", "Feature");
            HashMap hashMap2 = new HashMap();
            hashMap2.put("cluster", false);
            hashMap2.put("id", str);
            hashMap2.put("hasStatus", false);
            hashMap2.put("hasWebsite", false);
            hashMap2.put("category", str2);
            hashMap2.put("subcategory", str3);
            hashMap2.put("introduction", str4);
            hashMap.put("properties", hashMap2);
            HashMap hashMap3 = new HashMap();
            hashMap3.put("type", "Point");
            ArrayList arrayList = new ArrayList();
            arrayList.add(Double.valueOf(d));
            arrayList.add(Double.valueOf(d2));
            hashMap3.put("coordinates", arrayList);
            hashMap.put("geometry", hashMap3);
            return hashMap;
        }

        public void close() {
            if (CovidQueryStreams.logger.isInfoEnabled()) {
                CovidQueryStreams.logger.info("Closing processor...");
            }
        }
    }

    /* loaded from: input_file:net/lightapi/portal/covid/query/CovidQueryStreams$GlobalCityProcessor.class */
    public static class GlobalCityProcessor extends AbstractProcessor<String, String> {
        private KeyValueStore<String, String> globalCityStore;

        public void init(ProcessorContext processorContext) {
            this.globalCityStore = processorContext.getStateStore(CovidQueryStreams.city);
        }

        public void process(String str, String str2) {
            this.globalCityStore.put(str, str2);
        }

        public void close() {
            this.globalCityStore = null;
        }
    }

    public CovidQueryStreams() {
        logger.info("CovidQueryStreams is created");
    }

    public ReadOnlyKeyValueStore<String, String> getCityStore() {
        return (ReadOnlyKeyValueStore) this.covidStreams.store(StoreQueryParameters.fromNameAndType(city, QueryableStoreTypes.keyValueStore()));
    }

    public ReadOnlyKeyValueStore<String, String> getMapStore() {
        return (ReadOnlyKeyValueStore) this.covidStreams.store(StoreQueryParameters.fromNameAndType(map, QueryableStoreTypes.keyValueStore()));
    }

    public KeyQueryMetadata getMapStreamsMetadata(String str) {
        return this.covidStreams.queryMetadataForKey(map, str, Serdes.String().serializer());
    }

    public ReadOnlyKeyValueStore<String, String> getEntityStore() {
        return (ReadOnlyKeyValueStore) this.covidStreams.store(StoreQueryParameters.fromNameAndType(entity, QueryableStoreTypes.keyValueStore()));
    }

    public KeyQueryMetadata getEntityStreamsMetadata(String str) {
        return this.covidStreams.queryMetadataForKey(entity, str, Serdes.String().serializer());
    }

    public ReadOnlyKeyValueStore<String, String> getStatusStore() {
        return (ReadOnlyKeyValueStore) this.covidStreams.store(StoreQueryParameters.fromNameAndType(status, QueryableStoreTypes.keyValueStore()));
    }

    public KeyQueryMetadata getStatusStreamsMetadata(String str) {
        return this.covidStreams.queryMetadataForKey(status, str, Serdes.String().serializer());
    }

    public ReadOnlyKeyValueStore<String, String> getWebsiteStore() {
        return (ReadOnlyKeyValueStore) this.covidStreams.store(StoreQueryParameters.fromNameAndType(website, QueryableStoreTypes.keyValueStore()));
    }

    public KeyQueryMetadata getWebsiteStreamsMetadata(String str) {
        return this.covidStreams.queryMetadataForKey(status, str, Serdes.String().serializer());
    }

    private void startCovidStreams(String str, int i) {
        StoreBuilder withLoggingDisabled = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(city), Serdes.String(), Serdes.String()).withLoggingDisabled();
        StoreBuilder keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(map), Serdes.String(), Serdes.String());
        StoreBuilder keyValueStoreBuilder2 = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(entity), Serdes.String(), Serdes.String());
        StoreBuilder keyValueStoreBuilder3 = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(status), Serdes.String(), Serdes.String());
        StoreBuilder keyValueStoreBuilder4 = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(website), Serdes.String(), Serdes.String());
        Topology topology = new Topology();
        topology.addGlobalStore(withLoggingDisabled, "from-portal-city", Serdes.String().deserializer(), Serdes.String().deserializer(), "portal-city", "global-city-processor", GlobalCityProcessor::new);
        topology.addSource("SourceTopicProcessor", new String[]{"portal-event"});
        topology.addProcessor("CovidEventProcessor", CovidEventProcessor::new, new String[]{"SourceTopicProcessor"});
        topology.addStateStore(keyValueStoreBuilder, new String[]{"CovidEventProcessor"});
        topology.addStateStore(keyValueStoreBuilder2, new String[]{"CovidEventProcessor"});
        topology.addStateStore(keyValueStoreBuilder3, new String[]{"CovidEventProcessor"});
        topology.addStateStore(keyValueStoreBuilder4, new String[]{"CovidEventProcessor"});
        topology.addSink("NonceProcessor", "portal-nonce", new String[]{"CovidEventProcessor"});
        topology.addSink("NotificationProcessor", "portal-notification", new String[]{"CovidEventProcessor"});
        topology.addSink("CityProcessor", "portal-city", new String[]{"CovidEventProcessor"});
        topology.addSink("EventProcessor", "portal-event", new String[]{"CovidEventProcessor"});
        Properties properties = new Properties();
        properties.putAll(streamsConfig.getProperties());
        properties.put("application.id", portalConfig.getMaprootApplicationId());
        properties.put("application.server", str + ":" + i);
        this.covidStreams = new KafkaStreams(topology, properties);
        this.covidStreams.setUncaughtExceptionHandler(th -> {
            logger.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", th);
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
        });
        if (streamsConfig.isCleanUp()) {
            this.covidStreams.cleanUp();
        }
        this.covidStreams.start();
    }

    public void start(String str, int i) {
        if (logger.isDebugEnabled()) {
            logger.debug("CovidStreams is starting...");
        }
        startCovidStreams(str, i);
    }

    public void close() {
        if (logger.isDebugEnabled()) {
            logger.debug("CovidStreams is closing...");
        }
        this.covidStreams.close();
    }
}
