package risesoft.data.transfer.core.job;

import risesoft.data.transfer.core.Engine;
import risesoft.data.transfer.core.context.JobContext;
import risesoft.data.transfer.core.context.StreamContext;
import risesoft.data.transfer.core.executor.ExecutorListenerAdapter;
import risesoft.data.transfer.core.executor.in.JobInputExecutorFactory;
import risesoft.data.transfer.core.executor.out.JobOutputExecutorFactory;
import risesoft.data.transfer.core.statistics.Communication;
import risesoft.data.transfer.core.statistics.CommunicationTool;
import risesoft.data.transfer.core.statistics.State;

/* loaded from: input_file:risesoft/data/transfer/core/job/JobEngine.class */
public class JobEngine {
    public static JobContext start(JobContext jobContext) {
        jobContext.getCommunication().reset();
        startJob(jobContext);
        return jobContext;
    }

    public static void startJob(final JobContext jobContext) {
        try {
            try {
                final Communication communication = jobContext.getCommunication();
                if (jobContext.hasJob()) {
                    jobContext.doHandle(JobStartHandle.class, jobStartHandle -> {
                        jobStartHandle.onJobStart(jobContext);
                    });
                    Job nextJob = jobContext.nextJob();
                    final JobRunningController jobRunningController = new JobRunningController(jobContext);
                    StreamContext streamContext = nextJob.getStreamContext();
                    streamContext.getDataInputStreamFactory().init();
                    streamContext.getDataOutputStreamFactory().init();
                    jobContext.getInExecutorTaskQueue().setExecutorFacoty(new JobInputExecutorFactory(jobContext, streamContext.getDataInputStreamFactory()));
                    jobContext.getOutExecutorTaskQueue().setExecutorFacoty(new JobOutputExecutorFactory(streamContext.getDataOutputStreamFactory(), jobContext));
                    jobContext.getInExecutorTaskQueue().setExecutorListener(new ExecutorListenerAdapter() { // from class: risesoft.data.transfer.core.job.JobEngine.1
                        @Override // risesoft.data.transfer.core.executor.ExecutorListenerAdapter, risesoft.data.transfer.core.executor.ExecutorListener
                        public void onError(Throwable th) {
                            JobRunningController.this.onError(th);
                        }

                        @Override // risesoft.data.transfer.core.executor.ExecutorListenerAdapter, risesoft.data.transfer.core.executor.ExecutorListener
                        public void taskEnd(Object obj) {
                            jobContext.getCoreExchange().flush();
                            JobRunningController.this.inEnd();
                            communication.setLongCounter(CommunicationTool.READ_JOB_END, System.currentTimeMillis());
                        }

                        @Override // risesoft.data.transfer.core.executor.ExecutorListenerAdapter, risesoft.data.transfer.core.executor.ExecutorListener
                        public void taskStart(Object obj) {
                            if (communication.getLongCounter(CommunicationTool.READ_JOB_START).longValue() == 0) {
                                communication.setLongCounter(CommunicationTool.READ_JOB_START, System.currentTimeMillis());
                            }
                        }
                    });
                    jobContext.getOutExecutorTaskQueue().setExecutorListener(new ExecutorListenerAdapter() { // from class: risesoft.data.transfer.core.job.JobEngine.2
                        @Override // risesoft.data.transfer.core.executor.ExecutorListenerAdapter, risesoft.data.transfer.core.executor.ExecutorListener
                        public void onError(Throwable th) {
                            JobRunningController.this.onError(th);
                        }

                        @Override // risesoft.data.transfer.core.executor.ExecutorListenerAdapter, risesoft.data.transfer.core.executor.ExecutorListener
                        public void taskEnd(Object obj) {
                            if (JobRunningController.this.isEnd()) {
                                communication.setLongCounter(CommunicationTool.WRITER_JOB_END, System.currentTimeMillis());
                                JobRunningController.this.outEnd();
                            }
                        }

                        @Override // risesoft.data.transfer.core.executor.ExecutorListenerAdapter, risesoft.data.transfer.core.executor.ExecutorListener
                        public void taskStart(Object obj) {
                            if (communication.getLongCounter(CommunicationTool.WRITER_JOB_START).longValue() == 0) {
                                communication.setLongCounter(CommunicationTool.WRITER_JOB_START, System.currentTimeMillis());
                            }
                        }
                    });
                    jobContext.getInExecutorTaskQueue().addBatch(streamContext.getDataInputStreamFactory().splitToData(jobContext.getInExecutorTaskQueue().getExecutorSize()));
                    jobContext.getOutExecutorTaskQueue().start();
                    jobContext.getInExecutorTaskQueue().start();
                } else {
                    jobContext.getCommunication().setState(State.SUCCEEDED);
                }
                Engine.onJobFlush(jobContext);
            } catch (Throwable th) {
                jobContext.getCommunication().setThrowable(th, true);
                jobContext.getCommunication().setState(State.FAILED);
                Engine.onJobFlush(jobContext);
            }
        } catch (Throwable th2) {
            Engine.onJobFlush(jobContext);
            throw th2;
        }
    }
}
