package com.rfs;

import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.ParametersDelegate;
import com.rfs.cms.CoordinateWorkHttpClient;
import com.rfs.cms.IWorkCoordinator;
import com.rfs.cms.LeaseExpireTrigger;
import com.rfs.cms.OpenSearchWorkCoordinator;
import com.rfs.cms.ScopedWorkCoordinator;
import com.rfs.common.DefaultSourceRepoAccessor;
import com.rfs.common.DocumentReindexer;
import com.rfs.common.FileSystemRepo;
import com.rfs.common.LuceneDocumentsReader;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.S3Repo;
import com.rfs.common.S3Uri;
import com.rfs.common.SnapshotShardUnpacker;
import com.rfs.common.TryHandlePhaseFailure;
import com.rfs.common.http.ConnectionContext;
import com.rfs.models.IndexMetadata;
import com.rfs.models.ShardMetadata;
import com.rfs.tracing.RootWorkCoordinationContext;
import com.rfs.version_es_7_10.ElasticsearchConstants_ES_7_10;
import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.ShardMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10;
import com.rfs.worker.DocumentsRunner;
import com.rfs.worker.ShardWorkPreparer;
import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Function;
import org.opensearch.migrations.reindexer.tracing.RootDocumentMigrationContext;
import org.opensearch.migrations.tracing.ActiveContextTracker;
import org.opensearch.migrations.tracing.ActiveContextTrackerByActivityType;
import org.opensearch.migrations.tracing.CompositeContextTracker;
import org.opensearch.migrations.tracing.IContextTracker;
import org.opensearch.migrations.tracing.RootOtelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:com/rfs/RfsMigrateDocuments.class */
public class RfsMigrateDocuments {
    private static final Logger log = LoggerFactory.getLogger(RfsMigrateDocuments.class);
    public static final int PROCESS_TIMED_OUT = 2;
    public static final int TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 5;
    public static final String LOGGING_MDC_WORKER_ID = "workerId";

    /* loaded from: input_file:com/rfs/RfsMigrateDocuments$Args.class */
    public static class Args {

        @Parameter(names = {"--help", "-h"}, help = true, description = "Displays information about how to use this tool")
        private boolean help;

        @Parameter(names = {"--snapshot-name"}, required = true, description = "The name of the snapshot to migrate")
        public String snapshotName;

        @Parameter(names = {"--lucene-dir"}, required = true, description = "The absolute path to the directory where we'll put the Lucene docs")
        public String luceneDir;

        @Parameter(required = false, names = {"--otel-collector-endpoint"}, arity = 1, description = "Endpoint (host:port) for the OpenTelemetry Collector to which metrics logs should beforwarded. If no value is provided, metrics will not be forwarded.")
        String otelCollectorEndpoint;

        @Parameter(names = {"--snapshot-local-dir"}, required = false, description = "The absolute path to the directory on local disk where the snapshot exists.  Use this parameter if have a copy of the snapshot disk.  Mutually exclusive with --s3-local-dir, --s3-repo-uri, and --s3-region.")
        public String snapshotLocalDir = null;

        @Parameter(names = {"--s3-local-dir"}, required = false, description = "The absolute path to the directory on local disk to download S3 files to.  If you supply this, you must also supply --s3-repo-uri and --s3-region.  Mutually exclusive with --snapshot-local-dir.")
        public String s3LocalDir = null;

