package org.opensearch.migrations;

import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.ParametersDelegate;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.opensearch.migrations.bulkload.common.DefaultSourceRepoAccessor;
import org.opensearch.migrations.bulkload.common.DocumentReindexer;
import org.opensearch.migrations.bulkload.common.FileSystemRepo;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.S3Repo;
import org.opensearch.migrations.bulkload.common.S3Uri;
import org.opensearch.migrations.bulkload.common.SnapshotShardUnpacker;
import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
import org.opensearch.migrations.bulkload.lucene.LuceneIndexReader;
import org.opensearch.migrations.bulkload.models.IndexMetadata;
import org.opensearch.migrations.bulkload.models.ShardMetadata;
import org.opensearch.migrations.bulkload.tracing.IWorkCoordinationContexts;
import org.opensearch.migrations.bulkload.tracing.RootWorkCoordinationContext;
import org.opensearch.migrations.bulkload.workcoordination.CoordinateWorkHttpClient;
import org.opensearch.migrations.bulkload.workcoordination.IWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.LeaseExpireTrigger;
import org.opensearch.migrations.bulkload.workcoordination.OpenSearchWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.ScopedWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.WorkCoordinatorFactory;
import org.opensearch.migrations.bulkload.workcoordination.WorkItemTimeProvider;
import org.opensearch.migrations.bulkload.worker.DocumentsRunner;
import org.opensearch.migrations.bulkload.worker.ShardWorkPreparer;
import org.opensearch.migrations.bulkload.worker.WorkItemCursor;
import org.opensearch.migrations.cluster.ClusterProviderRegistry;
import org.opensearch.migrations.cluster.ClusterSnapshotReader;
import org.opensearch.migrations.reindexer.tracing.RootDocumentMigrationContext;
import org.opensearch.migrations.tracing.ActiveContextTracker;
import org.opensearch.migrations.tracing.ActiveContextTrackerByActivityType;
import org.opensearch.migrations.tracing.CompositeContextTracker;
import org.opensearch.migrations.tracing.IContextTracker;
import org.opensearch.migrations.tracing.RootOtelContext;
import org.opensearch.migrations.transform.TransformationLoader;
import org.opensearch.migrations.transform.TransformerConfigUtils;
import org.opensearch.migrations.transform.TransformerParams;
import org.opensearch.migrations.utils.ProcessHelpers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:org/opensearch/migrations/RfsMigrateDocuments.class */
public class RfsMigrateDocuments {
    private static final Logger log = LoggerFactory.getLogger(RfsMigrateDocuments.class);
    public static final int PROCESS_TIMED_OUT_EXIT_CODE = 2;
    public static final int NO_WORK_LEFT_EXIT_CODE = 3;
    public static final int TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 5;
    public static final String LOGGING_MDC_WORKER_ID = "workerId";
    private static final double DECREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD = 0.025d;
    private static final double INCREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD = 0.1d;
    public static final String DEFAULT_DOCUMENT_TRANSFORMATION_CONFIG = "[  {    \"JsonTransformerForDocumentTypeRemovalProvider\":\"\"  }]";

    /* loaded from: input_file:org/opensearch/migrations/RfsMigrateDocuments$Args.class */
    public static class Args {

        @Parameter(names = {"--help", "-h"}, help = true, description = "Displays information about how to use this tool")
        private boolean help;

        @Parameter(required = true, names = {"--snapshot-name", "--snapshotName"}, description = "The name of the snapshot to migrate")
        public String snapshotName;

        @Parameter(required = true, names = {"--lucene-dir", "--luceneDir"}, description = "The absolute path to the directory where we'll put the Lucene docs")
        public String luceneDir;

        @Parameter(required = false, names = {"--otel-collector-endpoint", "--otelCollectorEndpoint"}, arity = 1, description = "Endpoint (host:port) for the OpenTelemetry Collector to which metrics logs should beforwarded. If no value is provided, metrics will not be forwarded.")
        String otelCollectorEndpoint;

        @Parameter(required = false, names = {"--snapshot-local-dir", "--snapshotLocalDir"}, description = "The absolute path to the directory on local disk where the snapshot exists.  Use this parameter if there is a reachable copy of the snapshot on disk.  Mutually exclusive with --s3-local-dir, --s3-repo-uri, and --s3-region.")
        public String snapshotLocalDir = null;

