package io.kroxylicious.proxy.internal.filter;

import io.kroxylicious.proxy.filter.DescribeClusterResponseFilter;
import io.kroxylicious.proxy.filter.FetchResponseFilter;
import io.kroxylicious.proxy.filter.FilterContext;
import io.kroxylicious.proxy.filter.FindCoordinatorResponseFilter;
import io.kroxylicious.proxy.filter.MetadataResponseFilter;
import io.kroxylicious.proxy.filter.ProduceResponseFilter;
import io.kroxylicious.proxy.filter.ResponseFilterResult;
import io.kroxylicious.proxy.filter.ShareAcknowledgeResponseFilter;
import io.kroxylicious.proxy.filter.ShareFetchResponseFilter;
import io.kroxylicious.proxy.internal.net.EndpointReconciler;
import io.kroxylicious.proxy.model.VirtualClusterModel;
import io.kroxylicious.proxy.service.HostPort;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.ObjIntConsumer;
import java.util.function.ToIntFunction;
import org.apache.kafka.common.message.DescribeClusterResponseData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.message.ResponseHeaderData;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.ApiMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kroxylicious/proxy/internal/filter/BrokerAddressFilter.class */
public class BrokerAddressFilter implements MetadataResponseFilter, FindCoordinatorResponseFilter, DescribeClusterResponseFilter, ProduceResponseFilter, FetchResponseFilter, ShareFetchResponseFilter, ShareAcknowledgeResponseFilter {
    private static final Logger LOGGER = LoggerFactory.getLogger(BrokerAddressFilter.class);
    private final VirtualClusterModel virtualClusterModel;
    private final EndpointReconciler reconciler;

    public BrokerAddressFilter(VirtualClusterModel virtualClusterModel, EndpointReconciler endpointReconciler) {
        this.virtualClusterModel = virtualClusterModel;
        this.reconciler = endpointReconciler;
    }

    public CompletionStage<ResponseFilterResult> onMetadataResponse(short s, ResponseHeaderData responseHeaderData, MetadataResponseData metadataResponseData, FilterContext filterContext) {
        HashMap hashMap = new HashMap();
        Iterator it = metadataResponseData.brokers().iterator();
        while (it.hasNext()) {
            MetadataResponseData.MetadataResponseBroker metadataResponseBroker = (MetadataResponseData.MetadataResponseBroker) it.next();
            hashMap.put(Integer.valueOf(metadataResponseBroker.nodeId()), new HostPort(metadataResponseBroker.host(), metadataResponseBroker.port()));
            apply(filterContext, metadataResponseBroker, (v0) -> {
                return v0.nodeId();
            }, (v0) -> {
                return v0.host();
            }, (v0) -> {
                return v0.port();
            }, (v0, v1) -> {
                v0.setHost(v1);
            }, (v0, v1) -> {
                v0.setPort(v1);
            });
        }
        return doReconcileThenForwardResponse(responseHeaderData, metadataResponseData, filterContext, hashMap);
    }

    public CompletionStage<ResponseFilterResult> onDescribeClusterResponse(short s, ResponseHeaderData responseHeaderData, DescribeClusterResponseData describeClusterResponseData, FilterContext filterContext) {
        HashMap hashMap = new HashMap();
        Iterator it = describeClusterResponseData.brokers().iterator();
        while (it.hasNext()) {
            DescribeClusterResponseData.DescribeClusterBroker describeClusterBroker = (DescribeClusterResponseData.DescribeClusterBroker) it.next();
            hashMap.put(Integer.valueOf(describeClusterBroker.brokerId()), new HostPort(describeClusterBroker.host(), describeClusterBroker.port()));
            apply(filterContext, describeClusterBroker, (v0) -> {
                return v0.brokerId();
            }, (v0) -> {
                return v0.host();
            }, (v0) -> {
                return v0.port();
            }, (v0, v1) -> {
                v0.setHost(v1);
            }, (v0, v1) -> {
                v0.setPort(v1);
            });
        }
        return doReconcileThenForwardResponse(responseHeaderData, describeClusterResponseData, filterContext, hashMap);
    }

    public CompletionStage<ResponseFilterResult> onFindCoordinatorResponse(short s, ResponseHeaderData responseHeaderData, FindCoordinatorResponseData findCoordinatorResponseData, FilterContext filterContext) {
        for (FindCoordinatorResponseData.Coordinator coordinator : findCoordinatorResponseData.coordinators()) {
            if (coordinator.nodeId() >= 0) {
                apply(filterContext, coordinator, (v0) -> {
                    return v0.nodeId();
                }, (v0) -> {
                    return v0.host();
                }, (v0) -> {
                    return v0.port();
                }, (v0, v1) -> {
                    v0.setHost(v1);
                }, (v0, v1) -> {
                    v0.setPort(v1);
                });
            }
        }
        if (findCoordinatorResponseData.nodeId() >= 0 && findCoordinatorResponseData.host() != null && !findCoordinatorResponseData.host().isEmpty() && findCoordinatorResponseData.port() > 0) {
            apply(filterContext, findCoordinatorResponseData, (v0) -> {
                return v0.nodeId();
            }, (v0) -> {
                return v0.host();
            }, (v0) -> {
                return v0.port();
            }, (v0, v1) -> {
                v0.setHost(v1);
            }, (v0, v1) -> {
                v0.setPort(v1);
            });
        }
        return filterContext.forwardResponse(responseHeaderData, findCoordinatorResponseData);
    }

