package software.amazon.kinesis.lifecycle;

import java.util.List;
import java.util.ListIterator;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.ThrottlingReporter;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

@KinesisClientInternalApi
/* loaded from: input_file:META-INF/bundled-dependencies/amazon-kinesis-client-2.2.8.jar:software/amazon/kinesis/lifecycle/ProcessTask.class */
public class ProcessTask implements ConsumerTask {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ProcessTask.class);
    private static final String PROCESS_TASK_OPERATION = "ProcessTask";
    private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed";
    private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed";
    private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords";
    private static final String MILLIS_BEHIND_LATEST_METRIC = "MillisBehindLatest";
    private final ShardInfo shardInfo;
    private final ShardRecordProcessor shardRecordProcessor;
    private final ShardRecordProcessorCheckpointer recordProcessorCheckpointer;
    private final TaskType taskType = TaskType.PROCESS;
    private final long backoffTimeMillis;
    private final Shard shard;
    private final ThrottlingReporter throttlingReporter;
    private final boolean shouldCallProcessRecordsEvenForEmptyRecordList;
    private final long idleTimeInMilliseconds;
    private final ProcessRecordsInput processRecordsInput;
    private final MetricsFactory metricsFactory;
    private final AggregatorUtil aggregatorUtil;

    public ProcessTask(@NonNull ShardInfo shardInfo, @NonNull ShardRecordProcessor shardRecordProcessor, @NonNull ShardRecordProcessorCheckpointer shardRecordProcessorCheckpointer, long j, boolean z, ShardDetector shardDetector, @NonNull ThrottlingReporter throttlingReporter, ProcessRecordsInput processRecordsInput, boolean z2, long j2, @NonNull AggregatorUtil aggregatorUtil, @NonNull MetricsFactory metricsFactory) {
        if (shardInfo == null) {
            throw new NullPointerException("shardInfo");
        }
        if (shardRecordProcessor == null) {
            throw new NullPointerException("shardRecordProcessor");
        }
        if (shardRecordProcessorCheckpointer == null) {
            throw new NullPointerException("recordProcessorCheckpointer");
        }
        if (throttlingReporter == null) {
            throw new NullPointerException("throttlingReporter");
        }
        if (aggregatorUtil == null) {
            throw new NullPointerException("aggregatorUtil");
        }
        if (metricsFactory == null) {
            throw new NullPointerException("metricsFactory");
        }
        this.shardInfo = shardInfo;
        this.shardRecordProcessor = shardRecordProcessor;
        this.recordProcessorCheckpointer = shardRecordProcessorCheckpointer;
        this.backoffTimeMillis = j;
        this.throttlingReporter = throttlingReporter;
        this.processRecordsInput = processRecordsInput;
        this.shouldCallProcessRecordsEvenForEmptyRecordList = z2;
        this.idleTimeInMilliseconds = j2;
        this.metricsFactory = metricsFactory;
        if (z) {
            this.shard = null;
        } else {
            this.shard = shardDetector.shard(shardInfo.shardId());
        }
        if (this.shard == null && !z) {
            log.warn("Cannot get the shard for this ProcessTask, so duplicate KPL user records in the event of resharding will not be dropped during deaggregation of Amazon Kinesis records.");
        }
        this.aggregatorUtil = aggregatorUtil;
        this.recordProcessorCheckpointer.checkpointer().operation(PROCESS_TASK_OPERATION);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // software.amazon.kinesis.lifecycle.ConsumerTask, java.util.concurrent.Callable
    public TaskResult call() {
        MetricsScope createMetricsWithOperation = MetricsUtil.createMetricsWithOperation(this.metricsFactory, PROCESS_TASK_OPERATION);
        MetricsUtil.addShardId(createMetricsWithOperation, this.shardInfo.shardId());
        long currentTimeMillis = System.currentTimeMillis();
        boolean z = false;
        try {
            createMetricsWithOperation.addData(RECORDS_PROCESSED_METRIC, 0.0d, StandardUnit.COUNT, MetricsLevel.SUMMARY);
            createMetricsWithOperation.addData(DATA_BYTES_PROCESSED_METRIC, 0.0d, StandardUnit.BYTES, MetricsLevel.SUMMARY);
            RuntimeException runtimeException = null;
            try {
                if (this.processRecordsInput.millisBehindLatest() != null) {
                    createMetricsWithOperation.addData(MILLIS_BEHIND_LATEST_METRIC, this.processRecordsInput.millisBehindLatest().longValue(), StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY);
                }
            } catch (RuntimeException e) {
                log.error("ShardId {}: Caught exception: ", this.shardInfo.shardId(), e);
                runtimeException = e;
                backoff();
            }
            if (this.processRecordsInput.isAtShardEnd() && this.processRecordsInput.records().isEmpty()) {
                log.info("Reached end of shard {} and have no records to process", this.shardInfo.shardId());
                TaskResult taskResult = new TaskResult(null, true);
                MetricsUtil.addSuccessAndLatency(createMetricsWithOperation, false, currentTimeMillis, MetricsLevel.SUMMARY);
                MetricsUtil.endScope(createMetricsWithOperation);
                return taskResult;
            }
            this.throttlingReporter.success();
            List<KinesisClientRecord> deaggregateAnyKplRecords = deaggregateAnyKplRecords(this.processRecordsInput.records());
            if (!deaggregateAnyKplRecords.isEmpty()) {
                createMetricsWithOperation.addData(RECORDS_PROCESSED_METRIC, deaggregateAnyKplRecords.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
            }
            this.recordProcessorCheckpointer.largestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber(createMetricsWithOperation, deaggregateAnyKplRecords, this.recordProcessorCheckpointer.lastCheckpointValue(), this.recordProcessorCheckpointer.largestPermittedCheckpointValue()));
            if (shouldCallProcessRecords(deaggregateAnyKplRecords)) {
                callProcessRecords(this.processRecordsInput, deaggregateAnyKplRecords);
            }
            z = true;
            if (!this.processRecordsInput.isAtShardEnd()) {
                TaskResult taskResult2 = new TaskResult(runtimeException);
                MetricsUtil.addSuccessAndLatency(createMetricsWithOperation, z, currentTimeMillis, MetricsLevel.SUMMARY);
                MetricsUtil.endScope(createMetricsWithOperation);
                return taskResult2;
            }
            log.info("Reached end of shard {}, and processed {} records", this.shardInfo.shardId(), Integer.valueOf(this.processRecordsInput.records().size()));
            TaskResult taskResult3 = new TaskResult(null, true);
            MetricsUtil.addSuccessAndLatency(createMetricsWithOperation, z, currentTimeMillis, MetricsLevel.SUMMARY);
            MetricsUtil.endScope(createMetricsWithOperation);
            return taskResult3;
        } catch (Throwable th) {
            MetricsUtil.addSuccessAndLatency(createMetricsWithOperation, false, currentTimeMillis, MetricsLevel.SUMMARY);
            MetricsUtil.endScope(createMetricsWithOperation);
            throw th;
        }
    }

    private List<KinesisClientRecord> deaggregateAnyKplRecords(List<KinesisClientRecord> list) {
        return this.shard == null ? this.aggregatorUtil.deaggregate(list) : this.aggregatorUtil.deaggregate(list, this.shard.hashKeyRange().startingHashKey(), this.shard.hashKeyRange().endingHashKey());
    }

    private void backoff() {
        try {
            Thread.sleep(this.backoffTimeMillis);
        } catch (InterruptedException e) {
            log.debug("{}: Sleep was interrupted", this.shardInfo.shardId(), e);
        }
    }

    private void callProcessRecords(ProcessRecordsInput processRecordsInput, List<KinesisClientRecord> list) {
        log.debug("Calling application processRecords() with {} records from {}", Integer.valueOf(list.size()), this.shardInfo.shardId());
        ProcessRecordsInput build = ProcessRecordsInput.builder().records(list).cacheExitTime(processRecordsInput.cacheExitTime()).cacheEntryTime(processRecordsInput.cacheEntryTime()).checkpointer(this.recordProcessorCheckpointer).millisBehindLatest(processRecordsInput.millisBehindLatest()).build();
        MetricsScope createMetricsWithOperation = MetricsUtil.createMetricsWithOperation(this.metricsFactory, PROCESS_TASK_OPERATION);
        MetricsUtil.addShardId(createMetricsWithOperation, this.shardInfo.shardId());
        long currentTimeMillis = System.currentTimeMillis();
        try {
            try {
                this.shardRecordProcessor.processRecords(build);
                MetricsUtil.addLatency(createMetricsWithOperation, RECORD_PROCESSOR_PROCESS_RECORDS_METRIC, currentTimeMillis, MetricsLevel.SUMMARY);
                MetricsUtil.endScope(createMetricsWithOperation);
            } catch (Exception e) {
                log.error("ShardId {}: Application processRecords() threw an exception when processing shard ", this.shardInfo.shardId(), e);
                log.error("ShardId {}: Skipping over the following data records: {}", this.shardInfo.shardId(), list);
                MetricsUtil.addLatency(createMetricsWithOperation, RECORD_PROCESSOR_PROCESS_RECORDS_METRIC, currentTimeMillis, MetricsLevel.SUMMARY);
                MetricsUtil.endScope(createMetricsWithOperation);
            }
        } catch (Throwable th) {
            MetricsUtil.addLatency(createMetricsWithOperation, RECORD_PROCESSOR_PROCESS_RECORDS_METRIC, currentTimeMillis, MetricsLevel.SUMMARY);
            MetricsUtil.endScope(createMetricsWithOperation);
            throw th;
        }
    }

    private boolean shouldCallProcessRecords(List<KinesisClientRecord> list) {
        return !list.isEmpty() || this.shouldCallProcessRecordsEvenForEmptyRecordList;
    }

    private void handleNoRecords(long j) {
        log.debug("Kinesis didn't return any records for shard {}", this.shardInfo.shardId());
        long currentTimeMillis = this.idleTimeInMilliseconds - (System.currentTimeMillis() - j);
        if (currentTimeMillis > 0) {
            long max = Math.max(currentTimeMillis, this.idleTimeInMilliseconds);
            try {
                log.debug("Sleeping for {} ms since there were no new records in shard {}", Long.valueOf(max), this.shardInfo.shardId());
                Thread.sleep(max);
            } catch (InterruptedException e) {
                log.debug("ShardId {}: Sleep was interrupted", this.shardInfo.shardId());
            }
        }
    }

    @Override // software.amazon.kinesis.lifecycle.ConsumerTask
    public TaskType taskType() {
        return this.taskType;
    }

    private ExtendedSequenceNumber filterAndGetMaxExtendedSequenceNumber(MetricsScope metricsScope, List<KinesisClientRecord> list, ExtendedSequenceNumber extendedSequenceNumber, ExtendedSequenceNumber extendedSequenceNumber2) {
        ExtendedSequenceNumber extendedSequenceNumber3 = extendedSequenceNumber2;
        ListIterator<KinesisClientRecord> listIterator = list.listIterator();
        while (listIterator.hasNext()) {
            KinesisClientRecord next = listIterator.next();
            ExtendedSequenceNumber extendedSequenceNumber4 = new ExtendedSequenceNumber(next.sequenceNumber(), Long.valueOf(next.subSequenceNumber()));
            if (extendedSequenceNumber4.compareTo(extendedSequenceNumber) <= 0) {
                listIterator.remove();
                log.debug("removing record with ESN {} because the ESN is <= checkpoint ({})", extendedSequenceNumber4, extendedSequenceNumber);
            } else {
                if (extendedSequenceNumber3 == null || extendedSequenceNumber3.compareTo(extendedSequenceNumber4) < 0) {
                    extendedSequenceNumber3 = extendedSequenceNumber4;
                }
                metricsScope.addData(DATA_BYTES_PROCESSED_METRIC, next.data().limit(), StandardUnit.BYTES, MetricsLevel.SUMMARY);
            }
        }
        return extendedSequenceNumber3;
    }
}
