package io.codemonastery.dropwizard.kinesis.consumer;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.model.Record;
import com.google.common.base.Preconditions;
import io.codemonastery.dropwizard.kinesis.EventDecoder;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/codemonastery/dropwizard/kinesis/consumer/RecordProcessor.class */
public final class RecordProcessor<E> implements IRecordProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(RecordProcessor.class);
    private final EventDecoder<E> decoder;
    private final EventConsumer<E> processor;
    private final RecordProcessorMetrics metrics;
    private String shardId = null;

    public RecordProcessor(EventDecoder<E> eventDecoder, EventConsumer<E> eventConsumer, RecordProcessorMetrics recordProcessorMetrics) {
        Preconditions.checkNotNull(eventDecoder, "decoder cannot be null");
        Preconditions.checkNotNull(eventConsumer, "eventConsumer cannot be null");
        Preconditions.checkNotNull(recordProcessorMetrics, "metrics cannot be null");
        this.decoder = eventDecoder;
        this.processor = eventConsumer;
        this.metrics = recordProcessorMetrics;
    }

    public void initialize(InitializationInput initializationInput) {
        if (initializationInput != null) {
            this.shardId = initializationInput.getShardId();
        }
        this.metrics.processorStarted();
    }

    public void processRecords(ProcessRecordsInput processRecordsInput) {
        if (processRecordsInput.getMillisBehindLatest() != null) {
            this.metrics.millisBehindLatest(this.shardId, processRecordsInput.getMillisBehindLatest().longValue());
        }
        Record record = null;
        Iterator<E> it = processRecordsInput.getRecords().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Record record2 = (Record) it.next();
            try {
                E decode = this.decoder.decode(record2.getData());
                this.metrics.decoded();
                if (decode != null) {
                    boolean z = false;
                    try {
                        AutoCloseable processTime = this.metrics.processTime();
                        Throwable th = null;
                        try {
                            try {
                                z = this.processor.consume(decode);
                                if (processTime != null) {
                                    if (0 != 0) {
                                        try {
                                            processTime.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        processTime.close();
                                    }
                                }
                            } catch (Throwable th3) {
                                th = th3;
                                throw th3;
                                break;
                            }
                        } catch (Throwable th4) {
                            if (processTime != null) {
                                if (th != null) {
                                    try {
                                        processTime.close();
                                    } catch (Throwable th5) {
                                        th.addSuppressed(th5);
                                    }
                                } else {
                                    processTime.close();
                                }
                            }
                            throw th4;
                            break;
                        }
                    } catch (Exception e) {
                        this.metrics.unhandledException();
                        LOG.error("Unhandled exception processing event" + decode, e);
                    }
                    if (!z) {
                        this.metrics.processFailure();
                        break;
                    } else {
                        this.metrics.processSuccess();
                        record = record2;
                    }
                } else {
                    record = record2;
                }
            } catch (Exception e2) {
                this.metrics.decodeFailure();
                LOG.error("Unexpected exception decoding event", e2);
            }
        }
        if (record != null) {
            try {
                AutoCloseable checkpointTime = this.metrics.checkpointTime();
                Throwable th6 = null;
                try {
                    try {
                        processRecordsInput.getCheckpointer().checkpoint(record);
                        if (checkpointTime != null) {
                            if (0 != 0) {
                                try {
                                    checkpointTime.close();
                                } catch (Throwable th7) {
                                    th6.addSuppressed(th7);
                                }
                            } else {
                                checkpointTime.close();
                            }
                        }
                    } catch (Throwable th8) {
                        th6 = th8;
                        throw th8;
                    }
                } catch (Throwable th9) {
                    if (checkpointTime != null) {
                        if (th6 != null) {
                            try {
                                checkpointTime.close();
                            } catch (Throwable th10) {
                                th6.addSuppressed(th10);
                            }
                        } else {
                            checkpointTime.close();
                        }
                    }
                    throw th9;
                }
            } catch (Exception e3) {
                this.metrics.checkpointFailed();
                LOG.error("Could not checkpoint because of unexpected exception", e3);
            } catch (ShutdownException e4) {
                this.metrics.checkpointFailed();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Abandoning checkpoint because processor was shutdown");
                }
            }
        }
    }

    public void shutdown(ShutdownInput shutdownInput) {
        this.shardId = "UNKNOWN";
        this.metrics.processorShutdown(this.shardId);
    }
}
