package io.continual.services.processor.engine.runtime;

import io.continual.builder.Builder;
import io.continual.iam.identity.Identity;
import io.continual.metrics.MetricsCatalog;
import io.continual.metrics.impl.StdMetricsCatalog;
import io.continual.metrics.metricTypes.Meter;
import io.continual.metrics.metricTypes.Timer;
import io.continual.services.Service;
import io.continual.services.SimpleService;
import io.continual.services.processor.engine.library.util.SimpleMessageProcessingContext;
import io.continual.services.processor.engine.library.util.SimpleStreamProcessingContext;
import io.continual.services.processor.engine.model.MessageAndRouting;
import io.continual.services.processor.engine.model.Pipeline;
import io.continual.services.processor.engine.model.Program;
import io.continual.services.processor.engine.model.Sink;
import io.continual.services.processor.engine.model.Source;
import io.continual.services.processor.engine.model.StreamProcessingContext;
import io.continual.services.processor.service.ProcessingService;
import io.continual.util.data.exprEval.ExprDataSource;
import io.continual.util.data.exprEval.ExprDataSourceStack;
import io.continual.util.data.exprEval.SpecialFnsDataSource;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/continual/services/processor/engine/runtime/Engine.class */
public class Engine extends SimpleService {
    public static final long kDefault_MetricsReportSeconds = 300;
    private final Program fProgram;
    private final HashMap<String, ExecThread> fThreads;
    private final MetricsDumpThread fMetricsDumper;
    private final SerialNumberGenerator fSnGen;
    private final HashMap<String, String> fUserData;
    private final Identity fIdentity;
    private final ExprDataSourceStack fExprEvalStack;
    private final MetricsCatalog fEngineMetrics;
    private static final Logger log = LoggerFactory.getLogger(Engine.class);
    private static final Logger metricsLog = LoggerFactory.getLogger("continualProcessorEngineMetrics");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/continual/services/processor/engine/runtime/Engine$ExecThread.class */
    public class ExecThread extends Thread {
        private final String fSrcName;
        private final Source fSource;
        private final MetricsCatalog fThreadMetrics;
        private final StreamProcessingContext fStreamContext;

        public ExecThread(String str, String str2, Source source) {
            super("ExecThread " + str);
            this.fSrcName = str2;
            this.fSource = source;
            this.fThreadMetrics = Engine.this.fEngineMetrics.getSubCatalog(str);
            this.fStreamContext = SimpleStreamProcessingContext.builder().withSource(source).operatedBy(Engine.this.fIdentity).evaluatingAgainst(Engine.this.fExprEvalStack).loggingTo(Engine.log).runningProgram(Engine.this.fProgram).reportMetricsTo(this.fThreadMetrics).build();
        }

        public String getSourceName() {
            return this.fSrcName;
        }

        public Source getSource() {
            return this.fSource;
        }

