package io.streamnative.pulsar.recipes.task;

import java.beans.ConstructorProperties;
import java.time.Clock;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/recipes/task/TaskMetadataEvictionListener.class */
class TaskMetadataEvictionListener implements MessageListener<TaskMetadata> {
    private static final Logger log = LoggerFactory.getLogger(TaskMetadataEvictionListener.class);
    private final TaskMetadataUpdater metadataUpdater;
    private final Clock clock;
    private final int maxTaskAttempts;
    private final long terminalStateRetentionMillis;

    public void received(Consumer<TaskMetadata> consumer, Message<TaskMetadata> message) {
        try {
            TaskMetadata taskMetadata = (TaskMetadata) message.getValue();
            log.debug("Received: {}", taskMetadata);
            if (taskMetadata != null) {
                switch (taskMetadata.getState()) {
                    case COMPLETED:
                        checkTerminalTaskMetadata(consumer, message);
                        break;
                    case FAILED:
                        if (taskMetadata.getAttempts() != this.maxTaskAttempts) {
                            consumer.acknowledge(message);
                            break;
                        } else {
                            checkTerminalTaskMetadata(consumer, message);
                            break;
                        }
                    case NEW:
                    case PROCESSING:
                    default:
                        consumer.acknowledge(message);
                        break;
                }
            } else {
                consumer.acknowledge(message);
            }
        } catch (PulsarClientException e) {
            log.warn("Error while processing metadata for eviction", e);
        }
    }

    private void checkTerminalTaskMetadata(Consumer<TaskMetadata> consumer, Message<TaskMetadata> message) throws PulsarClientException {
        TaskMetadata taskMetadata = (TaskMetadata) message.getValue();
        long intervalUntilMetadataEviction = intervalUntilMetadataEviction(taskMetadata);
        log.trace("Task metadata should be evicted in {} milliseconds", Long.valueOf(intervalUntilMetadataEviction));
        if (intervalUntilMetadataEviction > 0) {
            log.debug("Task metadata not yet eligible for eviction - delaying redelivery until eviction horizon: {}", taskMetadata);
            consumer.reconsumeLater(message, intervalUntilMetadataEviction(taskMetadata), TimeUnit.MILLISECONDS);
        } else {
            log.debug("Evicting task metadata: {}", taskMetadata);
            this.metadataUpdater.delete(taskMetadata);
            consumer.acknowledge(message);
        }
    }

    private long intervalUntilMetadataEviction(TaskMetadata taskMetadata) {
        return Math.max(0L, (taskMetadata.getLastUpdated() + this.terminalStateRetentionMillis) - this.clock.millis());
    }

    @ConstructorProperties({"metadataUpdater", "clock", "maxTaskAttempts", "terminalStateRetentionMillis"})
    public TaskMetadataEvictionListener(TaskMetadataUpdater taskMetadataUpdater, Clock clock, int i, long j) {
        this.metadataUpdater = taskMetadataUpdater;
        this.clock = clock;
        this.maxTaskAttempts = i;
        this.terminalStateRetentionMillis = j;
    }
}