        @Parameter(required = false, names = {"--s3-local-dir", "--s3LocalDir"}, description = "The absolute path to the directory on local disk to download S3 files to.  If you supply this, you must also supply --s3-repo-uri and --s3-region.  Mutually exclusive with --snapshot-local-dir.")
        public String s3LocalDir = null;

        @Parameter(required = false, names = {"--s3-repo-uri", "--s3RepoUri"}, description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2.  If you supply this, you must also supply --s3-local-dir and --s3-region.  Mutually exclusive with --snapshot-local-dir.")
        public String s3RepoUri = null;

        @Parameter(required = false, names = {"--s3-region", "--s3Region"}, description = "The AWS Region the S3 bucket is in, like: us-east-2.  If you supply this, you must also supply --s3-local-dir and --s3-repo-uri.  Mutually exclusive with --snapshot-local-dir.")
        public String s3Region = null;

        @ParametersDelegate
        public ConnectionContext.TargetArgs targetArgs = new ConnectionContext.TargetArgs();

        @Parameter(required = false, names = {"--index-allowlist", "--indexAllowlist"}, description = "Optional.  List of index names to migrate (e.g. 'logs_2024_01, logs_2024_02').  Default: all non-system indices (e.g. those not starting with '.')")
        public List<String> indexAllowlist = List.of();

        @Parameter(required = false, names = {"--max-shard-size-bytes", "--maxShardSizeBytes"}, description = "Optional. The maximum shard size, in bytes, to allow when performing the document migration.  Useful for preventing disk overflow.  Default: 80 * 1024 * 1024 * 1024 (80 GB)")
        public long maxShardSizeBytes = 85899345920L;

        @Parameter(required = false, names = {"--initial-lease-duration", "--initialLeaseDuration"}, converter = DurationConverter.class, description = "Optional. The time that the first attempt to migrate a shard's documents should take.  If a process takes longer than this the process will terminate, allowing another process to attempt the migration, but with double the amount of time than the last time.  Default: PT10M")
        public Duration initialLeaseDuration = Duration.ofMinutes(10);

        @Parameter(required = false, names = {"--documents-per-bulk-request", "--documentsPerBulkRequest"}, description = "Optional.  The number of documents to be included within each bulk request sent. Default no max (controlled by documents size)")
        int numDocsPerBulkRequest = Integer.MAX_VALUE;

        @Parameter(required = false, names = {"--documents-size-per-bulk-request", "--documentsSizePerBulkRequest"}, description = "Optional. The maximum aggregate document size to be used in bulk requests in bytes. Note does not apply to single document requests. Default 10 MiB")
        long numBytesPerBulkRequest = 10485760;

        @Parameter(required = false, names = {"--max-connections", "--maxConnections"}, description = "Optional.  The maximum number of connections to simultaneously used to communicate to the target, default 10")
        int maxConnections = 10;

        @Parameter(required = true, names = {"--source-version", "--sourceVersion"}, converter = VersionConverter.class, description = "Version of the source cluster.")
        public Version sourceVersion = Version.fromString("ES 7.10");

        @ParametersDelegate
        private DocParams docTransformationParams = new DocParams();
    }

    /* loaded from: input_file:org/opensearch/migrations/RfsMigrateDocuments$DocParams.class */
    public static class DocParams implements TransformerParams {
        private static final String DOC_CONFIG_PARAMETER_ARG_PREFIX = "doc-";

        @Parameter(required = false, names = {"--doc-transformer-config-base64"}, arity = 1, description = "Configuration of doc transformers.  The same contents as --doc-transformer-config but Base64 encoded so that the configuration is easier to pass as a command line parameter.")
        private String transformerConfigEncoded;

        @Parameter(required = false, names = {"--doc-transformer-config"}, arity = 1, description = "Configuration of doc transformers.  Either as a string that identifies the transformer that should be run (with default settings) or as json to specify options as well as multiple transformers to run in sequence.  For json, keys are the (simple) names of the loaded transformers and values are the configuration passed to each of the transformers.")
        private String transformerConfig;

        @Parameter(required = false, names = {"--doc-transformer-config-file"}, arity = 1, description = "Path to the JSON configuration file of doc transformers.")
        private String transformerConfigFile;

        public String getTransformerConfigParameterArgPrefix() {
            return DOC_CONFIG_PARAMETER_ARG_PREFIX;
        }

        public String getTransformerConfigEncoded() {
            return this.transformerConfigEncoded;
        }

        public String getTransformerConfig() {
            return this.transformerConfig;
        }

        public String getTransformerConfigFile() {
            return this.transformerConfigFile;
        }
    }