        @Parameter(names = {"--s3-repo-uri"}, required = false, description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2.  If you supply this, you must also supply --s3-local-dir and --s3-region.  Mutually exclusive with --snapshot-local-dir.")
        public String s3RepoUri = null;

        @Parameter(names = {"--s3-region"}, required = false, description = "The AWS Region the S3 bucket is in, like: us-east-2.  If you supply this, you must also supply --s3-local-dir and --s3-repo-uri.  Mutually exclusive with --snapshot-local-dir.")
        public String s3Region = null;

        @ParametersDelegate
        public ConnectionContext.TargetArgs targetArgs = new ConnectionContext.TargetArgs();

        @Parameter(names = {"--index-allowlist"}, description = "Optional.  List of index names to migrate (e.g. 'logs_2024_01, logs_2024_02').  Default: all non-system indices (e.g. those not starting with '.')", required = false)
        public List<String> indexAllowlist = List.of();

        @Parameter(names = {"--max-shard-size-bytes"}, description = "Optional. The maximum shard size, in bytes, to allow when performing the document migration.  Useful for preventing disk overflow.  Default: 80 * 1024 * 1024 * 1024 (80 GB)", required = false)
        public long maxShardSizeBytes = 85899345920L;

        @Parameter(names = {"--initial-lease-duration"}, description = "Optional. The time that the first attempt to migrate a shard's documents should take.  If a process takes longer than this the process will terminate, allowing another process to attempt the migration, but with double the amount of time than the last time.  Default: PT10M", required = false, converter = DurationConverter.class)
        public Duration initialLeaseDuration = Duration.ofMinutes(10);

        @Parameter(required = false, names = {"--documents-per-bulk-request"}, description = "Optional.  The number of documents to be included within each bulk request sent. Default no max (controlled by documents size)")
        int numDocsPerBulkRequest = Integer.MAX_VALUE;

        @Parameter(required = false, names = {"--documents-size-per-bulk-request"}, description = "Optional. The maximum aggregate document size to be used in bulk requests in bytes. Note does not apply to single document requests. Default 10 MiB")
        long numBytesPerBulkRequest = 10485760;

        @Parameter(required = false, names = {"--max-connections"}, description = "Optional.  The maximum number of connections to simultaneously used to communicate to the target, default 10")
        int maxConnections = 10;
    }

    /* loaded from: input_file:com/rfs/RfsMigrateDocuments$DurationConverter.class */
    public static class DurationConverter implements IStringConverter<Duration> {
        /* renamed from: convert, reason: merged with bridge method [inline-methods] */
        public Duration m1convert(String str) {
            return Duration.parse(str);
        }
    }

    /* loaded from: input_file:com/rfs/RfsMigrateDocuments$NoWorkLeftException.class */
    public static class NoWorkLeftException extends Exception {
        public NoWorkLeftException(String str) {
            super(str);
        }
    }

    public static void validateArgs(Args args) {
        boolean z = args.snapshotLocalDir != null;
        boolean z2 = (args.s3LocalDir == null || args.s3RepoUri == null || args.s3Region == null) ? false : true;
        boolean z3 = (args.s3LocalDir == null && args.s3RepoUri == null && args.s3Region == null) ? false : true;
        if (z && z3) {
            throw new ParameterException("You must provide either --snapshot-local-dir or --s3-local-dir, --s3-repo-uri, and --s3-region, but not both.");
        }
        if (z3 && !z2) {
            throw new ParameterException("If provide the S3 Snapshot args, you must provide all of them (--s3-local-dir, --s3-repo-uri and --s3-region).");
        }
        if (!z && !z2) {
            throw new ParameterException("You must provide either --snapshot-local-dir or --s3-local-dir, --s3-repo-uri, and --s3-region.");
        }
    }

