package io.pravega.controller.task.Stream;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.concurrent.Futures;
import io.pravega.controller.fault.FailoverSweeper;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.util.RetryHelper;
import io.pravega.shared.controller.event.ControllerEvent;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/task/Stream/RequestSweeper.class */
public class RequestSweeper implements FailoverSweeper {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(RequestSweeper.class);
    public static final int LIMIT = 100;
    private final StreamMetadataStore metadataStore;
    private final ScheduledExecutorService executor;
    private final StreamMetadataTasks streamMetadataTasks;
    private final int limit;

    public RequestSweeper(StreamMetadataStore streamMetadataStore, ScheduledExecutorService scheduledExecutorService, StreamMetadataTasks streamMetadataTasks) {
        this(streamMetadataStore, scheduledExecutorService, streamMetadataTasks, 100);
    }

    @VisibleForTesting
    RequestSweeper(StreamMetadataStore streamMetadataStore, ScheduledExecutorService scheduledExecutorService, StreamMetadataTasks streamMetadataTasks, int i) {
        this.metadataStore = streamMetadataStore;
        this.executor = scheduledExecutorService;
        this.streamMetadataTasks = streamMetadataTasks;
        this.limit = i;
    }

    @Override // io.pravega.controller.fault.FailoverSweeper
    public boolean isReady() {
        return true;
    }

    @Override // io.pravega.controller.fault.FailoverSweeper
    public CompletableFuture<Void> sweepFailedProcesses(Supplier<Set<String>> supplier) {
        StreamMetadataStore streamMetadataStore = this.metadataStore;
        streamMetadataStore.getClass();
        return RetryHelper.withRetriesAsync(streamMetadataStore::listHostsWithPendingTask, RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor).thenComposeAsync(set -> {
            log.info("Hosts {} have ongoing tasks", set);
            set.removeAll((Collection) RetryHelper.withRetries(supplier, RetryHelper.UNCONDITIONAL_PREDICATE, Integer.MAX_VALUE));
            log.info("Failed hosts {} have orphaned tasks", set);
            return Futures.allOf((Collection) set.stream().map(this::handleFailedProcess).collect(Collectors.toList()));
        }, (Executor) this.executor);
    }

    @Override // io.pravega.controller.fault.FailoverSweeper
    public CompletableFuture<Void> handleFailedProcess(String str) {
        log.info("Sweeping orphaned tasks for host {}", str);
        return RetryHelper.withRetriesAsync(() -> {
            return Futures.doWhileLoop(() -> {
                return postRequest(str);
            }, list -> {
                return !list.isEmpty();
            }, this.executor).whenCompleteAsync((r5, th) -> {
                log.info("Sweeping orphaned tasks for host {} complete", str);
            }, (Executor) this.executor);
        }, RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor);
    }

    @VisibleForTesting
    CompletableFuture<List<String>> postRequest(String str) {
        return this.metadataStore.getPendingsTaskForHost(str, this.limit).thenComposeAsync(map -> {
            return Futures.allOfWithResults((List) map.entrySet().stream().map(entry -> {
                return this.streamMetadataTasks.writeEvent((ControllerEvent) entry.getValue()).thenCompose(r7 -> {
                    return this.metadataStore.removeTaskFromIndex(str, (String) entry.getKey());
                }).thenApply((Function<? super U, ? extends U>) r3 -> {
                    return (String) entry.getKey();
                });
            }).collect(Collectors.toList()));
        }, (Executor) this.executor);
    }
}
