package io.datarouter.conveyor.queue.configuration;

import io.datarouter.conveyor.Conveyor;
import io.datarouter.conveyor.ConveyorConfiguration;
import io.datarouter.conveyor.ConveyorCounters;
import io.datarouter.conveyor.ConveyorGauges;
import io.datarouter.conveyor.ConveyorRunnable;
import io.datarouter.instrumentation.trace.TracerTool;
import io.datarouter.model.databean.Databean;
import io.datarouter.model.key.primary.PrimaryKey;
import io.datarouter.scanner.Scanner;
import io.datarouter.storage.queue.QueueMessage;
import io.datarouter.storage.queue.QueueMessageKey;
import io.datarouter.storage.queue.consumer.BatchedAckQueueConsumer;
import jakarta.inject.Inject;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/conveyor/queue/configuration/BaseBatchedQueueConsumerConveyorConfiguration.class */
public abstract class BaseBatchedQueueConsumerConveyorConfiguration<PK extends PrimaryKey<PK>, D extends Databean<PK, D>> implements ConveyorConfiguration {
    private static final Logger logger = LoggerFactory.getLogger(BaseBatchedQueueConsumerConveyorConfiguration.class);
    private static final Duration BATCHED_QUEUE_PEEK_TIMEOUT = Duration.ofSeconds(10);
    private static final int BATCH_SIZE = 100;
    private final Object lock = new Object();
    private final List<MessageAndTime<PK, D>> buffer = new ArrayList(BATCH_SIZE);

    @Inject
    private ConveyorGauges gaugeRecorder;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/datarouter/conveyor/queue/configuration/BaseBatchedQueueConsumerConveyorConfiguration$MessageAndTime.class */
    public static final class MessageAndTime<PK extends PrimaryKey<PK>, D extends Databean<PK, D>> extends Record {
        private final QueueMessageKey queueMessageKey;
        private final D message;
        private final long peekTime;

        private MessageAndTime(QueueMessageKey queueMessageKey, D d, long j) {
            this.queueMessageKey = queueMessageKey;
            this.message = d;
            this.peekTime = j;
        }

        public QueueMessageKey queueMessageKey() {
            return this.queueMessageKey;
        }

        public D message() {
            return this.message;
        }

