package org.elasticsoftware.akces.client;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.elasticsoftware.akces.aggregate.DomainEventType;
import org.elasticsoftware.akces.annotations.CommandInfo;
import org.elasticsoftware.akces.annotations.DomainEventInfo;
import org.elasticsoftware.akces.commands.Command;
import org.elasticsoftware.akces.control.AggregateServiceCommandType;
import org.elasticsoftware.akces.control.AggregateServiceDomainEventType;
import org.elasticsoftware.akces.control.AggregateServiceRecord;
import org.elasticsoftware.akces.control.AkcesControlRecord;
import org.elasticsoftware.akces.events.DomainEvent;
import org.elasticsoftware.akces.events.ErrorEvent;
import org.elasticsoftware.akces.gdpr.EncryptingGDPRContext;
import org.elasticsoftware.akces.gdpr.GDPRAnnotationUtils;
import org.elasticsoftware.akces.gdpr.GDPRContext;
import org.elasticsoftware.akces.gdpr.GDPRContextHolder;
import org.elasticsoftware.akces.gdpr.GDPRKeyUtils;
import org.elasticsoftware.akces.protocol.CommandRecord;
import org.elasticsoftware.akces.protocol.CommandResponseRecord;
import org.elasticsoftware.akces.protocol.DomainEventRecord;
import org.elasticsoftware.akces.protocol.PayloadEncoding;
import org.elasticsoftware.akces.protocol.ProtocolRecord;
import org.elasticsoftware.akces.schemas.KafkaSchemaRegistry;
import org.elasticsoftware.akces.schemas.SchemaException;
import org.elasticsoftware.akces.util.HostUtils;
import org.elasticsoftware.akces.util.KafkaSender;
import org.everit.json.schema.ValidationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.boot.availability.AvailabilityChangeEvent;
import org.springframework.boot.availability.LivenessState;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaAdminOperations;
import org.springframework.kafka.core.ProducerFactory;

/* loaded from: input_file:org/elasticsoftware/akces/client/AkcesClientController.class */
public class AkcesClientController extends Thread implements AutoCloseable, AkcesClient, ApplicationContextAware {
    private static final Logger logger = LoggerFactory.getLogger(AkcesClientController.class);
    private static final TopicPartition AKCES_CONTROL_PARTITION = new TopicPartition("Akces-Control", 0);
    private final ProducerFactory<String, ProtocolRecord> producerFactory;
    private final ConsumerFactory<String, AkcesControlRecord> controlRecordConsumerFactory;
    private final ConsumerFactory<String, ProtocolRecord> commandResponseConsumerFactory;
    private final KafkaAdminOperations kafkaAdmin;
    private final HashFunction hashFunction;
    private final Map<String, AggregateServiceRecord> aggregateServices;
    private final BlockingQueue<CommandRequest> commandQueue;
    private final Map<String, PendingCommandResponse> pendingCommandResponseMap;
    private final KafkaSchemaRegistry schemaRegistry;
    private final ObjectMapper objectMapper;
    private final Map<Class<? extends Command>, AggregateServiceCommandType> commandTypes;
    private final Map<String, TreeMap<Integer, DomainEventType<? extends DomainEvent>>> domainEventClasses;
    private final Map<String, Map<Integer, ParsedSchema>> commandSchemas;
    private final Map<String, Map<Integer, ParsedSchema>> domainEventSchemas;
    private final Map<Class<? extends Command>, ParsedSchema> commandSchemasLookup;
    private final ClassPathScanningCandidateComponentProvider domainEventScanner;
    private final CountDownLatch shutdownLatch;
    private Integer partitions;
    private volatile AkcesClientControllerState processState;
    private TopicPartition commandResponsePartition;
    private ApplicationContext applicationContext;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsoftware/akces/client/AkcesClientController$CommandRequest.class */
    public static final class CommandRequest extends Record {

        @Nonnull
        private final String tenantId;

        @Nullable
        private final String correlationId;

        @Nonnull
        private final String commandType;
        private final int commandVersion;

        @Nonnull
        private final Command command;

        @Nonnull
        private final CompletableFuture<List<DomainEvent>> completableFuture;
        private final boolean completeAfterValidation;

