package io.datarouter.conveyor.queue.configuration;

import io.datarouter.conveyor.Conveyor;
import io.datarouter.conveyor.ConveyorConfiguration;
import io.datarouter.conveyor.ConveyorCounters;
import io.datarouter.conveyor.ConveyorGauges;
import io.datarouter.conveyor.ConveyorRunnable;
import io.datarouter.conveyor.queue.BatchedQueueConsumer;
import io.datarouter.model.databean.Databean;
import io.datarouter.model.key.primary.PrimaryKey;
import io.datarouter.scanner.Scanner;
import io.datarouter.storage.queue.QueueMessage;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/conveyor/queue/configuration/BaseBatchedLossyQueueConsumerConveyorConfiguration.class */
public abstract class BaseBatchedLossyQueueConsumerConveyorConfiguration<PK extends PrimaryKey<PK>, D extends Databean<PK, D>> implements ConveyorConfiguration {
    private static final Logger logger = LoggerFactory.getLogger(BaseBatchedLossyQueueConsumerConveyorConfiguration.class);
    private final Object lock = new Object();
    private final List<QueueMessage<PK, D>> buffer = new LinkedList();

    @Inject
    private ConveyorGauges gaugeRecorder;

    protected abstract void processBuffer(List<D> list);

    protected abstract BatchedQueueConsumer<PK, D> getQueueConsumer();

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v21, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v22, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v30 */
    /* JADX WARN: Type inference failed for: r0v39, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v40, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v43 */
    @Override // io.datarouter.conveyor.ConveyorConfiguration
    public Conveyor.ProcessResult process(ConveyorRunnable conveyorRunnable) {
        List<QueueMessage<PK, D>> emptyList = Collections.emptyList();
        Instant now = Instant.now();
        List<QueueMessage<PK, D>> peekMulti = getQueueConsumer().peekMulti(Integer.valueOf(getMaxQuerySize()), getPeekTimeout(), DEFAULT_VISIBILITY_TIMEOUT);
        Instant now2 = Instant.now();
        this.gaugeRecorder.savePeekDurationMs(conveyorRunnable, Duration.between(now, now2).toMillis());
        if (peekMulti.isEmpty()) {
            logger.info("peeked conveyor={} nullMessage", conveyorRunnable.getName());
            ?? r0 = this.lock;
            synchronized (r0) {
                List<QueueMessage<PK, D>> copyAndClearBuffer = copyAndClearBuffer();
                r0 = r0;
                flushBuffer(copyAndClearBuffer, Optional.of(now2), conveyorRunnable);
                return new Conveyor.ProcessResult(false);
            }
        }
        Instant now3 = Instant.now();
        Scanner.of(peekMulti).map((v0) -> {
            return v0.getKey();
        }).flush(list -> {
            getQueueConsumer().ackMulti(Integer.valueOf(list.size()), list);
        });
        this.gaugeRecorder.saveAckDurationMs(conveyorRunnable, Duration.between(now3, Instant.now()).toMillis());
        logger.info("acked conveyor={} messageCount={}", conveyorRunnable.getName(), Integer.valueOf(peekMulti.size()));
        ConveyorCounters.incAck(conveyorRunnable);
        ?? r02 = this.lock;
        synchronized (r02) {
            logger.info("peeked conveyor={} messageCount={}", conveyorRunnable.getName(), Integer.valueOf(peekMulti.size()));
            this.buffer.addAll(peekMulti);
            if (this.buffer.size() >= getBatchSize()) {
                emptyList = copyAndClearBuffer();
            }
            r02 = r02;
            logger.info("consumed conveyor={} messageCount={}", conveyorRunnable.getName(), Integer.valueOf(peekMulti.size()));
            ConveyorCounters.incConsumedOpAndDatabeans(conveyorRunnable, peekMulti.size());
            flushBuffer(emptyList, Optional.of(now2), conveyorRunnable);
            return new Conveyor.ProcessResult(true);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    @Override // io.datarouter.conveyor.ConveyorConfiguration
    public void interrupted(ConveyorRunnable conveyorRunnable) {
        ?? r0 = this.lock;
        synchronized (r0) {
            List<QueueMessage<PK, D>> copyAndClearBuffer = copyAndClearBuffer();
            r0 = r0;
            flushBuffer(copyAndClearBuffer, Optional.empty(), conveyorRunnable);
        }
    }

    private void flushBuffer(List<QueueMessage<PK, D>> list, Optional<Instant> optional, ConveyorRunnable conveyorRunnable) {
        if (list.isEmpty()) {
            return;
        }
        Instant now = Instant.now();
        optional.ifPresent(instant -> {
            this.gaugeRecorder.savePeekToProcessBufferDurationMs(conveyorRunnable, Duration.between(instant, now).toMillis());
        });
        Scanner.of(list).map((v0) -> {
            return v0.getDatabean();
        }).flush(this::processBuffer);
        this.gaugeRecorder.saveProcessBufferDurationMs(conveyorRunnable, Duration.between(now, Instant.now()).toMillis());
        ConveyorCounters.incFlushBuffer(conveyorRunnable, list.size());
    }

    private List<QueueMessage<PK, D>> copyAndClearBuffer() {
        ArrayList arrayList = new ArrayList(this.buffer);
        this.buffer.clear();
        return arrayList;
    }

    protected int getMaxQuerySize() {
        return 1;
    }

    protected Duration getPeekTimeout() {
        return DEFAULT_PEEK_TIMEOUT;
    }

    protected int getBatchSize() {
        return 100;
    }
}
