package io.strimzi.kafka.bridge.http;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.strimzi.kafka.bridge.BridgeContentType;
import io.strimzi.kafka.bridge.ConsumerInstanceId;
import io.strimzi.kafka.bridge.EmbeddedFormat;
import io.strimzi.kafka.bridge.Handler;
import io.strimzi.kafka.bridge.KafkaBridgeConsumer;
import io.strimzi.kafka.bridge.SinkTopicSubscription;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.converter.MessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpBinaryMessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpJsonMessageConverter;
import io.strimzi.kafka.bridge.http.converter.JsonDecodeException;
import io.strimzi.kafka.bridge.http.converter.JsonUtils;
import io.strimzi.kafka.bridge.http.model.HttpBridgeError;
import io.strimzi.kafka.bridge.tracing.SpanHandle;
import io.strimzi.kafka.bridge.tracing.TracingHandle;
import io.strimzi.kafka.bridge.tracing.TracingUtil;
import io.vertx.ext.web.RoutingContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;

/* loaded from: input_file:io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.class */
public class HttpSinkBridgeEndpoint<K, V> extends HttpBridgeEndpoint {
    private static final ObjectNode EMPTY_JSON = JsonUtils.createObjectNode();
    private long pollTimeOut;
    private long maxBytes;
    Pattern forwardedHostPattern;
    Pattern forwardedProtoPattern;
    Pattern hostPortPattern;
    private MessageConverter<K, V, byte[], byte[]> messageConverter;
    private final HttpBridgeContext<K, V> httpBridgeContext;
    private final KafkaBridgeConsumer<K, V> kafkaBridgeConsumer;
    private ConsumerInstanceId consumerInstanceId;
    private boolean subscribed;
    private boolean assigned;

    public HttpSinkBridgeEndpoint(BridgeConfig bridgeConfig, HttpBridgeContext<K, V> httpBridgeContext, EmbeddedFormat embeddedFormat, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        super(bridgeConfig, embeddedFormat);
        this.pollTimeOut = 100L;
        this.maxBytes = Long.MAX_VALUE;
        this.forwardedHostPattern = Pattern.compile("host=([^;]+)", 2);
        this.forwardedProtoPattern = Pattern.compile("proto=([^;]+)", 2);
        this.hostPortPattern = Pattern.compile("^.*:[0-9]+$");
        this.httpBridgeContext = httpBridgeContext;
        this.kafkaBridgeConsumer = new KafkaBridgeConsumer<>(bridgeConfig.getKafkaConfig(), deserializer, deserializer2);
        this.subscribed = false;
        this.assigned = false;
    }

    public ConsumerInstanceId consumerInstanceId() {
        return this.consumerInstanceId;
    }

    @Override // io.strimzi.kafka.bridge.http.HttpBridgeEndpoint
    public void open() {
        this.messageConverter = buildMessageConverter();
    }

    @Override // io.strimzi.kafka.bridge.http.HttpBridgeEndpoint
    public void close() {
        this.kafkaBridgeConsumer.close();
        super.close();
    }

