package io.pravega.controller.task;

import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.controller.fault.FailoverSweeper;
import io.pravega.controller.store.task.LockFailedException;
import io.pravega.controller.store.task.TaggedResource;
import io.pravega.controller.store.task.TaskMetadataStore;
import io.pravega.controller.task.TaskBase;
import io.pravega.controller.util.RetryHelper;
import java.beans.ConstructorProperties;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/task/TaskSweeper.class */
public class TaskSweeper implements FailoverSweeper {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(TaskSweeper.class);
    private final TaskMetadataStore taskMetadataStore;
    private final TaskBase[] taskClassObjects;
    private final Map<String, Method> methodMap = new HashMap();
    private final Map<String, TaskBase> objectMap = new HashMap();
    private final String hostId;
    private final ScheduledExecutorService executor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/pravega/controller/task/TaskSweeper$Result.class */
    public static class Result {
        private final TaggedResource taggedResource;
        private final Object value;
        private final Throwable error;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        @ConstructorProperties({"taggedResource", "value", "error"})
        public Result(TaggedResource taggedResource, Object obj, Throwable th) {
            this.taggedResource = taggedResource;
            this.value = obj;
            this.error = th;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public TaggedResource getTaggedResource() {
            return this.taggedResource;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public Object getValue() {
            return this.value;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public Throwable getError() {
            return this.error;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Result)) {
                return false;
            }
            Result result = (Result) obj;
            if (!result.canEqual(this)) {
                return false;
            }
            TaggedResource taggedResource = getTaggedResource();
            TaggedResource taggedResource2 = result.getTaggedResource();
            if (taggedResource == null) {
                if (taggedResource2 != null) {
                    return false;
                }
            } else if (!taggedResource.equals(taggedResource2)) {
                return false;
            }
            Object value = getValue();
            Object value2 = result.getValue();
            if (value == null) {
                if (value2 != null) {
                    return false;
                }
            } else if (!value.equals(value2)) {
                return false;
            }
            Throwable error = getError();
            Throwable error2 = result.getError();
            return error == null ? error2 == null : error.equals(error2);
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        protected boolean canEqual(Object obj) {
            return obj instanceof Result;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public int hashCode() {
            TaggedResource taggedResource = getTaggedResource();
            int hashCode = (1 * 59) + (taggedResource == null ? 43 : taggedResource.hashCode());
            Object value = getValue();
            int hashCode2 = (hashCode * 59) + (value == null ? 43 : value.hashCode());
            Throwable error = getError();
            return (hashCode2 * 59) + (error == null ? 43 : error.hashCode());
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String toString() {
            return "TaskSweeper.Result(taggedResource=" + getTaggedResource() + ", value=" + getValue() + ", error=" + getError() + ")";
        }
    }

    public TaskSweeper(TaskMetadataStore taskMetadataStore, String str, ScheduledExecutorService scheduledExecutorService, TaskBase... taskBaseArr) {
        this.taskMetadataStore = taskMetadataStore;
        this.hostId = str;
        this.executor = scheduledExecutorService;
        for (TaskBase taskBase : taskBaseArr) {
            Preconditions.checkArgument(taskBase.getContext().getHostId().equals(str));
        }
        this.taskClassObjects = taskBaseArr;
        initializeMappingTable();
    }

    @Override // io.pravega.controller.fault.FailoverSweeper
    public boolean isReady() {
        return true;
    }

    @Override // io.pravega.controller.fault.FailoverSweeper
    public CompletableFuture<Void> sweepFailedProcesses(Supplier<Set<String>> supplier) {
        TaskMetadataStore taskMetadataStore = this.taskMetadataStore;
        taskMetadataStore.getClass();
        return RetryHelper.withRetriesAsync(taskMetadataStore::getHosts, RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor).thenComposeAsync(set -> {
            log.info("Hosts {} have ongoing tasks", set);
            set.removeAll((Collection) RetryHelper.withRetries(supplier, RetryHelper.UNCONDITIONAL_PREDICATE, Integer.MAX_VALUE));
            log.info("Failed hosts {} have orphaned tasks", set);
            return Futures.allOf((Collection) set.stream().map(this::handleFailedProcess).collect(Collectors.toList()));
        }, (Executor) this.executor);
    }

    @Override // io.pravega.controller.fault.FailoverSweeper
    public CompletableFuture<Void> handleFailedProcess(String str) {
        log.info("Sweeping orphaned tasks for host {}", str);
        return RetryHelper.withRetriesAsync(() -> {
            return Futures.doWhileLoop(() -> {
                return executeHostTask(str);
            }, result -> {
                return result != null;
            }, this.executor).whenCompleteAsync((r5, th) -> {
                log.info("Sweeping orphaned tasks for host {} complete", str);
            }, (Executor) this.executor);
        }, RetryHelper.RETRYABLE_PREDICATE.and(th -> {
            return !(Exceptions.unwrap(th) instanceof LockFailedException);
        }), Integer.MAX_VALUE, this.executor);
    }

    private CompletableFuture<Result> executeHostTask(String str) {
        return this.taskMetadataStore.getRandomChild(str).thenComposeAsync(optional -> {
            if (!optional.isPresent()) {
                log.debug("Host={} fetched no child of {}", this.hostId, str);
                return this.taskMetadataStore.removeNode(str).thenApplyAsync(r2 -> {
                    return null;
                }, (Executor) this.executor);
            }
            TaggedResource taggedResource = (TaggedResource) optional.get();
            log.debug("Host={} processing child <{}, {}> of {}", new Object[]{this.hostId, taggedResource.getResource(), taggedResource.getTag(), str});
            return executeResourceTask(str, taggedResource);
        }, (Executor) this.executor);
    }

    private CompletableFuture<Result> executeResourceTask(String str, TaggedResource taggedResource) {
        CompletableFuture<Result> completableFuture = new CompletableFuture<>();
        this.taskMetadataStore.getTask(taggedResource.getResource(), str, taggedResource.getTag()).whenCompleteAsync((optional, th) -> {
            if (optional != null && optional.isPresent()) {
                log.debug("Host={} found task for child <{}, {}> of {}", new Object[]{this.hostId, taggedResource.getResource(), taggedResource.getTag(), str});
                execute(str, (TaskData) optional.get(), taggedResource).whenCompleteAsync((obj, th) -> {
                    completableFuture.complete(new Result(taggedResource, obj, th));
                }, (Executor) this.executor);
            } else if (optional == null) {
                completableFuture.complete(new Result(taggedResource, null, th));
            } else {
                log.debug("Host={} found no task for child <{}, {}> of {}. Removing child.", new Object[]{this.hostId, taggedResource.getResource(), taggedResource.getTag(), str});
                this.taskMetadataStore.removeChild(str, taggedResource, true).whenCompleteAsync((r10, th2) -> {
                    completableFuture.complete(new Result(taggedResource, null, th));
                }, (Executor) this.executor);
            }
        }, (Executor) this.executor);
        return completableFuture;
    }

    private CompletableFuture<Object> execute(String str, TaskData taskData, TaggedResource taggedResource) {
        log.debug("Host={} attempting to execute task {}-{} for child <{}, {}> of {}", new Object[]{this.hostId, taskData.getMethodName(), taskData.getMethodVersion(), taggedResource.getResource(), taggedResource.getTag(), str});
        try {
            String key = getKey(taskData.getMethodName(), taskData.getMethodVersion());
            if (!this.methodMap.containsKey(key)) {
                CompletableFuture<Object> completableFuture = new CompletableFuture<>();
                log.warn("Task {} not found", taskData.getMethodName());
                completableFuture.completeExceptionally(new RuntimeException(String.format("Task %s not found", taskData.getMethodName())));
                return completableFuture;
            }
            Method method = this.methodMap.get(key);
            if (this.objectMap.get(key).isReady()) {
                return (CompletableFuture) method.invoke(this.objectMap.get(key).copyWithContext(new TaskBase.Context(this.hostId, str, taggedResource.getTag(), taggedResource.getResource())), taskData.getParameters());
            }
            log.info("Task module for method {} not yet ready, delaying processing it", method.getName());
            return Futures.delayedFuture(Duration.ofMillis(100L), this.executor).thenApplyAsync(r2 -> {
                return null;
            }, (Executor) this.executor);
        } catch (Exception e) {
            CompletableFuture<Object> completableFuture2 = new CompletableFuture<>();
            completableFuture2.completeExceptionally(e);
            return completableFuture2;
        }
    }

    private void initializeMappingTable() {
        for (TaskBase taskBase : this.taskClassObjects) {
            for (Method method : taskBase.getClass().getDeclaredMethods()) {
                for (Annotation annotation : method.getAnnotations()) {
                    if (annotation instanceof Task) {
                        String name = ((Task) annotation).name();
                        String version = ((Task) annotation).version();
                        String key = getKey(name, version);
                        if (this.methodMap.containsKey(key)) {
                            throw new DuplicateTaskAnnotationException(name, version);
                        }
                        this.methodMap.put(key, method);
                        this.objectMap.put(key, taskBase);
                    }
                }
            }
        }
    }

    private String getKey(String str, String str2) {
        return str + "--" + str2;
    }
}
