package net.lightapi.portal.schedule.query.service;

import com.networknt.config.JsonMapper;
import com.networknt.utility.TimeUtil;
import com.networknt.utility.UuidUtil;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/lightapi/portal/schedule/query/service/AbstractTaskHandler.class */
public abstract class AbstractTaskHandler implements TaskHandler, Punctuator {
    private static final Logger logger = LoggerFactory.getLogger(AbstractTaskHandler.class);
    private final ProcessorContext<String, String> processorContext;
    private KeyValueStore<String, String> taskDefinitionStore;
    private final TimeUnit timeUnit;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: net.lightapi.portal.schedule.query.service.AbstractTaskHandler$1, reason: invalid class name */
    /* loaded from: input_file:net/lightapi/portal/schedule/query/service/AbstractTaskHandler$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$java$util$concurrent$TimeUnit = new int[TimeUnit.values().length];

        static {
            try {
                $SwitchMap$java$util$concurrent$TimeUnit[TimeUnit.MILLISECONDS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$java$util$concurrent$TimeUnit[TimeUnit.SECONDS.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$java$util$concurrent$TimeUnit[TimeUnit.MINUTES.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$java$util$concurrent$TimeUnit[TimeUnit.HOURS.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$java$util$concurrent$TimeUnit[TimeUnit.DAYS.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractTaskHandler(ProcessorContext<String, String> processorContext, String str, TimeUnit timeUnit) {
        this.processorContext = processorContext;
        if (logger.isInfoEnabled()) {
            logger.info("Looking for the store name {}", str);
        }
        this.taskDefinitionStore = processorContext.getStateStore(str);
        this.timeUnit = timeUnit;
        this.processorContext.schedule(getDuration(timeUnit), PunctuationType.WALL_CLOCK_TIME, this);
        logger.info("Scheduled punctuation for {} store every {}", str, getDuration(timeUnit));
    }

    protected static Duration getDuration(TimeUnit timeUnit) {
        Duration duration = null;
        switch (AnonymousClass1.$SwitchMap$java$util$concurrent$TimeUnit[timeUnit.ordinal()]) {
            case 1:
                duration = Duration.ofMillis(1L);
                break;
            case 2:
                duration = Duration.ofSeconds(1L);
                break;
            case 3:
                duration = Duration.ofMinutes(1L);
                break;
            case 4:
                duration = Duration.ofHours(1L);
                break;
            case 5:
                duration = Duration.ofDays(1L);
                break;
        }
        return duration;
    }

    @Override // net.lightapi.portal.schedule.query.service.TaskHandler
    public TimeUnit handlingDuration() {
        return this.timeUnit;
    }

    @Override // net.lightapi.portal.schedule.query.service.TaskHandler
    public void add(String str, String str2) {
        this.taskDefinitionStore.put(str, str2);
    }

    @Override // net.lightapi.portal.schedule.query.service.TaskHandler
    public String get(String str) {
        return (String) this.taskDefinitionStore.get(str);
    }

    @Override // net.lightapi.portal.schedule.query.service.TaskHandler
    public String delete(String str) {
        return (String) this.taskDefinitionStore.delete(str);
    }

    public void punctuate(long j) {
        Map string2Map;
        Object obj;
        try {
            KeyValueIterator all = this.taskDefinitionStore.all();
            while (all.hasNext()) {
                try {
                    KeyValue keyValue = (KeyValue) all.next();
                    String str = (String) keyValue.key;
                    String str2 = (String) keyValue.value;
                    if (str2 == null || str2.isEmpty()) {
                        logger.warn("Skipping punctuation for scheduleId {} due to null or empty value in store {}.", str, this.taskDefinitionStore.name());
                    } else {
                        try {
                            string2Map = JsonMapper.string2Map(str2);
                            obj = string2Map.get("data");
                        } catch (Exception e) {
                            logger.error("Error processing scheduleId {} in store {} during punctuation. Value: {}", new Object[]{str, this.taskDefinitionStore.name(), str2, e});
                        }
                        if (obj instanceof Map) {
                            Map map = (Map) obj;
                            String objects = Objects.toString(map.get("frequencyUnit"), null);
                            Object obj2 = map.get("frequencyTime");
                            String objects2 = Objects.toString(map.get("startTs"), null);
                            String objects3 = Objects.toString(map.get("eventTopic"), null);
                            String objects4 = Objects.toString(map.get("eventType"), null);
                            String objects5 = Objects.toString(map.get("eventData"), null);
                            if (objects == null || obj2 == null || !(obj2 instanceof Number) || objects2 == null) {
                                logger.warn("Missing/invalid schedule params (frequencyUnit/Time/startTs) for scheduleId {} in store {}. Skipping.", str, this.taskDefinitionStore.name());
                            } else if (objects3 == null || objects4 == null) {
                                logger.warn("Missing target execution params (eventTopic/eventType) for scheduleId {} in store {}. Skipping.", str, this.taskDefinitionStore.name());
                            } else {
                                try {
                                    TimeUnit valueOf = TimeUnit.valueOf(objects.toUpperCase());
                                    if (valueOf != this.timeUnit) {
                                        logger.trace("Schedule {} frequencyUnit ({}) does not match handler unit ({}). Skipping in this handler.", new Object[]{str, valueOf, this.timeUnit});
                                    } else {
                                        long longValue = ((Number) obj2).longValue();
                                        try {
                                            long nextStartTimestamp = TimeUtil.nextStartTimestamp(valueOf, OffsetDateTime.parse(objects2).toInstant().toEpochMilli());
                                            long nextStartTimestamp2 = TimeUtil.nextStartTimestamp(valueOf, j);
                                            if (nextStartTimestamp2 - nextStartTimestamp > 0) {
                                                if ((nextStartTimestamp2 - nextStartTimestamp) % (TimeUtil.oneTimeUnitMillisecond(this.timeUnit) * longValue) == 0) {
                                                    if (logger.isDebugEnabled()) {
                                                        logger.debug("{} - Triggering task Key: {}, Value: {}", new Object[]{this.timeUnit, keyValue.key, keyValue.value});
                                                    }
                                                    String format = Instant.ofEpochMilli(nextStartTimestamp2).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
                                                    HashMap hashMap = new HashMap(string2Map);
                                                    HashMap hashMap2 = new HashMap(map);
                                                    hashMap2.put("startTs", format);
                                                    hashMap.put("data", hashMap2);
                                                    this.taskDefinitionStore.put(str, JsonMapper.toJson(hashMap));
                                                    if (logger.isTraceEnabled()) {
                                                        logger.trace("Updated scheduleId {} state. Next execution due around: {}", str, format);
                                                    }
                                                    HashMap hashMap3 = new HashMap();
                                                    hashMap3.put("specversion", "1.0");
                                                    hashMap3.put("id", UuidUtil.getUUID());
                                                    hashMap3.put("type", objects4);
                                                    hashMap3.put("source", "https://github.com/lightapi/light-portal");
                                                    hashMap3.put("subject", str);
                                                    hashMap3.put("time", Instant.ofEpochMilli(j).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
                                                    hashMap3.put("host", string2Map.get("host"));
                                                    hashMap3.put("nonce", string2Map.get("nonce"));
                                                    hashMap3.put("user", string2Map.get("user"));
                                                    hashMap3.put("topic", objects3);
                                                    if (objects5 != null && !objects5.isEmpty()) {
                                                        try {
                                                            hashMap3.put("data", JsonMapper.string2Map(objects5));
                                                            hashMap3.put("datacontenttype", "application/json");
                                                        } catch (Exception e2) {
                                                            logger.warn("Target data for scheduleId {} is not valid JSON. Sending as text/plain. Data: '{}'", new Object[]{str, objects5, e2});
                                                            hashMap3.put("data", objects5);
                                                            hashMap3.put("datacontenttype", "text/plain");
                                                        }
                                                    }
                                                    String json = JsonMapper.toJson(hashMap3);
                                                    if (logger.isDebugEnabled()) {
                                                        logger.debug("Forwarding new task event for scheduleId {} to be routed to {}: {}", new Object[]{str, objects3, json});
                                                    }
                                                    this.processorContext.forward(new Record(str, json, j));
                                                }
                                            } else if (logger.isDebugEnabled()) {
                                                logger.debug("Skip execution task ScheduleId {}: current {} - start {} <= 0", new Object[]{str, Long.valueOf(nextStartTimestamp2), Long.valueOf(nextStartTimestamp)});
                                            }
                                        } catch (DateTimeParseException e3) {
                                            logger.warn("Invalid startTs format '{}' for scheduleId {} in store {}. Skipping.", new Object[]{objects2, str, this.taskDefinitionStore.name(), e3});
                                        }
                                    }
                                } catch (IllegalArgumentException e4) {
                                    logger.warn("Invalid frequencyUnit '{}' for scheduleId {} in store {}. Skipping.", new Object[]{objects, str, this.taskDefinitionStore.name()});
                                }
                            }
                        } else {
                            logger.warn("Schedule data payload not found or not a map for scheduleId {} in store {}. Skipping.", str, this.taskDefinitionStore.name());
                        }
                    }
                } finally {
                }
            }
            if (all != null) {
                all.close();
            }
        } catch (Exception e5) {
            logger.error("Error iterating through state store {} during punctuation", this.taskDefinitionStore.name(), e5);
        }
    }
}
