package org.opensearch.migrations.bulkload.common;

import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Generated;
import org.opensearch.migrations.bulkload.worker.WorkItemCursor;
import org.opensearch.migrations.reindexer.tracing.IDocumentMigrationContexts;
import org.opensearch.migrations.transform.IJsonTransformer;
import org.opensearch.migrations.transform.NoopTransformerProvider;
import org.opensearch.migrations.transform.ThreadSafeTransformerWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.spi.LoggingEventBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/opensearch/migrations/bulkload/common/DocumentReindexer.class */
public class DocumentReindexer {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(DocumentReindexer.class);
    private static final Supplier<IJsonTransformer> NOOP_TRANSFORMER_SUPPLIER = () -> {
        return new NoopTransformerProvider().createTransformer((Object) null);
    };
    protected final OpenSearchClient client;
    private final int maxDocsPerBulkRequest;
    private final long maxBytesPerBulkRequest;
    private final int maxConcurrentWorkItems;
    private final ThreadSafeTransformerWrapper threadSafeTransformer;
    private final boolean isNoopTransformer;

    public DocumentReindexer(OpenSearchClient openSearchClient, int i, long j, int i2, Supplier<IJsonTransformer> supplier) {
        this.client = openSearchClient;
        this.maxDocsPerBulkRequest = i;
        this.maxBytesPerBulkRequest = j;
        this.maxConcurrentWorkItems = i2;
        this.isNoopTransformer = supplier == null;
        this.threadSafeTransformer = new ThreadSafeTransformerWrapper(this.isNoopTransformer ? NOOP_TRANSFORMER_SUPPLIER : supplier);
    }

    public Flux<WorkItemCursor> reindex(String str, Flux<RfsLuceneDocument> flux, IDocumentMigrationContexts.IDocumentReindexContext iDocumentReindexContext) {
        AtomicInteger atomicInteger = new AtomicInteger();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), runnable -> {
            return new Thread(() -> {
                try {
                    runnable.run();
                } finally {
                    this.threadSafeTransformer.close();
                }
            }, "DocumentBulkAggregator-" + atomicInteger.incrementAndGet());
        });
        Scheduler fromExecutor = Schedulers.fromExecutor(newFixedThreadPool);
        return reindexDocsInParallelBatches(flux.publishOn(fromExecutor, 1).buffer(Math.min(100, this.maxDocsPerBulkRequest)).concatMapIterable(list -> {
            return transformDocumentBatch(this.threadSafeTransformer, list, str);
        }), str, iDocumentReindexContext).doFinally(signalType -> {
            fromExecutor.dispose();
            newFixedThreadPool.shutdown();
        });
    }

    Flux<WorkItemCursor> reindexDocsInParallelBatches(Flux<RfsDocument> flux, String str, IDocumentMigrationContexts.IDocumentReindexContext iDocumentReindexContext) {
        Scheduler newParallel = Schedulers.newParallel("DocumentBatchReindexer");
        return batchDocsBySizeOrCount(flux).limitRate(50, 1).publishOn(newParallel, 1).flatMapSequential(list -> {
            return sendBulkRequest(UUID.randomUUID(), list, str, iDocumentReindexContext, newParallel);
        }, this.maxConcurrentWorkItems).doFinally(signalType -> {
            newParallel.dispose();
        });
    }

    List<RfsDocument> transformDocumentBatch(IJsonTransformer iJsonTransformer, List<RfsLuceneDocument> list, String str) {
        List<RfsDocument> list2 = (List) list.stream().map(rfsLuceneDocument -> {
            return RfsDocument.fromLuceneDocument(rfsLuceneDocument, str);
        }).collect(Collectors.toList());
        return !this.isNoopTransformer ? RfsDocument.transform(iJsonTransformer, list2) : list2;
    }

    Mono<WorkItemCursor> sendBulkRequest(UUID uuid, List<RfsDocument> list, String str, IDocumentMigrationContexts.IDocumentReindexContext iDocumentReindexContext, Scheduler scheduler) {
        RfsDocument rfsDocument = list.get(list.size() - 1);
        log.atInfo().setMessage("Last doc is: Source Index " + str + " Lucene Doc Number " + rfsDocument.progressCheckpointNum).log();
        return this.client.sendBulkRequest(str, (List) list.stream().map(rfsDocument2 -> {
            return rfsDocument2.document;
        }).collect(Collectors.toList()), iDocumentReindexContext.createBulkRequest()).doFirst(() -> {
            LoggingEventBuilder addArgument = log.atInfo().setMessage("Batch Id:{}, {} documents in current bulk request.").addArgument(uuid);
            Objects.requireNonNull(list);
            addArgument.addArgument(list::size).log();
        }).doOnSuccess(bulkResponse -> {
            log.atDebug().setMessage("Batch Id:{}, succeeded").addArgument(uuid).log();
        }).doOnError(th -> {
            LoggingEventBuilder addArgument = log.atError().setMessage("Batch Id:{}, failed {}").addArgument(uuid);
            Objects.requireNonNull(th);
            addArgument.addArgument(th::getMessage).log();
        }).onErrorResume(th2 -> {
            return Mono.empty();
        }).then(Mono.just(new WorkItemCursor(rfsDocument.progressCheckpointNum)).subscribeOn(scheduler));
    }

    Flux<List<RfsDocument>> batchDocsBySizeOrCount(Flux<RfsDocument> flux) {
        return flux.bufferUntil(new Predicate<RfsDocument>() { // from class: org.opensearch.migrations.bulkload.common.DocumentReindexer.1
            private int currentItemCount = 0;
            private long currentSize = 0;

            @Override // java.util.function.Predicate
            public boolean test(RfsDocument rfsDocument) {
                long serializedLength = rfsDocument.document.getSerializedLength() + 1;
                this.currentSize += serializedLength;
                this.currentItemCount++;
                if (this.currentItemCount <= DocumentReindexer.this.maxDocsPerBulkRequest && this.currentSize <= DocumentReindexer.this.maxBytesPerBulkRequest) {
                    return false;
                }
                this.currentItemCount = 1;
                this.currentSize = serializedLength;
                return true;
            }
        }, true);
    }
}