    public boolean shouldHandleProduceResponse(short s) {
        return s >= 10;
    }

    public CompletionStage<ResponseFilterResult> onProduceResponse(short s, ResponseHeaderData responseHeaderData, ProduceResponseData produceResponseData, FilterContext filterContext) {
        if (produceResponseData.nodeEndpoints() != null) {
            produceResponseData.nodeEndpoints().forEach(nodeEndpoint -> {
                apply(filterContext, nodeEndpoint, (v0) -> {
                    return v0.nodeId();
                }, (v0) -> {
                    return v0.host();
                }, (v0) -> {
                    return v0.port();
                }, (v0, v1) -> {
                    v0.setHost(v1);
                }, (v0, v1) -> {
                    v0.setPort(v1);
                });
            });
        }
        return filterContext.forwardResponse(responseHeaderData, produceResponseData);
    }

    public boolean shouldHandleFetchResponse(short s) {
        return s >= 16;
    }

    public CompletionStage<ResponseFilterResult> onFetchResponse(short s, ResponseHeaderData responseHeaderData, FetchResponseData fetchResponseData, FilterContext filterContext) {
        if (fetchResponseData.nodeEndpoints() != null) {
            fetchResponseData.nodeEndpoints().forEach(nodeEndpoint -> {
                apply(filterContext, nodeEndpoint, (v0) -> {
                    return v0.nodeId();
                }, (v0) -> {
                    return v0.host();
                }, (v0) -> {
                    return v0.port();
                }, (v0, v1) -> {
                    v0.setHost(v1);
                }, (v0, v1) -> {
                    v0.setPort(v1);
                });
            });
        }
        return filterContext.forwardResponse(responseHeaderData, fetchResponseData);
    }

    public CompletionStage<ResponseFilterResult> onShareAcknowledgeResponse(short s, ResponseHeaderData responseHeaderData, ShareAcknowledgeResponseData shareAcknowledgeResponseData, FilterContext filterContext) {
        if (shareAcknowledgeResponseData.nodeEndpoints() != null) {
            shareAcknowledgeResponseData.nodeEndpoints().forEach(nodeEndpoint -> {
                apply(filterContext, nodeEndpoint, (v0) -> {
                    return v0.nodeId();
                }, (v0) -> {
                    return v0.host();
                }, (v0) -> {
                    return v0.port();
                }, (v0, v1) -> {
                    v0.setHost(v1);
                }, (v0, v1) -> {
                    v0.setPort(v1);
                });
            });
        }
        return filterContext.forwardResponse(responseHeaderData, shareAcknowledgeResponseData);
    }

    public CompletionStage<ResponseFilterResult> onShareFetchResponse(short s, ResponseHeaderData responseHeaderData, ShareFetchResponseData shareFetchResponseData, FilterContext filterContext) {
        if (shareFetchResponseData.nodeEndpoints() != null) {
            shareFetchResponseData.nodeEndpoints().forEach(nodeEndpoint -> {
                apply(filterContext, nodeEndpoint, (v0) -> {
                    return v0.nodeId();
                }, (v0) -> {
                    return v0.host();
                }, (v0) -> {
                    return v0.port();
                }, (v0, v1) -> {
                    v0.setHost(v1);
                }, (v0, v1) -> {
                    v0.setPort(v1);
                });
            });
        }
        return filterContext.forwardResponse(responseHeaderData, shareFetchResponseData);
    }

    private <T> void apply(FilterContext filterContext, T t, Function<T, Integer> function, Function<T, String> function2, ToIntFunction<T> toIntFunction, BiConsumer<T, String> biConsumer, ObjIntConsumer<T> objIntConsumer) {
        String apply = function2.apply(t);
        int applyAsInt = toIntFunction.applyAsInt(t);
        HostPort advertisedBrokerAddress = this.virtualClusterModel.getAdvertisedBrokerAddress(function.apply(t).intValue());
        LOGGER.trace("{}: Rewriting broker address in response {}:{} -> {}", new Object[]{filterContext, apply, Integer.valueOf(applyAsInt), advertisedBrokerAddress});
        biConsumer.accept(t, advertisedBrokerAddress.host());
        objIntConsumer.accept(t, advertisedBrokerAddress.port());
    }

    private CompletionStage<ResponseFilterResult> doReconcileThenForwardResponse(ResponseHeaderData responseHeaderData, ApiMessage apiMessage, FilterContext filterContext, Map<Integer, HostPort> map) {
        return this.reconciler.reconcile(this.virtualClusterModel, map).toCompletableFuture().thenCompose(r8 -> {
            LOGGER.debug("Endpoint reconciliation complete for virtual cluster {}", this.virtualClusterModel);
            return filterContext.responseFilterResultBuilder().forward(responseHeaderData, apiMessage).completed();
        });
    }
}
