package cn.ponfee.disjob.dispatch;

import cn.ponfee.disjob.common.base.Startable;
import cn.ponfee.disjob.common.collect.Collects;
import cn.ponfee.disjob.common.concurrent.AsyncDelayedExecutor;
import cn.ponfee.disjob.common.concurrent.DelayedData;
import cn.ponfee.disjob.core.base.RetryProperties;
import cn.ponfee.disjob.core.base.Worker;
import cn.ponfee.disjob.dispatch.event.TaskDispatchFailedEvent;
import cn.ponfee.disjob.dispatch.route.ExecutionRouterRegistrar;
import cn.ponfee.disjob.registry.Discovery;
import com.google.common.math.IntMath;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.util.Assert;

/* loaded from: input_file:cn/ponfee/disjob/dispatch/TaskDispatcher.class */
public abstract class TaskDispatcher implements Startable {
    protected final Logger log = LoggerFactory.getLogger(getClass());
    private final ApplicationEventPublisher eventPublisher;
    private final Discovery<Worker> discoverWorker;
    private final TaskReceiver taskReceiver;
    private final int retryMaxCount;
    private final long retryBackoffPeriod;
    private final AsyncDelayedExecutor<DispatchTaskParam> asyncDelayedExecutor;

    protected TaskDispatcher(ApplicationEventPublisher applicationEventPublisher, Discovery<Worker> discovery, RetryProperties retryProperties, @Nullable TaskReceiver taskReceiver) {
        ((RetryProperties) Objects.requireNonNull(retryProperties, "Retry properties cannot be null.")).check();
        this.eventPublisher = (ApplicationEventPublisher) Objects.requireNonNull(applicationEventPublisher);
        this.discoverWorker = (Discovery) Objects.requireNonNull(discovery);
        this.taskReceiver = taskReceiver;
        this.retryMaxCount = retryProperties.getMaxCount();
        this.retryBackoffPeriod = retryProperties.getBackoffPeriod();
        this.asyncDelayedExecutor = new AsyncDelayedExecutor<>(5, dispatchTaskParam -> {
            dispatch0(Collections.singletonList(dispatchTaskParam));
        });
    }

    public final boolean dispatch(List<ExecuteTaskParam> list) {
        if (CollectionUtils.isEmpty(list)) {
            return false;
        }
        for (ExecuteTaskParam executeTaskParam : list) {
            Assert.notNull(executeTaskParam.getOperation(), () -> {
                return "Dispatch task operation cannot be null: " + executeTaskParam;
            });
            Assert.isTrue(executeTaskParam.getOperation().isNotTrigger(), () -> {
                return "Specific dispatch task operation cannot be trigger: " + executeTaskParam;
            });
            Assert.notNull(executeTaskParam.getWorker(), () -> {
                return "Specific dispatch task worker cannot be null: " + executeTaskParam;
            });
        }
        return dispatch0(Collects.convert(list, executeTaskParam2 -> {
            return new DispatchTaskParam(executeTaskParam2, null);
        }));
    }

    public final boolean dispatch(String str, List<ExecuteTaskParam> list) {
        if (CollectionUtils.isEmpty(list)) {
            return false;
        }
        for (ExecuteTaskParam executeTaskParam : list) {
            Assert.notNull(executeTaskParam.getOperation(), () -> {
                return "Dispatch task operation cannot be null: " + executeTaskParam;
            });
            Assert.isTrue(executeTaskParam.getOperation().isTrigger(), () -> {
                return "Assign dispatch task operation must be trigger: " + executeTaskParam;
            });
            if (executeTaskParam.getRouteStrategy().isBroadcast()) {
                Assert.notNull(executeTaskParam.getWorker(), () -> {
                    return "Broadcast dispatch task worker cannot be null: " + executeTaskParam;
                });
            }
        }
        return dispatch0(Collects.convert(list, executeTaskParam2 -> {
            return new DispatchTaskParam(executeTaskParam2, str);
        }));
    }

