package io.kroxylicious.kubernetes.operator;

import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.Secret;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil;
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper;
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
import io.kroxylicious.kubernetes.api.common.AnyLocalRefBuilder;
import io.kroxylicious.kubernetes.api.common.Condition;
import io.kroxylicious.kubernetes.api.common.LocalRef;
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProtocolFilter;
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxy;
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxyIngress;
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxyIngressSpec;
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService;
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaClusterSpec;
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaClusterStatus;
import io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxyingressspec.ClusterIP;
import io.kroxylicious.kubernetes.api.v1alpha1.virtualkafkaclusterstatus.Ingresses;
import io.kroxylicious.kubernetes.api.v1alpha1.virtualkafkaclusterstatus.IngressesBuilder;
import io.kroxylicious.kubernetes.operator.Annotations;
import io.kroxylicious.kubernetes.operator.checksum.Crc32ChecksumGenerator;
import io.kroxylicious.kubernetes.operator.checksum.MetadataChecksumGenerator;
import io.kroxylicious.kubernetes.operator.model.networking.TcpClusterIPClusterIngressNetworkingModel;
import io.kroxylicious.kubernetes.operator.resolver.ClusterResolutionResult;
import io.kroxylicious.kubernetes.operator.resolver.DependencyResolver;
import java.time.Clock;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kroxylicious/kubernetes/operator/VirtualKafkaClusterReconciler.class */
public final class VirtualKafkaClusterReconciler implements Reconciler<VirtualKafkaCluster> {
    static final String PROXY_EVENT_SOURCE_NAME = "proxy";
    static final String PROXY_CONFIG_STATE_SOURCE_NAME = "proxy-config-state";
    static final String SERVICES_EVENT_SOURCE_NAME = "services";
    static final String INGRESSES_EVENT_SOURCE_NAME = "ingresses";
    static final String FILTERS_EVENT_SOURCE_NAME = "filters";
    static final String SECRETS_EVENT_SOURCE_NAME = "secrets";
    static final String CONFIGMAPS_EVENT_SOURCE_NAME = "configmaps";
    static final String KUBERNETES_SERVICES_EVENT_SOURCE_NAME = "kubernetesServices";
    private final VirtualKafkaClusterStatusFactory statusFactory;
    private final DependencyResolver resolver;
    private static final Logger LOGGER = LoggerFactory.getLogger(VirtualKafkaClusterReconciler.class);
    private static final String KAFKA_PROXY_INGRESS_KIND = HasMetadata.getKind(KafkaProxyIngress.class);
    private static final String KAFKA_PROXY_KIND = HasMetadata.getKind(KafkaProxy.class);
    private static final String VIRTUAL_KAFKA_CLUSTER_KIND = HasMetadata.getKind(VirtualKafkaCluster.class);
    private static final String KAFKA_SERVICE_KIND = HasMetadata.getKind(KafkaService.class);
    private static final String KAFKA_PROTOCOL_FILTER_KIND = HasMetadata.getKind(KafkaProtocolFilter.class);

    public VirtualKafkaClusterReconciler(Clock clock, DependencyResolver dependencyResolver) {
        this.statusFactory = new VirtualKafkaClusterStatusFactory(clock);
        this.resolver = dependencyResolver;
    }

    public UpdateControl<VirtualKafkaCluster> reconcile(VirtualKafkaCluster virtualKafkaCluster, Context<VirtualKafkaCluster> context) {
        UpdateControl<VirtualKafkaCluster> patchStatus;
        ClusterResolutionResult resolveClusterRefs = this.resolver.resolveClusterRefs(virtualKafkaCluster, context);
        if (resolveClusterRefs.allReferentsFullyResolved()) {
            VirtualKafkaCluster checkClusterIngressTlsSettings = checkClusterIngressTlsSettings(virtualKafkaCluster, context);
            if (checkClusterIngressTlsSettings == null) {
                VirtualKafkaCluster maybeCombineStatusWithClusterConfigMap = maybeCombineStatusWithClusterConfigMap(virtualKafkaCluster, context);
                MetadataChecksumGenerator metadataChecksumGenerator = (MetadataChecksumGenerator) context.managedWorkflowAndDependentResourceContext().get(MetadataChecksumGenerator.CHECKSUM_CONTEXT_KEY, MetadataChecksumGenerator.class).orElse(new Crc32ChecksumGenerator());
                Stream<HasMetadata> allResolvedReferents = resolveClusterRefs.allResolvedReferents();
                Objects.requireNonNull(metadataChecksumGenerator);
                allResolvedReferents.forEach(metadataChecksumGenerator::appendMetadata);
                appendSecretsFromCertificateRefs(context, virtualKafkaCluster, metadataChecksumGenerator);
                Annotations.annotateWithReferentChecksum((HasMetadata) maybeCombineStatusWithClusterConfigMap, metadataChecksumGenerator.encode());
                patchStatus = UpdateControl.patchResourceAndStatus(maybeCombineStatusWithClusterConfigMap);
            } else {
                patchStatus = UpdateControl.patchStatus(checkClusterIngressTlsSettings);
            }
        } else {
            patchStatus = UpdateControl.patchStatus(handleResolutionProblems(virtualKafkaCluster, resolveClusterRefs));
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Completed reconciliation of {}/{}", ResourcesUtil.namespace(virtualKafkaCluster), ResourcesUtil.name(virtualKafkaCluster));
        }
        return patchStatus;
    }

