package io.pravega.controller.task.Stream;

import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.controller.fault.FailoverSweeper;
import io.pravega.controller.server.rest.generated.api.ApiResponseMessage;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.TxnStatus;
import io.pravega.controller.store.stream.VersionedTransactionData;
import io.pravega.controller.store.task.TxnResource;
import io.pravega.controller.util.Config;
import io.pravega.controller.util.RetryHelper;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.Collection;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/task/Stream/TxnSweeper.class */
public class TxnSweeper implements FailoverSweeper {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(TxnSweeper.class);
    private final StreamMetadataStore streamMetadataStore;
    private final StreamTransactionMetadataTasks transactionMetadataTasks;
    private final long maxTxnTimeoutMillis;
    private final ScheduledExecutorService executor;

    /* renamed from: io.pravega.controller.task.Stream.TxnSweeper$1, reason: invalid class name */
    /* loaded from: input_file:io/pravega/controller/task/Stream/TxnSweeper$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$pravega$controller$store$stream$TxnStatus = new int[TxnStatus.values().length];

        static {
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.OPEN.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.ABORTING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.COMMITTING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.UNKNOWN.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/controller/task/Stream/TxnSweeper$Result.class */
    public static class Result {
        private final TxnResource txnResource;
        private final Object value;
        private final Throwable error;

        @SuppressFBWarnings(justification = "generated code")
        @ConstructorProperties({"txnResource", "value", "error"})
        public Result(TxnResource txnResource, Object obj, Throwable th) {
            this.txnResource = txnResource;
            this.value = obj;
            this.error = th;
        }

        @SuppressFBWarnings(justification = "generated code")
        public TxnResource getTxnResource() {
            return this.txnResource;
        }

        @SuppressFBWarnings(justification = "generated code")
        public Object getValue() {
            return this.value;
        }

        @SuppressFBWarnings(justification = "generated code")
        public Throwable getError() {
            return this.error;
        }