    private void doCreateConsumer(RoutingContext routingContext, JsonNode jsonNode, Handler<HttpBridgeEndpoint> handler) {
        String pathParam = routingContext.pathParam("groupid");
        this.name = JsonUtils.getString(jsonNode, "name", this.bridgeConfig.getBridgeID() == null ? "kafka-bridge-consumer-" + UUID.randomUUID() : this.bridgeConfig.getBridgeID() + "-" + UUID.randomUUID());
        this.consumerInstanceId = new ConsumerInstanceId(pathParam, this.name);
        if (this.httpBridgeContext.getHttpSinkEndpoints().containsKey(this.consumerInstanceId)) {
            HttpUtils.sendResponse(routingContext, HttpResponseStatus.CONFLICT.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(HttpResponseStatus.CONFLICT.code(), "A consumer instance with the specified name already exists in the Kafka Bridge.").toJson()));
            return;
        }
        String buildRequestUri = buildRequestUri(routingContext);
        if (!routingContext.request().path().endsWith("/")) {
            buildRequestUri = buildRequestUri + "/";
        }
        String str = buildRequestUri + "instances/" + this.name;
        Properties properties = new Properties();
        addConfigParameter("auto.offset.reset", JsonUtils.getString(jsonNode, "auto.offset.reset"), properties);
        addConfigParameter("enable.auto.commit", JsonUtils.getString(jsonNode, "enable.auto.commit"), properties);
        addConfigParameter("fetch.min.bytes", JsonUtils.getString(jsonNode, "fetch.min.bytes"), properties);
        addConfigParameter("request.timeout.ms", JsonUtils.getString(jsonNode, "consumer.request.timeout.ms"), properties);
        addConfigParameter("client.id", this.name, properties);
        addConfigParameter("isolation.level", JsonUtils.getString(jsonNode, "isolation.level"), properties);
        this.kafkaBridgeConsumer.create(properties, pathParam);
        if (handler != null) {
            handler.handle(this);
        }
        this.log.info("Created consumer {} in group {}", this.name, pathParam);
        HttpUtils.sendResponse(routingContext, HttpResponseStatus.OK.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(JsonUtils.createObjectNode().put("instance_id", this.name).put("base_uri", str)));
    }

    private void doSeek(RoutingContext routingContext, JsonNode jsonNode) {
        CompletableFuture.runAsync(() -> {
            ArrayNode arrayNode = jsonNode.get("offsets");
            for (int i = 0; i < arrayNode.size(); i++) {
                JsonNode jsonNode2 = arrayNode.get(i);
                this.kafkaBridgeConsumer.seek(new TopicPartition(JsonUtils.getString(jsonNode2, "topic"), JsonUtils.getInt(jsonNode2, "partition").intValue()), JsonUtils.getLong(jsonNode2, "offset").longValue());
            }
        }).whenComplete((r7, th) -> {
            this.log.trace("Seek handler thread {}", Thread.currentThread());
            if (th == null) {
                HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), null, null);
            } else {
                HttpBridgeError handleError = handleError(th);
                HttpUtils.sendResponse(routingContext, handleError.getCode(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(handleError.toJson()));
            }
        });
    }

    private void doSeekTo(RoutingContext routingContext, JsonNode jsonNode, HttpOpenApiOperations httpOpenApiOperations) {
        Stream stream = StreamSupport.stream(jsonNode.get("partitions").spliterator(), false);
        Class<JsonNode> cls = JsonNode.class;
        Objects.requireNonNull(JsonNode.class);
        Set set = (Set) stream.map((v1) -> {
            return r1.cast(v1);
        }).map(jsonNode2 -> {
            return new TopicPartition(JsonUtils.getString(jsonNode2, "topic"), JsonUtils.getInt(jsonNode2, "partition").intValue());
        }).collect(Collectors.toSet());
        CompletableFuture.runAsync(() -> {
            if (httpOpenApiOperations == HttpOpenApiOperations.SEEK_TO_BEGINNING) {
                this.kafkaBridgeConsumer.seekToBeginning(set);
            } else {
                this.kafkaBridgeConsumer.seekToEnd(set);
            }
        }).whenComplete((r7, th) -> {
            this.log.trace("SeekTo handler thread {}", Thread.currentThread());
            if (th == null) {
                HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), null, null);
            } else {
                HttpBridgeError handleError = handleError(th);
                HttpUtils.sendResponse(routingContext, handleError.getCode(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(handleError.toJson()));
            }
        });
    }

    private void doCommit(RoutingContext routingContext, JsonNode jsonNode) {
        if (jsonNode.isEmpty()) {
            CompletableFuture.runAsync(() -> {
                this.kafkaBridgeConsumer.commitLastPolledOffsets();
            }).whenComplete((r7, th) -> {
                this.log.trace("Commit handler thread {}", Thread.currentThread());
                if (th == null) {
                    HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), null, null);
                } else {
                    HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), th.getMessage()).toJson()));
                }
            });
            return;
        }
        ArrayNode arrayNode = jsonNode.get("offsets");
        HashMap hashMap = new HashMap();
        for (int i = 0; i < arrayNode.size(); i++) {
            JsonNode jsonNode2 = arrayNode.get(i);
            hashMap.put(new TopicPartition(JsonUtils.getString(jsonNode2, "topic"), JsonUtils.getInt(jsonNode2, "partition").intValue()), new OffsetAndMetadata(JsonUtils.getLong(jsonNode2, "offset").longValue(), JsonUtils.getString(jsonNode2, "metadata")));
        }
        CompletableFuture.supplyAsync(() -> {
            return this.kafkaBridgeConsumer.commit(hashMap);
        }).whenComplete((map, th2) -> {
            this.log.trace("Commit handler thread {}", Thread.currentThread());
            if (th2 == null) {
                HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), null, null);
            } else {
                HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), th2.getMessage()).toJson()));
            }
        });
    }

    private void doDeleteConsumer(RoutingContext routingContext) {
        close();
        this.log.info("Deleted consumer {} from group {}", routingContext.pathParam("name"), routingContext.pathParam("groupid"));
        HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), null, null);
    }

    private void pollHandler(ConsumerRecords<K, V> consumerRecords, Throwable th, RoutingContext routingContext) {
        HttpResponseStatus httpResponseStatus;
        TracingHandle tracing = TracingUtil.getTracing();
        SpanHandle<K, V> span = tracing.span(routingContext, HttpOpenApiOperations.POLL.toString());
        if (th != null) {
            HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), th.getMessage()).toJson()));
            span.finish(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), th);
            return;
        }
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            tracing.handleRecordSpan(span, (ConsumerRecord) it.next());
        }
        span.inject(routingContext);
        HttpResponseStatus httpResponseStatus2 = HttpResponseStatus.INTERNAL_SERVER_ERROR;
        try {
            try {
                byte[] messages = this.messageConverter.toMessages(consumerRecords);
                if (messages.length > this.maxBytes) {
                    httpResponseStatus = HttpResponseStatus.UNPROCESSABLE_ENTITY;
                    HttpUtils.sendResponse(routingContext, httpResponseStatus.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(httpResponseStatus.code(), "Response exceeds the maximum number of bytes the consumer can receive").toJson()));
                } else {
                    httpResponseStatus = HttpResponseStatus.OK;
                    HttpUtils.sendResponse(routingContext, httpResponseStatus.code(), this.format == EmbeddedFormat.BINARY ? BridgeContentType.KAFKA_JSON_BINARY : BridgeContentType.KAFKA_JSON_JSON, messages);
                }
                span.finish(httpResponseStatus.code());
            } catch (JsonDecodeException e) {
                this.log.error("Error decoding records as JSON", e);
                HttpResponseStatus httpResponseStatus3 = HttpResponseStatus.NOT_ACCEPTABLE;
                HttpUtils.sendResponse(routingContext, httpResponseStatus3.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(httpResponseStatus3.code(), e.getMessage()).toJson()));
                span.finish(httpResponseStatus3.code());
            }
        } catch (Throwable th2) {
            span.finish(httpResponseStatus2.code());
            throw th2;
        }
    }

    private void doPoll(RoutingContext routingContext) {
        if (!this.subscribed && !this.assigned) {
            HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), "Consumer is not subscribed to any topics or assigned any partitions").toJson()));
            return;
        }
        String header = routingContext.request().getHeader("Accept");
        if (header == null || !checkAcceptedBody(header)) {
            HttpUtils.sendResponse(routingContext, HttpResponseStatus.NOT_ACCEPTABLE.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(HttpResponseStatus.NOT_ACCEPTABLE.code(), "Consumer format does not match the embedded format requested by the Accept header.").toJson()));
            return;
        }
        if (routingContext.request().getParam("timeout") != null) {
            this.pollTimeOut = Long.parseLong(routingContext.request().getParam("timeout"));
        }
        if (routingContext.request().getParam("max_bytes") != null) {
            this.maxBytes = Long.parseLong(routingContext.request().getParam("max_bytes"));
        }
        CompletableFuture.supplyAsync(() -> {
            return this.kafkaBridgeConsumer.poll(this.pollTimeOut);
        }).whenComplete((consumerRecords, th) -> {
            this.log.trace("Poll handler thread {}", Thread.currentThread());
            pollHandler(consumerRecords, th, routingContext);
        });
    }

    private void doAssign(RoutingContext routingContext, JsonNode jsonNode) {
        if (this.subscribed) {
            HttpUtils.sendResponse(routingContext, HttpResponseStatus.CONFLICT.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(HttpResponseStatus.CONFLICT.code(), "Subscriptions to topics, partitions, and patterns are mutually exclusive.").toJson()));
            return;
        }
        ArrayNode arrayNode = jsonNode.get("partitions");
        ArrayList arrayList = new ArrayList();
        Stream stream = StreamSupport.stream(arrayNode.spliterator(), false);
        Class<JsonNode> cls = JsonNode.class;
        Objects.requireNonNull(JsonNode.class);
        arrayList.addAll((Collection) stream.map((v1) -> {
            return r2.cast(v1);
        }).map(jsonNode2 -> {
            return new SinkTopicSubscription(JsonUtils.getString(jsonNode2, "topic"), JsonUtils.getInt(jsonNode2, "partition"));
        }).collect(Collectors.toList()));
        CompletableFuture.runAsync(() -> {
            this.kafkaBridgeConsumer.assign(arrayList);
        }).whenComplete((r7, th) -> {
            this.log.trace("Assign handler thread {}", Thread.currentThread());
            if (th == null) {
                this.assigned = true;
                HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), null, null);
            } else {
                HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), th.getMessage()).toJson()));
            }
        });
    }

    private void doSubscribe(RoutingContext routingContext, JsonNode jsonNode) {
        if ((jsonNode.has("topics") && jsonNode.has("topic_pattern")) || this.assigned) {
            HttpUtils.sendResponse(routingContext, HttpResponseStatus.CONFLICT.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(HttpResponseStatus.CONFLICT.code(), "Subscriptions to topics, partitions, and patterns are mutually exclusive.").toJson()));
        } else if (jsonNode.has("topics") || jsonNode.has("topic_pattern")) {
            CompletableFuture.runAsync(() -> {
                if (!jsonNode.has("topics")) {
                    if (jsonNode.has("topic_pattern")) {
                        this.kafkaBridgeConsumer.subscribe(Pattern.compile(JsonUtils.getString(jsonNode, "topic_pattern")));
                        return;
                    }
                    return;
                }
                ArrayNode arrayNode = jsonNode.get("topics");
                ArrayList arrayList = new ArrayList();
                Stream stream = StreamSupport.stream(arrayNode.spliterator(), false);
                Class<TextNode> cls = TextNode.class;
                Objects.requireNonNull(TextNode.class);
                arrayList.addAll((Collection) stream.map((v1) -> {
                    return r2.cast(v1);
                }).map(textNode -> {
                    return new SinkTopicSubscription(textNode.asText());
                }).collect(Collectors.toList()));
                this.kafkaBridgeConsumer.subscribe(arrayList);
            }).whenComplete((r7, th) -> {
                this.log.trace("Subscribe handler thread {}", Thread.currentThread());
                if (th == null) {
                    this.subscribed = true;
                    HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), null, null);
                } else {
                    HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), th.getMessage()).toJson()));
                }
            });
        } else {
            HttpUtils.sendResponse(routingContext, HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), "A list (of Topics type) or a topic_pattern must be specified.").toJson()));
        }
    }

    private void doListSubscriptions(RoutingContext routingContext) {
        CompletableFuture.supplyAsync(() -> {
            return this.kafkaBridgeConsumer.listSubscriptions();
        }).whenComplete((set, th) -> {
            this.log.trace("ListSubscriptions handler thread {}", Thread.currentThread());
            if (th != null) {
                HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), th.getMessage()).toJson()));
                return;
            }
            ObjectNode createObjectNode = JsonUtils.createObjectNode();
            ArrayList arrayList = new ArrayList();
            ArrayNode createArrayNode = JsonUtils.createArrayNode();
            HashMap hashMap = new HashMap();
            Iterator it = set.iterator();
            while (it.hasNext()) {
                TopicPartition topicPartition = (TopicPartition) it.next();
                if (!arrayList.contains(topicPartition.topic())) {
                    arrayList.add(topicPartition.topic());
                }
                if (!hashMap.containsKey(topicPartition.topic())) {
                    hashMap.put(topicPartition.topic(), JsonUtils.createArrayNode());
                }
                hashMap.put(topicPartition.topic(), ((ArrayNode) hashMap.get(topicPartition.topic())).add(topicPartition.partition()));
            }
            for (Map.Entry<K, V> entry : hashMap.entrySet()) {
                ObjectNode createObjectNode2 = JsonUtils.createObjectNode();
                createObjectNode2.put((String) entry.getKey(), (JsonNode) entry.getValue());
                createArrayNode.add(createObjectNode2);
            }
            createObjectNode.put("topics", JsonUtils.createArrayNode(arrayList));
            createObjectNode.put("partitions", createArrayNode);
            HttpUtils.sendResponse(routingContext, HttpResponseStatus.OK.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(createObjectNode));
        });
    }

    public void doUnsubscribe(RoutingContext routingContext) {
        CompletableFuture.runAsync(() -> {
            this.kafkaBridgeConsumer.unsubscribe();
        }).whenComplete((r7, th) -> {
            this.log.trace("Unsubscribe handler thread {}", Thread.currentThread());
            if (th != null) {
                HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), th.getMessage()).toJson()));
            } else {
                this.subscribed = false;
                this.assigned = false;
                HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), null, null);
            }
        });
    }

    private void addConfigParameter(String str, String str2, Properties properties) {
        if (str2 != null) {
            properties.put(str, str2);
        }
    }

    @Override // io.strimzi.kafka.bridge.http.HttpBridgeEndpoint
    public void handle(RoutingContext routingContext, Handler<HttpBridgeEndpoint> handler) {
        JsonNode jsonNode = EMPTY_JSON;
        try {
            if (!routingContext.body().isEmpty()) {
                jsonNode = JsonUtils.bytesToJson(routingContext.body().buffer().getByteBuf().array());
            }
            this.log.debug("[{}] Request: body = {}", routingContext.get("request-id"), jsonNode);
            this.log.trace("HttpSinkBridgeEndpoint handle thread {}", Thread.currentThread());
            switch (this.httpBridgeContext.getOpenApiOperation()) {
                case CREATE_CONSUMER:
                    doCreateConsumer(routingContext, jsonNode, handler);
                    return;
                case SUBSCRIBE:
                    doSubscribe(routingContext, jsonNode);
                    return;
                case ASSIGN:
                    doAssign(routingContext, jsonNode);
                    return;
                case POLL:
                    doPoll(routingContext);
                    return;
                case DELETE_CONSUMER:
                    doDeleteConsumer(routingContext);
                    return;
                case COMMIT:
                    doCommit(routingContext, jsonNode);
                    return;
                case SEEK:
                    doSeek(routingContext, jsonNode);
                    return;
                case SEEK_TO_BEGINNING:
                case SEEK_TO_END:
                    doSeekTo(routingContext, jsonNode, this.httpBridgeContext.getOpenApiOperation());
                    return;
                case UNSUBSCRIBE:
                    doUnsubscribe(routingContext);
                    return;
                case LIST_SUBSCRIPTIONS:
                    doListSubscriptions(routingContext);
                    return;
                default:
                    throw new IllegalArgumentException("Unknown Operation: " + this.httpBridgeContext.getOpenApiOperation());
            }
        } catch (JsonDecodeException e) {
            HttpBridgeError handleError = handleError(e);
            HttpUtils.sendResponse(routingContext, handleError.getCode(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(handleError.toJson()));
        }
    }

    private MessageConverter<K, V, byte[], byte[]> buildMessageConverter() {
        switch (this.format) {
            case JSON:
                return new HttpJsonMessageConverter();
            case BINARY:
                return new HttpBinaryMessageConverter();
            default:
                return null;
        }
    }

    private boolean checkAcceptedBody(String str) {
        boolean z = -1;
        switch (str.hashCode()) {
            case -1640887942:
                if (str.equals(BridgeContentType.KAFKA_JSON_JSON)) {
                    z = false;
                    break;
                }
                break;
            case -13620909:
                if (str.equals(BridgeContentType.KAFKA_JSON_BINARY)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return this.format == EmbeddedFormat.JSON;
            case true:
                return this.format == EmbeddedFormat.BINARY;
            default:
                return false;
        }
    }

    private String buildRequestUri(RoutingContext routingContext) {
        String scheme = routingContext.request().scheme();
        String host = routingContext.request().host();
        String header = routingContext.request().getHeader("x-forwarded-path");
        String path = (header == null || header.isEmpty()) ? routingContext.request().path() : header;
        String header2 = routingContext.request().getHeader("forwarded");
        if (header2 == null || header2.isEmpty()) {
            String header3 = routingContext.request().getHeader("x-forwarded-host");
            String header4 = routingContext.request().getHeader("x-forwarded-proto");
            if (header3 != null && !header3.isEmpty() && header4 != null && !header4.isEmpty()) {
                this.log.debug("Getting base URI from HTTP headers: X-Forwarded-Host '{}' and X-Forwarded-Proto '{}'", header3, header4);
                scheme = header4;
                host = header3;
            }
        } else {
            Matcher matcher = this.forwardedHostPattern.matcher(header2);
            Matcher matcher2 = this.forwardedProtoPattern.matcher(header2);
            if (matcher.find() && matcher2.find()) {
                this.log.debug("Getting base URI from HTTP header: Forwarded '{}'", header2);
                scheme = matcher2.group(1);
                host = matcher.group(1);
            } else {
                this.log.debug("Forwarded HTTP header '{}' lacked 'host' and/or 'proto' pair; ignoring header", header2);
            }
        }
        this.log.debug("Request URI build upon scheme: {}, host: {}, path: {}", new Object[]{scheme, host, path});
        return formatRequestUri(scheme, host, path);
    }

    private String formatRequestUri(String str, String str2, String str3) {
        boolean z;
        if (str2.matches(this.hostPortPattern.pattern())) {
            return String.format("%s://%s%s", str, str2, str3);
        }
        if (str.equals("http")) {
            z = 80;
        } else {
            if (!str.equals("https")) {
                throw new IllegalArgumentException(str + " is not a valid schema/proto.");
            }
            z = 443;
        }
        return String.format("%s://%s%s", str, str2 + ":" + z, str3);
    }

    private HttpBridgeError handleError(Throwable th) {
        if (th instanceof CompletionException) {
            th = th.getCause();
        }
        int code = HttpResponseStatus.INTERNAL_SERVER_ERROR.code();
        if ((th instanceof IllegalStateException) && th.getMessage() != null && th.getMessage().contains("No current assignment for partition")) {
            code = HttpResponseStatus.NOT_FOUND.code();
        } else if (th instanceof JsonDecodeException) {
            code = HttpResponseStatus.UNPROCESSABLE_ENTITY.code();
        }
        return new HttpBridgeError(code, th.getMessage());
    }
}