    /* loaded from: input_file:org/opensearch/migrations/RfsMigrateDocuments$DurationConverter.class */
    public static class DurationConverter implements IStringConverter<Duration> {
        /* renamed from: convert, reason: merged with bridge method [inline-methods] */
        public Duration m1convert(String str) {
            return Duration.parse(str);
        }
    }

    /* loaded from: input_file:org/opensearch/migrations/RfsMigrateDocuments$NoWorkLeftException.class */
    public static class NoWorkLeftException extends Exception {
        public NoWorkLeftException(String str) {
            super(str);
        }
    }

    public static void validateArgs(Args args) {
        boolean z = args.snapshotLocalDir != null;
        boolean z2 = (args.s3LocalDir == null || args.s3RepoUri == null || args.s3Region == null) ? false : true;
        boolean z3 = (args.s3LocalDir == null && args.s3RepoUri == null && args.s3Region == null) ? false : true;
        if (z && z3) {
            throw new ParameterException("You must provide either --snapshot-local-dir or --s3-local-dir, --s3-repo-uri, and --s3-region, but not both.");
        }
        if (z3 && !z2) {
            throw new ParameterException("If provide the S3 Snapshot args, you must provide all of them (--s3-local-dir, --s3-repo-uri and --s3-region).");
        }
        if (!z && !z2) {
            throw new ParameterException("You must provide either --snapshot-local-dir or --s3-local-dir, --s3-repo-uri, and --s3-region.");
        }
    }

