package io.streamnative.pulsar.recipes.task;

import java.beans.ConstructorProperties;
import java.time.Clock;
import java.time.DateTimeException;
import java.time.Duration;
import java.util.Optional;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/recipes/task/TaskListener.class */
class TaskListener<T, R> implements MessageListener<T> {
    private static final Logger log = LoggerFactory.getLogger(TaskListener.class);
    private final TaskMetadataView<T> taskMetadataView;
    private final TaskMetadataUpdater taskMetadataUpdater;
    private final ProcessExecutor<T, R> processExecutor;
    private final Clock clock;
    private final Schema<R> resultSchema;
    private final int maxTaskAttempts;
    private final long keepAliveIntervalMillis;

    public void received(Consumer<T> consumer, Message<T> message) {
        log.debug("Received: {}", message.getMessageId());
        TaskMetadata taskMetadata = this.taskMetadataView.get(message);
        try {
            switch (taskMetadata.getState()) {
                case NEW:
                    handleNew(consumer, message, taskMetadata);
                    break;
                case PROCESSING:
                    handleProcessing(consumer, message, taskMetadata);
                    break;
                case COMPLETED:
                    handleCompleted(consumer, message);
                    break;
                case FAILED:
                    handleFailed(consumer, message, taskMetadata);
                    break;
                default:
                    log.error("Unexpected state: {}", taskMetadata);
                    handleError(consumer, message, taskMetadata, "Unexpected state: " + taskMetadata.getState());
                    break;
            }
        } catch (Throwable th) {
            log.error("Unexpected error when consuming task: task={}, metadata={}", new Object[]{message, taskMetadata, th});
            consumer.negativeAcknowledge(message);
        }
    }

    private void handleNew(Consumer<T> consumer, Message<T> message, TaskMetadata taskMetadata) throws PulsarClientException {
        processTask(consumer, message, taskMetadata);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void processTask(Consumer<T> consumer, Message<T> message, TaskMetadata taskMetadata) throws PulsarClientException {
        TaskMetadata process = taskMetadata.process(this.clock.millis());
        this.taskMetadataUpdater.update(process);
        try {
            log.debug("Task processing for message {}", message.getMessageId());
            Object execute = this.processExecutor.execute(message.getValue(), getMaxTaskDuration(message), () -> {
                this.taskMetadataUpdater.update(process.keepAlive(this.clock.millis()));
            });
            log.debug("Task processed for message {}", message.getMessageId());
            process = process.complete(this.clock.millis(), this.resultSchema.encode(execute));
            this.taskMetadataUpdater.update(process);
            consumer.acknowledge(message);
        } catch (ProcessException e) {
            log.debug("Error while processing task: {}", process, e);
            handleError(consumer, message, process, e.getMessage() + ": " + e.getCause().getMessage());
        } catch (Exception e2) {
            log.error("Error handling task result: {}", process, e2);
        }
    }

    private void handleProcessing(Consumer<T> consumer, Message<T> message, TaskMetadata taskMetadata) throws PulsarClientException {
        if (this.clock.millis() - taskMetadata.getLastUpdated() > this.keepAliveIntervalMillis * 2) {
            if (taskMetadata.getAttempts() < this.maxTaskAttempts) {
                processTask(consumer, message, taskMetadata);
            } else {
                this.taskMetadataUpdater.update(taskMetadata.fail(this.clock.millis(), "All attempts to process task failed."));
                consumer.acknowledge(message);
            }
        }
    }

    private void handleCompleted(Consumer<T> consumer, Message<T> message) throws PulsarClientException {
        consumer.acknowledge(message);
    }

    private void handleFailed(Consumer<T> consumer, Message<T> message, TaskMetadata taskMetadata) throws PulsarClientException {
        if (taskMetadata.getAttempts() < this.maxTaskAttempts) {
            processTask(consumer, message, taskMetadata);
        } else {
            consumer.acknowledge(message);
        }
    }

    private void handleError(Consumer<T> consumer, Message<T> message, TaskMetadata taskMetadata, String str) throws PulsarClientException {
        TaskMetadata fail = taskMetadata.fail(this.clock.millis(), str);
        this.taskMetadataUpdater.update(fail);
        handleFailed(consumer, message, fail);
    }

    private Optional<Duration> getMaxTaskDuration(Message<T> message) {
        Optional<String> from = TaskProperties.MAX_TASK_DURATION.from(message);
        try {
            return from.map((v0) -> {
                return Duration.parse(v0);
            });
        } catch (DateTimeException e) {
            log.warn("Message {} specified invalid max task duration header: {}", message.getMessageId(), from);
            return Optional.empty();
        }
    }

    @ConstructorProperties({"taskMetadataView", "taskMetadataUpdater", "processExecutor", "clock", "resultSchema", "maxTaskAttempts", "keepAliveIntervalMillis"})
    public TaskListener(TaskMetadataView<T> taskMetadataView, TaskMetadataUpdater taskMetadataUpdater, ProcessExecutor<T, R> processExecutor, Clock clock, Schema<R> schema, int i, long j) {
        this.taskMetadataView = taskMetadataView;
        this.taskMetadataUpdater = taskMetadataUpdater;
        this.processExecutor = processExecutor;
        this.clock = clock;
        this.resultSchema = schema;
        this.maxTaskAttempts = i;
        this.keepAliveIntervalMillis = j;
    }
}
