package io.strimzi.kafka.bridge.http;

import io.netty.handler.codec.http.HttpResponseStatus;
import io.strimzi.kafka.bridge.BridgeContentType;
import io.strimzi.kafka.bridge.EmbeddedFormat;
import io.strimzi.kafka.bridge.Endpoint;
import io.strimzi.kafka.bridge.SourceBridgeEndpoint;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.converter.MessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpBinaryMessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpJsonMessageConverter;
import io.strimzi.kafka.bridge.http.model.HttpBridgeError;
import io.strimzi.kafka.bridge.http.model.HttpBridgeResult;
import io.strimzi.kafka.bridge.tracing.SpanHandle;
import io.strimzi.kafka.bridge.tracing.TracingUtil;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.RecordMetadata;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serializer;

/* loaded from: input_file:io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.class */
public class HttpSourceBridgeEndpoint<K, V> extends SourceBridgeEndpoint<K, V> {
    private MessageConverter<K, V, Buffer, Buffer> messageConverter;
    private boolean closing;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.strimzi.kafka.bridge.http.HttpSourceBridgeEndpoint$1, reason: invalid class name */
    /* loaded from: input_file:io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$strimzi$kafka$bridge$EmbeddedFormat = new int[EmbeddedFormat.values().length];

        static {
            try {
                $SwitchMap$io$strimzi$kafka$bridge$EmbeddedFormat[EmbeddedFormat.JSON.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$strimzi$kafka$bridge$EmbeddedFormat[EmbeddedFormat.BINARY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public HttpSourceBridgeEndpoint(Vertx vertx, BridgeConfig bridgeConfig, EmbeddedFormat embeddedFormat, Serializer<K> serializer, Serializer<V> serializer2) {
        super(vertx, bridgeConfig, embeddedFormat, serializer, serializer2);
    }

    @Override // io.strimzi.kafka.bridge.SourceBridgeEndpoint, io.strimzi.kafka.bridge.BridgeEndpoint
    public void open() {
        this.name = this.bridgeConfig.getBridgeID() == null ? "kafka-bridge-producer-" + UUID.randomUUID() : this.bridgeConfig.getBridgeID() + "-" + UUID.randomUUID();
        this.closing = false;
        this.messageConverter = buildMessageConverter();
        super.open();
    }

    public void maybeClose() {
        if (this.closing) {
            close();
        }
    }

    @Override // io.strimzi.kafka.bridge.BridgeEndpoint
    public void handle(Endpoint<?> endpoint) {
        RoutingContext routingContext = (RoutingContext) endpoint.get();
        String pathParam = routingContext.pathParam("topicname");
        Integer num = null;
        if (routingContext.pathParam("partitionid") != null) {
            try {
                num = Integer.valueOf(Integer.parseInt(routingContext.pathParam("partitionid")));
            } catch (NumberFormatException e) {
                HttpUtils.sendResponse(routingContext, HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), BridgeContentType.KAFKA_JSON, new HttpBridgeError(HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), "Specified partition is not a valid number").toJson().toBuffer());
                return;
            }
        }
        boolean parseBoolean = Boolean.parseBoolean(routingContext.queryParams().get("async"));
        SpanHandle<K, V> span = TracingUtil.getTracing().span(routingContext, num == null ? HttpOpenApiOperations.SEND.toString() : HttpOpenApiOperations.SEND_TO_PARTITION.toString());
        try {
            if (this.messageConverter == null) {
                span.finish(HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
                HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), BridgeContentType.KAFKA_JSON, new HttpBridgeError(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), HttpResponseStatus.INTERNAL_SERVER_ERROR.reasonPhrase()).toJson().toBuffer());
                return;
            }
            List<KafkaProducerRecord<K, V>> kafkaRecords = this.messageConverter.toKafkaRecords(pathParam, num, routingContext.body().buffer());
            Iterator<KafkaProducerRecord<K, V>> it = kafkaRecords.iterator();
            while (it.hasNext()) {
                span.inject(it.next());
            }
            ArrayList arrayList = new ArrayList(kafkaRecords.size());
            if (parseBoolean) {
                Iterator<KafkaProducerRecord<K, V>> it2 = kafkaRecords.iterator();
                while (it2.hasNext()) {
                    send(it2.next(), null);
                }
                span.finish(HttpResponseStatus.NO_CONTENT.code());
                HttpUtils.sendResponse(routingContext, HttpResponseStatus.NO_CONTENT.code(), BridgeContentType.KAFKA_JSON, null);
                maybeClose();
                return;
            }
            ArrayList arrayList2 = new ArrayList(kafkaRecords.size());
            for (KafkaProducerRecord<K, V> kafkaProducerRecord : kafkaRecords) {
                Promise promise = Promise.promise();
                arrayList2.add(promise.future());
                send(kafkaProducerRecord, promise);
            }
            CompositeFuture.join(arrayList2).onComplete(asyncResult -> {
                for (int i = 0; i < arrayList2.size(); i++) {
                    if (!((Future) arrayList2.get(i)).succeeded() || ((Future) arrayList2.get(i)).result() == null) {
                        String message = ((Future) arrayList2.get(i)).cause().getMessage();
                        int handleError = handleError(((Future) arrayList2.get(i)).cause());
                        this.log.error("Failed to deliver record {}", kafkaRecords.get(i), asyncResult.cause());
                        arrayList.add(new HttpBridgeResult(new HttpBridgeError(handleError, message)));
                    } else {
                        RecordMetadata recordMetadata = (RecordMetadata) ((Future) arrayList2.get(i)).result();
                        this.log.debug("Delivered record {} to Kafka on topic {} at partition {} [{}]", new Object[]{kafkaRecords.get(i), recordMetadata.getTopic(), Integer.valueOf(recordMetadata.getPartition()), Long.valueOf(recordMetadata.getOffset())});
                        arrayList.add(new HttpBridgeResult(recordMetadata));
                    }
                }
                span.finish(HttpResponseStatus.OK.code());
                HttpUtils.sendResponse(routingContext, HttpResponseStatus.OK.code(), BridgeContentType.KAFKA_JSON, buildOffsets(arrayList).toBuffer());
                maybeClose();
            });
        } catch (Exception e2) {
            span.finish(HttpResponseStatus.UNPROCESSABLE_ENTITY.code());
            HttpUtils.sendResponse(routingContext, HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), BridgeContentType.KAFKA_JSON, new HttpBridgeError(HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), e2.getMessage()).toJson().toBuffer());
        }
    }

    @Override // io.strimzi.kafka.bridge.BridgeEndpoint
    public void handle(Endpoint<?> endpoint, Handler<?> handler) {
    }

    private JsonObject buildOffsets(List<HttpBridgeResult<?>> list) {
        JsonObject jsonObject = new JsonObject();
        JsonArray jsonArray = new JsonArray();
        for (HttpBridgeResult<?> httpBridgeResult : list) {
            JsonObject jsonObject2 = null;
            if (httpBridgeResult.getResult() instanceof RecordMetadata) {
                RecordMetadata recordMetadata = (RecordMetadata) httpBridgeResult.getResult();
                jsonObject2 = new JsonObject().put("partition", Integer.valueOf(recordMetadata.getPartition())).put("offset", Long.valueOf(recordMetadata.getOffset()));
            } else if (httpBridgeResult.getResult() instanceof HttpBridgeError) {
                jsonObject2 = ((HttpBridgeError) httpBridgeResult.getResult()).toJson();
            }
            jsonArray.add(jsonObject2);
        }
        jsonObject.put("offsets", jsonArray);
        return jsonObject;
    }

    private int handleError(Throwable th) {
        if (!(th instanceof TimeoutException) || th.getMessage() == null || !th.getMessage().contains("not present in metadata")) {
            return HttpResponseStatus.INTERNAL_SERVER_ERROR.code();
        }
        this.closing = true;
        return HttpResponseStatus.NOT_FOUND.code();
    }

    private MessageConverter<K, V, Buffer, Buffer> buildMessageConverter() {
        switch (AnonymousClass1.$SwitchMap$io$strimzi$kafka$bridge$EmbeddedFormat[this.format.ordinal()]) {
            case HttpConfig.DEFAULT_HTTP_ENABLED /* 1 */:
                return new HttpJsonMessageConverter();
            case 2:
                return new HttpBinaryMessageConverter();
            default:
                return null;
        }
    }
}