        @SuppressFBWarnings(justification = "generated code")
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Result)) {
                return false;
            }
            Result result = (Result) obj;
            if (!result.canEqual(this)) {
                return false;
            }
            TxnResource txnResource = getTxnResource();
            TxnResource txnResource2 = result.getTxnResource();
            if (txnResource == null) {
                if (txnResource2 != null) {
                    return false;
                }
            } else if (!txnResource.equals(txnResource2)) {
                return false;
            }
            Object value = getValue();
            Object value2 = result.getValue();
            if (value == null) {
                if (value2 != null) {
                    return false;
                }
            } else if (!value.equals(value2)) {
                return false;
            }
            Throwable error = getError();
            Throwable error2 = result.getError();
            return error == null ? error2 == null : error.equals(error2);
        }

        @SuppressFBWarnings(justification = "generated code")
        protected boolean canEqual(Object obj) {
            return obj instanceof Result;
        }

        @SuppressFBWarnings(justification = "generated code")
        public int hashCode() {
            TxnResource txnResource = getTxnResource();
            int hashCode = (1 * 59) + (txnResource == null ? 43 : txnResource.hashCode());
            Object value = getValue();
            int hashCode2 = (hashCode * 59) + (value == null ? 43 : value.hashCode());
            Throwable error = getError();
            return (hashCode2 * 59) + (error == null ? 43 : error.hashCode());
        }

        @SuppressFBWarnings(justification = "generated code")
        public String toString() {
            return "TxnSweeper.Result(txnResource=" + getTxnResource() + ", value=" + getValue() + ", error=" + getError() + ")";
        }
    }

    public TxnSweeper(StreamMetadataStore streamMetadataStore, StreamTransactionMetadataTasks streamTransactionMetadataTasks, long j, ScheduledExecutorService scheduledExecutorService) {
        Preconditions.checkNotNull(streamMetadataStore, "streamMetadataStore");
        Preconditions.checkNotNull(streamTransactionMetadataTasks, "transactionMetadataTasks");
        Preconditions.checkNotNull(scheduledExecutorService, "executor");
        Preconditions.checkArgument(j > 0, "maxTxnTimeoutMillis should be a positive number");
        this.streamMetadataStore = streamMetadataStore;
        this.transactionMetadataTasks = streamTransactionMetadataTasks;
        this.maxTxnTimeoutMillis = j;
        this.executor = scheduledExecutorService;
    }

    public void awaitInitialization() throws InterruptedException {
        this.transactionMetadataTasks.awaitInitialization();
    }

    @Override // io.pravega.controller.fault.FailoverSweeper
    public boolean isReady() {
        return this.transactionMetadataTasks.isReady();
    }

    @Override // io.pravega.controller.fault.FailoverSweeper
    public CompletableFuture<Void> sweepFailedProcesses(Supplier<Set<String>> supplier) {
        if (!this.transactionMetadataTasks.isReady()) {
            return Futures.failedFuture(new IllegalStateException(getClass().getName() + " not yet ready"));
        }
        StreamMetadataStore streamMetadataStore = this.streamMetadataStore;
        streamMetadataStore.getClass();
        return RetryHelper.withRetriesAsync(streamMetadataStore::listHostsOwningTxn, RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor).thenComposeAsync(set -> {
            set.removeAll((Collection) supplier.get());
            log.info("Failed hosts {} have orphaned tasks", set);
            return Futures.allOf((Collection) set.stream().map(this::handleFailedProcess).collect(Collectors.toList()));
        }, (Executor) this.executor);
    }

    @Override // io.pravega.controller.fault.FailoverSweeper
    public CompletableFuture<Void> handleFailedProcess(String str) {
        if (!this.transactionMetadataTasks.isReady()) {
            return Futures.failedFuture(new IllegalStateException(getClass().getName() + " not yet ready"));
        }
        log.info("Host={}, sweeping orphaned transactions", str);
        return Futures.delayedFuture(Duration.ofMillis(2 * this.maxTxnTimeoutMillis), this.executor).thenComposeAsync(r7 -> {
            return RetryHelper.withRetriesAsync(() -> {
                return sweepOrphanedTxnsWithoutDelay(str);
            }, RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor);
        });
    }

    private CompletableFuture<Void> sweepOrphanedTxnsWithoutDelay(String str) {
        return Futures.doWhileLoop(() -> {
            return failOverTxns(str);
        }, result -> {
            return result != null;
        }, this.executor).whenCompleteAsync((r6, th) -> {
            if (th != null) {
                log.warn("Host={}, Caught exception sweeping orphaned transactions", str, th);
            } else {
                log.debug("Host={}, sweeping orphaned transactions complete", str);
            }
        }, (Executor) this.executor);
    }

    private CompletableFuture<Result> failOverTxns(String str) {
        return this.streamMetadataStore.getRandomTxnFromIndex(str).thenComposeAsync(optional -> {
            return optional.isPresent() ? failOverTxn(str, (TxnResource) optional.get()) : this.streamMetadataStore.removeHostFromIndex(str).thenApplyAsync(r2 -> {
                return null;
            }, (Executor) this.executor);
        }, (Executor) this.executor);
    }

    private CompletableFuture<Result> failOverTxn(String str, TxnResource txnResource) {
        String scope = txnResource.getScope();
        String stream = txnResource.getStream();
        UUID txnId = txnResource.getTxnId();
        log.debug("Host = {}, processing transaction {}/{}/{}", new Object[]{str, scope, stream, txnId});
        return this.streamMetadataStore.getTransactionData(scope, stream, txnId, null, this.executor).handle((versionedTransactionData, th) -> {
            if (th == null) {
                return versionedTransactionData;
            }
            if (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException) {
                return VersionedTransactionData.EMPTY;
            }
            throw new CompletionException(th);
        }).thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) versionedTransactionData2 -> {
            int epoch = versionedTransactionData2.getEpoch();
            switch (AnonymousClass1.$SwitchMap$io$pravega$controller$store$stream$TxnStatus[versionedTransactionData2.getStatus().ordinal()]) {
                case ApiResponseMessage.ERROR /* 1 */:
                    return failOverOpenTxn(str, txnResource).handleAsync((r7, th2) -> {
                        return new Result(txnResource, r7, th2);
                    }, (Executor) this.executor);
                case ApiResponseMessage.WARNING /* 2 */:
                    return failOverAbortingTxn(str, epoch, txnResource).handleAsync((r72, th3) -> {
                        return new Result(txnResource, r72, th3);
                    }, (Executor) this.executor);
                case ApiResponseMessage.INFO /* 3 */:
                    return failOverCommittingTxn(str, epoch, txnResource).handleAsync((r73, th4) -> {
                        return new Result(txnResource, r73, th4);
                    }, (Executor) this.executor);
                case ApiResponseMessage.OK /* 4 */:
                default:
                    return this.streamMetadataStore.removeTxnFromIndex(str, txnResource, true).thenApply(r74 -> {
                        return new Result(txnResource, null, null);
                    });
            }
        }, (Executor) this.executor).whenComplete((result, th2) -> {
            log.debug("Host = {}, processing transaction {}/{}/{} complete", new Object[]{str, scope, stream, txnId});
        });
    }

    private CompletableFuture<Void> failOverCommittingTxn(String str, int i, TxnResource txnResource) {
        String scope = txnResource.getScope();
        String stream = txnResource.getStream();
        UUID txnId = txnResource.getTxnId();
        log.debug("Host = {}, failing over committing transaction {}/{}/{}", new Object[]{str, scope, stream, txnId});
        return this.transactionMetadataTasks.writeCommitEvent(scope, stream, i, txnId, TxnStatus.COMMITTING).thenComposeAsync(txnStatus -> {
            return this.streamMetadataStore.removeTxnFromIndex(str, txnResource, true);
        }, (Executor) this.executor);
    }

    private CompletableFuture<Void> failOverAbortingTxn(String str, int i, TxnResource txnResource) {
        String scope = txnResource.getScope();
        String stream = txnResource.getStream();
        UUID txnId = txnResource.getTxnId();
        log.debug("Host = {}, failing over aborting transaction {}/{}/{}", new Object[]{str, scope, stream, txnId});
        return this.transactionMetadataTasks.writeAbortEvent(scope, stream, i, txnId, TxnStatus.ABORTING).thenComposeAsync(txnStatus -> {
            return this.streamMetadataStore.removeTxnFromIndex(str, txnResource, true);
        }, (Executor) this.executor);
    }

    private CompletableFuture<Void> failOverOpenTxn(String str, TxnResource txnResource) {
        String scope = txnResource.getScope();
        String stream = txnResource.getStream();
        UUID txnId = txnResource.getTxnId();
        log.debug("Host = {}, failing over open transaction {}/{}/{}", new Object[]{str, scope, stream, txnId});
        return this.streamMetadataStore.getTransactionData(scope, stream, txnId, null, this.executor).thenCompose(versionedTransactionData -> {
            return this.transactionMetadataTasks.pingTxn(scope, stream, txnResource.getTxnId(), Config.MAX_LEASE_VALUE, null);
        }).thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) pingTxnStatus -> {
            return this.streamMetadataStore.removeTxnFromIndex(str, txnResource, true);
        }, (Executor) this.executor);
    }
}
