package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardClosureVerificationResponse;
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.amazonaws.services.kinesis.model.Shard;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/amazon-kinesis-client-1.13.3.jar:com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.class */
class ShutdownTask implements ITask {
    private static final Log LOG = LogFactory.getLog(ShutdownTask.class);
    private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown";
    private final ShardInfo shardInfo;
    private final IRecordProcessor recordProcessor;
    private final RecordProcessorCheckpointer recordProcessorCheckpointer;
    private final ShutdownReason reason;
    private final IKinesisProxy kinesisProxy;
    private final KinesisClientLibLeaseCoordinator leaseCoordinator;
    private final InitialPositionInStreamExtended initialPositionInStream;
    private final boolean cleanupLeasesOfCompletedShards;
    private final boolean ignoreUnexpectedChildShards;
    private final TaskType taskType = TaskType.SHUTDOWN;
    private final long backoffTimeMillis;
    private final GetRecordsCache getRecordsCache;
    private final ShardSyncer shardSyncer;
    private final ShardSyncStrategy shardSyncStrategy;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ShutdownTask(ShardInfo shardInfo, IRecordProcessor iRecordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownReason shutdownReason, IKinesisProxy iKinesisProxy, InitialPositionInStreamExtended initialPositionInStreamExtended, boolean z, boolean z2, KinesisClientLibLeaseCoordinator kinesisClientLibLeaseCoordinator, long j, GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
        this.shardInfo = shardInfo;
        this.recordProcessor = iRecordProcessor;
        this.recordProcessorCheckpointer = recordProcessorCheckpointer;
        this.reason = shutdownReason;
        this.kinesisProxy = iKinesisProxy;
        this.initialPositionInStream = initialPositionInStreamExtended;
        this.cleanupLeasesOfCompletedShards = z;
        this.ignoreUnexpectedChildShards = z2;
        this.leaseCoordinator = kinesisClientLibLeaseCoordinator;
        this.backoffTimeMillis = j;
        this.getRecordsCache = getRecordsCache;
        this.shardSyncer = shardSyncer;
        this.shardSyncStrategy = shardSyncStrategy;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask, java.util.concurrent.Callable
    public TaskResult call() {
        try {
            ShutdownReason shutdownReason = this.reason;
            List<Shard> list = null;
            if (shutdownReason == ShutdownReason.TERMINATE) {
                ShardClosureVerificationResponse verifyShardClosure = this.kinesisProxy.verifyShardClosure(this.shardInfo.getShardId());
                if (verifyShardClosure instanceof ShardListWrappingShardClosureVerificationResponse) {
                    list = ((ShardListWrappingShardClosureVerificationResponse) verifyShardClosure).getLatestShards();
                }
                if (!verifyShardClosure.isShardClosed()) {
                    shutdownReason = ShutdownReason.ZOMBIE;
                    dropLease();
                    LOG.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + this.shardInfo.getShardId());
                }
            }
            if (shutdownReason == ShutdownReason.TERMINATE) {
                this.recordProcessorCheckpointer.setSequenceNumberAtShardEnd(this.recordProcessorCheckpointer.getLargestPermittedCheckpointValue());
                this.recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
            }
            LOG.debug("Invoking shutdown() for shard " + this.shardInfo.getShardId() + ", concurrencyToken " + this.shardInfo.getConcurrencyToken() + ". Shutdown reason: " + shutdownReason);
            ShutdownInput withCheckpointer = new ShutdownInput().withShutdownReason(shutdownReason).withCheckpointer(this.recordProcessorCheckpointer);
            long currentTimeMillis = System.currentTimeMillis();
            try {
                try {
                    this.recordProcessor.shutdown(withCheckpointer);
                    ExtendedSequenceNumber lastCheckpointValue = this.recordProcessorCheckpointer.getLastCheckpointValue();
                    if (shutdownReason == ShutdownReason.TERMINATE && (lastCheckpointValue == null || !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END))) {
                        throw new IllegalArgumentException("Application didn't checkpoint at end of shard " + this.shardInfo.getShardId() + ". Application must checkpoint upon shutdown. See IRecordProcessor.shutdown javadocs for more information.");
                    }
                    LOG.debug("Shutting down retrieval strategy.");
                    this.getRecordsCache.shutdown();
                    LOG.debug("Record processor completed shutdown() for shard " + this.shardInfo.getShardId());
                    MetricsHelper.addLatency(RECORD_PROCESSOR_SHUTDOWN_METRIC, currentTimeMillis, MetricsLevel.SUMMARY);
                    if (shutdownReason == ShutdownReason.TERMINATE) {
                        LOG.debug("Looking for child shards of shard " + this.shardInfo.getShardId());
                        TaskResult onShardConsumerShutDown = this.shardSyncStrategy.onShardConsumerShutDown(list);
                        if (onShardConsumerShutDown.getException() != null) {
                            LOG.debug("Exception while trying to sync shards on the shutdown of shard: " + this.shardInfo.getShardId());
                            throw onShardConsumerShutDown.getException();
                        }
                        LOG.debug("Finished checking for child shards of shard " + this.shardInfo.getShardId());
                    }
                    return new TaskResult((Exception) null);
                } catch (Throwable th) {
                    MetricsHelper.addLatency(RECORD_PROCESSOR_SHUTDOWN_METRIC, currentTimeMillis, MetricsLevel.SUMMARY);
                    throw th;
                }
            } catch (Exception e) {
                throw e;
            }
        } catch (Exception e2) {
            if (0 != 0) {
                LOG.error("Application exception. ", e2);
            } else {
                LOG.error("Caught exception: ", e2);
            }
            try {
                Thread.sleep(this.backoffTimeMillis);
            } catch (InterruptedException e3) {
                LOG.debug("Interrupted sleep", e3);
            }
            return new TaskResult(e2);
        }
    }

    @Override // com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask
    public TaskType getTaskType() {
        return this.taskType;
    }

    @VisibleForTesting
    ShutdownReason getReason() {
        return this.reason;
    }

    private void dropLease() {
        KinesisClientLease currentlyHeldLease = this.leaseCoordinator.getCurrentlyHeldLease(this.shardInfo.getShardId());
        this.leaseCoordinator.dropLease(currentlyHeldLease);
        LOG.warn("Dropped lease for shutting down ShardConsumer: " + currentlyHeldLease.getLeaseKey());
    }
}