    public static void main(String[] strArr) throws Exception {
        String nodeInstanceName = ProcessHelpers.getNodeInstanceName();
        System.err.println("Starting program with: " + String.join(" ", strArr));
        System.setProperty("log4j2.shutdownHookEnabled", "false");
        log.info("Starting RfsMigrateDocuments with workerId=" + nodeInstanceName);
        Args args = new Args();
        JCommander build = JCommander.newBuilder().addObject(args).build();
        build.parse(strArr);
        if (args.help) {
            build.usage();
            return;
        }
        validateArgs(args);
        RootDocumentMigrationContext makeRootContext = makeRootContext(args, nodeInstanceName);
        Path path = Paths.get(args.luceneDir, new String[0]);
        Path path2 = args.snapshotLocalDir != null ? Paths.get(args.snapshotLocalDir, new String[0]) : null;
        ConnectionContext connectionContext = args.targetArgs.toConnectionContext();
        OpenSearchClient determineVersionAndCreate = new OpenSearchClientFactory(connectionContext).determineVersionAndCreate();
        Version clusterVersion = determineVersionAndCreate.getClusterVersion();
        String str = (String) Optional.ofNullable(TransformerConfigUtils.getTransformerConfig(args.docTransformationParams)).orElse(DEFAULT_DOCUMENT_TRANSFORMATION_CONFIG);
        log.atInfo().setMessage("Doc Transformations config string: {}").addArgument(str).log();
        TransformationLoader transformationLoader = new TransformationLoader();
        Supplier supplier = () -> {
            return transformationLoader.getTransformerFactoryLoader(str);
        };
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        AtomicReference atomicReference3 = new AtomicReference();
        WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();
        WorkCoordinatorFactory workCoordinatorFactory = new WorkCoordinatorFactory(clusterVersion);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        try {
            CoordinateWorkHttpClient coordinateWorkHttpClient = new CoordinateWorkHttpClient(connectionContext);
            Clock systemUTC = Clock.systemUTC();
            Objects.requireNonNull(atomicReference);
            OpenSearchWorkCoordinator openSearchWorkCoordinator = workCoordinatorFactory.get(coordinateWorkHttpClient, 5L, nodeInstanceName, systemUTC, (v1) -> {
                r5.set(v1);
            });
            try {
                LeaseExpireTrigger leaseExpireTrigger = new LeaseExpireTrigger(str2 -> {
                    Duration duration = args.initialLeaseDuration;
                    Runnable runnable = () -> {
                        Optional.ofNullable((Runnable) atomicReference3.get()).ifPresent((v0) -> {
                            v0.run();
                        });
                    };
                    RootWorkCoordinationContext workCoordinationContext = makeRootContext.getWorkCoordinationContext();
                    Objects.requireNonNull(workCoordinationContext);
                    exitOnLeaseTimeout(atomicReference, openSearchWorkCoordinator, str2, atomicReference2, workItemTimeProvider, duration, runnable, atomicBoolean, workCoordinationContext::createSuccessorWorkItemsContext);
                }, Clock.systemUTC());
                try {
                    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                        Thread.currentThread().setName("Cleanup-Hook-Thread");
                        log.atWarn().setMessage("Received shutdown signal. Trying to mark progress and shutdown cleanly.").log();
                        try {
                            try {
                                RootWorkCoordinationContext workCoordinationContext = makeRootContext.getWorkCoordinationContext();
                                Objects.requireNonNull(workCoordinationContext);
                                executeCleanShutdownProcess(atomicReference, atomicReference2, openSearchWorkCoordinator, atomicBoolean, workCoordinationContext::createSuccessorWorkItemsContext);
                                log.atInfo().setMessage("Clean shutdown completed.").log();
                                LogManager.shutdown();
                            } catch (InterruptedException e) {
                                log.atError().setMessage("Clean exit process was interrupted: {}").addArgument(e).log();
                                Thread.currentThread().interrupt();
                                LogManager.shutdown();
                            } catch (Exception e2) {
                                log.atError().setMessage("Could not complete clean exit process: {}").addArgument(e2).log();
                                LogManager.shutdown();
                            }
                        } catch (Throwable th) {
                            LogManager.shutdown();
                            throw th;
                        }
                    }));
                    MDC.put(LOGGING_MDC_WORKER_ID, nodeInstanceName);
                    DocumentReindexer documentReindexer = new DocumentReindexer(determineVersionAndCreate, args.numDocsPerBulkRequest, args.numBytesPerBulkRequest, args.maxConnections, supplier);
                    S3Repo create = path2 == null ? S3Repo.create(Paths.get(args.s3LocalDir, new String[0]), new S3Uri(args.s3RepoUri), args.s3Region) : new FileSystemRepo(path2);
                    DefaultSourceRepoAccessor defaultSourceRepoAccessor = new DefaultSourceRepoAccessor(create);
                    ClusterSnapshotReader snapshotReader = ClusterProviderRegistry.getSnapshotReader(args.sourceVersion, create);
                    run(new LuceneIndexReader.Factory(snapshotReader), documentReindexer, atomicReference2, openSearchWorkCoordinator, args.initialLeaseDuration, leaseExpireTrigger, snapshotReader.getIndexMetadata(), args.snapshotName, args.indexAllowlist, snapshotReader.getShardMetadata(), new SnapshotShardUnpacker.Factory(defaultSourceRepoAccessor, path, snapshotReader.getBufferSizeInBytes()), args.maxShardSizeBytes, makeRootContext, atomicReference3, workItemTimeProvider);
                    atomicBoolean.set(true);
                    leaseExpireTrigger.close();
                    if (openSearchWorkCoordinator != null) {
                        openSearchWorkCoordinator.close();
                    }
                } catch (Throwable th) {
                    try {
                        leaseExpireTrigger.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (NoWorkLeftException e) {
            log.atWarn().setMessage("No work left to acquire.  Exiting with error code to signal that.").log();
            atomicBoolean.set(true);
            System.exit(3);
        } catch (Exception e2) {
            log.atError().setCause(e2).setMessage("Unexpected error running RfsWorker").log();
            throw e2;
        }
    }

    private static void executeCleanShutdownProcess(AtomicReference<IWorkCoordinator.WorkItemAndDuration> atomicReference, AtomicReference<WorkItemCursor> atomicReference2, IWorkCoordinator iWorkCoordinator, AtomicBoolean atomicBoolean, Supplier<IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext> supplier) throws IOException, InterruptedException {
        if (atomicBoolean.get()) {
            log.atInfo().setMessage("Clean shutdown already completed").log();
            return;
        }
        if (atomicReference.get() == null || atomicReference2.get() == null) {
            log.atInfo().setMessage("No work item or progress cursor found. This may indicate that the task is exiting too early to have progress to mark.").log();
            return;
        }
        IWorkCoordinator.WorkItemAndDuration workItemAndDuration = atomicReference.get();
        log.atInfo().setMessage("Marking progress: " + workItemAndDuration.getWorkItem().toString() + ", at doc " + atomicReference2.get().getDocId()).log();
        iWorkCoordinator.createSuccessorWorkItemsAndMarkComplete(workItemAndDuration.getWorkItem().toString(), getSuccessorWorkItemIds(workItemAndDuration, atomicReference2.get()), 1, supplier);
        atomicBoolean.set(true);
    }

    private static void exitOnLeaseTimeout(AtomicReference<IWorkCoordinator.WorkItemAndDuration> atomicReference, IWorkCoordinator iWorkCoordinator, String str, AtomicReference<WorkItemCursor> atomicReference2, WorkItemTimeProvider workItemTimeProvider, Duration duration, Runnable runnable, AtomicBoolean atomicBoolean, Supplier<IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext> supplier) {
        log.atWarn().setMessage("Terminating RfsMigrateDocuments because the lease has expired for {}").addArgument(str).log();
        if (atomicReference2.get() != null) {
            log.atWarn().setMessage("Progress cursor set, cancelling active doc migration").log();
            runnable.run();
            WorkItemCursor workItemCursor = atomicReference2.get();
            log.atWarn().setMessage("Progress cursor: {}").addArgument(workItemCursor).log();
            IWorkCoordinator.WorkItemAndDuration workItemAndDuration = atomicReference.get();
            if (workItemAndDuration == null) {
                throw new IllegalStateException("Unexpected state with progressCursor set without awork item");
            }
            log.atWarn().setMessage("Work Item and Duration: {}").addArgument(workItemAndDuration).log();
            log.atWarn().setMessage("Work Item: {}").addArgument(workItemAndDuration.getWorkItem()).log();
            List<String> successorWorkItemIds = getSuccessorWorkItemIds(workItemAndDuration, workItemCursor);
            log.atWarn().setMessage("Successor Work Ids: {}").addArgument(String.join(", ", successorWorkItemIds)).log();
            iWorkCoordinator.createSuccessorWorkItemsAndMarkComplete(str, successorWorkItemIds, getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, duration, workItemAndDuration.getLeaseExpirationTime()), supplier);
        } else {
            log.atWarn().setMessage("No progress cursor to create successor work items from. This can happen whendownloading and unpacking shard takes longer than the lease").log();
            log.atWarn().setMessage("Skipping creation of successor work item to retry the existing one with more time").log();
        }
        atomicBoolean.set(true);
        System.exit(2);
    }