    private static void appendSecretsFromCertificateRefs(Context<VirtualKafkaCluster> context, VirtualKafkaCluster virtualKafkaCluster, @NonNull MetadataChecksumGenerator metadataChecksumGenerator) {
        LOGGER.debug("Including secrets from ingress TLS in checksum");
        Stream flatMap = ((VirtualKafkaClusterSpec) virtualKafkaCluster.getSpec()).getIngresses().stream().map((v0) -> {
            return v0.getTls();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).map((v0) -> {
            return v0.getCertificateRef();
        }).flatMap(certificateRef -> {
            return context.getSecondaryResourcesAsStream(Secret.class).filter(secret -> {
                return KubernetesResourceUtil.getName(secret).equals(certificateRef.getName());
            });
        });
        Objects.requireNonNull(metadataChecksumGenerator);
        flatMap.forEach((v1) -> {
            r1.appendMetadata(v1);
        });
        LOGGER.debug("Including TrustAnchors from ingress TLS in checksum");
        Stream flatMap2 = ((VirtualKafkaClusterSpec) virtualKafkaCluster.getSpec()).getIngresses().stream().map((v0) -> {
            return v0.getTls();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).map((v0) -> {
            return v0.getTrustAnchorRef();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).flatMap(trustAnchorRef -> {
            return context.getSecondaryResourcesAsStream(ConfigMap.class).filter(configMap -> {
                return KubernetesResourceUtil.getName(configMap).equals(trustAnchorRef.getRef().getName());
            });
        });
        Objects.requireNonNull(metadataChecksumGenerator);
        flatMap2.forEach((v1) -> {
            r1.appendMetadata(v1);
        });
    }

    @Nullable
    private VirtualKafkaCluster checkClusterIngressTlsSettings(VirtualKafkaCluster virtualKafkaCluster, Context<VirtualKafkaCluster> context) {
        Optional findFirst = ((VirtualKafkaClusterSpec) virtualKafkaCluster.getSpec()).getIngresses().stream().map(ingresses -> {
            return checkTlsConfigConsistency(context, virtualKafkaCluster, ingresses);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).findFirst();
        if (findFirst.isPresent()) {
            return (VirtualKafkaCluster) findFirst.get();
        }
        VirtualKafkaCluster checkIngressCertificateRefs = checkIngressCertificateRefs(virtualKafkaCluster, context);
        if (checkIngressCertificateRefs == null) {
            checkIngressCertificateRefs = checkIngressTrustAnchorRefs(virtualKafkaCluster, context);
        }
        return checkIngressCertificateRefs;
    }

    @Nullable
    private VirtualKafkaCluster checkIngressCertificateRefs(VirtualKafkaCluster virtualKafkaCluster, Context<VirtualKafkaCluster> context) {
        List list = ((VirtualKafkaClusterSpec) virtualKafkaCluster.getSpec()).getIngresses().stream().flatMap(ingresses -> {
            return Optional.ofNullable(ingresses.getTls()).stream();
        }).map((v0) -> {
            return v0.getCertificateRef();
        }).toList();
        if (list.isEmpty()) {
            return null;
        }
        Optional findFirst = list.stream().map(certificateRef -> {
            return ResourcesUtil.checkCertRef(virtualKafkaCluster, certificateRef, "spec.ingresses[].tls.certificateRef", this.statusFactory, context, "secrets").resource();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).findFirst();
        if (findFirst.isPresent()) {
            return (VirtualKafkaCluster) findFirst.get();
        }
        return null;
    }

    @Nullable
    private VirtualKafkaCluster checkIngressTrustAnchorRefs(VirtualKafkaCluster virtualKafkaCluster, Context<VirtualKafkaCluster> context) {
        List list = ((VirtualKafkaClusterSpec) virtualKafkaCluster.getSpec()).getIngresses().stream().flatMap(ingresses -> {
            return Optional.ofNullable(ingresses.getTls()).stream();
        }).map((v0) -> {
            return v0.getTrustAnchorRef();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).toList();
        if (list.isEmpty()) {
            return null;
        }
        Optional findFirst = list.stream().map(trustAnchorRef -> {
            return ResourcesUtil.checkTrustAnchorRef(virtualKafkaCluster, context, "configmaps", trustAnchorRef, "spec.ingresses[].tls.trustAnchor", this.statusFactory).resource();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).findFirst();
        if (findFirst.isPresent()) {
            return (VirtualKafkaCluster) findFirst.get();
        }
        return null;
    }

    private VirtualKafkaCluster maybeCombineStatusWithClusterConfigMap(VirtualKafkaCluster virtualKafkaCluster, Context<VirtualKafkaCluster> context) {
        List<Ingresses> buildIngressStatus = buildIngressStatus(virtualKafkaCluster, context);
        ResourceState of = ResourceState.of(this.statusFactory.newTrueCondition(virtualKafkaCluster, Condition.Type.ResolvedRefs));
        return (VirtualKafkaCluster) context.getSecondaryResource(ConfigMap.class, PROXY_CONFIG_STATE_SOURCE_NAME).flatMap(configMap -> {
            return Optional.ofNullable(configMap.getData());
        }).map(ProxyConfigStateData::new).flatMap(proxyConfigStateData -> {
            return proxyConfigStateData.getStatusPatchForCluster(ResourcesUtil.name(virtualKafkaCluster));
        }).map(virtualKafkaCluster2 -> {
            return this.statusFactory.clusterStatusPatch(virtualKafkaCluster, of.replacementFor(ResourceState.fromList(((VirtualKafkaClusterStatus) virtualKafkaCluster2.getStatus()).getConditions())), buildIngressStatus);
        }).orElse(this.statusFactory.clusterStatusPatch(virtualKafkaCluster, of, buildIngressStatus));
    }

    private List<Ingresses> buildIngressStatus(VirtualKafkaCluster virtualKafkaCluster, Context<VirtualKafkaCluster> context) {
        List list = context.getSecondaryResources(Service.class).stream().flatMap(service -> {
            return Annotations.readBootstrapServersFrom(service).stream();
        }).toList();
        return ((VirtualKafkaClusterSpec) virtualKafkaCluster.getSpec()).getIngresses().stream().flatMap(ingresses -> {
            Optional findFirst = list.stream().filter(clusterIngressBootstrapServers -> {
                return clusterIngressBootstrapServers.ingressName().equals(ingresses.getIngressRef().getName()) && clusterIngressBootstrapServers.clusterName().equals(ResourcesUtil.name(virtualKafkaCluster));
            }).findFirst();
            if (findFirst.isEmpty()) {
                return Stream.empty();
            }
            return Stream.of(new IngressesBuilder().withName(ingresses.getIngressRef().getName()).withBootstrapServer(((Annotations.ClusterIngressBootstrapServers) findFirst.get()).bootstrapServers()).withProtocol(Optional.ofNullable(ingresses.getTls()).map((v0) -> {
                return v0.getCertificateRef();
            }).isEmpty() ? Ingresses.Protocol.TCP : Ingresses.Protocol.TLS).build());
        }).toList();
    }

    private VirtualKafkaCluster handleResolutionProblems(VirtualKafkaCluster virtualKafkaCluster, ClusterResolutionResult clusterResolutionResult) {
        LocalRef localRef = ResourcesUtil.toLocalRef(virtualKafkaCluster);
        Set set = (Set) clusterResolutionResult.allDanglingReferences().filter(ClusterResolutionResult.DanglingReference.hasReferrerKind(KAFKA_PROXY_INGRESS_KIND).and(ClusterResolutionResult.DanglingReference.hasReferentKind(KAFKA_PROXY_KIND))).collect(Collectors.toSet());
        if (clusterResolutionResult.allDanglingReferences().anyMatch(ClusterResolutionResult.DanglingReference.hasReferrer(localRef))) {
            return this.statusFactory.newFalseConditionStatusPatch(virtualKafkaCluster, Condition.Type.ResolvedRefs, "ReferencedResourcesNotFound", joiningMessages(refsMessage("spec.proxyRef references ", virtualKafkaCluster, clusterResolutionResult.allDanglingReferences().filter(ClusterResolutionResult.DanglingReference.hasReferrer(localRef).and(ClusterResolutionResult.DanglingReference.hasReferentKind(KAFKA_PROXY_KIND))).map((v0) -> {
                return v0.absentRef();
            })), refsMessage("spec.targetKafkaServiceRef references ", virtualKafkaCluster, clusterResolutionResult.allDanglingReferences().filter(ClusterResolutionResult.DanglingReference.hasReferrer(localRef).and(ClusterResolutionResult.DanglingReference.hasReferentKind(KAFKA_SERVICE_KIND))).map((v0) -> {
                return v0.absentRef();
            })), refsMessage("spec.ingresses[].ingressRef references ", virtualKafkaCluster, clusterResolutionResult.allDanglingReferences().filter(ClusterResolutionResult.DanglingReference.hasReferrer(localRef).and(ClusterResolutionResult.DanglingReference.hasReferentKind(KAFKA_PROXY_INGRESS_KIND))).map((v0) -> {
                return v0.absentRef();
            })), refsMessage("spec.filterRefs references ", virtualKafkaCluster, clusterResolutionResult.allDanglingReferences().filter(ClusterResolutionResult.DanglingReference.hasReferrer(localRef).and(ClusterResolutionResult.DanglingReference.hasReferentKind(KAFKA_PROTOCOL_FILTER_KIND))).map((v0) -> {
                return v0.absentRef();
            }))));
        }
        if (!clusterResolutionResult.allResolvedReferents().anyMatch(ResourcesUtil::hasFreshResolvedRefsFalseCondition) && set.isEmpty()) {
            return this.statusFactory.newFalseConditionStatusPatch(virtualKafkaCluster, Condition.Type.ResolvedRefs, ProxyConfigDependentResource.REASON_INVALID, joiningMessages(Stream.of("unknown dependency resolution issue")));
        }
        return this.statusFactory.newFalseConditionStatusPatch(virtualKafkaCluster, Condition.Type.ResolvedRefs, "TransitivelyReferencedResourcesNotFound", joiningMessages(refsMessage("spec.targetKafkaServiceRef references ", virtualKafkaCluster, clusterResolutionResult.allResolvedReferents().filter(ResourcesUtil.hasFreshResolvedRefsFalseCondition().and(ResourcesUtil.hasKind(KAFKA_SERVICE_KIND))).map(ResourcesUtil::toLocalRef)), refsMessage("spec.ingresses[].ingressRef references ", virtualKafkaCluster, clusterResolutionResult.allResolvedReferents().filter(ResourcesUtil.hasFreshResolvedRefsFalseCondition().and(ResourcesUtil.hasKind(KAFKA_PROXY_INGRESS_KIND))).map(ResourcesUtil::toLocalRef)), refsMessage("spec.filterRefs references ", virtualKafkaCluster, clusterResolutionResult.allResolvedReferents().filter(ResourcesUtil.hasFreshResolvedRefsFalseCondition().and(ResourcesUtil.hasKind(KAFKA_PROTOCOL_FILTER_KIND))).map(ResourcesUtil::toLocalRef)), refsMessage("a spec.ingresses[].ingressRef had an inconsistent or missing proxyRef ", virtualKafkaCluster, clusterResolutionResult.allDanglingReferences().filter(ClusterResolutionResult.DanglingReference.hasReferrerKind(KAFKA_PROXY_INGRESS_KIND).and(ClusterResolutionResult.DanglingReference.hasReferentKind(KAFKA_PROXY_KIND))).map((v0) -> {
            return v0.absentRef();
        }))));
    }

    @SafeVarargs
    private static String joiningMessages(Stream<String>... streamArr) {
        return (String) Stream.of((Object[]) streamArr).flatMap(Function.identity()).collect(Collectors.joining("; "));
    }

    private static Stream<String> refsMessage(String str, VirtualKafkaCluster virtualKafkaCluster, Stream<LocalRef<?>> stream) {
        List<LocalRef<?>> list = stream.sorted().toList();
        return list.isEmpty() ? Stream.of((Object[]) new String[0]) : Stream.of(str + ((String) list.stream().map(localRef -> {
            return ResourcesUtil.namespacedSlug(localRef, virtualKafkaCluster);
        }).collect(Collectors.joining(", "))));
    }

    public List<EventSource<?, VirtualKafkaCluster>> prepareEventSources(EventSourceContext<VirtualKafkaCluster> eventSourceContext) {
        InformerEventSourceConfiguration build = InformerEventSourceConfiguration.from(KafkaProxy.class, VirtualKafkaCluster.class).withName("proxy").withPrimaryToSecondaryMapper(virtualKafkaCluster -> {
            return ResourcesUtil.localRefAsResourceId(virtualKafkaCluster, ((VirtualKafkaClusterSpec) virtualKafkaCluster.getSpec()).getProxyRef());
        }).withSecondaryToPrimaryMapper(kafkaProxy -> {
            return ResourcesUtil.findReferrers(eventSourceContext, kafkaProxy, VirtualKafkaCluster.class, virtualKafkaCluster2 -> {
                return Optional.of(((VirtualKafkaClusterSpec) virtualKafkaCluster2.getSpec()).getProxyRef());
            });
        }).build();
        InformerEventSourceConfiguration build2 = InformerEventSourceConfiguration.from(ConfigMap.class, VirtualKafkaCluster.class).withName(PROXY_CONFIG_STATE_SOURCE_NAME).withPrimaryToSecondaryMapper(VirtualKafkaClusterReconciler::toConfigStateResourceName).withSecondaryToPrimaryMapper(configMap -> {
            return ResourcesUtil.findReferrers(eventSourceContext, configMap, VirtualKafkaCluster.class, virtualKafkaCluster2 -> {
                return Optional.of(new AnyLocalRefBuilder().withGroup(MetadataChecksumGenerator.NO_CHECKSUM_SPECIFIED).withKind("ConfigMap").withName(((VirtualKafkaClusterSpec) virtualKafkaCluster2.getSpec()).getProxyRef().getName() + "-config-state").build());
            });
        }).build();
        InformerEventSourceConfiguration build3 = InformerEventSourceConfiguration.from(KafkaService.class, VirtualKafkaCluster.class).withName(SERVICES_EVENT_SOURCE_NAME).withPrimaryToSecondaryMapper(virtualKafkaCluster2 -> {
            return ResourcesUtil.localRefAsResourceId(virtualKafkaCluster2, ((VirtualKafkaClusterSpec) virtualKafkaCluster2.getSpec()).getTargetKafkaServiceRef());
        }).withSecondaryToPrimaryMapper(kafkaServiceSecondaryToPrimaryMapper(eventSourceContext)).build();
        return List.of(new InformerEventSource(build, eventSourceContext), new InformerEventSource(build2, eventSourceContext), new InformerEventSource(InformerEventSourceConfiguration.from(KafkaProxyIngress.class, VirtualKafkaCluster.class).withName(INGRESSES_EVENT_SOURCE_NAME).withPrimaryToSecondaryMapper(virtualKafkaCluster3 -> {
            return ResourcesUtil.localRefsAsResourceIds(virtualKafkaCluster3, Optional.ofNullable((VirtualKafkaClusterSpec) virtualKafkaCluster3.getSpec()).map((v0) -> {
                return v0.getIngresses();
            }).stream().flatMap((v0) -> {
                return v0.stream();
            }).map((v0) -> {
                return v0.getIngressRef();
            }).toList());
        }).withSecondaryToPrimaryMapper(ingressSecondaryToPrimaryMapper(eventSourceContext)).build(), eventSourceContext), new InformerEventSource(build3, eventSourceContext), new InformerEventSource(InformerEventSourceConfiguration.from(KafkaProtocolFilter.class, VirtualKafkaCluster.class).withName(FILTERS_EVENT_SOURCE_NAME).withPrimaryToSecondaryMapper(virtualKafkaCluster4 -> {
            return ResourcesUtil.localRefsAsResourceIds(virtualKafkaCluster4, (List) Optional.ofNullable((VirtualKafkaClusterSpec) virtualKafkaCluster4.getSpec()).map((v0) -> {
                return v0.getFilterRefs();
            }).orElse(List.of()));
        }).withSecondaryToPrimaryMapper(filterSecondaryToPrimaryMapper(eventSourceContext)).build(), eventSourceContext), new InformerEventSource(InformerEventSourceConfiguration.from(Service.class, VirtualKafkaCluster.class).withName(KUBERNETES_SERVICES_EVENT_SOURCE_NAME).withPrimaryToSecondaryMapper(kubernetesServicesPrimaryToSecondaryMapper()).withSecondaryToPrimaryMapper(kubernetesServicesSecondaryToPrimaryMapper(eventSourceContext)).build(), eventSourceContext), new InformerEventSource(InformerEventSourceConfiguration.from(Secret.class, VirtualKafkaCluster.class).withName("secrets").withPrimaryToSecondaryMapper(virtualKafkaClusterToSecret()).withSecondaryToPrimaryMapper(secretToVirtualKafkaCluster(eventSourceContext)).build(), eventSourceContext), new InformerEventSource(InformerEventSourceConfiguration.from(ConfigMap.class, VirtualKafkaCluster.class).withName("configmaps").withPrimaryToSecondaryMapper(virtualKafkaClusterToConfigMap()).withSecondaryToPrimaryMapper(configMapToVirtualKafkaCluster(eventSourceContext)).build(), eventSourceContext));
    }

    @NonNull
    static SecondaryToPrimaryMapper<Service> kubernetesServicesSecondaryToPrimaryMapper(EventSourceContext<VirtualKafkaCluster> eventSourceContext) {
        return service -> {
            Optional<OwnerReference> extractOwnerRefFromKubernetesService = extractOwnerRefFromKubernetesService(service, VIRTUAL_KAFKA_CLUSTER_KIND);
            if (extractOwnerRefFromKubernetesService.isPresent()) {
                return (Set) extractOwnerRefFromKubernetesService.map(ownerReference -> {
                    return new ResourceID(ownerReference.getName(), service.getMetadata().getNamespace());
                }).map((v0) -> {
                    return Set.of(v0);
                }).orElse(Set.of());
            }
            Optional<OwnerReference> extractOwnerRefFromKubernetesService2 = extractOwnerRefFromKubernetesService(service, KAFKA_PROXY_KIND);
            return extractOwnerRefFromKubernetesService2.isPresent() ? ResourcesUtil.findReferrers(eventSourceContext, extractOwnerRefFromKubernetesService2.get(), service, VirtualKafkaCluster.class, virtualKafkaCluster -> {
                return Optional.of(((VirtualKafkaClusterSpec) virtualKafkaCluster.getSpec()).getProxyRef());
            }) : Set.of();
        };
    }

    private static Optional<OwnerReference> extractOwnerRefFromKubernetesService(Service service, String str) {
        return service.getMetadata().getOwnerReferences().stream().filter(ownerReference -> {
            return str.equals(ownerReference.getKind());
        }).findFirst();
    }

    @NonNull
    static PrimaryToSecondaryMapper<VirtualKafkaCluster> kubernetesServicesPrimaryToSecondaryMapper() {
        return virtualKafkaCluster -> {
            return (Set) Stream.concat(((VirtualKafkaClusterSpec) virtualKafkaCluster.getSpec()).getIngresses().stream().map((v0) -> {
                return v0.getIngressRef();
            }).flatMap(ingressRef -> {
                return ResourcesUtil.localRefAsResourceId(virtualKafkaCluster, new AnyLocalRefBuilder().withName(TcpClusterIPClusterIngressNetworkingModel.bootstrapServiceName(virtualKafkaCluster, ingressRef.getName())).build()).stream();
            }), ResourcesUtil.localRefAsResourceId(virtualKafkaCluster, new AnyLocalRefBuilder().withName(((VirtualKafkaClusterSpec) virtualKafkaCluster.getSpec()).getProxyRef().getName() + "-sni").build()).stream()).collect(Collectors.toSet());
        };
    }

    static PrimaryToSecondaryMapper<VirtualKafkaCluster> virtualKafkaClusterToSecret() {
        return virtualKafkaCluster -> {
            return ResourcesUtil.localRefsAsResourceIds(virtualKafkaCluster, ((VirtualKafkaClusterSpec) virtualKafkaCluster.getSpec()).getIngresses().stream().flatMap(ingresses -> {
                return Optional.ofNullable(ingresses.getTls()).stream();
            }).map((v0) -> {
                return v0.getCertificateRef();
            }).toList());
        };
    }

    static SecondaryToPrimaryMapper<Secret> secretToVirtualKafkaCluster(EventSourceContext<VirtualKafkaCluster> eventSourceContext) {
        return secret -> {
            return ResourcesUtil.findReferrersMulti(eventSourceContext, secret, VirtualKafkaCluster.class, virtualKafkaCluster -> {
                return ((VirtualKafkaClusterSpec) virtualKafkaCluster.getSpec()).getIngresses().stream().flatMap(ingresses -> {
                    return Optional.ofNullable(ingresses.getTls()).stream();
                }).map((v0) -> {
                    return v0.getCertificateRef();
                }).toList();
            });
        };
    }

    static PrimaryToSecondaryMapper<VirtualKafkaCluster> virtualKafkaClusterToConfigMap() {
        return virtualKafkaCluster -> {
            return ResourcesUtil.localRefsAsResourceIds(virtualKafkaCluster, ((VirtualKafkaClusterSpec) virtualKafkaCluster.getSpec()).getIngresses().stream().flatMap(ingresses -> {
                return Optional.ofNullable(ingresses.getTls()).stream();
            }).map((v0) -> {
                return v0.getTrustAnchorRef();
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).map((v0) -> {
                return v0.getRef();
            }).toList());
        };
    }

    static SecondaryToPrimaryMapper<ConfigMap> configMapToVirtualKafkaCluster(EventSourceContext<VirtualKafkaCluster> eventSourceContext) {
        return configMap -> {
            return ResourcesUtil.findReferrersMulti(eventSourceContext, configMap, VirtualKafkaCluster.class, virtualKafkaCluster -> {
                return ((VirtualKafkaClusterSpec) virtualKafkaCluster.getSpec()).getIngresses().stream().flatMap(ingresses -> {
                    return Optional.ofNullable(ingresses.getTls()).stream();
                }).map((v0) -> {
                    return v0.getTrustAnchorRef();
                }).filter((v0) -> {
                    return Objects.nonNull(v0);
                }).map((v0) -> {
                    return v0.getRef();
                }).toList();
            });
        };
    }

    static SecondaryToPrimaryMapper<KafkaProtocolFilter> filterSecondaryToPrimaryMapper(EventSourceContext<VirtualKafkaCluster> eventSourceContext) {
        return kafkaProtocolFilter -> {
            if (ResourcesUtil.isStatusFresh(kafkaProtocolFilter)) {
                return ResourcesUtil.findReferrersMulti(eventSourceContext, kafkaProtocolFilter, VirtualKafkaCluster.class, virtualKafkaCluster -> {
                    return ((VirtualKafkaClusterSpec) virtualKafkaCluster.getSpec()).getFilterRefs();
                });
            }
            logIgnoredEvent(kafkaProtocolFilter);
            return Set.of();
        };
    }

    static SecondaryToPrimaryMapper<KafkaProxyIngress> ingressSecondaryToPrimaryMapper(EventSourceContext<VirtualKafkaCluster> eventSourceContext) {
        return kafkaProxyIngress -> {
            if (ResourcesUtil.isStatusFresh(kafkaProxyIngress)) {
                return ResourcesUtil.findReferrersMulti(eventSourceContext, kafkaProxyIngress, VirtualKafkaCluster.class, virtualKafkaCluster -> {
                    return ((VirtualKafkaClusterSpec) virtualKafkaCluster.getSpec()).getIngresses().stream().map((v0) -> {
                        return v0.getIngressRef();
                    }).toList();
                });
            }
            logIgnoredEvent(kafkaProxyIngress);
            return Set.of();
        };
    }

    static SecondaryToPrimaryMapper<KafkaService> kafkaServiceSecondaryToPrimaryMapper(EventSourceContext<VirtualKafkaCluster> eventSourceContext) {
        return kafkaService -> {
            if (ResourcesUtil.isStatusFresh(kafkaService)) {
                return ResourcesUtil.findReferrers(eventSourceContext, kafkaService, VirtualKafkaCluster.class, virtualKafkaCluster -> {
                    return Optional.of(((VirtualKafkaClusterSpec) virtualKafkaCluster.getSpec()).getTargetKafkaServiceRef());
                });
            }
            logIgnoredEvent(kafkaService);
            return Set.of();
        };
    }

    private static void logIgnoredEvent(HasMetadata hasMetadata) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Ignoring event from {} with stale status: {}", HasMetadata.getKind(hasMetadata.getClass()), ResourcesUtil.toLocalRef(hasMetadata));
        }
    }

    public ErrorStatusUpdateControl<VirtualKafkaCluster> updateErrorStatus(VirtualKafkaCluster virtualKafkaCluster, Context<VirtualKafkaCluster> context, Exception exc) {
        ErrorStatusUpdateControl<VirtualKafkaCluster> patchStatus = ErrorStatusUpdateControl.patchStatus(this.statusFactory.newUnknownConditionStatusPatch(virtualKafkaCluster, Condition.Type.ResolvedRefs, exc));
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Completed reconciliation of {}/{} with error {}", new Object[]{ResourcesUtil.namespace(virtualKafkaCluster), ResourcesUtil.name(virtualKafkaCluster), exc.toString()});
        }
        return patchStatus;
    }

    private static Set<ResourceID> toConfigStateResourceName(VirtualKafkaCluster virtualKafkaCluster) {
        return (Set) ResourcesUtil.localRefAsResourceId(virtualKafkaCluster, ((VirtualKafkaClusterSpec) virtualKafkaCluster.getSpec()).getProxyRef()).stream().map(resourceID -> {
            return new ResourceID(resourceID.getName() + "-config-state", (String) resourceID.getNamespace().orElse(null));
        }).collect(Collectors.toSet());
    }

    @Nullable
    private VirtualKafkaCluster checkTlsConfigConsistency(Context<VirtualKafkaCluster> context, VirtualKafkaCluster virtualKafkaCluster, io.kroxylicious.kubernetes.api.v1alpha1.virtualkafkaclusterspec.Ingresses ingresses) {
        Optional findOnlyResourceNamed = ResourcesUtil.findOnlyResourceNamed((String) Objects.requireNonNull(ingresses.getIngressRef().getName()), context.getSecondaryResources(KafkaProxyIngress.class));
        if (!findOnlyResourceNamed.isPresent()) {
            return null;
        }
        KafkaProxyIngress kafkaProxyIngress = (KafkaProxyIngress) findOnlyResourceNamed.get();
        boolean z = getIngressProtocol(kafkaProxyIngress) == ClusterIP.Protocol.TLS;
        if ((ingresses.getTls() != null) == z) {
            return null;
        }
        String namespacedSlug = ResourcesUtil.namespacedSlug(ResourcesUtil.toLocalRef(kafkaProxyIngress), kafkaProxyIngress);
        String name = ingresses.getIngressRef().getName();
        return this.statusFactory.newFalseConditionStatusPatch(virtualKafkaCluster, Condition.Type.ResolvedRefs, "InvalidReferencedResource", z ? "spec.ingresses[].tls: Inconsistent TLS configuration. %s requires the use of TLS but the cluster ingress (%s) does not define a tls object.".formatted(namespacedSlug, name) : "spec.ingresses[].tls: Inconsistent TLS configuration. %s requires the use of TCP but the cluster ingress (%s) defines a tls object.".formatted(namespacedSlug, name));
    }

    private static ClusterIP.Protocol getIngressProtocol(KafkaProxyIngress kafkaProxyIngress) {
        ClusterIP clusterIP = ((KafkaProxyIngressSpec) kafkaProxyIngress.getSpec()).getClusterIP();
        if (clusterIP != null) {
            return clusterIP.getProtocol();
        }
        if (((KafkaProxyIngressSpec) kafkaProxyIngress.getSpec()).getLoadBalancer() != null) {
            return ClusterIP.Protocol.TLS;
        }
        throw new IllegalStateException("No protocol could be determined for " + String.valueOf(kafkaProxyIngress));
    }

    public /* bridge */ /* synthetic */ ErrorStatusUpdateControl updateErrorStatus(HasMetadata hasMetadata, Context context, Exception exc) {
        return updateErrorStatus((VirtualKafkaCluster) hasMetadata, (Context<VirtualKafkaCluster>) context, exc);
    }

    public /* bridge */ /* synthetic */ UpdateControl reconcile(HasMetadata hasMetadata, Context context) throws Exception {
        return reconcile((VirtualKafkaCluster) hasMetadata, (Context<VirtualKafkaCluster>) context);
    }
}