    protected abstract boolean doDispatch(ExecuteTaskParam executeTaskParam) throws Exception;

    public void start() {
    }

    public void stop() {
        this.asyncDelayedExecutor.doStop();
    }

    private boolean dispatch0(List<DispatchTaskParam> list) {
        ((Map) list.stream().filter(dispatchTaskParam -> {
            return dispatchTaskParam.task().getWorker() == null;
        }).collect(Collectors.groupingBy(dispatchTaskParam2 -> {
            return Long.valueOf(dispatchTaskParam2.task().getInstanceId());
        }))).forEach((l, list2) -> {
            assignWorker(list2);
        });
        boolean z = true;
        for (DispatchTaskParam dispatchTaskParam3 : list) {
            ExecuteTaskParam task = dispatchTaskParam3.task();
            this.log.info("Task trace [{}] dispatching: {}, {}, {}", new Object[]{Long.valueOf(task.getTaskId()), task.getOperation(), task.getWorker(), Integer.valueOf(dispatchTaskParam3.retried())});
            try {
                doDispatch0(task);
                this.log.info("Task trace [{}] dispatched: {}, {}", new Object[]{Long.valueOf(task.getTaskId()), task.getOperation(), task.getWorker()});
            } catch (Throwable th) {
                if (th instanceof TaskDispatchException) {
                    this.log.error("Dispatch task failed: {}, {}", th.getMessage(), dispatchTaskParam3);
                } else {
                    this.log.error("Dispatch task error: " + dispatchTaskParam3, th);
                }
                retry(dispatchTaskParam3);
                z = false;
            }
        }
        return z;
    }

    private void assignWorker(List<DispatchTaskParam> list) {
        DispatchTaskParam dispatchTaskParam = list.get(0);
        String group = dispatchTaskParam.group();
        List discoveredServers = this.discoverWorker.getDiscoveredServers(group);
        if (CollectionUtils.isEmpty(discoveredServers)) {
            this.log.error("Not found available [{}] worker for assign task.", group);
        } else {
            ExecutionRouterRegistrar.route(dispatchTaskParam.task().getRouteStrategy(), Collects.convert(list, (v0) -> {
                return v0.task();
            }), discoveredServers);
        }
    }

    private void doDispatch0(ExecuteTaskParam executeTaskParam) throws Exception {
        boolean doDispatch;
        if (executeTaskParam.getWorker() == null) {
            throw new TaskDispatchException("unassigned");
        }
        if (this.taskReceiver == null || !executeTaskParam.getWorker().matches(Worker.local())) {
            doDispatch = doDispatch(executeTaskParam);
        } else {
            this.log.info("Dispatching task to local worker {}, {}, {}", new Object[]{Long.valueOf(executeTaskParam.getTaskId()), executeTaskParam.getOperation(), executeTaskParam.getWorker()});
            doDispatch = this.taskReceiver.receive(executeTaskParam);
        }
        if (!doDispatch) {
            throw new TaskDispatchException("false");
        }
    }

    private void retry(DispatchTaskParam dispatchTaskParam) {
        ExecuteTaskParam task = dispatchTaskParam.task();
        if (dispatchTaskParam.retried() >= this.retryMaxCount) {
            this.log.error("Dispatched task retried max count still failed: {}", task);
            this.eventPublisher.publishEvent(TaskDispatchFailedEvent.of(task));
            return;
        }
        this.log.info("Delay retrying dispatch task [{}]: {}", Integer.valueOf(dispatchTaskParam.retried()), task);
        int retrying = dispatchTaskParam.retrying();
        if (task.getRouteStrategy().isNotBroadcast() && task.getOperation().isTrigger()) {
            task.setWorker(null);
        }
        this.asyncDelayedExecutor.put(DelayedData.of(dispatchTaskParam, this.retryBackoffPeriod * IntMath.pow(retrying, 2)));
    }
}
