package io.infinitic.pulsar;

import com.sksamuel.hoplite.ConfigLoader;
import com.sksamuel.hoplite.PropertySource;
import io.infinitic.common.storage.keySet.CachedLoggedKeySetStorage;
import io.infinitic.common.storage.keySet.KeySetCache;
import io.infinitic.common.storage.keySet.KeySetStorage;
import io.infinitic.common.storage.keyValue.CachedLoggedKeyValueStorage;
import io.infinitic.common.storage.keyValue.KeyValueCache;
import io.infinitic.common.storage.keyValue.KeyValueStorage;
import io.infinitic.config.WorkerConfig;
import io.infinitic.config.cache.GetKeySetCacheKt;
import io.infinitic.config.cache.GetKeyValueCacheKt;
import io.infinitic.config.data.Mode;
import io.infinitic.config.data.Monitoring;
import io.infinitic.config.data.TagEngine;
import io.infinitic.config.data.Task;
import io.infinitic.config.data.TaskEngine;
import io.infinitic.config.data.Workflow;
import io.infinitic.config.data.WorkflowEngine;
import io.infinitic.config.storage.GetKeySetStorageKt;
import io.infinitic.config.storage.GetKeyValueStorageKt;
import io.infinitic.pulsar.transport.PulsarConsumerFactory;
import io.infinitic.pulsar.transport.PulsarOutputs;
import io.infinitic.pulsar.workers.StartPulsarMonitoringGlobalWorkerKt;
import io.infinitic.pulsar.workers.StartPulsarMonitoringPerNameWorkerKt;
import io.infinitic.pulsar.workers.StartPulsarTagEngineWorkerKt;
import io.infinitic.pulsar.workers.StartPulsarTaskEngineWorkerKt;
import io.infinitic.pulsar.workers.StartPulsarTaskExecutorWorkerKt;
import io.infinitic.pulsar.workers.StartPulsarWorkflowEngineWorkerKt;
import io.infinitic.storage.StateStorage;
import io.infinitic.tasks.TaskExecutorRegister;
import io.infinitic.tasks.executor.register.TaskExecutorRegisterImpl;
import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kotlin.Metadata;
import kotlin.collections.ArraysKt;
import kotlin.collections.CollectionsKt;
import kotlin.coroutines.CoroutineContext;
import kotlin.jvm.JvmField;
import kotlin.jvm.JvmStatic;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.Reflection;
import kotlin.text.StringsKt;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineDispatcher;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.ExecutorsKt;
import org.apache.pulsar.client.api.PulsarClient;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* compiled from: InfiniticWorker.kt */
@Metadata(mv = {1, 4, 2}, bv = {1, 0, 3}, k = 1, d1 = {"��B\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0006\u0018�� \u00192\u00020\u0001:\u0001\u0019B\u0015\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005¢\u0006\u0002\u0010\u0006J\u0006\u0010\n\u001a\u00020\u000bJ\u0006\u0010\f\u001a\u00020\u000bJ,\u0010\r\u001a\u00020\u000b*\u00020\u000e2\u0006\u0010\u000f\u001a\u00020\u00102\u0006\u0010\u0004\u001a\u00020\u00052\u0006\u0010\u0011\u001a\u00020\u00122\u0006\u0010\u0013\u001a\u00020\u0014H\u0002J,\u0010\u0015\u001a\u00020\u000b*\u00020\u000e2\u0006\u0010\u000f\u001a\u00020\u00102\u0006\u0010\u0004\u001a\u00020\u00052\u0006\u0010\u0011\u001a\u00020\u00122\u0006\u0010\u0013\u001a\u00020\u0014H\u0002J,\u0010\u0016\u001a\u00020\u000b*\u00020\u000e2\u0006\u0010\u000f\u001a\u00020\u00102\u0006\u0010\u0004\u001a\u00020\u00052\u0006\u0010\u0011\u001a\u00020\u00122\u0006\u0010\u0013\u001a\u00020\u0014H\u0002J,\u0010\u0017\u001a\u00020\u000b*\u00020\u000e2\u0006\u0010\u000f\u001a\u00020\u00102\u0006\u0010\u0004\u001a\u00020\u00052\u0006\u0010\u0011\u001a\u00020\u00122\u0006\u0010\u0013\u001a\u00020\u0014H\u0002J,\u0010\u0018\u001a\u00020\u000b*\u00020\u000e2\u0006\u0010\u000f\u001a\u00020\u00102\u0006\u0010\u0004\u001a\u00020\u00052\u0006\u0010\u0011\u001a\u00020\u00122\u0006\u0010\u0013\u001a\u00020\u0014H\u0002R\u0010\u0010\u0004\u001a\u00020\u00058\u0006X\u0087\u0004¢\u0006\u0002\n��R\u0016\u0010\u0007\u001a\n \t*\u0004\u0018\u00010\b0\bX\u0082\u0004¢\u0006\u0002\n��R\u0010\u0010\u0002\u001a\u00020\u00038\u0006X\u0087\u0004¢\u0006\u0002\n��¨\u0006\u001a"}, d2 = {"Lio/infinitic/pulsar/InfiniticWorker;", "", "pulsarClient", "Lorg/apache/pulsar/client/api/PulsarClient;", "config", "Lio/infinitic/config/WorkerConfig;", "(Lorg/apache/pulsar/client/api/PulsarClient;Lio/infinitic/config/WorkerConfig;)V", "logger", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "close", "", "start", "startMonitoringWorkers", "Lkotlinx/coroutines/CoroutineScope;", "consumerName", "", "pulsarConsumerFactory", "Lio/infinitic/pulsar/transport/PulsarConsumerFactory;", "pulsarOutputs", "Lio/infinitic/pulsar/transport/PulsarOutputs;", "startTagEngineWorkers", "startTaskEngineWorkers", "startTaskExecutorWorkers", "startWorkflowEngineWorkers", "Companion", "infinitic-pulsar"})
/* loaded from: input_file:io/infinitic/pulsar/InfiniticWorker.class */
public final class InfiniticWorker {
    private final Logger logger;

