package io.streamnative.pulsar.recipes.task;

import java.beans.ConstructorProperties;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/recipes/task/ProcessExecutor.class */
class ProcessExecutor<T, R> {
    private static final Logger log = LoggerFactory.getLogger(ProcessExecutor.class);
    private final ExecutorService executor;
    private final Process<T, R> process;
    private final Clock clock;
    private final Duration keepAliveInterval;

    /* loaded from: input_file:io/streamnative/pulsar/recipes/task/ProcessExecutor$KeepAlive.class */
    interface KeepAlive {
        void update() throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public R execute(T t, Optional<Duration> optional, KeepAlive keepAlive) throws ProcessException {
        Instant instant = this.clock.instant();
        Future<T> submit = this.executor.submit(() -> {
            return this.process.apply(t);
        });
        while (!submit.isDone()) {
            try {
                try {
                    submit.get(this.keepAliveInterval.toMillis(), TimeUnit.MILLISECONDS);
                } catch (TimeoutException e) {
                    if (optional.isPresent() && this.clock.instant().isAfter(instant.plus((TemporalAmount) optional.get()))) {
                        submit.cancel(true);
                    } else {
                        try {
                            keepAlive.update();
                        } catch (Exception e2) {
                            log.warn("Failed to update keep alive", e2);
                        }
                    }
                }
            } catch (InterruptedException e3) {
                Thread.currentThread().interrupt();
                throw new ProcessException("Process was interrupted", e3);
            } catch (CancellationException e4) {
                throw new ProcessException("Process was cancelled", e4);
            } catch (ExecutionException e5) {
                throw new ProcessException("Processing error", e5.getCause());
            } catch (Exception e6) {
                throw new ProcessException("Unexpected error in executor", e6);
            }
        }
        return submit.get();
    }

    @ConstructorProperties({"executor", "process", "clock", "keepAliveInterval"})
    public ProcessExecutor(ExecutorService executorService, Process<T, R> process, Clock clock, Duration duration) {
        this.executor = executorService;
        this.process = process;
        this.clock = clock;
        this.keepAliveInterval = duration;
    }
}
