package org.apache.pulsar.io.kafka;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.FloatDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.ShortDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.kafka.KafkaAbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name = "kafka", type = IOType.SOURCE, help = "Transfer data from Kafka to Pulsar.", configClass = KafkaSourceConfig.class)
/* loaded from: input_file:org/apache/pulsar/io/kafka/KafkaBytesSource.class */
public class KafkaBytesSource extends KafkaAbstractSource<ByteBuffer> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaBytesSource.class);
    private AvroSchemaCache schemaCache;
    private Schema keySchema;
    private Schema valueSchema;
    private boolean produceKeyValue;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/io/kafka/KafkaBytesSource$DeferredSchemaPlaceholder.class */
    public static final class DeferredSchemaPlaceholder extends ByteBufferSchemaWrapper {
        static final DeferredSchemaPlaceholder INSTANCE = new DeferredSchemaPlaceholder();

        DeferredSchemaPlaceholder() {
            super(SchemaInfo.builder().type(SchemaType.AVRO).properties(Collections.emptyMap()).schema(new byte[0]).build());
        }
    }

    /* loaded from: input_file:org/apache/pulsar/io/kafka/KafkaBytesSource$ExtractKafkaAvroSchemaDeserializer.class */
    public static class ExtractKafkaAvroSchemaDeserializer implements Deserializer<BytesWithKafkaSchema> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.kafka.common.serialization.Deserializer
        public BytesWithKafkaSchema deserialize(String str, byte[] bArr) {
            if (bArr == null) {
                return null;
            }
            try {
                ByteBuffer wrap = ByteBuffer.wrap(bArr);
                wrap.get();
                return new BytesWithKafkaSchema(wrap, wrap.getInt());
            } catch (Exception e) {
                throw new SerializationException("Error deserializing Avro message", e);
            }
        }
    }

    @Override // org.apache.pulsar.io.kafka.KafkaAbstractSource
    protected Properties beforeCreateConsumer(Properties properties) {
        properties.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        log.info("Created kafka consumer config : {}", properties);
        this.keySchema = getSchemaFromDeserializerAndAdaptConfiguration(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, properties, true);
        this.valueSchema = getSchemaFromDeserializerAndAdaptConfiguration(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, properties, false);
        if (this.keySchema == DeferredSchemaPlaceholder.INSTANCE || this.valueSchema == DeferredSchemaPlaceholder.INSTANCE) {
            initSchemaCache(properties);
        }
        if (this.keySchema.getSchemaInfo().getType() != SchemaType.STRING) {
            this.produceKeyValue = true;
        }
        return properties;
    }

    private void initSchemaCache(Properties properties) {
        KafkaAvroDeserializerConfig kafkaAvroDeserializerConfig = new KafkaAvroDeserializerConfig(properties);
        List<String> schemaRegistryUrls = kafkaAvroDeserializerConfig.getSchemaRegistryUrls();
        int maxSchemasPerSubject = kafkaAvroDeserializerConfig.getMaxSchemasPerSubject();
        CachedSchemaRegistryClient cachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrls, maxSchemasPerSubject);
        log.info("initializing SchemaRegistry Client, urls:{}, maxSchemasPerSubject: {}", schemaRegistryUrls, Integer.valueOf(maxSchemasPerSubject));
        this.schemaCache = new AvroSchemaCache(cachedSchemaRegistryClient);
    }

    @Override // org.apache.pulsar.io.kafka.KafkaAbstractSource
    public KafkaAbstractSource.KafkaRecord buildRecord(ConsumerRecord<Object, Object> consumerRecord) {
        if (!this.produceKeyValue) {
            Object value = consumerRecord.value();
            return new KafkaAbstractSource.KafkaRecord(consumerRecord, extractSimpleValue(value), getSchemaFromObject(value, this.valueSchema));
        }
        ByteBuffer extractSimpleValue = extractSimpleValue(consumerRecord.key());
        ByteBuffer extractSimpleValue2 = extractSimpleValue(consumerRecord.value());
        return new KafkaAbstractSource.KeyValueKafkaRecord(consumerRecord, new KeyValue(extractSimpleValue, extractSimpleValue2), getSchemaFromObject(consumerRecord.key(), this.keySchema), getSchemaFromObject(consumerRecord.value(), this.valueSchema));
    }

    private static ByteBuffer extractSimpleValue(Object obj) {
        if (obj == null) {
            return null;
        }
        if (obj instanceof BytesWithKafkaSchema) {
            return ((BytesWithKafkaSchema) obj).getValue();
        }
        if (obj instanceof ByteBuffer) {
            return (ByteBuffer) obj;
        }
        throw new IllegalArgumentException("Unexpected type from Kafka: " + obj.getClass());
    }

    private Schema<ByteBuffer> getSchemaFromObject(Object obj, Schema schema) {
        return obj instanceof BytesWithKafkaSchema ? this.schemaCache.get(((BytesWithKafkaSchema) obj).getSchemaId()) : schema;
    }

    private static Schema<ByteBuffer> getSchemaFromDeserializerAndAdaptConfiguration(String str, Properties properties, boolean z) {
        Schema schema;
        String property = properties.getProperty(str);
        Objects.requireNonNull(property);
        properties.put(str, ByteBufferDeserializer.class.getCanonicalName());
        if (ByteArrayDeserializer.class.getName().equals(property) || ByteBufferDeserializer.class.getName().equals(property) || BytesDeserializer.class.getName().equals(property)) {
            schema = Schema.BYTEBUFFER;
        } else if (StringDeserializer.class.getName().equals(property)) {
            if (z) {
                properties.put(str, property);
            }
            schema = Schema.STRING;
        } else if (DoubleDeserializer.class.getName().equals(property)) {
            schema = Schema.DOUBLE;
        } else if (FloatDeserializer.class.getName().equals(property)) {
            schema = Schema.FLOAT;
        } else if (IntegerDeserializer.class.getName().equals(property)) {
            schema = Schema.INT32;
        } else if (LongDeserializer.class.getName().equals(property)) {
            schema = Schema.INT64;
        } else {
            if (!ShortDeserializer.class.getName().equals(property)) {
                if (!KafkaAvroDeserializer.class.getName().equals(property)) {
                    throw new IllegalArgumentException("Unsupported deserializer " + property);
                }
                properties.put(str, ExtractKafkaAvroSchemaDeserializer.class.getName());
                return DeferredSchemaPlaceholder.INSTANCE;
            }
            schema = Schema.INT16;
        }
        return new ByteBufferSchemaWrapper(schema);
    }

    Schema getKeySchema() {
        return this.keySchema;
    }

    Schema getValueSchema() {
        return this.valueSchema;
    }

    boolean isProduceKeyValue() {
        return this.produceKeyValue;
    }
}
