package io.fluxcapacitor.axonclient.eventhandling;

import io.fluxcapacitor.axonclient.common.serialization.AxonMessageSerializer;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.Message;
import io.fluxcapacitor.javaclient.tracking.ConsumerService;
import io.fluxcapacitor.javaclient.tracking.Tracking;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.axonframework.eventhandling.AbstractEventProcessor;
import org.axonframework.eventhandling.ErrorHandler;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventProcessingException;
import org.axonframework.eventhandling.PropagatingErrorHandler;
import org.axonframework.eventhandling.SimpleEventHandlerInvoker;
import org.axonframework.messaging.unitofwork.RollbackConfiguration;
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/axonclient/eventhandling/FluxCapacitorEventProcessor.class */
public class FluxCapacitorEventProcessor extends AbstractEventProcessor {
    private static final Logger log = LoggerFactory.getLogger(FluxCapacitorEventProcessor.class);
    private final ConsumerService consumerService;
    private final AxonMessageSerializer serializer;
    private final int threads;
    private volatile Registration registration;

    public FluxCapacitorEventProcessor(String str, List<?> list, ConsumerService consumerService, AxonMessageSerializer axonMessageSerializer) {
        this(str, new SimpleEventHandlerInvoker(new Object[]{list}), RollbackConfigurationType.ANY_THROWABLE, PropagatingErrorHandler.INSTANCE, NoOpMessageMonitor.INSTANCE, consumerService, axonMessageSerializer, 1);
    }

    public FluxCapacitorEventProcessor(String str, EventHandlerInvoker eventHandlerInvoker, RollbackConfiguration rollbackConfiguration, ErrorHandler errorHandler, MessageMonitor<? super EventMessage<?>> messageMonitor, ConsumerService consumerService, AxonMessageSerializer axonMessageSerializer, int i) {
        super(str, eventHandlerInvoker, rollbackConfiguration, errorHandler, messageMonitor);
        this.consumerService = consumerService;
        this.serializer = axonMessageSerializer;
        this.threads = i;
    }

    protected void handle(List<Message> list) {
        List list2 = (List) this.serializer.deserializeEvents(list.stream()).collect(Collectors.toList());
        try {
            log.info("{} received events {}", getName(), list2.stream().map((v0) -> {
                return v0.getPayloadType();
            }).map((v0) -> {
                return v0.getSimpleName();
            }).collect(Collectors.toList()));
            super.process(list2);
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e2) {
            throw new EventProcessingException("Exception occurred while processing events", e2);
        }
    }

    public void start() {
        if (this.registration == null) {
            this.registration = Tracking.start(getName(), this.threads, this.consumerService, this::handle);
        }
    }

    public void shutDown() {
        Optional.ofNullable(this.registration).ifPresent((v0) -> {
            v0.cancel();
        });
        this.registration = null;
    }
}
