package io.continual.flowcontrol.impl.transfer;

import io.continual.builder.Builder;
import io.continual.flowcontrol.controlapi.ConfigTransferService;
import io.continual.flowcontrol.jobapi.FlowControlJob;
import io.continual.flowcontrol.jobapi.FlowControlJobConfig;
import io.continual.services.Service;
import io.continual.services.ServiceContainer;
import io.continual.services.SimpleService;
import io.continual.util.data.Sha256HmacSigner;
import io.continual.util.data.StreamTools;
import io.continual.util.data.StringUtils;
import io.continual.util.data.TypeConvertor;
import io.continual.util.data.exprEval.EnvDataSource;
import io.continual.util.data.exprEval.ExprDataSource;
import io.continual.util.data.exprEval.ExpressionEvaluator;
import io.continual.util.data.exprEval.JsonDataSource;
import io.continual.util.time.Clock;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/continual/flowcontrol/impl/transfer/ConfigFetchService.class */
public class ConfigFetchService extends SimpleService implements ConfigTransferService {
    private final String fSigningKey;
    private final String fBaseUrl;
    private final long fKeyTimeLimitSec;
    private final HashMap<String, CachedConfig> fConfigMap;
    private final ScheduledExecutorService fBackgroundWork;
    private static final Logger log = LoggerFactory.getLogger(ConfigFetchService.class);
    private static final long kCullerPeriodSec = 60;
    private static final long kShutdownTimeoutSec = 60;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/continual/flowcontrol/impl/transfer/ConfigFetchService$CachedConfig.class */
    public class CachedConfig implements FlowControlJobConfig {
        private final String fDataType;
        private final long fCachedTimeMs = Clock.now();
        private final byte[] fConfigData;

        public CachedConfig(String str, byte[] bArr) {
            this.fDataType = str;
            this.fConfigData = bArr;
        }

        @Override // io.continual.flowcontrol.jobapi.FlowControlJobConfig
        public String getDataType() {
            return this.fDataType;
        }

        @Override // io.continual.flowcontrol.jobapi.FlowControlJobConfig
        public InputStream readConfiguration() {
            return new ByteArrayInputStream(this.fConfigData);
        }

        public long cachedTime() {
            return this.fCachedTimeMs;
        }
    }

    public ConfigFetchService(ServiceContainer serviceContainer, JSONObject jSONObject) throws Builder.BuildFailure {
        ExpressionEvaluator expressionEvaluator = new ExpressionEvaluator(new ExprDataSource[]{new JsonDataSource(jSONObject), new EnvDataSource()});
        this.fSigningKey = expressionEvaluator.evaluateText(jSONObject.getString("signingKey"));
        if (this.fSigningKey.length() == 0) {
            throw new Builder.BuildFailure("Config signing key is an empty string.");
        }
        this.fBaseUrl = expressionEvaluator.evaluateText(jSONObject.getString("baseUrl"));
        this.fKeyTimeLimitSec = expressionEvaluator.evaluateTextToLong(jSONObject.opt("timeLimitSec"), -1L);
        this.fConfigMap = new HashMap<>();
        this.fBackgroundWork = Executors.newScheduledThreadPool(1);
    }

    @Override // io.continual.flowcontrol.controlapi.ConfigTransferService
    public Map<String, String> deployConfiguration(FlowControlJob flowControlJob) throws ConfigTransferService.ServiceException {
        try {
            HashMap hashMap = new HashMap();
            String id = flowControlJob.getId();
            long now = Clock.now();
            StringBuilder sb = new StringBuilder();
            sb.append(id).append(".").append(now);
            String sb2 = sb.toString();
            String str = TypeConvertor.base64UrlEncode(sb2) + "-" + TypeConvertor.base64UrlEncode(Sha256HmacSigner.sign(sb2, this.fSigningKey));
            log.info("job [" + id + "] => [" + str + "]");
            FlowControlJobConfig configuration = flowControlJob.getConfiguration();
            putConfig(str, new CachedConfig(configuration.getDataType(), StreamTools.readBytes(configuration.readConfiguration())));
            hashMap.put("CONFIG_KEY", str);
            hashMap.put("CONFIG_URL", this.fBaseUrl + str);
            return hashMap;
        } catch (IOException e) {
            throw new ConfigTransferService.ServiceException(e);
        }
    }

    @Override // io.continual.flowcontrol.controlapi.ConfigTransferService
    public InputStream fetch(String str) {
        String[] splitList = StringUtils.splitList(str, new char[]{'-'}, new char[0]);
        if (splitList.length != 2) {
            log.info("bad key format");
            return null;
        }
        String str2 = new String(TypeConvertor.base64UrlDecode(splitList[0]), StandardCharsets.UTF_8);
        if (!Sha256HmacSigner.sign(str2, this.fSigningKey).equals(new String(TypeConvertor.base64UrlDecode(splitList[1]), StandardCharsets.UTF_8))) {
            log.info("signature doesn't match");
            return null;
        }
        String[] splitList2 = StringUtils.splitList(str2, new char[]{'.'}, new char[0]);
        if (splitList2.length != 2) {
            log.info("tag is malformed");
            return null;
        }
        try {
            long parseLong = Long.parseLong(splitList2[1]);
            if (this.fKeyTimeLimitSec > 0 && parseLong + this.fKeyTimeLimitSec < Clock.now()) {
                log.info("tag is expired");
                return null;
            }
            FlowControlJobConfig readConfig = readConfig(splitList2[0]);
            if (readConfig != null) {
                return readConfig.readConfiguration();
            }
            log.info("No such job.");
            return null;
        } catch (NumberFormatException e) {
            log.info("tag timestamp is not a number");
            return null;
        }
    }

    protected void onStartRequested() throws Service.FailedToStart {
        this.fBackgroundWork.scheduleAtFixedRate(() -> {
            cullConfigs();
        }, 60L, 60L, TimeUnit.SECONDS);
    }

    protected void onStopRequested() {
        this.fBackgroundWork.shutdown();
        try {
            this.fBackgroundWork.awaitTermination(60L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.warn("Interrupted while awaiting termination of ConfigFetchService background executor.");
        }
    }

    private synchronized FlowControlJobConfig readConfig(String str) {
        return this.fConfigMap.get(str);
    }

    private synchronized void putConfig(String str, CachedConfig cachedConfig) {
        this.fConfigMap.put(str, cachedConfig);
    }

    private synchronized void cullConfigs() {
        long now = Clock.now() - (this.fKeyTimeLimitSec * 1000);
        LinkedList linkedList = new LinkedList();
        for (Map.Entry<String, CachedConfig> entry : this.fConfigMap.entrySet()) {
            if (entry.getValue().cachedTime() < now) {
                linkedList.add(entry.getKey());
            }
        }
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            this.fConfigMap.remove((String) it.next());
        }
    }
}
