package io.datarouter.conveyor.queue;

import io.datarouter.bytes.Codec;
import io.datarouter.conveyor.BaseConveyor;
import io.datarouter.conveyor.ConveyorCounters;
import io.datarouter.storage.queue.BlobQueueMessageDto;
import io.datarouter.web.exception.ExceptionRecorder;
import java.time.Duration;
import java.util.Optional;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/conveyor/queue/BaseBlobQueueConsumerConveyor.class */
public abstract class BaseBlobQueueConsumerConveyor<T> extends BaseConveyor {
    private static final Logger logger = LoggerFactory.getLogger(BaseBlobQueueConsumerConveyor.class);
    private static final Duration PEEK_TIMEOUT = Duration.ofSeconds(30);
    private static final Duration VISIBILITY_TIMEOUT = Duration.ofSeconds(30);
    private final BlobQueueConsumer queueConsumer;
    private final Codec<T, byte[]> codec;

    protected BaseBlobQueueConsumerConveyor(String str, Supplier<Boolean> supplier, ExceptionRecorder exceptionRecorder, BlobQueueConsumer blobQueueConsumer, Codec<T, byte[]> codec) {
        super(str, supplier, () -> {
            return false;
        }, exceptionRecorder);
        this.queueConsumer = blobQueueConsumer;
        this.codec = codec;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.datarouter.conveyor.BaseConveyor
    public BaseConveyor.ProcessBatchResult processBatch() {
        Duration visibilityTimeout = getVisibilityTimeout();
        Optional<BlobQueueMessageDto> peek = this.queueConsumer.peek(PEEK_TIMEOUT, visibilityTimeout);
        if (peek.isEmpty()) {
            logger.info("peeked conveyor={} nullMessage", this.name);
            return new BaseConveyor.ProcessBatchResult(false);
        }
        BlobQueueMessageDto blobQueueMessageDto = peek.get();
        logger.info("peeked conveyor={} messageCount={}", this.name, 1);
        Object decode = this.codec.decode(blobQueueMessageDto.getData());
        long currentTimeMillis = System.currentTimeMillis();
        try {
            if (!processOneShouldAck(decode)) {
                return new BaseConveyor.ProcessBatchResult(true);
            }
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            if (currentTimeMillis2 > visibilityTimeout.toMillis()) {
                logger.warn("slow conveyor conveyor={} durationMs={}", this.name, Long.valueOf(currentTimeMillis2));
            }
            logger.info("consumed conveyor={} messageCount={}", this.name, 1);
            ConveyorCounters.incConsumedOpAndDatabeans(this, 1L);
            this.queueConsumer.ack(blobQueueMessageDto);
            logger.info("acked conveyor={} messageCount={}", this.name, 1);
            ConveyorCounters.incAck(this);
            return new BaseConveyor.ProcessBatchResult(true);
        } catch (Exception e) {
            throw new RuntimeException("failed to process message", e);
        }
    }

    protected Duration getVisibilityTimeout() {
        return VISIBILITY_TIMEOUT;
    }

    protected boolean processOneShouldAck(T t) {
        processOne(t);
        return true;
    }

    protected void processOne(T t) {
        throw new UnsupportedOperationException();
    }
}