    @JvmField
    @NotNull
    public final PulsarClient pulsarClient;

    @JvmField
    @NotNull
    public final WorkerConfig config;

    @NotNull
    public static final Companion Companion = new Companion(null);

    /* compiled from: InfiniticWorker.kt */
    @Metadata(mv = {1, 4, 2}, bv = {1, 0, 3}, k = 1, d1 = {"��&\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0011\n\u0002\u0010\u000e\n\u0002\b\u0004\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002J\u0010\u0010\u0003\u001a\u00020\u00042\u0006\u0010\u0005\u001a\u00020\u0006H\u0007J!\u0010\u0007\u001a\u00020\u00042\u0012\u0010\b\u001a\n\u0012\u0006\b\u0001\u0012\u00020\n0\t\"\u00020\nH\u0007¢\u0006\u0002\u0010\u000bJ!\u0010\f\u001a\u00020\u00042\u0012\u0010\r\u001a\n\u0012\u0006\b\u0001\u0012\u00020\n0\t\"\u00020\nH\u0007¢\u0006\u0002\u0010\u000b¨\u0006\u000e"}, d2 = {"Lio/infinitic/pulsar/InfiniticWorker$Companion;", "", "()V", "fromConfig", "Lio/infinitic/pulsar/InfiniticWorker;", "config", "Lio/infinitic/config/WorkerConfig;", "fromConfigFile", "files", "", "", "([Ljava/lang/String;)Lio/infinitic/pulsar/InfiniticWorker;", "fromConfigResource", "resources", "infinitic-pulsar"})
    /* loaded from: input_file:io/infinitic/pulsar/InfiniticWorker$Companion.class */
    public static final class Companion {
        @JvmStatic
        @NotNull
        public final InfiniticWorker fromConfig(@NotNull WorkerConfig workerConfig) {
            Intrinsics.checkNotNullParameter(workerConfig, "config");
            PulsarClient build = PulsarClient.builder().serviceUrl(workerConfig.pulsar.serviceUrl).build();
            Intrinsics.checkNotNullExpressionValue(build, "PulsarClient\n           …\n                .build()");
            return new InfiniticWorker(build, workerConfig);
        }

        @JvmStatic
        @NotNull
        public final InfiniticWorker fromConfigResource(@NotNull String... strArr) {
            Intrinsics.checkNotNullParameter(strArr, "resources");
            Companion companion = this;
            List list = ArraysKt.toList(strArr);
            ConfigLoader.Builder builder = new ConfigLoader.Builder();
            List list2 = CollectionsKt.toList(list);
            ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(list2, 10));
            Iterator it = list2.iterator();
            while (it.hasNext()) {
                arrayList.add(builder.addSource(PropertySource.Companion.resource((String) it.next(), false)));
            }
            ConfigLoader build = builder.build();
            return companion.fromConfig((WorkerConfig) build.returnOrThrow(build.loadConfig(Reflection.getOrCreateKotlinClass(WorkerConfig.class), CollectionsKt.emptyList())));
        }