    public static void main(String[] strArr) throws Exception {
        Args args = new Args();
        JCommander build = JCommander.newBuilder().addObject(args).build();
        build.parse(strArr);
        if (args.help) {
            build.usage();
            return;
        }
        validateArgs(args);
        RootDocumentMigrationContext makeRootContext = makeRootContext(args);
        Path path = Paths.get(args.luceneDir, new String[0]);
        Path path2 = args.snapshotLocalDir != null ? Paths.get(args.snapshotLocalDir, new String[0]) : null;
        LeaseExpireTrigger leaseExpireTrigger = new LeaseExpireTrigger(str -> {
            log.error("Terminating RfsMigrateDocuments because the lease has expired for " + str);
            System.exit(2);
        }, Clock.systemUTC());
        try {
            ConnectionContext connectionContext = args.targetArgs.toConnectionContext();
            String uuid = UUID.randomUUID().toString();
            OpenSearchWorkCoordinator openSearchWorkCoordinator = new OpenSearchWorkCoordinator(new CoordinateWorkHttpClient(connectionContext), 5L, uuid);
            MDC.put(LOGGING_MDC_WORKER_ID, uuid);
            TryHandlePhaseFailure.executeWithTryCatch(() -> {
                log.info("Running RfsMigrateDocuments with workerId = " + uuid);
                DocumentReindexer documentReindexer = new DocumentReindexer(new OpenSearchClient(connectionContext), args.numDocsPerBulkRequest, args.numBytesPerBulkRequest, args.maxConnections);
                S3Repo create = path2 == null ? S3Repo.create(Paths.get(args.s3LocalDir, new String[0]), new S3Uri(args.s3RepoUri), args.s3Region) : new FileSystemRepo(path2);
                SnapshotRepoProvider_ES_7_10 snapshotRepoProvider_ES_7_10 = new SnapshotRepoProvider_ES_7_10(create);
                run(LuceneDocumentsReader.getFactory(ElasticsearchConstants_ES_7_10.SOFT_DELETES_POSSIBLE, ElasticsearchConstants_ES_7_10.SOFT_DELETES_FIELD), documentReindexer, openSearchWorkCoordinator, args.initialLeaseDuration, leaseExpireTrigger, new IndexMetadataFactory_ES_7_10(snapshotRepoProvider_ES_7_10), args.snapshotName, args.indexAllowlist, new ShardMetadataFactory_ES_7_10(snapshotRepoProvider_ES_7_10), new SnapshotShardUnpacker.Factory(new DefaultSourceRepoAccessor(create), path, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES), args.maxShardSizeBytes, makeRootContext);
            });
            leaseExpireTrigger.close();
        } catch (Throwable th) {
            try {
                leaseExpireTrigger.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private static RootDocumentMigrationContext makeRootContext(Args args) {
        CompositeContextTracker compositeContextTracker = new CompositeContextTracker(new IContextTracker[]{new ActiveContextTracker(), new ActiveContextTrackerByActivityType()});
        OpenTelemetry initializeOpenTelemetryWithCollectorOrAsNoop = RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(args.otelCollectorEndpoint, "docMigration");
        return new RootDocumentMigrationContext(initializeOpenTelemetryWithCollectorOrAsNoop, compositeContextTracker, new RootWorkCoordinationContext(initializeOpenTelemetryWithCollectorOrAsNoop, compositeContextTracker));
    }

    public static DocumentsRunner.CompletionStatus run(Function<Path, LuceneDocumentsReader> function, DocumentReindexer documentReindexer, IWorkCoordinator iWorkCoordinator, Duration duration, LeaseExpireTrigger leaseExpireTrigger, IndexMetadata.Factory factory, String str, List<String> list, ShardMetadata.Factory factory2, SnapshotShardUnpacker.Factory factory3, long j, RootDocumentMigrationContext rootDocumentMigrationContext) throws IOException, InterruptedException, NoWorkLeftException {
        ScopedWorkCoordinator scopedWorkCoordinator = new ScopedWorkCoordinator(iWorkCoordinator, leaseExpireTrigger);
        confirmShardPrepIsComplete(factory, str, list, scopedWorkCoordinator, rootDocumentMigrationContext);
        RootWorkCoordinationContext workCoordinationContext = rootDocumentMigrationContext.getWorkCoordinationContext();
        Objects.requireNonNull(workCoordinationContext);
        if (!iWorkCoordinator.workItemsArePending(workCoordinationContext::createItemsPendingContext)) {
            throw new NoWorkLeftException("No work items are pending/all work items have been processed.  Returning.");
        }
        DocumentsRunner documentsRunner = new DocumentsRunner(scopedWorkCoordinator, duration, (str2, num) -> {
            ShardMetadata fromRepo = factory2.fromRepo(str, str2, num.intValue());
            log.info("Shard size: " + fromRepo.getTotalSizeBytes());
            if (fromRepo.getTotalSizeBytes() > j) {
                throw new DocumentsRunner.ShardTooLargeException(fromRepo.getTotalSizeBytes(), j);
            }
            return fromRepo;
        }, factory3, function, documentReindexer);
        Objects.requireNonNull(rootDocumentMigrationContext);
        return documentsRunner.migrateNextShard(rootDocumentMigrationContext::createReindexContext);
    }

    private static void confirmShardPrepIsComplete(IndexMetadata.Factory factory, String str, List<String> list, ScopedWorkCoordinator scopedWorkCoordinator, RootDocumentMigrationContext rootDocumentMigrationContext) throws IOException, InterruptedException {
        long j = 1000;
        int i = 0;
        while (true) {
            try {
                new ShardWorkPreparer().run(scopedWorkCoordinator, factory, str, list, rootDocumentMigrationContext);
                return;
            } catch (IWorkCoordinator.LeaseLockHeldElsewhereException e) {
                long j2 = j;
                int i2 = i;
                log.atInfo().setMessage(() -> {
                    return "After " + i2 + "another process holds the lock for setting up the shard work items.  Waiting " + j2 + "ms before trying again.";
                }).log();
                Thread.sleep(j);
                j *= 2;
                i++;
            }
        }
    }
}