    public static int getSuccessorNextAcquisitionLeaseExponent(WorkItemTimeProvider workItemTimeProvider, Duration duration, Instant instant) {
        if (workItemTimeProvider.getLeaseAcquisitionTimeRef().get() == null || workItemTimeProvider.getDocumentMigraionStartTimeRef().get() == null) {
            throw new IllegalStateException("Unexpected state with either leaseAquisitionTime ordocumentMigrationStartTime as null while creating successor work item");
        }
        Instant instant2 = (Instant) workItemTimeProvider.getLeaseAcquisitionTimeRef().get();
        Instant instant3 = (Instant) workItemTimeProvider.getDocumentMigraionStartTimeRef().get();
        Duration between = Duration.between(instant2, instant);
        double millis = between.toMillis() / duration.toMillis();
        long max = Math.max(Math.round(Math.log(millis) / Math.log(2.0d)), 0L);
        Duration between2 = Duration.between(instant2, instant3);
        double millis2 = between2.toMillis() / between.toMillis();
        int i = (int) max;
        if (millis2 < DECREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD && i > 0) {
            log.atInfo().setMessage("Shard setup took {}% of lease time which is less than target lower threshold of {}%.Decreasing successor lease duration exponent.").addArgument(String.format("%.2f", Double.valueOf(millis2 * 100.0d))).addArgument(String.format("%.2f", Double.valueOf(2.5d))).log();
            i--;
        } else if (millis2 > INCREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD) {
            log.atInfo().setMessage("Shard setup took {}% of lease time which is more than target upper threshold of {}%.Increasing successor lease duration exponent.").addArgument(String.format("%.2f", Double.valueOf(millis2 * 100.0d))).addArgument(String.format("%.2f", Double.valueOf(10.0d))).log();
            i++;
        }
        log.atDebug().setMessage("SuccessorNextAcquisitionLeaseExponent calculated values:\nleaseAcquisitionTime:{}\ndocumentMigrationStartTime:{}\nleaseDuration:{}\nleaseDurationFactor:{}\nexistingNextAcquisitionLeaseExponent:{}\nshardSetupDuration:{}\nshardSetupDurationFactor:{}\nsuccessorShardNextAcquisitionLeaseExponent:{}").addArgument(instant2).addArgument(instant3).addArgument(between).addArgument(Double.valueOf(millis)).addArgument(Long.valueOf(max)).addArgument(between2).addArgument(Double.valueOf(millis2)).addArgument(Integer.valueOf(i)).log();
        return i;
    }