        @JvmStatic
        @NotNull
        public final InfiniticWorker fromConfigFile(@NotNull String... strArr) {
            Intrinsics.checkNotNullParameter(strArr, "files");
            Companion companion = this;
            List list = ArraysKt.toList(strArr);
            ConfigLoader.Builder builder = new ConfigLoader.Builder();
            List list2 = CollectionsKt.toList(list);
            ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(list2, 10));
            Iterator it = list2.iterator();
            while (it.hasNext()) {
                arrayList.add(builder.addSource(PropertySource.Companion.file(new File((String) it.next()), false)));
            }
            ConfigLoader build = builder.build();
            return companion.fromConfig((WorkerConfig) build.returnOrThrow(build.loadConfig(Reflection.getOrCreateKotlinClass(WorkerConfig.class), CollectionsKt.emptyList())));
        }

        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    public final void close() {
        this.pulsarClient.close();
    }

    public final void start() {
        BuildersKt.runBlocking$default((CoroutineContext) null, new InfiniticWorker$start$1(this, null), 1, (Object) null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void startTagEngineWorkers(CoroutineScope coroutineScope, String str, WorkerConfig workerConfig, PulsarConsumerFactory pulsarConsumerFactory, PulsarOutputs pulsarOutputs) {
        TagEngine tagEngine = workerConfig.tagEngine;
        if (tagEngine == null || tagEngine.getModeOrDefault() != Mode.worker) {
            return;
        }
        KeyValueCache keyValueCache = GetKeyValueCacheKt.getKeyValueCache(tagEngine.getStateCacheOrDefault(), workerConfig);
        StateStorage stateStorage = tagEngine.stateStorage;
        Intrinsics.checkNotNull(stateStorage);
        KeyValueStorage cachedLoggedKeyValueStorage = new CachedLoggedKeyValueStorage(keyValueCache, GetKeyValueStorageKt.getKeyValueStorage(stateStorage, workerConfig));
        KeySetCache keySetCache = GetKeySetCacheKt.getKeySetCache(tagEngine.getStateCacheOrDefault(), workerConfig);
        StateStorage stateStorage2 = tagEngine.stateStorage;
        Intrinsics.checkNotNull(stateStorage2);
        KeySetStorage cachedLoggedKeySetStorage = new CachedLoggedKeySetStorage(keySetCache, GetKeySetStorageKt.getKeySetStorage(stateStorage2, workerConfig));
        System.out.print((Object) (StringsKt.padEnd$default("Tag engine", 25, (char) 0, 2, (Object) null) + ": starting " + tagEngine.consumers + " instances... (storage: " + tagEngine.stateStorage + ", cache:" + tagEngine.getStateCacheOrDefault() + ')'));
        int consumersOrDefault = tagEngine.getConsumersOrDefault();
        for (int i = 0; i < consumersOrDefault; i++) {
            int i2 = i;
            this.logger.info("InfiniticWorker - starting tag engine {}", Integer.valueOf(i2));
            StartPulsarTagEngineWorkerKt.startPulsarTagEngineWorker(coroutineScope, i2, pulsarConsumerFactory.newTagEngineConsumer(str, i2), cachedLoggedKeySetStorage, pulsarOutputs.getSendEventsToClient(), pulsarOutputs.getSendCommandsToTaskEngine(), pulsarOutputs.getSendCommandsToWorkflowEngine(), cachedLoggedKeyValueStorage);
        }
        System.out.println((Object) " done");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void startTaskEngineWorkers(CoroutineScope coroutineScope, String str, WorkerConfig workerConfig, PulsarConsumerFactory pulsarConsumerFactory, PulsarOutputs pulsarOutputs) {
        TaskEngine taskEngine = workerConfig.taskEngine;
        if (taskEngine == null || taskEngine.getModeOrDefault() != Mode.worker) {
            return;
        }
        KeyValueCache keyValueCache = GetKeyValueCacheKt.getKeyValueCache(taskEngine.getStateCacheOrDefault(), workerConfig);
        StateStorage stateStorage = taskEngine.stateStorage;
        Intrinsics.checkNotNull(stateStorage);
        KeyValueStorage cachedLoggedKeyValueStorage = new CachedLoggedKeyValueStorage(keyValueCache, GetKeyValueStorageKt.getKeyValueStorage(stateStorage, workerConfig));
        System.out.print((Object) (StringsKt.padEnd$default("Task engine", 25, (char) 0, 2, (Object) null) + ": starting " + taskEngine.consumers + " instances... (storage: " + taskEngine.stateStorage + ", cache:" + taskEngine.getStateCacheOrDefault() + ')'));
        int consumersOrDefault = taskEngine.getConsumersOrDefault();
        for (int i = 0; i < consumersOrDefault; i++) {
            int i2 = i;
            this.logger.info("InfiniticWorker - starting task engine {}", Integer.valueOf(i2));
            StartPulsarTaskEngineWorkerKt.startPulsarTaskEngineWorker(coroutineScope, i2, pulsarConsumerFactory.newTaskEngineConsumer(str, i2), cachedLoggedKeyValueStorage, pulsarOutputs.getSendEventsToClient(), pulsarOutputs.getSendEventsToTagEngine(), pulsarOutputs.getSendEventsToTaskEngine(), pulsarOutputs.getSendEventsToWorkflowEngine(), pulsarOutputs.getSendToTaskExecutors(), pulsarOutputs.getSendToMetricsPerName());
        }
        System.out.println((Object) " done");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void startWorkflowEngineWorkers(CoroutineScope coroutineScope, String str, WorkerConfig workerConfig, PulsarConsumerFactory pulsarConsumerFactory, PulsarOutputs pulsarOutputs) {
        WorkflowEngine workflowEngine = workerConfig.workflowEngine;
        if (workflowEngine == null || workflowEngine.getModeOrDefault() != Mode.worker) {
            return;
        }
        KeyValueCache keyValueCache = GetKeyValueCacheKt.getKeyValueCache(workflowEngine.getStateCacheOrDefault(), workerConfig);
        StateStorage stateStorage = workflowEngine.stateStorage;
        Intrinsics.checkNotNull(stateStorage);
        KeyValueStorage cachedLoggedKeyValueStorage = new CachedLoggedKeyValueStorage(keyValueCache, GetKeyValueStorageKt.getKeyValueStorage(stateStorage, workerConfig));
        System.out.print((Object) (StringsKt.padEnd$default("Workflow engine", 25, (char) 0, 2, (Object) null) + ": starting " + workflowEngine.consumers + " instances... (storage: " + workflowEngine.stateStorage + ", cache:" + workflowEngine.getStateCacheOrDefault() + ')'));
        int consumersOrDefault = workflowEngine.getConsumersOrDefault();
        for (int i = 0; i < consumersOrDefault; i++) {
            int i2 = i;
            this.logger.info("InfiniticWorker - starting workflow engine {}", Integer.valueOf(i2));
            StartPulsarWorkflowEngineWorkerKt.startPulsarWorkflowEngineWorker(coroutineScope, i2, pulsarConsumerFactory.newWorkflowEngineConsumer(str, i2), cachedLoggedKeyValueStorage, pulsarOutputs.getSendEventsToClient(), pulsarOutputs.getSendEventsToTagEngine(), pulsarOutputs.getSendCommandsToTaskEngine(), pulsarOutputs.getSendEventsToWorkflowEngine());
        }
        System.out.println((Object) " done");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void startMonitoringWorkers(CoroutineScope coroutineScope, String str, WorkerConfig workerConfig, PulsarConsumerFactory pulsarConsumerFactory, PulsarOutputs pulsarOutputs) {
        Monitoring monitoring = workerConfig.monitoring;
        if (monitoring == null || monitoring.mode != Mode.worker) {
            return;
        }
        KeyValueCache keyValueCache = GetKeyValueCacheKt.getKeyValueCache(monitoring.getStateCacheOrDefault(), workerConfig);
        StateStorage stateStorage = monitoring.stateStorage;
        Intrinsics.checkNotNull(stateStorage);
        KeyValueStorage cachedLoggedKeyValueStorage = new CachedLoggedKeyValueStorage(keyValueCache, GetKeyValueStorageKt.getKeyValueStorage(stateStorage, workerConfig));
        int consumersOrDefault = monitoring.getConsumersOrDefault();
        for (int i = 0; i < consumersOrDefault; i++) {
            int i2 = i;
            this.logger.info("InfiniticWorker - starting monitoring per name {}", Integer.valueOf(i2));
            StartPulsarMonitoringPerNameWorkerKt.startPulsarMonitoringPerNameWorker(coroutineScope, i2, pulsarConsumerFactory.newMonitoringPerNameEngineConsumer(str, i2), cachedLoggedKeyValueStorage, pulsarOutputs.getSendToMetricsGlobal());
        }
        this.logger.info("InfiniticWorker - starting monitoring global");
        StartPulsarMonitoringGlobalWorkerKt.startPulsarMonitoringGlobalWorker(coroutineScope, pulsarConsumerFactory.newMonitoringGlobalEngineConsumer(str), cachedLoggedKeyValueStorage);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void startTaskExecutorWorkers(CoroutineScope coroutineScope, String str, WorkerConfig workerConfig, PulsarConsumerFactory pulsarConsumerFactory, PulsarOutputs pulsarOutputs) {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        Intrinsics.checkNotNullExpressionValue(newCachedThreadPool, "Executors.newCachedThreadPool()");
        CoroutineDispatcher from = ExecutorsKt.from(newCachedThreadPool);
        TaskExecutorRegister taskExecutorRegisterImpl = new TaskExecutorRegisterImpl();
        for (final Workflow workflow : workerConfig.workflows) {
            if (workflow.getModeOrDefault() == Mode.worker) {
                taskExecutorRegisterImpl.register(workflow.name, new Function0<Object>() { // from class: io.infinitic.pulsar.InfiniticWorker$startTaskExecutorWorkers$1
                    @NotNull
                    public final Object invoke() {
                        return workflow.getInstance();
                    }

                    /* JADX INFO: Access modifiers changed from: package-private */
                    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                    {
                        super(0);
                    }
                });
                int i = workflow.consumers;
                for (int i2 = 0; i2 < i; i2++) {
                    int i3 = i2;
                    System.out.print((Object) (StringsKt.padEnd$default("Workflow executor", 25, (char) 0, 2, (Object) null) + ": starting " + workflow.concurrency + " instances for " + workflow.name + "..."));
                    this.logger.info("InfiniticWorker - starting workflow executor for {}", workflow.name);
                    StartPulsarTaskExecutorWorkerKt.startPulsarTaskExecutorWorker(coroutineScope, from, workflow.name, i3, pulsarConsumerFactory.newWorkflowExecutorConsumer(str, i3, workflow.name), pulsarOutputs.getSendEventsToTaskEngine(), taskExecutorRegisterImpl, workflow.concurrency);
                    System.out.println((Object) " done");
                }
            }
        }
        for (final Task task : workerConfig.tasks) {
            if (task.getModeOrDefault() == Mode.worker) {
                taskExecutorRegisterImpl.register(task.name, new Function0<Object>() { // from class: io.infinitic.pulsar.InfiniticWorker$startTaskExecutorWorkers$3
                    @NotNull
                    public final Object invoke() {
                        return task.getInstance();
                    }

                    /* JADX INFO: Access modifiers changed from: package-private */
                    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                    {
                        super(0);
                    }
                });
                int i4 = task.consumers;
                for (int i5 = 0; i5 < i4; i5++) {
                    int i6 = i5;
                    System.out.print((Object) (StringsKt.padEnd$default("Task executor", 25, (char) 0, 2, (Object) null) + ": starting " + task.concurrency + " instances for " + task.name + "..."));
                    this.logger.info("InfiniticWorker - starting task executor for {}", task.name);
                    StartPulsarTaskExecutorWorkerKt.startPulsarTaskExecutorWorker(coroutineScope, from, task.name, i6, pulsarConsumerFactory.newTaskExecutorConsumer(str, i6, task.name), pulsarOutputs.getSendEventsToTaskEngine(), taskExecutorRegisterImpl, task.concurrency);
                    System.out.println((Object) " done");
                }
            }
        }
    }

    public InfiniticWorker(@NotNull PulsarClient pulsarClient, @NotNull WorkerConfig workerConfig) {
        Intrinsics.checkNotNullParameter(pulsarClient, "pulsarClient");
        Intrinsics.checkNotNullParameter(workerConfig, "config");
        this.pulsarClient = pulsarClient;
        this.config = workerConfig;
        this.logger = LoggerFactory.getLogger(getClass());
    }

    @JvmStatic
    @NotNull
    public static final InfiniticWorker fromConfig(@NotNull WorkerConfig workerConfig) {
        return Companion.fromConfig(workerConfig);
    }

    @JvmStatic
    @NotNull
    public static final InfiniticWorker fromConfigResource(@NotNull String... strArr) {
        return Companion.fromConfigResource(strArr);
    }

    @JvmStatic
    @NotNull
    public static final InfiniticWorker fromConfigFile(@NotNull String... strArr) {
        return Companion.fromConfigFile(strArr);
    }
}
