package org.opensearch.migrations.bulkload.worker;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.Generated;
import org.opensearch.migrations.bulkload.common.DocumentReindexer;
import org.opensearch.migrations.bulkload.common.RfsException;
import org.opensearch.migrations.bulkload.common.SnapshotShardUnpacker;
import org.opensearch.migrations.bulkload.lucene.LuceneIndexReader;
import org.opensearch.migrations.bulkload.models.ShardMetadata;
import org.opensearch.migrations.bulkload.workcoordination.IWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.ScopedWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.WorkItemTimeProvider;
import org.opensearch.migrations.reindexer.tracing.IDocumentMigrationContexts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/opensearch/migrations/bulkload/worker/DocumentsRunner.class */
public class DocumentsRunner {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DocumentsRunner.class);
    private final ScopedWorkCoordinator workCoordinator;
    private final Duration maxInitialLeaseDuration;
    private final DocumentReindexer reindexer;
    private final SnapshotShardUnpacker.Factory unpackerFactory;
    private final BiFunction<String, Integer, ShardMetadata> shardMetadataFactory;
    private final LuceneIndexReader.Factory readerFactory;
    private final Consumer<WorkItemCursor> cursorConsumer;
    private final Consumer<Runnable> cancellationTriggerConsumer;
    private final WorkItemTimeProvider timeProvider;

    /* loaded from: input_file:org/opensearch/migrations/bulkload/worker/DocumentsRunner$CompletionStatus.class */
    public enum CompletionStatus {
        NOTHING_DONE,
        WORK_COMPLETED
    }

    /* loaded from: input_file:org/opensearch/migrations/bulkload/worker/DocumentsRunner$ShardTooLargeException.class */
    public static class ShardTooLargeException extends RfsException {
        public ShardTooLargeException(long j, long j2) {
            super("The shard size of " + j + " bytes exceeds the maximum shard size of " + this + " bytes");
        }
    }

    public CompletionStatus migrateNextShard(Supplier<IDocumentMigrationContexts.IDocumentReindexContext> supplier) throws IOException, InterruptedException {
        final IDocumentMigrationContexts.IDocumentReindexContext iDocumentReindexContext = supplier.get();
        try {
            ScopedWorkCoordinator scopedWorkCoordinator = this.workCoordinator;
            ScopedWorkCoordinator.WorkItemGetter workItemGetter = iWorkCoordinator -> {
                try {
                    Duration duration = this.maxInitialLeaseDuration;
                    Objects.requireNonNull(iDocumentReindexContext);
                    IWorkCoordinator.WorkAcquisitionOutcome acquireNextWorkItem = iWorkCoordinator.acquireNextWorkItem(duration, iDocumentReindexContext::createOpeningContext);
                    this.timeProvider.getLeaseAcquisitionTimeRef().set(Instant.now());
                    return acquireNextWorkItem;
                } catch (IOException e) {
                    throw e;
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    throw e2;
                }
            };
            IWorkCoordinator.WorkAcquisitionOutcomeVisitor<CompletionStatus> workAcquisitionOutcomeVisitor = new IWorkCoordinator.WorkAcquisitionOutcomeVisitor<CompletionStatus>() { // from class: org.opensearch.migrations.bulkload.worker.DocumentsRunner.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.opensearch.migrations.bulkload.workcoordination.IWorkCoordinator.WorkAcquisitionOutcomeVisitor
                public CompletionStatus onAlreadyCompleted() {
                    return CompletionStatus.NOTHING_DONE;
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.opensearch.migrations.bulkload.workcoordination.IWorkCoordinator.WorkAcquisitionOutcomeVisitor
                public CompletionStatus onAcquiredWork(IWorkCoordinator.WorkItemAndDuration workItemAndDuration) {
                    Flux<WorkItemCursor> flux = DocumentsRunner.this.setupDocMigration(workItemAndDuration.getWorkItem(), iDocumentReindexContext);
                    CountDownLatch countDownLatch = new CountDownLatch(1);
                    Scheduler newSingle = Schedulers.newSingle("workFinishScheduler");
                    Disposable subscribe = flux.subscribeOn(newSingle).doFinally(signalType -> {
                        newSingle.dispose();
                    }).takeLast(1).subscribe(workItemCursor -> {
                    }, th -> {
                        DocumentsRunner.log.atError().setCause(th).setMessage("Error prevented some batches from being processed").log();
                    }, () -> {
                        DocumentsRunner.log.atInfo().setMessage("Reindexing completed for Index {}, Shard {}").addArgument(workItemAndDuration.getWorkItem().getIndexName()).addArgument(workItemAndDuration.getWorkItem().getShardNumber()).log();
                        countDownLatch.countDown();
                    });
                    Consumer<Runnable> consumer = DocumentsRunner.this.cancellationTriggerConsumer;
                    Objects.requireNonNull(subscribe);
                    consumer.accept(subscribe::dispose);
                    try {
                        countDownLatch.await();
                        return CompletionStatus.WORK_COMPLETED;
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw e;
                    }
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.opensearch.migrations.bulkload.workcoordination.IWorkCoordinator.WorkAcquisitionOutcomeVisitor
                public CompletionStatus onNoAvailableWorkToBeDone() throws IOException {
                    return CompletionStatus.NOTHING_DONE;
                }
            };
            Objects.requireNonNull(iDocumentReindexContext);
            CompletionStatus completionStatus = (CompletionStatus) scopedWorkCoordinator.ensurePhaseCompletion(workItemGetter, workAcquisitionOutcomeVisitor, iDocumentReindexContext::createCloseContet);
            if (iDocumentReindexContext != null) {
                iDocumentReindexContext.close();
            }
            return completionStatus;
        } catch (Throwable th) {
            if (iDocumentReindexContext != null) {
                try {
                    iDocumentReindexContext.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private Flux<WorkItemCursor> setupDocMigration(IWorkCoordinator.WorkItemAndDuration.WorkItem workItem, IDocumentMigrationContexts.IDocumentReindexContext iDocumentReindexContext) {
        log.atInfo().setMessage("Migrating docs for {}").addArgument(workItem).log();
        LuceneIndexReader reader = this.readerFactory.getReader(this.unpackerFactory.create(this.shardMetadataFactory.apply(workItem.getIndexName(), workItem.getShardNumber())).unpack());
        this.timeProvider.getDocumentMigraionStartTimeRef().set(Instant.now());
        return this.reindexer.reindex(workItem.getIndexName(), reader.readDocuments(workItem.getStartingDocId().intValue()), iDocumentReindexContext).doOnNext(this.cursorConsumer);
    }

    @Generated
    public DocumentsRunner(ScopedWorkCoordinator scopedWorkCoordinator, Duration duration, DocumentReindexer documentReindexer, SnapshotShardUnpacker.Factory factory, BiFunction<String, Integer, ShardMetadata> biFunction, LuceneIndexReader.Factory factory2, Consumer<WorkItemCursor> consumer, Consumer<Runnable> consumer2, WorkItemTimeProvider workItemTimeProvider) {
        this.workCoordinator = scopedWorkCoordinator;
        this.maxInitialLeaseDuration = duration;
        this.reindexer = documentReindexer;
        this.unpackerFactory = factory;
        this.shardMetadataFactory = biFunction;
        this.readerFactory = factory2;
        this.cursorConsumer = consumer;
        this.cancellationTriggerConsumer = consumer2;
        this.timeProvider = workItemTimeProvider;
    }
}