    private static List<String> getSuccessorWorkItemIds(IWorkCoordinator.WorkItemAndDuration workItemAndDuration, WorkItemCursor workItemCursor) {
        if (workItemAndDuration == null) {
            throw new IllegalStateException("Unexpected worker coordination state. Expected workItem set when progressCursor not null.");
        }
        IWorkCoordinator.WorkItemAndDuration.WorkItem workItem = workItemAndDuration.getWorkItem();
        IWorkCoordinator.WorkItemAndDuration.WorkItem workItem2 = new IWorkCoordinator.WorkItemAndDuration.WorkItem(workItem.getIndexName(), workItem.getShardNumber(), Integer.valueOf(workItemCursor.getDocId()));
        ArrayList arrayList = new ArrayList();
        arrayList.add(workItem2.toString());
        return arrayList;
    }

    private static RootDocumentMigrationContext makeRootContext(Args args, String str) {
        return new RootDocumentMigrationContext(RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(args.otelCollectorEndpoint, "documentMigration", str), new CompositeContextTracker(new IContextTracker[]{new ActiveContextTracker(), new ActiveContextTrackerByActivityType()}));
    }

    public static DocumentsRunner.CompletionStatus run(LuceneIndexReader.Factory factory, DocumentReindexer documentReindexer, AtomicReference<WorkItemCursor> atomicReference, IWorkCoordinator iWorkCoordinator, Duration duration, LeaseExpireTrigger leaseExpireTrigger, IndexMetadata.Factory factory2, String str, List<String> list, ShardMetadata.Factory factory3, SnapshotShardUnpacker.Factory factory4, long j, RootDocumentMigrationContext rootDocumentMigrationContext, AtomicReference<Runnable> atomicReference2, WorkItemTimeProvider workItemTimeProvider) throws IOException, InterruptedException, NoWorkLeftException {
        ScopedWorkCoordinator scopedWorkCoordinator = new ScopedWorkCoordinator(iWorkCoordinator, leaseExpireTrigger);
        confirmShardPrepIsComplete(factory2, str, list, scopedWorkCoordinator, rootDocumentMigrationContext);
        RootWorkCoordinationContext workCoordinationContext = rootDocumentMigrationContext.getWorkCoordinationContext();
        Objects.requireNonNull(workCoordinationContext);
        if (!iWorkCoordinator.workItemsNotYetComplete(workCoordinationContext::createItemsPendingContext)) {
            throw new NoWorkLeftException("No work items are pending/all work items have been processed.  Returning.");
        }
        BiFunction biFunction = (str2, num) -> {
            ShardMetadata fromRepo = factory3.fromRepo(str, str2, num.intValue());
            log.info("Shard size: " + fromRepo.getTotalSizeBytes());
            if (fromRepo.getTotalSizeBytes() > j) {
                throw new DocumentsRunner.ShardTooLargeException(fromRepo.getTotalSizeBytes(), j);
            }
            return fromRepo;
        };
        Objects.requireNonNull(atomicReference);
        Consumer consumer = (v1) -> {
            r8.set(v1);
        };
        Objects.requireNonNull(atomicReference2);
        DocumentsRunner documentsRunner = new DocumentsRunner(scopedWorkCoordinator, duration, documentReindexer, factory4, biFunction, factory, consumer, (v1) -> {
            r9.set(v1);
        }, workItemTimeProvider);
        Objects.requireNonNull(rootDocumentMigrationContext);
        return documentsRunner.migrateNextShard(rootDocumentMigrationContext::createReindexContext);
    }

    private static void confirmShardPrepIsComplete(IndexMetadata.Factory factory, String str, List<String> list, ScopedWorkCoordinator scopedWorkCoordinator, RootDocumentMigrationContext rootDocumentMigrationContext) throws IOException, InterruptedException {
        long j = 1000;
        int i = 0;
        while (true) {
            try {
                new ShardWorkPreparer().run(scopedWorkCoordinator, factory, str, list, rootDocumentMigrationContext);
                return;
            } catch (IWorkCoordinator.LeaseLockHeldElsewhereException e) {
                log.atInfo().setMessage("After {} another process holds the lock for setting up the shard work items.  Waiting {} ms before trying again.").addArgument(Integer.valueOf(i)).addArgument(Long.valueOf(j)).log();
                Thread.sleep(j);
                j *= 2;
                i++;
            }
        }
    }
}
