package cn.sliew.carp.module.http.sync.framework.model;

import cn.sliew.carp.module.http.sync.framework.model.internal.SimpleJobContext;
import cn.sliew.milky.common.exception.Rethrower;
import cn.sliew.milky.common.util.JacksonUtil;
import java.lang.invoke.SerializedLambda;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.SpawnProtocol;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.FlowShape;
import org.apache.pekko.stream.KillSwitches;
import org.apache.pekko.stream.UniformFanInShape;
import org.apache.pekko.stream.UniformFanOutShape;
import org.apache.pekko.stream.UniqueKillSwitch;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.GraphDSL;
import org.apache.pekko.stream.javadsl.Keep;
import org.apache.pekko.stream.javadsl.MergeSequence;
import org.apache.pekko.stream.javadsl.Partition;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/sliew/carp/module/http/sync/framework/model/AbstractJob.class */
public abstract class AbstractJob implements Job {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractJob.class);
    protected ActorSystem<SpawnProtocol.Command> actorSystem;
    protected SyncOffsetManager syncOffsetManager;
    protected SplitManager splitManager;

    public AbstractJob(ActorSystem<SpawnProtocol.Command> actorSystem, SyncOffsetManager syncOffsetManager, SplitManager splitManager) {
        this.actorSystem = actorSystem;
        this.syncOffsetManager = syncOffsetManager;
        this.splitManager = splitManager;
    }

    @Override // cn.sliew.carp.module.http.sync.framework.model.Job
    public void process(String str) {
        doExecute(str);
    }

    protected void doExecute(Object obj) {
        SimpleJobContext buildJobContext = buildJobContext();
        JobProcessor buildJobProcessor = buildJobProcessor(buildJobContext);
        Source viaMat = Source.single(buildRootTask(obj)).mapConcat(rootTask -> {
            return buildJobProcessor.map(rootTask);
        }).viaMat(KillSwitches.single(), Keep.right());
        Flow mapAsync = Flow.create().map(subTask -> {
            return buildJobProcessor.process(subTask);
        }).mapAsync(1, completableFuture -> {
            return completableFuture;
        });
        Pair pair = (Pair) viaMat.via(Flow.fromGraph(GraphDSL.create(builder -> {
            int subTaskParallelism = buildJobContext.getSubTaskParallelism();
            UniformFanOutShape add = builder.add(Partition.create(subTaskParallelism, subTask2 -> {
                return Integer.valueOf(Math.toIntExact(subTask2.getIdentifier().longValue()) % subTaskParallelism);
            }));
            UniformFanInShape add2 = builder.add(MergeSequence.create(subTaskParallelism, processResult -> {
                return processResult.getSubTask().getIdentifier();
            }));
            for (int i = 0; i < subTaskParallelism; i++) {
                builder.from(add.out(i)).via(builder.add(mapAsync.async())).viaFanIn(add2);
            }
            return FlowShape.of(add.in(), add2.out());
        }))).log(getJobName()).toMat(Sink.foreach(processResult -> {
            buildJobProcessor.reduce(processResult);
        }), Keep.both()).run(this.actorSystem);
        UniqueKillSwitch uniqueKillSwitch = (UniqueKillSwitch) pair.first();
        try {
            ((CompletionStage) pair.second()).toCompletableFuture().get(1L, TimeUnit.HOURS);
        } catch (Exception e) {
            log.error("job 执行异常, job: {}, param: {}", getJobName(), JacksonUtil.toJsonString(obj));
            uniqueKillSwitch.abort(e);
            Rethrower.throwAs(e);
        }
    }

    public abstract String getJobName();

    protected SimpleJobContext buildJobContext() {
        SimpleJobContext simpleJobContext = new SimpleJobContext();
        simpleJobContext.setActorSystem(this.actorSystem);
        simpleJobContext.setSyncOffsetManager(this.syncOffsetManager);
        simpleJobContext.setSplitManager(this.splitManager);
        return simpleJobContext;
    }

    protected JobProcessor buildJobProcessor(SimpleJobContext simpleJobContext) {
        return new DefaultJobProcessor(simpleJobContext);
    }

    protected abstract RootTask buildRootTask(Object obj);

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2140411857:
                if (implMethodName.equals("lambda$doExecute$bac1d32a$1")) {
                    z = 3;
                    break;
                }
                break;
            case -2084291332:
                if (implMethodName.equals("lambda$doExecute$df151efd$1")) {
                    z = 5;
                    break;
                }
                break;
            case -2068794430:
                if (implMethodName.equals("lambda$doExecute$8f1068a0$1")) {
                    z = 2;
                    break;
                }
                break;
            case -1402138890:
                if (implMethodName.equals("lambda$doExecute$e7463012$1")) {
                    z = 6;
                    break;
                }
                break;
            case -1177773460:
                if (implMethodName.equals("lambda$doExecute$f32fa3a6$1")) {
                    z = false;
                    break;
                }
                break;
            case -622940119:
                if (implMethodName.equals("lambda$doExecute$13d9674a$1")) {
                    z = 4;
                    break;
                }
                break;
            case -351111021:
                if (implMethodName.equals("lambda$doExecute$22e4da14$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pekko/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cn/sliew/carp/module/http/sync/framework/model/AbstractJob") && serializedLambda.getImplMethodSignature().equals("(Lcn/sliew/carp/module/http/sync/framework/model/internal/SimpleJobContext;Lorg/apache/pekko/stream/javadsl/Flow;Lorg/apache/pekko/stream/javadsl/GraphDSL$Builder;)Lorg/apache/pekko/stream/FlowShape;")) {
                    SimpleJobContext simpleJobContext = (SimpleJobContext) serializedLambda.getCapturedArg(0);
                    Flow flow = (Flow) serializedLambda.getCapturedArg(1);
                    return builder -> {
                        int subTaskParallelism = simpleJobContext.getSubTaskParallelism();
                        UniformFanOutShape add = builder.add(Partition.create(subTaskParallelism, subTask2 -> {
                            return Integer.valueOf(Math.toIntExact(subTask2.getIdentifier().longValue()) % subTaskParallelism);
                        }));
                        UniformFanInShape add2 = builder.add(MergeSequence.create(subTaskParallelism, processResult -> {
                            return processResult.getSubTask().getIdentifier();
                        }));
                        for (int i = 0; i < subTaskParallelism; i++) {
                            builder.from(add.out(i)).via(builder.add(flow.async())).viaFanIn(add2);
                        }
                        return FlowShape.of(add.in(), add2.out());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pekko/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cn/sliew/carp/module/http/sync/framework/model/AbstractJob") && serializedLambda.getImplMethodSignature().equals("(Lcn/sliew/carp/module/http/sync/framework/model/internal/ProcessResult;)Ljava/lang/Object;")) {
                    return processResult -> {
                        return processResult.getSubTask().getIdentifier();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pekko/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cn/sliew/carp/module/http/sync/framework/model/AbstractJob") && serializedLambda.getImplMethodSignature().equals("(Lcn/sliew/carp/module/http/sync/framework/model/JobProcessor;Lcn/sliew/carp/module/http/sync/framework/model/RootTask;)Ljava/lang/Iterable;")) {
                    JobProcessor jobProcessor = (JobProcessor) serializedLambda.getCapturedArg(0);
                    return rootTask -> {
                        return jobProcessor.map(rootTask);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pekko/japi/function/Procedure") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cn/sliew/carp/module/http/sync/framework/model/AbstractJob") && serializedLambda.getImplMethodSignature().equals("(Lcn/sliew/carp/module/http/sync/framework/model/JobProcessor;Lcn/sliew/carp/module/http/sync/framework/model/internal/ProcessResult;)V")) {
                    JobProcessor jobProcessor2 = (JobProcessor) serializedLambda.getCapturedArg(0);
                    return processResult2 -> {
                        jobProcessor2.reduce(processResult2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pekko/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cn/sliew/carp/module/http/sync/framework/model/AbstractJob") && serializedLambda.getImplMethodSignature().equals("(Lcn/sliew/carp/module/http/sync/framework/model/JobProcessor;Lcn/sliew/carp/module/http/sync/framework/model/SubTask;)Ljava/util/concurrent/CompletableFuture;")) {
                    JobProcessor jobProcessor3 = (JobProcessor) serializedLambda.getCapturedArg(0);
                    return subTask -> {
                        return jobProcessor3.process(subTask);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pekko/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cn/sliew/carp/module/http/sync/framework/model/AbstractJob") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/CompletableFuture;)Ljava/util/concurrent/CompletionStage;")) {
                    return completableFuture -> {
                        return completableFuture;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pekko/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cn/sliew/carp/module/http/sync/framework/model/AbstractJob") && serializedLambda.getImplMethodSignature().equals("(ILcn/sliew/carp/module/http/sync/framework/model/SubTask;)Ljava/lang/Integer;")) {
                    int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return subTask2 -> {
                        return Integer.valueOf(Math.toIntExact(subTask2.getIdentifier().longValue()) % intValue);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
