package io.simplesource.saga.action.internal;

import io.simplesource.saga.action.async.AsyncContext;
import io.simplesource.saga.action.async.AsyncSerdes;
import io.simplesource.saga.action.async.AsyncSpec;
import io.simplesource.saga.model.messages.ActionRequest;
import io.simplesource.saga.model.messages.ActionResponse;
import io.simplesource.saga.model.saga.SagaId;
import io.simplesource.saga.model.specs.ActionProcessorSpec;
import io.simplesource.saga.shared.topics.TopicTypes;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/simplesource/saga/action/internal/AsyncConsumerRunner.class */
public class AsyncConsumerRunner<A, D, K, O, R> implements Runnable {
    private final AsyncSpec<A, D, K, O, R> asyncSpec;
    private final ActionProcessorSpec<A> actionSpec;
    private final Consumer<Boolean> onClose;
    private final Properties consumerConfig;
    private final AsyncContext<A, D, K, O, R> asyncContext;
    private final AsyncPublisher<SagaId, ActionResponse> responsePublisher;
    private final Function<AsyncSerdes<K, R>, AsyncPublisher<K, R>> outputPublisher;
    private Optional<KafkaConsumer<SagaId, ActionRequest<A>>> consumer = Optional.empty();
    private final Logger logger = LoggerFactory.getLogger(AsyncConsumerRunner.class);
    private final AtomicBoolean closed = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    public AsyncConsumerRunner(AsyncContext<A, D, K, O, R> asyncContext, Properties properties, AsyncPublisher<SagaId, ActionResponse> asyncPublisher, Function<AsyncSerdes<K, R>, AsyncPublisher<K, R>> function, Consumer<Boolean> consumer) {
        this.asyncSpec = asyncContext.asyncSpec;
        this.actionSpec = asyncContext.actionSpec;
        this.responsePublisher = asyncPublisher;
        this.outputPublisher = function;
        this.onClose = consumer;
        this.asyncContext = asyncContext;
        this.consumerConfig = properties;
    }

    @Override // java.lang.Runnable
    public void run() {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(this.consumerConfig, this.actionSpec.serdes.sagaId().deserializer(), this.actionSpec.serdes.request().deserializer());
        kafkaConsumer.subscribe(Collections.singletonList(this.asyncContext.actionTopicNamer.apply(TopicTypes.ActionTopic.requestUnprocessed)));
        this.consumer = Optional.of(kafkaConsumer);
        while (!this.closed.get()) {
            try {
                try {
                    Iterator it = kafkaConsumer.poll(Duration.ofMillis(100L)).iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        SagaId sagaId = (SagaId) consumerRecord.key();
                        ActionRequest actionRequest = (ActionRequest) consumerRecord.value();
                        if (actionRequest.actionType.equals(this.asyncSpec.actionType)) {
                            AsyncActionProcessor.processRecord(this.asyncContext, sagaId, actionRequest, this.responsePublisher, this.outputPublisher);
                        }
                    }
                } catch (WakeupException e) {
                    if (!this.closed.get()) {
                        throw e;
                    }
                    this.logger.info("Closing consumer and producer");
                    kafkaConsumer.commitSync();
                    kafkaConsumer.close();
                    this.onClose.accept(true);
                    return;
                }
            } catch (Throwable th) {
                this.logger.info("Closing consumer and producer");
                kafkaConsumer.commitSync();
                kafkaConsumer.close();
                this.onClose.accept(true);
                throw th;
            }
        }
        this.logger.info("Closing consumer and producer");
        kafkaConsumer.commitSync();
        kafkaConsumer.close();
        this.onClose.accept(true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        this.closed.set(true);
        this.consumer.ifPresent((v0) -> {
            v0.wakeup();
        });
    }
}