        private CommandRequest(@Nonnull String str, @Nullable String str2, @Nonnull String str3, int i, @Nonnull Command command, @Nonnull CompletableFuture<List<DomainEvent>> completableFuture, boolean z) {
            this.tenantId = str;
            this.correlationId = str2;
            this.commandType = str3;
            this.commandVersion = i;
            this.command = command;
            this.completableFuture = completableFuture;
            this.completeAfterValidation = z;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, CommandRequest.class), CommandRequest.class, "tenantId;correlationId;commandType;commandVersion;command;completableFuture;completeAfterValidation", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$CommandRequest;->tenantId:Ljava/lang/String;", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$CommandRequest;->correlationId:Ljava/lang/String;", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$CommandRequest;->commandType:Ljava/lang/String;", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$CommandRequest;->commandVersion:I", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$CommandRequest;->command:Lorg/elasticsoftware/akces/commands/Command;", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$CommandRequest;->completableFuture:Ljava/util/concurrent/CompletableFuture;", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$CommandRequest;->completeAfterValidation:Z").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, CommandRequest.class), CommandRequest.class, "tenantId;correlationId;commandType;commandVersion;command;completableFuture;completeAfterValidation", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$CommandRequest;->tenantId:Ljava/lang/String;", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$CommandRequest;->correlationId:Ljava/lang/String;", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$CommandRequest;->commandType:Ljava/lang/String;", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$CommandRequest;->commandVersion:I", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$CommandRequest;->command:Lorg/elasticsoftware/akces/commands/Command;", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$CommandRequest;->completableFuture:Ljava/util/concurrent/CompletableFuture;", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$CommandRequest;->completeAfterValidation:Z").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, CommandRequest.class, Object.class), CommandRequest.class, "tenantId;correlationId;commandType;commandVersion;command;completableFuture;completeAfterValidation", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$CommandRequest;->tenantId:Ljava/lang/String;", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$CommandRequest;->correlationId:Ljava/lang/String;", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$CommandRequest;->commandType:Ljava/lang/String;", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$CommandRequest;->commandVersion:I", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$CommandRequest;->command:Lorg/elasticsoftware/akces/commands/Command;", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$CommandRequest;->completableFuture:Ljava/util/concurrent/CompletableFuture;", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$CommandRequest;->completeAfterValidation:Z").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        @Nonnull
        public String tenantId() {
            return this.tenantId;
        }

        @Nullable
        public String correlationId() {
            return this.correlationId;
        }

        @Nonnull
        public String commandType() {
            return this.commandType;
        }

        public int commandVersion() {
            return this.commandVersion;
        }

        @Nonnull
        public Command command() {
            return this.command;
        }

        @Nonnull
        public CompletableFuture<List<DomainEvent>> completableFuture() {
            return this.completableFuture;
        }

        public boolean completeAfterValidation() {
            return this.completeAfterValidation;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsoftware/akces/client/AkcesClientController$PendingCommandResponse.class */
    public static final class PendingCommandResponse extends Record {

        @Nonnull
        private final CommandRecord commandRecord;

        @Nonnull
        private final CompletableFuture<List<DomainEvent>> completableFuture;

        private PendingCommandResponse(@Nonnull CommandRecord commandRecord, @Nonnull CompletableFuture<List<DomainEvent>> completableFuture) {
            this.commandRecord = commandRecord;
            this.completableFuture = completableFuture;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, PendingCommandResponse.class), PendingCommandResponse.class, "commandRecord;completableFuture", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$PendingCommandResponse;->commandRecord:Lorg/elasticsoftware/akces/protocol/CommandRecord;", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$PendingCommandResponse;->completableFuture:Ljava/util/concurrent/CompletableFuture;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, PendingCommandResponse.class), PendingCommandResponse.class, "commandRecord;completableFuture", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$PendingCommandResponse;->commandRecord:Lorg/elasticsoftware/akces/protocol/CommandRecord;", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$PendingCommandResponse;->completableFuture:Ljava/util/concurrent/CompletableFuture;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, PendingCommandResponse.class, Object.class), PendingCommandResponse.class, "commandRecord;completableFuture", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$PendingCommandResponse;->commandRecord:Lorg/elasticsoftware/akces/protocol/CommandRecord;", "FIELD:Lorg/elasticsoftware/akces/client/AkcesClientController$PendingCommandResponse;->completableFuture:Ljava/util/concurrent/CompletableFuture;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        @Nonnull
        public CommandRecord commandRecord() {
            return this.commandRecord;
        }

        @Nonnull
        public CompletableFuture<List<DomainEvent>> completableFuture() {
            return this.completableFuture;
        }
    }

    public AkcesClientController(ProducerFactory<String, ProtocolRecord> producerFactory, ConsumerFactory<String, AkcesControlRecord> consumerFactory, ConsumerFactory<String, ProtocolRecord> consumerFactory2, KafkaAdminOperations kafkaAdminOperations, KafkaSchemaRegistry kafkaSchemaRegistry, ObjectMapper objectMapper, ClassPathScanningCandidateComponentProvider classPathScanningCandidateComponentProvider, String str) {
        super("AkcesClientController");
        this.hashFunction = Hashing.murmur3_32_fixed();
        this.aggregateServices = new ConcurrentHashMap();
        this.commandQueue = new LinkedBlockingQueue();
        this.pendingCommandResponseMap = new HashMap();
        this.commandTypes = new ConcurrentHashMap();
        this.domainEventClasses = new ConcurrentHashMap();
        this.commandSchemas = new ConcurrentHashMap();
        this.domainEventSchemas = new ConcurrentHashMap();
        this.commandSchemasLookup = new ConcurrentHashMap();
        this.shutdownLatch = new CountDownLatch(1);
        this.partitions = null;
        this.processState = AkcesClientControllerState.INITIALIZING;
        this.producerFactory = producerFactory;
        this.controlRecordConsumerFactory = consumerFactory;
        this.commandResponseConsumerFactory = consumerFactory2;
        this.kafkaAdmin = kafkaAdminOperations;
        this.schemaRegistry = kafkaSchemaRegistry;
        this.objectMapper = objectMapper;
        this.domainEventScanner = classPathScanningCandidateComponentProvider;
        loadSupportedDomainEvents(str);
        loadSupportedDomainEvents("org.elasticsoftware.akces.errors");
    }

    @VisibleForTesting
    public TopicPartition getCommandResponsePartition() {
        return this.commandResponsePartition;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        Consumer<String, AkcesControlRecord> createConsumer = this.controlRecordConsumerFactory.createConsumer(HostUtils.getHostName() + "-AkcesClientController-Control", HostUtils.getHostName() + "-AkcesClientController-Control", (String) null);
        try {
            Consumer<String, ProtocolRecord> createConsumer2 = this.commandResponseConsumerFactory.createConsumer(HostUtils.getHostName() + "-AkcesClientController-CommandResponses", HostUtils.getHostName() + "-AkcesClientController-CommandResponses", (String) null);
            try {
                Producer<String, ProtocolRecord> createProducer = this.producerFactory.createProducer(HostUtils.getHostName() + "-AkcesClientController");
                try {
                    this.partitions = Integer.valueOf(((TopicDescription) this.kafkaAdmin.describeTopics(new String[]{"Akces-Control"}).get("Akces-Control")).partitions().size());
                    createConsumer.assign(Collections.singletonList(AKCES_CONTROL_PARTITION));
                    createConsumer.seekToBeginning(Collections.singletonList(AKCES_CONTROL_PARTITION));
                    this.commandResponsePartition = new TopicPartition("Akces-CommandResponses", resolveCommandResponsePartition(HostUtils.getHostName(), ((TopicDescription) this.kafkaAdmin.describeTopics(new String[]{"Akces-CommandResponses"}).get("Akces-CommandResponses")).partitions().size()).intValue());
                    createConsumer2.assign(Collections.singletonList(this.commandResponsePartition));
                    createConsumer2.seekToEnd(Collections.singletonList(this.commandResponsePartition));
                    while (this.processState != AkcesClientControllerState.SHUTTING_DOWN) {
                        process(createConsumer, createConsumer2, createProducer);
                    }
                    ArrayList<CommandRequest> arrayList = new ArrayList();
                    this.commandQueue.drainTo(arrayList);
                    for (CommandRequest commandRequest : arrayList) {
                        commandRequest.completableFuture().completeExceptionally(new CommandRefusedException(commandRequest.command().getClass(), AkcesClientControllerState.SHUTTING_DOWN));
                    }
                    this.applicationContext.publishEvent(new AvailabilityChangeEvent(this, LivenessState.BROKEN));
                    this.shutdownLatch.countDown();
                    if (createProducer != null) {
                        createProducer.close();
                    }
                    if (createConsumer2 != null) {
                        createConsumer2.close();
                    }
                    if (createConsumer != null) {
                        createConsumer.close();
                    }
                } catch (Throwable th) {
                    if (createProducer != null) {
                        try {
                            createProducer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (createConsumer2 != null) {
                    try {
                        createConsumer2.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } catch (Throwable th5) {
            if (createConsumer != null) {
                try {
                    createConsumer.close();
                } catch (Throwable th6) {
                    th5.addSuppressed(th6);
                }
            }
            throw th5;
        }
    }

    private void loadSupportedDomainEvents(String str) {
        Iterator it = this.domainEventScanner.findCandidateComponents(str).iterator();
        while (it.hasNext()) {
            try {
                Class<?> cls = Class.forName(((BeanDefinition) it.next()).getBeanClassName());
                DomainEventInfo annotation = cls.getAnnotation(DomainEventInfo.class);
                this.domainEventClasses.computeIfAbsent(annotation.type(), str2 -> {
                    return new TreeMap();
                }).put(Integer.valueOf(annotation.version()), new DomainEventType<>(annotation.type(), annotation.version(), cls, false, true, ErrorEvent.class.isAssignableFrom(cls), GDPRAnnotationUtils.hasPIIDataAnnotation(cls).booleanValue()));
            } catch (ClassCastException e) {
            } catch (ClassNotFoundException e2) {
            }
        }
    }

    private void process(Consumer<String, AkcesControlRecord> consumer, Consumer<String, ProtocolRecord> consumer2, Producer<String, ProtocolRecord> producer) {
        if (this.processState == AkcesClientControllerState.RUNNING) {
            try {
                processControlRecords(consumer.poll(Duration.ofMillis(10L)));
                processCommands(producer);
                processCommandResponses(consumer2.poll(Duration.ofMillis(10L)));
                return;
            } catch (WakeupException | InterruptException e) {
                return;
            } catch (KafkaException e2) {
                logger.error("Unrecoverable exception in AkcesController", e2);
                this.processState = AkcesClientControllerState.SHUTTING_DOWN;
                return;
            } catch (IOException | RestClientException e3) {
                logger.error("Exception while loading Command (JSON)Schemas from SchemaRegistry", e3);
                this.processState = AkcesClientControllerState.SHUTTING_DOWN;
                return;
            }
        }
        if (this.processState == AkcesClientControllerState.INITIALIZING) {
            try {
                Map endOffsets = consumer.endOffsets(Collections.singletonList(AKCES_CONTROL_PARTITION));
                ConsumerRecords<String, AkcesControlRecord> poll = consumer.poll(Duration.ofMillis(10L));
                processControlRecords(poll);
                if (poll.isEmpty() && ((Long) endOffsets.getOrDefault(AKCES_CONTROL_PARTITION, 0L)).longValue() <= consumer.position(AKCES_CONTROL_PARTITION)) {
                    this.processState = AkcesClientControllerState.RUNNING;
                }
            } catch (IOException | RestClientException e4) {
                logger.error("Exception while loading Command (JSON)Schemas from SchemaRegistry", e4);
                this.processState = AkcesClientControllerState.SHUTTING_DOWN;
            } catch (WakeupException | InterruptException e5) {
            } catch (KafkaException e6) {
                logger.error("Unrecoverable exception in AkcesController", e6);
                this.processState = AkcesClientControllerState.SHUTTING_DOWN;
            }
        }
    }

    private void processControlRecords(ConsumerRecords<String, AkcesControlRecord> consumerRecords) throws RestClientException, IOException {
        if (consumerRecords.isEmpty()) {
            return;
        }
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            AggregateServiceRecord aggregateServiceRecord = (AkcesControlRecord) consumerRecord.value();
            if (aggregateServiceRecord instanceof AggregateServiceRecord) {
                AggregateServiceRecord aggregateServiceRecord2 = aggregateServiceRecord;
                if (!this.aggregateServices.containsKey(consumerRecord.key())) {
                    logger.info("Discovered service: {}", aggregateServiceRecord2.aggregateName());
                }
                this.aggregateServices.put((String) consumerRecord.key(), aggregateServiceRecord2);
            } else {
                logger.warn("Received unknown AkcesControlRecord type: {}", aggregateServiceRecord.getClass().getSimpleName());
            }
        }
    }

    private void processCommands(Producer<String, ProtocolRecord> producer) {
        HashMap hashMap = new HashMap();
        while (!this.commandQueue.isEmpty()) {
            CommandRequest peek = this.commandQueue.peek();
            if (peek != null) {
                try {
                    registerCommand(peek.commandType(), peek.commandVersion(), peek.command().getClass());
                } catch (SchemaException e) {
                    return;
                } catch (UnroutableCommandException e2) {
                }
            }
            CommandRequest poll = this.commandQueue.poll();
            try {
                registerCommand(poll.commandType(), poll.commandVersion(), poll.command().getClass());
                String resolveTopic = resolveTopic(poll.commandType(), poll.commandVersion(), poll.command());
                CommandRecord commandRecord = new CommandRecord(poll.tenantId(), poll.commandType(), poll.commandVersion(), serialize(poll.command()), PayloadEncoding.JSON, poll.command().getAggregateId(), poll.correlationId() != null ? poll.correlationId() : UUID.randomUUID().toString(), this.commandResponsePartition.toString());
                hashMap.put(new ProducerRecord(resolveTopic, resolvePartition(commandRecord.aggregateId()), commandRecord.aggregateId(), commandRecord), poll);
                if (poll.completeAfterValidation()) {
                    poll.completableFuture().complete(Collections.emptyList());
                }
            } catch (AkcesClientCommandException | SchemaException e3) {
                poll.completableFuture().completeExceptionally(e3);
            }
        }
        if (hashMap.isEmpty()) {
            return;
        }
        producer.beginTransaction();
        for (Map.Entry entry : hashMap.entrySet()) {
            CompletableFuture<List<DomainEvent>> completableFuture = ((CommandRequest) entry.getValue()).completableFuture();
            CommandRecord commandRecord2 = (CommandRecord) ((ProducerRecord) entry.getKey()).value();
            Class cls = ((CommandRequest) entry.getValue()).command().getClass();
            if (completableFuture.isDone()) {
                KafkaSender.send(producer, (ProducerRecord) entry.getKey());
            } else {
                KafkaSender.send(producer, (ProducerRecord) entry.getKey(), (recordMetadata, exc) -> {
                    if (exc != null) {
                        completableFuture.completeExceptionally(new CommandSendingFailedException(cls, exc));
                    } else {
                        this.pendingCommandResponseMap.put(commandRecord2.id(), new PendingCommandResponse(commandRecord2, completableFuture));
                    }
                });
            }
        }
        producer.commitTransaction();
    }

    private void processCommandResponses(ConsumerRecords<String, ProtocolRecord> consumerRecords) {
        if (consumerRecords.isEmpty()) {
            return;
        }
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            CommandResponseRecord commandResponseRecord = (ProtocolRecord) ((ConsumerRecord) it.next()).value();
            if (commandResponseRecord instanceof CommandResponseRecord) {
                CommandResponseRecord commandResponseRecord2 = commandResponseRecord;
                PendingCommandResponse remove = this.pendingCommandResponseMap.remove(commandResponseRecord2.commandId());
                if (remove == null) {
                    logger.trace("Received CommandResponseRecord for unknown commandId: {}", commandResponseRecord2.commandId());
                } else {
                    try {
                        ArrayList arrayList = new ArrayList();
                        Iterator it2 = commandResponseRecord2.events().iterator();
                        while (it2.hasNext()) {
                            arrayList.add(deserialize((DomainEventRecord) it2.next(), commandResponseRecord2.encryptionKey()));
                        }
                        remove.completableFuture().complete(arrayList);
                    } catch (IOException e) {
                        remove.completableFuture().completeExceptionally(e);
                    }
                }
            } else {
                logger.warn("Received unknown ProtocolRecord type: {}", commandResponseRecord.getClass().getSimpleName());
            }
        }
    }

    @Override // org.elasticsoftware.akces.client.AkcesClient
    public CompletionStage<List<DomainEvent>> send(@Nonnull String str, @Nullable String str2, @Nonnull Command command) {
        checkRunning(command);
        CommandInfo annotation = command.getClass().getAnnotation(CommandInfo.class);
        if (annotation == null) {
            throw new IllegalArgumentException("Command class " + command.getClass().getName() + " is not annotated with @CommandInfo");
        }
        CompletableFuture completableFuture = new CompletableFuture();
        this.commandQueue.add(new CommandRequest(str, str2, annotation.type(), annotation.version(), command, completableFuture, false));
        return completableFuture;
    }

    @Override // org.elasticsoftware.akces.client.AkcesClient
    public void sendAndForget(@Nonnull String str, @Nullable String str2, @Nonnull Command command) {
        checkRunning(command);
        CommandInfo annotation = command.getClass().getAnnotation(CommandInfo.class);
        if (annotation == null) {
            throw new IllegalArgumentException("Command class " + command.getClass().getName() + " is not annotated with @CommandInfo");
        }
        CompletableFuture completableFuture = new CompletableFuture();
        this.commandQueue.add(new CommandRequest(str, str2, annotation.type(), annotation.version(), command, completableFuture, true));
        try {
            completableFuture.get();
        } catch (InterruptedException | CancellationException e) {
        } catch (ExecutionException e2) {
            if (!(e2.getCause() instanceof RuntimeException)) {
                throw new RuntimeException(e2.getCause());
            }
            throw ((RuntimeException) e2.getCause());
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.processState = AkcesClientControllerState.SHUTTING_DOWN;
        try {
            if (this.shutdownLatch.await(10L, TimeUnit.SECONDS)) {
                logger.info("AkcesClientController has been shutdown");
            } else {
                logger.warn("AkcesClientController did not shutdown within 10 seconds");
            }
        } catch (InterruptedException e) {
        }
    }

    private AggregateServiceCommandType resolveCommandType(String str, int i) {
        return (AggregateServiceCommandType) this.aggregateServices.values().stream().filter(aggregateServiceRecord -> {
            return supportsCommand(aggregateServiceRecord.supportedCommands(), str, i);
        }).findFirst().flatMap(aggregateServiceRecord2 -> {
            return aggregateServiceRecord2.supportedCommands().stream().filter(aggregateServiceCommandType -> {
                return aggregateServiceCommandType.typeName().equals(str) && aggregateServiceCommandType.version() == i;
            }).findFirst();
        }).orElse(null);
    }

    private AggregateServiceRecord resolveAggregateService(AggregateServiceCommandType aggregateServiceCommandType) {
        return this.aggregateServices.values().stream().filter(aggregateServiceRecord -> {
            return supportsCommand(aggregateServiceRecord.supportedCommands(), aggregateServiceCommandType.typeName(), aggregateServiceCommandType.version());
        }).findFirst().orElse(null);
    }

    private String resolveTopic(String str, int i, Command command) {
        List<AggregateServiceRecord> list = this.aggregateServices.values().stream().filter(aggregateServiceRecord -> {
            return supportsCommand(aggregateServiceRecord.supportedCommands(), str, i);
        }).toList();
        if (list.size() == 1) {
            return ((AggregateServiceRecord) list.getFirst()).commandTopic();
        }
        if (list.isEmpty()) {
            throw new UnroutableCommandException(command.getClass());
        }
        throw new UnroutableCommandException(command.getClass());
    }

    private boolean supportsCommand(List<AggregateServiceCommandType> list, String str, int i) {
        for (AggregateServiceCommandType aggregateServiceCommandType : list) {
            if (aggregateServiceCommandType.typeName().equals(str) && aggregateServiceCommandType.version() == i) {
                return true;
            }
        }
        return false;
    }

    @VisibleForTesting
    public Integer resolvePartition(@Nonnull String str) {
        return Integer.valueOf(Math.abs(this.hashFunction.hashString(str, StandardCharsets.UTF_8).asInt()) % this.partitions.intValue());
    }

    private Integer resolveCommandResponsePartition(String str, int i) {
        return Integer.valueOf(Math.abs(this.hashFunction.hashString(str, StandardCharsets.UTF_8).asInt()) % i);
    }

    private void registerCommand(String str, int i, Class<? extends Command> cls) {
        if (this.commandTypes.containsKey(cls)) {
            return;
        }
        AggregateServiceCommandType resolveCommandType = resolveCommandType(str, i);
        if (resolveCommandType == null) {
            throw new UnroutableCommandException(cls);
        }
        ParsedSchema validate = this.schemaRegistry.validate(resolveCommandType.toLocalCommandType(cls));
        this.commandSchemas.computeIfAbsent(resolveCommandType.typeName(), str2 -> {
            return new ConcurrentHashMap();
        }).put(Integer.valueOf(resolveCommandType.version()), validate);
        logger.trace("Stored schema: {} v{}", resolveCommandType.schemaName(), Integer.valueOf(resolveCommandType.version()));
        this.commandSchemasLookup.put(cls, validate);
        this.commandTypes.put(cls, resolveCommandType);
        Iterator it = resolveAggregateService(resolveCommandType).producedEvents().iterator();
        while (it.hasNext()) {
            processDomainEvent(cls, (AggregateServiceDomainEventType) it.next());
        }
    }

    private void processDomainEvent(Class<? extends Command> cls, AggregateServiceDomainEventType aggregateServiceDomainEventType) throws SchemaException {
        DomainEventType<? extends DomainEvent> value = this.domainEventClasses.get(aggregateServiceDomainEventType.typeName()).floorEntry(Integer.valueOf(aggregateServiceDomainEventType.version())).getValue();
        if (value == null) {
            throw new MissingDomainEventException(cls, aggregateServiceDomainEventType.typeName(), aggregateServiceDomainEventType.version());
        }
        this.domainEventSchemas.computeIfAbsent(value.typeName(), str -> {
            return new ConcurrentHashMap();
        }).put(Integer.valueOf(value.version()), this.schemaRegistry.validate(value));
        logger.trace("Stored schema for: {} v{}", value.getSchemaName(), Integer.valueOf(value.version()));
    }

    private byte[] serialize(Command command) {
        try {
            JsonSchema jsonSchema = (ParsedSchema) this.commandSchemasLookup.get(command.getClass());
            if (!(jsonSchema instanceof JsonSchema)) {
                return this.objectMapper.writeValueAsBytes(command);
            }
            JsonSchema jsonSchema2 = jsonSchema;
            JsonNode valueToTree = this.objectMapper.valueToTree(command);
            jsonSchema2.validate(valueToTree);
            return this.objectMapper.writeValueAsBytes(valueToTree);
        } catch (ValidationException e) {
            throw new CommandValidationException(command.getClass(), e);
        } catch (IOException e2) {
            throw new CommandSerializationException(command.getClass(), e2);
        }
    }

    private DomainEvent deserialize(DomainEventRecord domainEventRecord, @Nullable byte[] bArr) throws IOException {
        GDPRContext encryptingGDPRContext;
        if (bArr != null) {
            try {
                encryptingGDPRContext = new EncryptingGDPRContext(domainEventRecord.aggregateId(), bArr, GDPRKeyUtils.isUUID(domainEventRecord.aggregateId()));
            } catch (Throwable th) {
                GDPRContextHolder.resetCurrentGDPRContext();
                throw th;
            }
        } else {
            encryptingGDPRContext = null;
        }
        GDPRContextHolder.setCurrentGDPRContext(encryptingGDPRContext);
        DomainEvent domainEvent = (DomainEvent) this.objectMapper.readValue(domainEventRecord.payload(), this.domainEventClasses.get(domainEventRecord.name()).floorEntry(Integer.valueOf(domainEventRecord.version())).getValue().typeClass());
        GDPRContextHolder.resetCurrentGDPRContext();
        return domainEvent;
    }

    private void checkRunning(Command command) {
        if (this.processState == AkcesClientControllerState.SHUTTING_DOWN) {
            throw new CommandRefusedException(command.getClass(), this.processState);
        }
    }

    public boolean isRunning() {
        return this.processState == AkcesClientControllerState.RUNNING;
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}
