package io.streamnative.pulsar.recipes.task;

import java.beans.ConstructorProperties;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TableView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/recipes/task/TaskWorker.class */
public class TaskWorker implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(TaskWorker.class);
    private final ExecutorService executor;
    private final List<AutoCloseable> closeables;
    private final long shutdownTimeoutMillis;

    public static <T, R> TaskWorker create(PulsarClient pulsarClient, Process<T, R> process, TaskWorkerConfiguration<T, R> taskWorkerConfiguration) throws PulsarClientException {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        Clock systemUTC = Clock.systemUTC();
        MessagingFactory messagingFactory = new MessagingFactory(pulsarClient, taskWorkerConfiguration);
        ArrayList arrayList = new ArrayList();
        Producer<TaskMetadata> taskMetadataProducer = messagingFactory.taskMetadataProducer();
        arrayList.add(taskMetadataProducer);
        TaskMetadataUpdater taskMetadataUpdater = new TaskMetadataUpdater(taskMetadataProducer);
        TableView<TaskMetadata> taskMetadataTableView = messagingFactory.taskMetadataTableView();
        arrayList.add(taskMetadataTableView);
        arrayList.add(messagingFactory.taskConsumer(new TaskListener<>(new TaskMetadataView(taskMetadataTableView, systemUTC, taskWorkerConfiguration.getTaskSchema()), taskMetadataUpdater, new ProcessExecutor(newSingleThreadExecutor, process, systemUTC, taskWorkerConfiguration.getKeepAliveInterval()), systemUTC, taskWorkerConfiguration.getResultSchema(), taskWorkerConfiguration.getMaxTaskAttempts(), taskWorkerConfiguration.getKeepAliveInterval().toMillis())));
        arrayList.add(messagingFactory.metadataEvictionConsumer(new TaskMetadataEvictionListener(taskMetadataUpdater, systemUTC, taskWorkerConfiguration.getMaxTaskAttempts(), taskWorkerConfiguration.getRetention().toMillis())));
        return new TaskWorker(newSingleThreadExecutor, Collections.unmodifiableList(arrayList), taskWorkerConfiguration.getShutdownTimeout().toMillis());
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        shutdownExecutor();
        this.closeables.forEach(this::closeQuietly);
    }

    private void shutdownExecutor() {
        this.executor.shutdown();
        try {
            if (!this.executor.awaitTermination(this.shutdownTimeoutMillis, TimeUnit.MILLISECONDS)) {
                this.executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            this.executor.shutdownNow();
        }
    }

    private void closeQuietly(AutoCloseable autoCloseable) {
        try {
            autoCloseable.close();
        } catch (Exception e) {
            log.error("Error closing {}", autoCloseable, e);
        }
    }

    @ConstructorProperties({"executor", "closeables", "shutdownTimeoutMillis"})
    TaskWorker(ExecutorService executorService, List<AutoCloseable> list, long j) {
        this.executor = executorService;
        this.closeables = list;
        this.shutdownTimeoutMillis = j;
    }
}