        public StreamProcessingContext getStreamContext() {
            return this.fStreamContext;
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                try {
                    try {
                        try {
                            for (Map.Entry<String, ProcessingService> entry : Engine.this.fProgram.getServicesFor(this.fSrcName).entrySet()) {
                                this.fStreamContext.addNamedObject(entry.getKey(), entry.getValue());
                            }
                            Iterator<Map.Entry<String, ProcessingService>> it = Engine.this.fProgram.getServicesFor(this.fSrcName).entrySet().iterator();
                            while (it.hasNext()) {
                                it.next().getValue().startBackgroundProcessing();
                            }
                            SimpleMessageProcessingContext.Builder usingContext = SimpleMessageProcessingContext.builder().evaluatingAgainst(Engine.this.fExprEvalStack).serialNumbersFrom(Engine.this.fSnGen).usingContext(this.fStreamContext);
                            MetricsCatalog subCatalog = this.fThreadMetrics.getSubCatalog("engine");
                            Meter meter = subCatalog.meter("cycles");
                            Meter meter2 = subCatalog.meter("msgsIn");
                            Timer timer = subCatalog.timer("msgLoad");
                            Timer timer2 = subCatalog.timer("procTime");
                            Engine.log.info("Source " + this.fSrcName + ": START");
                            this.fSource.open();
                            while (!this.fSource.isEof() && !this.fStreamContext.failed()) {
                                try {
                                    meter.mark();
                                    MetricsCatalog.PathPopper push = this.fThreadMetrics.push(this.fSrcName);
                                    try {
                                        Timer.Context time = timer.time();
                                        try {
                                            MessageAndRouting nextMessage = this.fSource.getNextMessage(this.fStreamContext, 500L, TimeUnit.MILLISECONDS);
                                            if (time != null) {
                                                time.close();
                                            }
                                            if (push != null) {
                                                push.close();
                                            }
                                            if (nextMessage != null) {
                                                meter2.mark();
                                                Pipeline pipeline = Engine.this.fProgram.getPipeline(nextMessage.getPipelineName());
                                                if (pipeline == null) {
                                                    Engine.log.info("No pipeline {} for source \"{}\", ignored.", nextMessage.getPipelineName(), this.fSrcName);
                                                } else {
                                                    time = timer2.time();
                                                    try {
                                                        pipeline.process(usingContext.build(nextMessage.getMessage()));
                                                        if (time != null) {
                                                            time.close();
                                                        }
                                                    } finally {
                                                        if (time != null) {
                                                            try {
                                                                time.close();
                                                            } catch (Throwable th) {
                                                                th.addSuppressed(th);
                                                            }
                                                        }
                                                    }
                                                }
                                                this.fSource.markComplete(this.fStreamContext, nextMessage);
                                            }
                                        } catch (Throwable th2) {
                                            throw th2;
                                        }
                                    } catch (Throwable th3) {
                                        if (push != null) {
                                            try {
                                                push.close();
                                            } catch (Throwable th4) {
                                                th3.addSuppressed(th4);
                                            }
                                        }
                                        throw th3;
                                    }
                                } catch (Throwable th5) {
                                    this.fSource.close();
                                    throw th5;
                                }
                            }
                            if (this.fSource.isEof()) {
                                Engine.log.info("Source " + this.fSrcName + ": EOF");
                                Iterator<Map.Entry<String, ProcessingService>> it2 = Engine.this.fProgram.getServicesFor(this.fSrcName).entrySet().iterator();
                                while (it2.hasNext()) {
                                    it2.next().getValue().onSourceEof();
                                }
                            } else {
                                Engine.log.warn("Processing stopped.");
                            }
                            this.fSource.close();
                            Iterator<Map.Entry<String, ProcessingService>> it3 = Engine.this.fProgram.getServicesFor(this.fSrcName).entrySet().iterator();
                            while (it3.hasNext()) {
                                it3.next().getValue().stopBackgroundProcessing();
                            }
                        } catch (IOException | Builder.BuildFailure e) {
                            Engine.log.warn("Error on source {}: {}", this.fSrcName, e.getMessage());
                            Iterator<Map.Entry<String, ProcessingService>> it4 = Engine.this.fProgram.getServicesFor(this.fSrcName).entrySet().iterator();
                            while (it4.hasNext()) {
                                it4.next().getValue().stopBackgroundProcessing();
                            }
                        }
                    } catch (Throwable th6) {
                        Engine.log.warn("Unexpected error stopping processing engine thread {}: {}", new Object[]{super.getName(), th6.getMessage(), th6});
                        throw th6;
                    }
                } catch (InterruptedException e2) {
                    Engine.log.info("Source {} interrupted.", this.fSrcName);
                    Iterator<Map.Entry<String, ProcessingService>> it5 = Engine.this.fProgram.getServicesFor(this.fSrcName).entrySet().iterator();
                    while (it5.hasNext()) {
                        it5.next().getValue().stopBackgroundProcessing();
                    }
                }
            } catch (Throwable th7) {
                Iterator<Map.Entry<String, ProcessingService>> it6 = Engine.this.fProgram.getServicesFor(this.fSrcName).entrySet().iterator();
                while (it6.hasNext()) {
                    it6.next().getValue().stopBackgroundProcessing();
                }
                throw th7;
            }
        }
    }

    /* loaded from: input_file:io/continual/services/processor/engine/runtime/Engine$MetricsDumpThread.class */
    private class MetricsDumpThread extends Thread {
        private final long fPeriodMs;

        public MetricsDumpThread(long j) {
            super("processor metrics dumper");
            setDaemon(true);
            this.fPeriodMs = j;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            boolean z = true;
            while (z) {
                try {
                    Thread.sleep(this.fPeriodMs);
                    Engine.metricsLog.info(Engine.this.fEngineMetrics.toString());
                    z = false;
                    Iterator<ExecThread> it = Engine.this.fThreads.values().iterator();
                    while (it.hasNext()) {
                        z = it.next().isAlive();
                        if (z) {
                            break;
                        }
                    }
                } catch (InterruptedException e) {
                    Engine.log.warn("Metrics dumper interrupted: ", e);
                    return;
                } catch (Throwable th) {
                    Engine.log.warn("Metrics dumper terminated: ", th);
                    return;
                }
            }
            Engine.log.info("Metrics dump thread exiting.");
        }
    }

    public Engine(Program program) {
        this(null, program);
    }

    public Engine(Identity identity, Program program) {
        this(identity, program, 300000L);
    }

    public Engine(Identity identity, Program program, long j) {
        this.fIdentity = identity;
        this.fProgram = program;
        this.fThreads = new HashMap<>();
        this.fSnGen = new SerialNumberGenerator();
        this.fUserData = new HashMap<>();
        this.fEngineMetrics = new StdMetricsCatalog.Builder().build();
        this.fExprEvalStack = new ExprDataSourceStack(new ExprDataSource[]{new ExprDataSource() { // from class: io.continual.services.processor.engine.runtime.Engine.1
            public Object eval(String str) {
                return Engine.this.getUserData(str);
            }
        }, new ExprDataSource() { // from class: io.continual.services.processor.engine.runtime.Engine.2
            public Object eval(String str) {
                return System.getenv().get(str);
            }
        }, new SpecialFnsDataSource()});
        this.fMetricsDumper = new MetricsDumpThread(j);
        TreeSet treeSet = new TreeSet();
        for (Map.Entry<String, Source> entry : this.fProgram.getSources().entrySet()) {
            String name = getName(treeSet, entry.getKey());
            this.fThreads.put(name, new ExecThread(name, entry.getKey(), entry.getValue()));
        }
    }

    public synchronized boolean isRunning() {
        Iterator<ExecThread> it = this.fThreads.values().iterator();
        while (it.hasNext()) {
            if (it.next().isAlive()) {
                return true;
            }
        }
        return false;
    }

    public void startAndWait() throws Service.FailedToStart {
        try {
            start();
            while (isRunning()) {
                Thread.sleep(100L);
            }
        } catch (InterruptedException e) {
            System.out.println("exiting...");
        }
        waitForCompletion();
    }

    public void waitForCompletion() {
        Iterator<Source> it = this.fProgram.getSources().values().iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (IOException e) {
                log.warn("Problem closing source: " + e.getMessage());
            }
        }
        Iterator<Sink> it2 = this.fProgram.getSinks().values().iterator();
        while (it2.hasNext()) {
            try {
                it2.next().close();
            } catch (IOException e2) {
                log.warn("Problem closing sink: " + e2.getMessage());
            }
        }
    }

    public StreamProcessingContext getStreamContextFor(String str) {
        ExecThread execThread = this.fThreads.get(str);
        if (execThread != null) {
            return execThread.getStreamContext();
        }
        return null;
    }

    public Engine setUserData(String str, String str2) {
        this.fUserData.put(str, str2);
        return this;
    }

    public String getUserData(String str) {
        return this.fUserData.get(str);
    }

    public void removeUserData(String str) {
        this.fUserData.remove(str);
    }

    protected void onStartRequested() throws Service.FailedToStart {
        this.fMetricsDumper.start();
        Iterator<Sink> it = this.fProgram.getSinks().values().iterator();
        while (it.hasNext()) {
            it.next().init();
        }
        Iterator<ExecThread> it2 = this.fThreads.values().iterator();
        while (it2.hasNext()) {
            it2.next().start();
        }
    }

    protected void onStopRequested() {
        log.info("Stopping processing engine...");
        for (ExecThread execThread : this.fThreads.values()) {
            try {
                execThread.getSource().close();
            } catch (IOException e) {
                log.warn("Problem closing source {}: {}", execThread.getSourceName(), e.getMessage());
            }
        }
    }

    /*  JADX ERROR: JadxRuntimeException in pass: BlockProcessor
        jadx.core.utils.exceptions.JadxRuntimeException: Unexpected InsnArg types: ("-") and ("-")
        	at jadx.core.dex.visitors.blocks.BlockProcessor.sameArgs(BlockProcessor.java:193)
        	at jadx.core.dex.visitors.blocks.BlockProcessor.isInsnsEquals(BlockProcessor.java:170)
        	at jadx.core.dex.visitors.blocks.BlockProcessor.isSame(BlockProcessor.java:159)
        	at jadx.core.dex.visitors.blocks.BlockProcessor.getSameLastInsnCount(BlockProcessor.java:149)
        	at jadx.core.dex.visitors.blocks.BlockProcessor.deduplicateBlockInsns(BlockProcessor.java:107)
        	at jadx.core.dex.visitors.blocks.BlockProcessor.independentBlockTreeMod(BlockProcessor.java:321)
        	at jadx.core.dex.visitors.blocks.BlockProcessor.processBlocksTree(BlockProcessor.java:51)
        	at jadx.core.dex.visitors.blocks.BlockProcessor.visit(BlockProcessor.java:44)
        */
    private static java.lang.String getName(java.util.Set<java.lang.String> r3, java.lang.String r4) {
        /*
            r0 = r3
            r1 = r4
            boolean r0 = r0.contains(r1)
            if (r0 != 0) goto L14
            r0 = r3
            r1 = r4
            boolean r0 = r0.add(r1)
            r0 = r4
            return r0
        L14:
            r0 = 2
            r5 = r0
            r0 = r4
            r1 = r5
            java.lang.String r0 = r0 + "-" + r1
            r6 = r0
        L1e:
            r0 = r3
            r1 = r6
            boolean r0 = r0.contains(r1)
            if (r0 == 0) goto L36
            r0 = r4
            int r5 = r5 + 1
            r1 = r5
            java.lang.String r0 = r0 + "-" + r1
            r6 = r0
            goto L1e
        L36:
            r0 = r3
            r1 = r6
            boolean r0 = r0.add(r1)
            r0 = r6
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: io.continual.services.processor.engine.runtime.Engine.getName(java.util.Set, java.lang.String):java.lang.String");
    }
}
