package io.datarouter.conveyor.trace.conveyor;

import io.datarouter.conveyor.Conveyor;
import io.datarouter.conveyor.ConveyorConfiguration;
import io.datarouter.conveyor.ConveyorCounters;
import io.datarouter.conveyor.ConveyorGauges;
import io.datarouter.conveyor.ConveyorRunnable;
import io.datarouter.conveyor.trace.ConveyorTraceBuffer;
import io.datarouter.instrumentation.exception.DatarouterExceptionPublisher;
import io.datarouter.instrumentation.trace.Trace2BatchedBundleDto;
import io.datarouter.instrumentation.trace.TracePublisher;
import io.datarouter.instrumentation.trace.TracerTool;
import io.datarouter.scanner.OptionalScanner;
import io.datarouter.scanner.Scanner;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.time.Duration;
import java.time.Instant;
import java.util.List;

@Singleton
/* loaded from: input_file:io/datarouter/conveyor/trace/conveyor/ConveyorTraceMemoryToPublisherConveyorConfiguration.class */
public class ConveyorTraceMemoryToPublisherConveyorConfiguration implements ConveyorConfiguration {
    private static final int BATCH_SIZE = 500;

    @Inject
    private ConveyorTraceBuffer traceBuffer;

    @Inject
    private TracePublisher tracePublisher;

    @Inject
    private DatarouterExceptionPublisher exceptionPublisher;

    @Inject
    private ConveyorGauges gaugeRecorder;

    @Override // io.datarouter.conveyor.ConveyorConfiguration
    public Conveyor.ProcessResult process(ConveyorRunnable conveyorRunnable) {
        Instant now = Instant.now();
        List pollMultiWithLimit = this.traceBuffer.buffer.pollMultiWithLimit(BATCH_SIZE);
        this.gaugeRecorder.savePeekDurationMs(conveyorRunnable, Duration.between(now, Instant.now()).toMillis());
        TracerTool.setAlternativeStartTime();
        if (pollMultiWithLimit.isEmpty()) {
            return new Conveyor.ProcessResult(false);
        }
        Instant now2 = Instant.now();
        Scanner.of(pollMultiWithLimit).map((v0) -> {
            return v0.traceBundleDto();
        }).flush(list -> {
            this.tracePublisher.addBatch(new Trace2BatchedBundleDto(list));
        });
        Scanner concat = Scanner.of(pollMultiWithLimit).map((v0) -> {
            return v0.taskExecutorRecord();
        }).concat(OptionalScanner::of);
        DatarouterExceptionPublisher datarouterExceptionPublisher = this.exceptionPublisher;
        datarouterExceptionPublisher.getClass();
        concat.flush(datarouterExceptionPublisher::addTaskExecutorRecord);
        ConveyorCounters.incConsumedOpAndDatabeans(conveyorRunnable, Duration.between(now2, Instant.now()).toMillis());
        this.gaugeRecorder.saveProcessBufferDurationMs(conveyorRunnable, 500L);
        return new Conveyor.ProcessResult(true);
    }

    @Override // io.datarouter.conveyor.ConveyorConfiguration
    public boolean shouldRunOnShutdown() {
        return true;
    }
}
