package org.axonframework.axonserver.connector.processor;

import io.axoniq.axonserver.connector.AxonServerConnection;
import io.axoniq.axonserver.connector.admin.AdminChannel;
import io.axoniq.axonserver.connector.control.ControlChannel;
import io.axoniq.axonserver.connector.control.ProcessorInstructionHandler;
import io.axoniq.axonserver.grpc.control.EventProcessorInfo;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.common.FutureUtils;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.configuration.Configuration;
import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.eventhandling.StreamingEventProcessor;
import org.axonframework.eventhandling.SubscribingEventProcessor;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/axonserver/connector/processor/EventProcessorControlService.class */
public class EventProcessorControlService {
    private static final Logger logger = LoggerFactory.getLogger(EventProcessorControlService.class);
    private static final String SUBSCRIBING_EVENT_PROCESSOR_MODE = "Subscribing";
    private static final String UNKNOWN_EVENT_PROCESSOR_MODE = "Unknown";
    protected final AxonServerConnectionManager axonServerConnectionManager;
    protected final Configuration eventProcessingConfiguration;
    protected final String context;
    protected final Map<String, AxonServerConfiguration.Eventhandling.ProcessorSettings> processorConfig;

    /* loaded from: input_file:org/axonframework/axonserver/connector/processor/EventProcessorControlService$AxonProcessorInstructionHandler.class */
    protected static class AxonProcessorInstructionHandler implements ProcessorInstructionHandler {
        private final EventProcessor processor;
        private final String name;

        public AxonProcessorInstructionHandler(EventProcessor eventProcessor, String str) {
            this.processor = eventProcessor;
            this.name = str;
        }

        public CompletableFuture<Boolean> releaseSegment(int i) {
            try {
                if (this.processor instanceof StreamingEventProcessor) {
                    this.processor.releaseSegment(i);
                    return CompletableFuture.completedFuture(true);
                }
                EventProcessorControlService.logger.info("Release segment requested for processor [{}] which is not a Streaming Event Processor", this.name);
                return CompletableFuture.completedFuture(false);
            } catch (Exception e) {
                return exceptionallyCompletedFuture(e);
            }
        }

        public CompletableFuture<Boolean> splitSegment(int i) {
            try {
                if (this.processor instanceof StreamingEventProcessor) {
                    return this.processor.splitSegment(i).thenApply(bool -> {
                        if (Boolean.TRUE.equals(bool)) {
                            EventProcessorControlService.logger.info("Successfully split segment [{}] of processor [{}]", Integer.valueOf(i), this.name);
                        } else {
                            EventProcessorControlService.logger.warn("Was not able to split segment [{}] for processor [{}]", Integer.valueOf(i), this.name);
                        }
                        return bool;
                    });
                }
                EventProcessorControlService.logger.info("Split segment requested for processor [{}] which is not a Streaming Event Processor", this.name);
                return CompletableFuture.completedFuture(false);
            } catch (Exception e) {
                return exceptionallyCompletedFuture(e);
            }
        }

        public CompletableFuture<Boolean> mergeSegment(int i) {
            try {
                if (this.processor instanceof StreamingEventProcessor) {
                    return this.processor.mergeSegment(i).thenApply(bool -> {
                        if (Boolean.TRUE.equals(bool)) {
                            EventProcessorControlService.logger.info("Successfully merged segment [{}] of processor [{}]", Integer.valueOf(i), this.name);
                        } else {
                            EventProcessorControlService.logger.warn("Was not able to merge segment [{}] for processor [{}]", Integer.valueOf(i), this.name);
                        }
                        return bool;
                    });
                }
                EventProcessorControlService.logger.warn("Merge segment request received for processor [{}] which is not a Streaming Event Processor", this.name);
                return CompletableFuture.completedFuture(false);
            } catch (Exception e) {
                return exceptionallyCompletedFuture(e);
            }
        }

        public CompletableFuture<Void> pauseProcessor() {
            try {
                this.processor.shutDown();
                return FutureUtils.emptyCompletedFuture();
            } catch (Exception e) {
                return exceptionallyCompletedFuture(e);
            }
        }

        public CompletableFuture<Void> startProcessor() {
            try {
                this.processor.start();
                return FutureUtils.emptyCompletedFuture();
            } catch (Exception e) {
                return exceptionallyCompletedFuture(e);
            }
        }

        private <T> CompletableFuture<T> exceptionallyCompletedFuture(Exception exc) {
            CompletableFuture<T> completableFuture = new CompletableFuture<>();
            completableFuture.completeExceptionally(exc);
            return completableFuture;
        }
    }

    public EventProcessorControlService(AxonServerConnectionManager axonServerConnectionManager, Configuration configuration, AxonServerConfiguration axonServerConfiguration) {
        this(axonServerConnectionManager, configuration, axonServerConfiguration.getContext(), axonServerConfiguration.getEventhandling().getProcessors());
    }

