package io.strimzi.kafka.bridge.http;

import com.fasterxml.jackson.databind.node.ObjectNode;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.micrometer.core.instrument.config.MeterFilter;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.strimzi.kafka.bridge.Application;
import io.strimzi.kafka.bridge.BridgeContentType;
import io.strimzi.kafka.bridge.ConsumerInstanceId;
import io.strimzi.kafka.bridge.EmbeddedFormat;
import io.strimzi.kafka.bridge.IllegalEmbeddedFormatException;
import io.strimzi.kafka.bridge.MetricsReporter;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.http.converter.JsonUtils;
import io.strimzi.kafka.bridge.http.model.HttpBridgeError;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.handler.CorsHandler;
import io.vertx.ext.web.openapi.RouterBuilder;
import io.vertx.ext.web.validation.BodyProcessorException;
import io.vertx.ext.web.validation.ParameterProcessorException;
import io.vertx.json.schema.ValidationException;
import io.vertx.micrometer.Label;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/strimzi/kafka/bridge/http/HttpBridge.class */
public class HttpBridge extends AbstractVerticle {
    private static final Logger LOGGER = LogManager.getLogger(HttpBridge.class);
    private final BridgeConfig bridgeConfig;
    private HttpServer httpServer;
    private HttpBridgeContext<byte[], byte[]> httpBridgeContext;
    private Router router;
    private final MetricsReporter metricsReporter;
    private boolean isReady = false;
    private final Map<ConsumerInstanceId, Long> timestampMap = new HashMap();
    final HttpOpenApiOperation SEND = new HttpOpenApiOperation(HttpOpenApiOperations.SEND) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.1
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.send(routingContext);
        }
    };
    final HttpOpenApiOperation SEND_TO_PARTITION = new HttpOpenApiOperation(HttpOpenApiOperations.SEND_TO_PARTITION) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.2
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.sendToPartition(routingContext);
        }
    };
    final HttpOpenApiOperation CREATE_CONSUMER = new HttpOpenApiOperation(HttpOpenApiOperations.CREATE_CONSUMER) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.3
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.createConsumer(routingContext);
        }
    };
    final HttpOpenApiOperation DELETE_CONSUMER = new HttpOpenApiOperation(HttpOpenApiOperations.DELETE_CONSUMER) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.4
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.deleteConsumer(routingContext);
        }
    };
    final HttpOpenApiOperation SUBSCRIBE = new HttpOpenApiOperation(HttpOpenApiOperations.SUBSCRIBE) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.5
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.subscribe(routingContext);
        }
    };
    final HttpOpenApiOperation UNSUBSCRIBE = new HttpOpenApiOperation(HttpOpenApiOperations.UNSUBSCRIBE) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.6
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.unsubscribe(routingContext);
        }
    };
    final HttpOpenApiOperation LIST_SUBSCRIPTIONS = new HttpOpenApiOperation(HttpOpenApiOperations.LIST_SUBSCRIPTIONS) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.7
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.listSubscriptions(routingContext);
        }
    };
    final HttpOpenApiOperation LIST_TOPICS = new HttpOpenApiOperation(HttpOpenApiOperations.LIST_TOPICS) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.8
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.listTopics(routingContext);
        }
    };
    final HttpOpenApiOperation GET_TOPIC = new HttpOpenApiOperation(HttpOpenApiOperations.GET_TOPIC) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.9
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.getTopic(routingContext);
        }
    };
    final HttpOpenApiOperation LIST_PARTITIONS = new HttpOpenApiOperation(HttpOpenApiOperations.LIST_PARTITIONS) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.10
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.listPartitions(routingContext);
        }
    };
    final HttpOpenApiOperation GET_PARTITION = new HttpOpenApiOperation(HttpOpenApiOperations.GET_PARTITION) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.11
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.getPartition(routingContext);
        }
    };
    final HttpOpenApiOperation GET_OFFSETS = new HttpOpenApiOperation(HttpOpenApiOperations.GET_OFFSETS) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.12
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.getOffsets(routingContext);
        }
    };
    final HttpOpenApiOperation ASSIGN = new HttpOpenApiOperation(HttpOpenApiOperations.ASSIGN) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.13
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.assign(routingContext);
        }
    };
    final HttpOpenApiOperation POLL = new HttpOpenApiOperation(HttpOpenApiOperations.POLL) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.14
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.poll(routingContext);
        }
    };
    final HttpOpenApiOperation COMMIT = new HttpOpenApiOperation(HttpOpenApiOperations.COMMIT) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.15
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.commit(routingContext);
        }
    };
    final HttpOpenApiOperation SEEK = new HttpOpenApiOperation(HttpOpenApiOperations.SEEK) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.16
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.seek(routingContext);
        }
    };
    final HttpOpenApiOperation SEEK_TO_BEGINNING = new HttpOpenApiOperation(HttpOpenApiOperations.SEEK_TO_BEGINNING) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.17
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.seekToBeginning(routingContext);
        }
    };
    final HttpOpenApiOperation SEEK_TO_END = new HttpOpenApiOperation(HttpOpenApiOperations.SEEK_TO_END) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.18
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.seekToEnd(routingContext);
        }
    };
    final HttpOpenApiOperation HEALTHY = new HttpOpenApiOperation(HttpOpenApiOperations.HEALTHY) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.19
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.healthy(routingContext);
        }
    };
    final HttpOpenApiOperation READY = new HttpOpenApiOperation(HttpOpenApiOperations.READY) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.20
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.ready(routingContext);
        }
    };
    final HttpOpenApiOperation OPENAPI = new HttpOpenApiOperation(HttpOpenApiOperations.OPENAPI) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.21
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.openapi(routingContext);
        }
    };
    final HttpOpenApiOperation METRICS = new HttpOpenApiOperation(HttpOpenApiOperations.METRICS) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.22
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.metrics(routingContext);
        }
    };
    final HttpOpenApiOperation INFO = new HttpOpenApiOperation(HttpOpenApiOperations.INFO) { // from class: io.strimzi.kafka.bridge.http.HttpBridge.23
        @Override // io.strimzi.kafka.bridge.http.HttpOpenApiOperation
        public void process(RoutingContext routingContext) {
            HttpBridge.this.information(routingContext);
        }
    };

    public HttpBridge(BridgeConfig bridgeConfig, MetricsReporter metricsReporter) {
        this.bridgeConfig = bridgeConfig;
        this.metricsReporter = metricsReporter;
    }

    private void bindHttpServer(Promise<Void> promise) {
        this.httpServer = this.vertx.createHttpServer(httpServerOptions()).connectionHandler(this::processConnection).requestHandler(this.router).listen(asyncResult -> {
            if (!asyncResult.succeeded()) {
                LOGGER.error("Error starting HTTP-Kafka Bridge", asyncResult.cause());
                promise.fail(asyncResult.cause());
                return;
            }
            LOGGER.info("HTTP-Kafka Bridge started and listening on port {}", Integer.valueOf(((HttpServer) asyncResult.result()).actualPort()));
            LOGGER.info("HTTP-Kafka Bridge bootstrap servers {}", this.bridgeConfig.getKafkaConfig().getConfig().get("bootstrap.servers"));
            if (this.bridgeConfig.getHttpConfig().getConsumerTimeout() > -1) {
                startInactiveConsumerDeletionTimer(Long.valueOf(this.bridgeConfig.getHttpConfig().getConsumerTimeout()));
            }
            this.isReady = true;
            promise.complete();
        });
    }

    private void startInactiveConsumerDeletionTimer(Long l) {
        Long valueOf = Long.valueOf(l.longValue() * 1000);
        this.vertx.setPeriodic(valueOf.longValue() / 2, l2 -> {
            HttpSinkBridgeEndpoint<byte[], byte[]> httpSinkBridgeEndpoint;
            LOGGER.debug("Looking for stale consumers in {} entries", Integer.valueOf(this.timestampMap.size()));
            for (Map.Entry<ConsumerInstanceId, Long> entry : this.timestampMap.entrySet()) {
                if (entry.getValue().longValue() + valueOf.longValue() < System.currentTimeMillis() && (httpSinkBridgeEndpoint = this.httpBridgeContext.getHttpSinkEndpoints().get(entry.getKey())) != null) {
                    httpSinkBridgeEndpoint.close();
                    this.httpBridgeContext.getHttpSinkEndpoints().remove(entry.getKey());
                    LOGGER.warn("Consumer {} deleted after inactivity timeout ({}s).", entry.getKey(), l);
                    this.timestampMap.remove(entry.getKey());
                }
            }
        });
    }

    public void start(Promise<Void> promise) {
        RouterBuilder.create(this.vertx, "openapi.json", asyncResult -> {
            if (!asyncResult.succeeded()) {
                LOGGER.error("Failed to create OpenAPI router factory");
                promise.fail(asyncResult.cause());
                return;
            }
            RouterBuilder routerBuilder = (RouterBuilder) asyncResult.result();
            routerBuilder.operation(this.SEND.getOperationId().toString()).handler(this.SEND);
            routerBuilder.operation(this.SEND_TO_PARTITION.getOperationId().toString()).handler(this.SEND_TO_PARTITION);
            routerBuilder.operation(this.CREATE_CONSUMER.getOperationId().toString()).handler(this.CREATE_CONSUMER);
            routerBuilder.operation(this.DELETE_CONSUMER.getOperationId().toString()).handler(this.DELETE_CONSUMER);
            routerBuilder.operation(this.SUBSCRIBE.getOperationId().toString()).handler(this.SUBSCRIBE);
            routerBuilder.operation(this.UNSUBSCRIBE.getOperationId().toString()).handler(this.UNSUBSCRIBE);
            routerBuilder.operation(this.LIST_SUBSCRIPTIONS.getOperationId().toString()).handler(this.LIST_SUBSCRIPTIONS);
            routerBuilder.operation(this.ASSIGN.getOperationId().toString()).handler(this.ASSIGN);
            routerBuilder.operation(this.POLL.getOperationId().toString()).handler(this.POLL);
            routerBuilder.operation(this.COMMIT.getOperationId().toString()).handler(this.COMMIT);
            routerBuilder.operation(this.SEEK.getOperationId().toString()).handler(this.SEEK);
            routerBuilder.operation(this.SEEK_TO_BEGINNING.getOperationId().toString()).handler(this.SEEK_TO_BEGINNING);
            routerBuilder.operation(this.SEEK_TO_END.getOperationId().toString()).handler(this.SEEK_TO_END);
            routerBuilder.operation(this.LIST_TOPICS.getOperationId().toString()).handler(this.LIST_TOPICS);
            routerBuilder.operation(this.GET_TOPIC.getOperationId().toString()).handler(this.GET_TOPIC);
            routerBuilder.operation(this.LIST_PARTITIONS.getOperationId().toString()).handler(this.LIST_PARTITIONS);
            routerBuilder.operation(this.GET_PARTITION.getOperationId().toString()).handler(this.GET_PARTITION);
            routerBuilder.operation(this.GET_OFFSETS.getOperationId().toString()).handler(this.GET_OFFSETS);
            routerBuilder.operation(this.HEALTHY.getOperationId().toString()).handler(this.HEALTHY);
            routerBuilder.operation(this.READY.getOperationId().toString()).handler(this.READY);
            routerBuilder.operation(this.OPENAPI.getOperationId().toString()).handler(this.OPENAPI);
            routerBuilder.operation(this.METRICS.getOperationId().toString()).handler(this.METRICS);
            routerBuilder.operation(this.INFO.getOperationId().toString()).handler(this.INFO);
            if (this.bridgeConfig.getHttpConfig().isCorsEnabled()) {
                routerBuilder.rootHandler(getCorsHandler());
                routerBuilder.rootHandler(BodyHandler.create());
            }
            this.router = routerBuilder.createRouter();
            this.router.errorHandler(HttpResponseStatus.BAD_REQUEST.code(), this::errorHandler);
            this.router.errorHandler(HttpResponseStatus.NOT_FOUND.code(), this::errorHandler);
            if (this.metricsReporter.getMeterRegistry() != null) {
                this.metricsReporter.getMeterRegistry().config().meterFilter(MeterFilter.deny(id -> {
                    return "/metrics".equals(id.getTag(Label.HTTP_PATH.toString()));
                }));
            }
            LOGGER.info("Starting HTTP-Kafka bridge verticle...");
            this.httpBridgeContext = new HttpBridgeContext<>();
            HttpAdminBridgeEndpoint httpAdminBridgeEndpoint = new HttpAdminBridgeEndpoint(this.bridgeConfig, this.httpBridgeContext);
            this.httpBridgeContext.setHttpAdminEndpoint(httpAdminBridgeEndpoint);
            httpAdminBridgeEndpoint.open();
            bindHttpServer(promise);
        });
    }

    private CorsHandler getCorsHandler() {
        HashSet hashSet = new HashSet();
        hashSet.add("x-requested-with");
        hashSet.add("x-forwarded-proto");
        hashSet.add("x-forwarded-host");
        hashSet.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN.toString());
        hashSet.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS.toString());
        hashSet.add(HttpHeaderNames.ORIGIN.toString());
        hashSet.add(HttpHeaderNames.CONTENT_TYPE.toString());
        hashSet.add(HttpHeaderNames.CONTENT_LENGTH.toString());
        hashSet.add(HttpHeaderNames.ACCEPT.toString());
        HashSet hashSet2 = new HashSet();
        for (String str : this.bridgeConfig.getHttpConfig().getCorsAllowedMethods().split(",")) {
            hashSet2.add(HttpMethod.valueOf(str));
        }
        String corsAllowedOrigins = this.bridgeConfig.getHttpConfig().getCorsAllowedOrigins();
        LOGGER.info("Allowed origins for Cors: {}", corsAllowedOrigins);
        return CorsHandler.create().addRelativeOrigin(corsAllowedOrigins).allowedHeaders(hashSet).allowedMethods(hashSet2);
    }

    public void stop(Promise<Void> promise) {
        LOGGER.info("Stopping HTTP-Kafka bridge verticle ...");
        this.isReady = false;
        this.httpBridgeContext.closeAllHttpSinkBridgeEndpoints();
        this.httpBridgeContext.closeAllHttpSourceBridgeEndpoints();
        this.httpBridgeContext.closeHttpAdminClientEndpoint();
        if (this.httpServer != null) {
            this.httpServer.close(asyncResult -> {
                if (asyncResult.succeeded()) {
                    LOGGER.info("HTTP-Kafka bridge has been shut down successfully");
                    promise.complete();
                } else {
                    LOGGER.info("Error while shutting down HTTP-Kafka bridge", asyncResult.cause());
                    promise.fail(asyncResult.cause());
                }
            });
        }
    }

    private HttpServerOptions httpServerOptions() {
        HttpServerOptions httpServerOptions = new HttpServerOptions();
        httpServerOptions.setHost(this.bridgeConfig.getHttpConfig().getHost());
        httpServerOptions.setPort(this.bridgeConfig.getHttpConfig().getPort());
        return httpServerOptions;
    }

    private void send(RoutingContext routingContext) {
        this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.SEND);
        processProducer(routingContext);
    }

    private void sendToPartition(RoutingContext routingContext) {
        this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.SEND_TO_PARTITION);
        processProducer(routingContext);
    }

    private void createConsumer(RoutingContext routingContext) {
        if (!this.bridgeConfig.getHttpConfig().isConsumerEnabled()) {
            HttpUtils.sendResponse(routingContext, HttpResponseStatus.SERVICE_UNAVAILABLE.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(HttpResponseStatus.SERVICE_UNAVAILABLE.code(), "Consumer is disabled in config. To enable consumer update http.consumer.enabled to true").toJson()));
            return;
        }
        this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.CREATE_CONSUMER);
        HttpSinkBridgeEndpoint httpSinkBridgeEndpoint = null;
        try {
            httpSinkBridgeEndpoint = new HttpSinkBridgeEndpoint(this.bridgeConfig, this.httpBridgeContext, EmbeddedFormat.from(JsonUtils.getString(!routingContext.body().isEmpty() ? JsonUtils.bytesToJson(routingContext.body().buffer().getBytes()) : JsonUtils.createObjectNode(), "format", "binary")), new ByteArrayDeserializer(), new ByteArrayDeserializer());
            httpSinkBridgeEndpoint.closeHandler(httpBridgeEndpoint -> {
                this.httpBridgeContext.getHttpSinkEndpoints().remove(((HttpSinkBridgeEndpoint) httpBridgeEndpoint).consumerInstanceId());
            });
            httpSinkBridgeEndpoint.open();
            httpSinkBridgeEndpoint.handle(routingContext, httpBridgeEndpoint2 -> {
                HttpSinkBridgeEndpoint<byte[], byte[]> httpSinkBridgeEndpoint2 = (HttpSinkBridgeEndpoint) httpBridgeEndpoint2;
                this.httpBridgeContext.getHttpSinkEndpoints().put(httpSinkBridgeEndpoint2.consumerInstanceId(), httpSinkBridgeEndpoint2);
                this.timestampMap.put(httpSinkBridgeEndpoint2.consumerInstanceId(), Long.valueOf(System.currentTimeMillis()));
            });
        } catch (Exception e) {
            if (httpSinkBridgeEndpoint != null) {
                httpSinkBridgeEndpoint.close();
            }
            HttpBridgeError httpBridgeError = new HttpBridgeError((((e instanceof IllegalEmbeddedFormatException) || (e instanceof ConfigException)) ? HttpResponseStatus.UNPROCESSABLE_ENTITY : HttpResponseStatus.INTERNAL_SERVER_ERROR).code(), e.getMessage());
            HttpUtils.sendResponse(routingContext, httpBridgeError.getCode(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(httpBridgeError.toJson()));
        }
    }

    private void deleteConsumer(RoutingContext routingContext) {
        this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.DELETE_CONSUMER);
        ConsumerInstanceId consumerInstanceId = new ConsumerInstanceId(routingContext.pathParam("groupid"), routingContext.pathParam("name"));
        HttpSinkBridgeEndpoint<byte[], byte[]> httpSinkBridgeEndpoint = this.httpBridgeContext.getHttpSinkEndpoints().get(consumerInstanceId);
        if (httpSinkBridgeEndpoint == null) {
            HttpUtils.sendResponse(routingContext, HttpResponseStatus.NOT_FOUND.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(HttpResponseStatus.NOT_FOUND.code(), "The specified consumer instance was not found.").toJson()));
        } else {
            httpSinkBridgeEndpoint.handle(routingContext);
            this.httpBridgeContext.getHttpSinkEndpoints().remove(consumerInstanceId);
            this.timestampMap.remove(consumerInstanceId);
        }
    }

    private void subscribe(RoutingContext routingContext) {
        this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.SUBSCRIBE);
        processConsumer(routingContext);
    }

    private void unsubscribe(RoutingContext routingContext) {
        this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.UNSUBSCRIBE);
        processConsumer(routingContext);
    }

    private void listSubscriptions(RoutingContext routingContext) {
        this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.LIST_SUBSCRIPTIONS);
        processConsumer(routingContext);
    }

    private void listTopics(RoutingContext routingContext) {
        this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.LIST_TOPICS);
        processAdminClient(routingContext);
    }

    private void getTopic(RoutingContext routingContext) {
        this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.GET_TOPIC);
        processAdminClient(routingContext);
    }

    private void listPartitions(RoutingContext routingContext) {
        this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.LIST_PARTITIONS);
        processAdminClient(routingContext);
    }

    private void getPartition(RoutingContext routingContext) {
        this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.GET_PARTITION);
        processAdminClient(routingContext);
    }

    private void getOffsets(RoutingContext routingContext) {
        this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.GET_OFFSETS);
        processAdminClient(routingContext);
    }

    private void assign(RoutingContext routingContext) {
        this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.ASSIGN);
        processConsumer(routingContext);
    }

    private void poll(RoutingContext routingContext) {
        this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.POLL);
        processConsumer(routingContext);
    }

    private void commit(RoutingContext routingContext) {
        this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.COMMIT);
        processConsumer(routingContext);
    }

    private void seek(RoutingContext routingContext) {
        this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.SEEK);
        processConsumer(routingContext);
    }

    private void seekToBeginning(RoutingContext routingContext) {
        this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.SEEK_TO_BEGINNING);
        processConsumer(routingContext);
    }

    private void seekToEnd(RoutingContext routingContext) {
        this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.SEEK_TO_END);
        processConsumer(routingContext);
    }

    private void processConsumer(RoutingContext routingContext) {
        ConsumerInstanceId consumerInstanceId = new ConsumerInstanceId(routingContext.pathParam("groupid"), routingContext.pathParam("name"));
        HttpSinkBridgeEndpoint<byte[], byte[]> httpSinkBridgeEndpoint = this.httpBridgeContext.getHttpSinkEndpoints().get(consumerInstanceId);
        if (httpSinkBridgeEndpoint != null) {
            this.timestampMap.replace(consumerInstanceId, Long.valueOf(System.currentTimeMillis()));
            httpSinkBridgeEndpoint.handle(routingContext);
        } else {
            HttpUtils.sendResponse(routingContext, HttpResponseStatus.NOT_FOUND.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(HttpResponseStatus.NOT_FOUND.code(), "The specified consumer instance was not found.").toJson()));
        }
    }

    private void processProducer(RoutingContext routingContext) {
        if (!this.bridgeConfig.getHttpConfig().isProducerEnabled()) {
            HttpUtils.sendResponse(routingContext, HttpResponseStatus.SERVICE_UNAVAILABLE.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(HttpResponseStatus.SERVICE_UNAVAILABLE.code(), "Producer is disabled in config. To enable producer update http.producer.enabled to true").toJson()));
            return;
        }
        HttpServerRequest request = routingContext.request();
        String header = request.getHeader("Content-Type") != null ? request.getHeader("Content-Type") : BridgeContentType.KAFKA_JSON_BINARY;
        HttpSourceBridgeEndpoint<byte[], byte[]> httpSourceBridgeEndpoint = this.httpBridgeContext.getHttpSourceEndpoints().get(request.connection());
        if (httpSourceBridgeEndpoint == null) {
            try {
                httpSourceBridgeEndpoint = new HttpSourceBridgeEndpoint<>(this.bridgeConfig, contentTypeToFormat(header), new ByteArraySerializer(), new ByteArraySerializer());
                httpSourceBridgeEndpoint.closeHandler(httpBridgeEndpoint -> {
                    this.httpBridgeContext.getHttpSourceEndpoints().remove(request.connection());
                });
                httpSourceBridgeEndpoint.open();
                this.httpBridgeContext.getHttpSourceEndpoints().put(request.connection(), httpSourceBridgeEndpoint);
            } catch (Exception e) {
                if (httpSourceBridgeEndpoint != null) {
                    httpSourceBridgeEndpoint.close();
                }
                HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), e.getMessage()).toJson()));
                return;
            }
        }
        httpSourceBridgeEndpoint.handle(routingContext);
    }

    private void processAdminClient(RoutingContext routingContext) {
        HttpAdminBridgeEndpoint httpAdminEndpoint = this.httpBridgeContext.getHttpAdminEndpoint();
        if (httpAdminEndpoint != null) {
            httpAdminEndpoint.handle(routingContext);
        } else {
            HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), "The AdminClient was not found.").toJson()));
        }
    }

    private void healthy(RoutingContext routingContext) {
        HttpUtils.sendResponse(routingContext, (isAlive() ? HttpResponseStatus.NO_CONTENT : HttpResponseStatus.INTERNAL_SERVER_ERROR).code(), null, null);
    }

    private void ready(RoutingContext routingContext) {
        HttpUtils.sendResponse(routingContext, (isReady() ? HttpResponseStatus.NO_CONTENT : HttpResponseStatus.INTERNAL_SERVER_ERROR).code(), null, null);
    }

    private void openapi(RoutingContext routingContext) {
        this.vertx.fileSystem().readFile("openapiv2.json", asyncResult -> {
            if (!asyncResult.succeeded()) {
                LOGGER.error("Failed to read OpenAPI JSON file", asyncResult.cause());
                HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), BridgeContentType.JSON, JsonUtils.jsonToBytes(new HttpBridgeError(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), asyncResult.cause().getMessage()).toJson()));
                return;
            }
            String header = routingContext.request().getHeader("x-forwarded-path");
            String header2 = routingContext.request().getHeader("x-forwarded-prefix");
            if (header == null && header2 == null) {
                HttpUtils.sendFile(routingContext, HttpResponseStatus.OK.code(), BridgeContentType.JSON, "openapiv2.json");
                return;
            }
            String str = header2 != null ? header2 : "/";
            if (header != null) {
                str = header;
            }
            ObjectNode bytesToJson = JsonUtils.bytesToJson(((Buffer) asyncResult.result()).getBytes());
            bytesToJson.put("basePath", str);
            HttpUtils.sendResponse(routingContext, HttpResponseStatus.OK.code(), BridgeContentType.JSON, JsonUtils.jsonToBytes(bytesToJson));
        });
    }

    private void metrics(RoutingContext routingContext) {
        routingContext.response().setStatusCode(HttpResponseStatus.OK.code()).end(this.metricsReporter.scrape());
    }

    private void information(RoutingContext routingContext) {
        String implementationVersion = Application.class.getPackage().getImplementationVersion();
        ObjectNode createObjectNode = JsonUtils.createObjectNode();
        createObjectNode.put("bridge_version", implementationVersion == null ? "null" : implementationVersion);
        HttpUtils.sendResponse(routingContext, HttpResponseStatus.OK.code(), BridgeContentType.JSON, JsonUtils.jsonToBytes(createObjectNode));
    }

    @SuppressFBWarnings({"BC_UNCONFIRMED_CAST_OF_RETURN_VALUE"})
    private void errorHandler(RoutingContext routingContext) {
        int identityHashCode = System.identityHashCode(routingContext.request());
        routingContext.put("request-id", Integer.valueOf(identityHashCode));
        LOGGER.error("[{}] Request: from {}, method = {}, path = {}", Integer.valueOf(identityHashCode), routingContext.request().remoteAddress(), routingContext.request().method(), routingContext.request().path());
        String str = null;
        if (routingContext.statusCode() == HttpResponseStatus.BAD_REQUEST.code()) {
            str = HttpResponseStatus.BAD_REQUEST.reasonPhrase();
            if (routingContext.failure() != null) {
                StringBuilder sb = new StringBuilder();
                if (routingContext.failure().getCause() instanceof ValidationException) {
                    ValidationException cause = routingContext.failure().getCause();
                    if (cause.inputScope() != null) {
                        sb.append("Validation error on: ").append(cause.inputScope()).append(" - ");
                    }
                    sb.append(cause.getMessage());
                } else if (routingContext.failure() instanceof ParameterProcessorException) {
                    ParameterProcessorException failure = routingContext.failure();
                    if (failure.getParameterName() != null) {
                        sb.append("Parameter error on: ").append(failure.getParameterName()).append(" - ");
                    }
                    sb.append(failure.getMessage());
                } else if (routingContext.failure() instanceof BodyProcessorException) {
                    sb.append(routingContext.failure().getMessage());
                }
                str = sb.toString();
            }
        } else if (routingContext.statusCode() == HttpResponseStatus.NOT_FOUND.code()) {
            str = HttpResponseStatus.NOT_FOUND.reasonPhrase();
        }
        HttpUtils.sendResponse(routingContext, routingContext.statusCode(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(new HttpBridgeError(routingContext.statusCode(), str).toJson()));
        LOGGER.error("[{}] Response: statusCode = {}, message = {} ", Integer.valueOf(identityHashCode), Integer.valueOf(routingContext.statusCode()), str);
    }

    private void processConnection(HttpConnection httpConnection) {
        httpConnection.closeHandler(r5 -> {
            closeConnectionEndpoint(httpConnection);
        });
    }

    private void closeConnectionEndpoint(HttpConnection httpConnection) {
        if (this.httpBridgeContext.getHttpSourceEndpoints().containsKey(httpConnection)) {
            HttpSourceBridgeEndpoint<byte[], byte[]> httpSourceBridgeEndpoint = this.httpBridgeContext.getHttpSourceEndpoints().get(httpConnection);
            if (httpSourceBridgeEndpoint != null) {
                httpSourceBridgeEndpoint.close();
            }
            this.httpBridgeContext.getHttpSourceEndpoints().remove(httpConnection);
        }
    }

    private EmbeddedFormat contentTypeToFormat(String str) {
        boolean z = -1;
        switch (str.hashCode()) {
            case -1640887942:
                if (str.equals(BridgeContentType.KAFKA_JSON_JSON)) {
                    z = true;
                    break;
                }
                break;
            case -690991937:
                if (str.equals(BridgeContentType.KAFKA_JSON_TEXT)) {
                    z = 2;
                    break;
                }
                break;
            case -13620909:
                if (str.equals(BridgeContentType.KAFKA_JSON_BINARY)) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return EmbeddedFormat.BINARY;
            case true:
                return EmbeddedFormat.JSON;
            case true:
                return EmbeddedFormat.TEXT;
            default:
                throw new IllegalArgumentException(str);
        }
    }

    private boolean isAlive() {
        return this.isReady;
    }

    private boolean isReady() {
        return this.isReady;
    }
}