        public long peekTime() {
            return this.peekTime;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, MessageAndTime.class), MessageAndTime.class, "queueMessageKey;message;peekTime", "FIELD:Lio/datarouter/conveyor/queue/configuration/BaseBatchedQueueConsumerConveyorConfiguration$MessageAndTime;->queueMessageKey:Lio/datarouter/storage/queue/QueueMessageKey;", "FIELD:Lio/datarouter/conveyor/queue/configuration/BaseBatchedQueueConsumerConveyorConfiguration$MessageAndTime;->message:Lio/datarouter/model/databean/Databean;", "FIELD:Lio/datarouter/conveyor/queue/configuration/BaseBatchedQueueConsumerConveyorConfiguration$MessageAndTime;->peekTime:J").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, MessageAndTime.class), MessageAndTime.class, "queueMessageKey;message;peekTime", "FIELD:Lio/datarouter/conveyor/queue/configuration/BaseBatchedQueueConsumerConveyorConfiguration$MessageAndTime;->queueMessageKey:Lio/datarouter/storage/queue/QueueMessageKey;", "FIELD:Lio/datarouter/conveyor/queue/configuration/BaseBatchedQueueConsumerConveyorConfiguration$MessageAndTime;->message:Lio/datarouter/model/databean/Databean;", "FIELD:Lio/datarouter/conveyor/queue/configuration/BaseBatchedQueueConsumerConveyorConfiguration$MessageAndTime;->peekTime:J").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, MessageAndTime.class, Object.class), MessageAndTime.class, "queueMessageKey;message;peekTime", "FIELD:Lio/datarouter/conveyor/queue/configuration/BaseBatchedQueueConsumerConveyorConfiguration$MessageAndTime;->queueMessageKey:Lio/datarouter/storage/queue/QueueMessageKey;", "FIELD:Lio/datarouter/conveyor/queue/configuration/BaseBatchedQueueConsumerConveyorConfiguration$MessageAndTime;->message:Lio/datarouter/model/databean/Databean;", "FIELD:Lio/datarouter/conveyor/queue/configuration/BaseBatchedQueueConsumerConveyorConfiguration$MessageAndTime;->peekTime:J").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    protected abstract void processBuffer(List<D> list);

    protected abstract BatchedAckQueueConsumer<PK, D> getQueueConsumer();

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v10, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v11, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v19 */
    /* JADX WARN: Type inference failed for: r0v26, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v27, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v30 */
    @Override // io.datarouter.conveyor.ConveyorConfiguration
    public Conveyor.ProcessResult process(ConveyorRunnable conveyorRunnable) {
        List<MessageAndTime<PK, D>> of = List.of();
        Instant now = Instant.now();
        QueueMessage peek = getQueueConsumer().peek(BATCHED_QUEUE_PEEK_TIMEOUT, DEFAULT_VISIBILITY_TIMEOUT);
        Instant now2 = Instant.now();
        this.gaugeRecorder.savePeekDurationMs(conveyorRunnable, Duration.between(now, now2).toMillis());
        TracerTool.setAlternativeStartTime();
        if (peek == null) {
            logger.info("peeked conveyor={} nullMessage", conveyorRunnable.getName());
            ?? r0 = this.lock;
            synchronized (r0) {
                List<MessageAndTime<PK, D>> copyAndClearBuffer = copyAndClearBuffer();
                r0 = r0;
                flushBuffer(copyAndClearBuffer, Optional.of(now2), conveyorRunnable);
                return new Conveyor.ProcessResult(false);
            }
        }
        ?? r02 = this.lock;
        synchronized (r02) {
            logger.info("peeked conveyor={} messageCount={}", conveyorRunnable.getName(), 1);
            this.buffer.add(new MessageAndTime<>(peek.getKey(), peek.getDatabean(), System.currentTimeMillis()));
            if (this.buffer.size() >= BATCH_SIZE) {
                of = copyAndClearBuffer();
            }
            r02 = r02;
            flushBuffer(of, Optional.of(now2), conveyorRunnable);
            return new Conveyor.ProcessResult(true);
        }
    }

    private void flushBuffer(List<MessageAndTime<PK, D>> list, Optional<Instant> optional, ConveyorRunnable conveyorRunnable) {
        if (list.isEmpty()) {
            return;
        }
        Instant now = Instant.now();
        optional.ifPresent(instant -> {
            this.gaugeRecorder.savePeekToProcessBufferDurationMs(conveyorRunnable, Duration.between(instant, now).toMillis());
        });
        Scanner.of(list).map(messageAndTime -> {
            return messageAndTime.message;
        }).flush(this::processBuffer);
        this.gaugeRecorder.saveProcessBufferDurationMs(conveyorRunnable, Duration.between(now, Instant.now()).toMillis());
        ConveyorCounters.incFlushBuffer(conveyorRunnable, list.size());
        logger.info("consumed conveyor={} messageCount={}", conveyorRunnable.getName(), Integer.valueOf(list.size()));
        ConveyorCounters.incConsumedOpAndDatabeans(conveyorRunnable, list.size());
        Scanner.of(list).forEach(messageAndTime2 -> {
            long epochMilli = now.toEpochMilli() - messageAndTime2.peekTime;
            if (epochMilli > DEFAULT_VISIBILITY_TIMEOUT.toMillis()) {
                logger.warn("slow conveyor conveyor={} waitDurationMs={} databean={}", new Object[]{conveyorRunnable.getName(), Long.valueOf(epochMilli), messageAndTime2.message});
            }
        });
        Instant now2 = Instant.now();
        Scanner.of(list).map(messageAndTime3 -> {
            return messageAndTime3.queueMessageKey;
        }).flush(list2 -> {
            getQueueConsumer().ackMulti(Integer.valueOf(list2.size()), list2);
        });
        this.gaugeRecorder.saveAckDurationMs(conveyorRunnable, Duration.between(now2, Instant.now()).toMillis());
        logger.info("acked conveyor={} messageCount={}", conveyorRunnable.getName(), Integer.valueOf(list.size()));
        ConveyorCounters.incAck(conveyorRunnable, list.size());
    }

    private List<MessageAndTime<PK, D>> copyAndClearBuffer() {
        ArrayList arrayList = new ArrayList(this.buffer);
        this.buffer.clear();
        return arrayList;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v3, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v6 */
    @Override // io.datarouter.conveyor.ConveyorConfiguration
    public void interrupted(ConveyorRunnable conveyorRunnable) throws Exception {
        try {
            ?? r0 = this.lock;
            synchronized (r0) {
                List<MessageAndTime<PK, D>> copyAndClearBuffer = copyAndClearBuffer();
                r0 = r0;
                flushBuffer(copyAndClearBuffer, Optional.empty(), conveyorRunnable);
            }
        } catch (Exception e) {
            throw new Exception("Exception processing buffer. bufferSize=" + this.buffer.size() + " bufferMessages=" + Arrays.toString(this.buffer.toArray()), e);
        }
    }
}