    public EventProcessorControlService(AxonServerConnectionManager axonServerConnectionManager, Configuration configuration, String str, Map<String, AxonServerConfiguration.Eventhandling.ProcessorSettings> map) {
        this.axonServerConnectionManager = axonServerConnectionManager;
        this.eventProcessingConfiguration = configuration;
        this.context = str;
        this.processorConfig = map;
    }

    public void start() {
        if (this.axonServerConnectionManager == null || this.eventProcessingConfiguration == null) {
            return;
        }
        Map<String, EventProcessor> map = (Map) this.eventProcessingConfiguration.getComponent(Map.class);
        AxonServerConnection connection = this.axonServerConnectionManager.getConnection(this.context);
        registerInstructionHandlers(connection, map);
        setLoadBalancingStrategies(connection, map.keySet());
    }

    private void setLoadBalancingStrategies(AxonServerConnection axonServerConnection, Set<String> set) {
        AdminChannel adminChannel = axonServerConnection.adminChannel();
        ((Map) this.processorConfig.entrySet().stream().filter(entry -> {
            if (set.contains(entry.getKey())) {
                return true;
            }
            logger.info("Event Processor [{}] is not a registered. Please check the name or register the Event Processor", entry.getKey());
            return false;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry2 -> {
            return ((AxonServerConfiguration.Eventhandling.ProcessorSettings) entry2.getValue()).getLoadBalancingStrategy();
        }))).forEach((str, str2) -> {
            Optional<String> optional = tokenStoreIdentifierFor(str);
            if (!optional.isPresent()) {
                logger.warn("Cannot find token store identifier for processor [{}]. Load balancing cannot be configured without this identifier.", str);
                return;
            }
            String str = optional.get();
            adminChannel.loadBalanceEventProcessor(str, str, str2).whenComplete((r9, th) -> {
                if (th == null) {
                    logger.debug("Successfully requested to load balance processor [{}] with strategy [{}].", str, str2);
                } else {
                    logger.warn("Requesting to load balance processor [{}] with strategy [{}] failed.", new Object[]{str, str2, th});
                }
            });
            if (this.processorConfig.get(str).isAutomaticBalancing()) {
                adminChannel.setAutoLoadBalanceStrategy(str, str, str2).whenComplete((r92, th2) -> {
                    if (th2 == null) {
                        logger.debug("Successfully requested to automatically balance processor [{}] with strategy [{}].", str, str2);
                    } else {
                        logger.warn("Requesting to automatically balance processor [{}] with strategy [{}] failed.", new Object[]{str, str2, th2});
                    }
                });
            }
        });
    }

    private Optional<String> tokenStoreIdentifierFor(String str) {
        TokenStore tokenStore = (TokenStore) this.eventProcessingConfiguration.getComponent(TokenStore.class);
        TransactionManager transactionManager = (TransactionManager) this.eventProcessingConfiguration.getComponent(TransactionManager.class);
        Objects.requireNonNull(tokenStore);
        return (Optional) transactionManager.fetchInTransaction(tokenStore::retrieveStorageIdentifier);
    }

    private void registerInstructionHandlers(AxonServerConnection axonServerConnection, Map<String, EventProcessor> map) {
        ControlChannel controlChannel = axonServerConnection.controlChannel();
        map.forEach((str, eventProcessor) -> {
            controlChannel.registerEventProcessor(str, infoSupplier(eventProcessor), new AxonProcessorInstructionHandler(eventProcessor, str));
        });
    }

    protected Supplier<EventProcessorInfo> infoSupplier(EventProcessor eventProcessor) {
        return eventProcessor instanceof StreamingEventProcessor ? () -> {
            return StreamingEventProcessorInfoMessage.describe((StreamingEventProcessor) eventProcessor);
        } : eventProcessor instanceof SubscribingEventProcessor ? () -> {
            return subscribingProcessorInfo(eventProcessor);
        } : () -> {
            return unknownProcessorTypeInfo(eventProcessor);
        };
    }

    private EventProcessorInfo subscribingProcessorInfo(EventProcessor eventProcessor) {
        return EventProcessorInfo.newBuilder().setProcessorName(eventProcessor.getName()).setMode(SUBSCRIBING_EVENT_PROCESSOR_MODE).setIsStreamingProcessor(false).build();
    }

    private EventProcessorInfo unknownProcessorTypeInfo(EventProcessor eventProcessor) {
        return EventProcessorInfo.newBuilder().setProcessorName(eventProcessor.getName()).setMode(UNKNOWN_EVENT_PROCESSOR_MODE).setIsStreamingProcessor(false).build();
    }
}
