package risesoft.data.transfer.core;

import java.util.ArrayList;
import java.util.List;
import risesoft.data.transfer.core.channel.JoinOutExecutorChannel;
import risesoft.data.transfer.core.channel.OutChannel;
import risesoft.data.transfer.core.context.JobContext;
import risesoft.data.transfer.core.context.StreamContext;
import risesoft.data.transfer.core.exchange.CoreExchange;
import risesoft.data.transfer.core.exchange.Exchange;
import risesoft.data.transfer.core.executor.ExecutorTaskQueue;
import risesoft.data.transfer.core.factory.FactoryManager;
import risesoft.data.transfer.core.handle.HandleManager;
import risesoft.data.transfer.core.handle.InitApplicationConfigHandle;
import risesoft.data.transfer.core.job.Job;
import risesoft.data.transfer.core.job.JobEndHandle;
import risesoft.data.transfer.core.job.JobEngine;
import risesoft.data.transfer.core.listener.JobListener;
import risesoft.data.transfer.core.listener.impl.ResultJobListener;
import risesoft.data.transfer.core.log.HandledLoggerFactory;
import risesoft.data.transfer.core.log.LoggerFactory;
import risesoft.data.transfer.core.plug.PlugManager;
import risesoft.data.transfer.core.statistics.Communication;
import risesoft.data.transfer.core.statistics.State;
import risesoft.data.transfer.core.stream.in.DataInputStreamFactory;
import risesoft.data.transfer.core.stream.out.DataOutputStreamFactory;
import risesoft.data.transfer.core.util.CloseUtils;
import risesoft.data.transfer.core.util.Configuration;
import risesoft.data.transfer.core.util.ConfigurationConst;
import risesoft.data.transfer.core.util.ValueUtils;

/* loaded from: input_file:risesoft/data/transfer/core/Engine.class */
public class Engine {
    private static final String JOB_NAME_KEY = "core.name";

    public static ResultJobListener start(String str, Configuration configuration) {
        ResultJobListener resultJobListener = new ResultJobListener();
        start(str, configuration, resultJobListener, null);
        return resultJobListener;
    }

    public static JobContext start(String str, Configuration configuration, JobListener jobListener) {
        return start(str, configuration, jobListener, null);
    }

    public static JobContext start(String str, Configuration configuration, JobListener jobListener, LoggerFactory loggerFactory) {
        JobContext jobContext = new JobContext(new Communication(), str, new HandleManager(), configuration, jobListener);
        if (loggerFactory == null) {
            try {
                loggerFactory = new HandledLoggerFactory(jobContext.getHandles());
                jobContext.getHandles().add((HandledLoggerFactory) loggerFactory);
            } catch (Throwable th) {
                jobContext.getCommunication().setThrowable(th, true);
                jobContext.getCommunication().setState(State.FAILED);
                onJobFlush(jobContext);
                jobContext.getLogger().error(Engine.class, "初始化任务失败" + th.getMessage());
                return jobContext;
            }
        }
        jobContext.setName(configuration.getString(JOB_NAME_KEY, Thread.currentThread().getName()));
        jobContext.setLoggerFactory(loggerFactory);
        PlugManager.loadPlug(configuration, jobContext);
        jobContext.doHandle(InitApplicationConfigHandle.class, initApplicationConfigHandle -> {
            initApplicationConfigHandle.initApplicationConfig(configuration);
        });
        jobContext.getLogger().info(Engine.class, "正在装配核心组件");
        createJobs(configuration, jobContext);
        jobContext.setInExecutorTaskQueue((ExecutorTaskQueue) FactoryManager.getInstanceOfConfiguration((Configuration) ValueUtils.getRequired(configuration.getConfiguration(ConfigurationConst.EXECUTOR_INPUT), "缺少输入队列执行器"), ExecutorTaskQueue.class, jobContext.getInstanceMap())).setInChannelConfiguration((Configuration) ValueUtils.getRequired(configuration.getConfiguration(ConfigurationConst.IN_CHANNEL), "缺失输入通道")).setOutExecutorTaskQueue((ExecutorTaskQueue) FactoryManager.getInstanceOfConfiguration((Configuration) ValueUtils.getRequired(configuration.getConfiguration(ConfigurationConst.EXECUTOR_OUTPUT), "缺少输出队列执行器"), ExecutorTaskQueue.class, jobContext.getInstanceMap())).setCoreExchange(new CoreExchange((Exchange) FactoryManager.getInstanceOfConfiguration((Configuration) ValueUtils.getRequired(configuration.getConfiguration(ConfigurationConst.CORE_EXCHANGE), "缺少核心交换机"), Exchange.class, jobContext.getInstanceMap()), jobContext.getCommunication())).getCoreExchange().setOutChannel((OutChannel) FactoryManager.getInstanceOfConfiguration((Configuration) ValueUtils.getRequired(configuration.getConfiguration(ConfigurationConst.OUT_CHANNEL), "缺失输出通道"), OutChannel.class, jobContext.getInstanceMap()));
        jobContext.getCoreExchange().getOutChannel().setOutPutStream(new JoinOutExecutorChannel(jobContext.getOutExecutorTaskQueue()));
        jobContext.getLogger().info(Engine.class, "组件装配完成任务开始");
        return JobEngine.start(jobContext);
    }

    private static void createJobs(Configuration configuration, JobContext jobContext) {
        List<Configuration> list = (List) ValueUtils.getRequired(configuration.getListConfiguration(ConfigurationConst.JOBS), "没有任务");
        ArrayList arrayList = new ArrayList();
        for (Configuration configuration2 : list) {
            Job job = new Job();
            job.setStreamContext(new StreamContext());
            job.getStreamContext().setDataInputStreamFactory((DataInputStreamFactory) FactoryManager.getInstanceOfConfiguration((Configuration) ValueUtils.getRequired(configuration2.getConfiguration(ConfigurationConst.INPUT), "未找到输入流"), DataInputStreamFactory.class, jobContext.getInstanceMap()));
            job.getStreamContext().setDataOutputStreamFactory((DataOutputStreamFactory) FactoryManager.getInstanceOfConfiguration((Configuration) ValueUtils.getRequired(configuration2.getConfiguration(ConfigurationConst.OUTPUT), "未找到输出流"), DataOutputStreamFactory.class, jobContext.getInstanceMap()));
            arrayList.add(job);
        }
        jobContext.setJobs(arrayList);
    }

    public static void onJobFlush(JobContext jobContext) {
        if (jobContext.isEnd()) {
            return;
        }
        Communication communication = jobContext.getCommunication();
        if (communication.getState() == State.FAILED || communication.getState() == State.SUCCEEDED) {
            shutdown(jobContext);
            jobContext.setEnd(true);
            try {
                jobContext.doHandle(JobEndHandle.class, jobEndHandle -> {
                    jobEndHandle.onJobEnd(jobContext);
                });
            } catch (Exception e) {
                communication.setState(State.FAILED, true);
                communication.setThrowable(e);
            }
            jobContext.getJobListener().end(communication);
        }
    }

    private static void shutdown(JobContext jobContext) {
        CloseUtils.close(jobContext.getOutExecutorTaskQueue());
        try {
            jobContext.getOutExecutorTaskQueue().shutdown();
        } catch (Exception e) {
        }
        try {
            jobContext.getInExecutorTaskQueue().shutdown();
        } catch (Exception e2) {
        }
        try {
            jobContext.getCoreExchange().shutdown();
        } catch (Exception e3) {
        }
    }
}
