package org.apache.kafka.clients.consumer;

import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/clients/consumer/PulsarKafkaConsumer.class */
public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener<byte[]> {
    private static final Logger log = LoggerFactory.getLogger(PulsarKafkaConsumer.class);
    private static final long serialVersionUID = 1;
    private final PulsarClient client;
    private final Schema<K> keySchema;
    private final Schema<V> valueSchema;
    private final String groupId;
    private final boolean isAutoCommit;
    private final ConcurrentMap<TopicPartition, Consumer<byte[]>> consumers;
    private final Map<TopicPartition, Long> lastReceivedOffset;
    private final Map<TopicPartition, OffsetAndMetadata> lastCommittedOffset;
    private final Set<TopicPartition> unpolledPartitions;
    private final SubscriptionInitialPosition strategy;
    private volatile boolean closed;
    private final int maxRecordsInSinglePoll;
    private final Properties properties;
    private final BlockingQueue<QueueItem> receivedMessages;

    /* loaded from: input_file:org/apache/kafka/clients/consumer/PulsarKafkaConsumer$QueueItem.class */
    private static class QueueItem {
        final Consumer<byte[]> consumer;
        final Message<byte[]> message;

        QueueItem(Consumer<byte[]> consumer, Message<byte[]> message) {
            this.consumer = consumer;
            this.message = message;
        }
    }

    public PulsarKafkaConsumer(Map<String, Object> map) {
        this(new ConsumerConfig(map), (Schema) null, (Schema) null);
    }

    public PulsarKafkaConsumer(Map<String, Object> map, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        this(new ConsumerConfig(map), new PulsarKafkaSchema(deserializer), new PulsarKafkaSchema(deserializer2));
    }

    public PulsarKafkaConsumer(Map<String, Object> map, Schema<K> schema, Schema<V> schema2) {
        this(new ConsumerConfig(map), schema, schema2);
    }

    public PulsarKafkaConsumer(Properties properties) {
        this(new ConsumerConfig(properties), (Schema) null, (Schema) null);
    }

    public PulsarKafkaConsumer(Properties properties, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        this(new ConsumerConfig(properties), new PulsarKafkaSchema(deserializer), new PulsarKafkaSchema(deserializer2));
    }

    public PulsarKafkaConsumer(Properties properties, Schema<K> schema, Schema<V> schema2) {
        this(new ConsumerConfig(properties), schema, schema2);
    }

    private PulsarKafkaConsumer(ConsumerConfig consumerConfig, Schema<K> schema, Schema<V> schema2) {
        this.consumers = new ConcurrentHashMap();
        this.lastReceivedOffset = new ConcurrentHashMap();
        this.lastCommittedOffset = new ConcurrentHashMap();
        this.unpolledPartitions = new HashSet();
        this.closed = false;
        this.receivedMessages = new ArrayBlockingQueue(1000);
        if (schema == null) {
            Deserializer deserializer = (Deserializer) consumerConfig.getConfiguredInstance("key.deserializer", Deserializer.class);
            deserializer.configure(consumerConfig.originals(), true);
            this.keySchema = new PulsarKafkaSchema(deserializer);
        } else {
            this.keySchema = schema;
            consumerConfig.ignore("key.deserializer");
        }
        if (schema2 == null) {
            Deserializer deserializer2 = (Deserializer) consumerConfig.getConfiguredInstance("value.deserializer", Deserializer.class);
            deserializer2.configure(consumerConfig.originals(), true);
            this.valueSchema = new PulsarKafkaSchema(deserializer2);
        } else {
            this.valueSchema = schema2;
            consumerConfig.ignore("value.deserializer");
        }
        this.groupId = consumerConfig.getString("group.id");
        this.isAutoCommit = consumerConfig.getBoolean("enable.auto.commit");
        this.strategy = getStrategy(consumerConfig.getString("auto.offset.reset"));
        log.info("Offset reset strategy has been assigned value {}", this.strategy);
        String str = (String) consumerConfig.getList("bootstrap.servers").get(0);
        this.maxRecordsInSinglePoll = 1000;
        this.properties = new Properties();
        Map originals = consumerConfig.originals();
        Properties properties = this.properties;
        properties.getClass();
        originals.forEach((v1, v2) -> {
            r1.put(v1, v2);
        });
        ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(this.properties);
        clientBuilder.enableTcpNoDelay(false);
        try {
            this.client = clientBuilder.serviceUrl(str).build();
        } catch (PulsarClientException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    private SubscriptionInitialPosition getStrategy(String str) {
        return "earliest".equals(str) ? SubscriptionInitialPosition.Earliest : SubscriptionInitialPosition.Latest;
    }

    public Set<TopicPartition> assignment() {
        throw new UnsupportedOperationException("Cannot access the partitions assignements");
    }

    public Set<String> subscription() {
        return (Set) this.consumers.keySet().stream().map((v0) -> {
            return v0.topic();
        }).collect(Collectors.toSet());
    }

    public void subscribe(List<String> list) {
        subscribe(list, (ConsumerRebalanceListener) null);
    }

    public void subscribe(List<String> list, ConsumerRebalanceListener consumerRebalanceListener) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        try {
            for (String str : list) {
                int intValue = ((Integer) this.client.getNumberOfPartitions(str).get()).intValue();
                ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(this.client, this.properties);
                consumerBuilder.subscriptionType(SubscriptionType.Failover);
                consumerBuilder.messageListener(this);
                consumerBuilder.subscriptionName(this.groupId);
                if (intValue > 1) {
                    consumerBuilder.consumerName(ConsumerName.generateRandomName());
                    for (int i = 0; i < intValue; i++) {
                        CompletableFuture subscribeAsync = consumerBuilder.clone().topic(new String[]{TopicName.get(str).getPartition(i).toString()}).subscribeAsync();
                        TopicPartition topicPartition = new TopicPartition(TopicName.get(str).getPartitionedTopicName(), i);
                        arrayList.add(subscribeAsync.thenApply(consumer -> {
                            log.info("Add consumer {} for partition {}", consumer, topicPartition);
                            this.consumers.putIfAbsent(topicPartition, consumer);
                            return consumer;
                        }));
                        arrayList2.add(topicPartition);
                    }
                } else {
                    CompletableFuture subscribeAsync2 = consumerBuilder.topic(new String[]{str}).subscribeAsync();
                    TopicPartition topicPartition2 = new TopicPartition(TopicName.get(str).getPartitionedTopicName(), 0);
                    arrayList.add(subscribeAsync2.thenApply(consumer2 -> {
                        log.info("Add consumer {} for partition {}", consumer2, topicPartition2);
                        this.consumers.putIfAbsent(topicPartition2, consumer2);
                        return consumer2;
                    }));
                    arrayList2.add(topicPartition2);
                }
            }
            this.unpolledPartitions.addAll(arrayList2);
            arrayList.forEach((v0) -> {
                v0.join();
            });
            if (consumerRebalanceListener != null) {
                consumerRebalanceListener.onPartitionsAssigned(arrayList2);
            }
        } catch (Exception e) {
            arrayList.forEach(completableFuture -> {
                try {
                    ((Consumer) completableFuture.get()).close();
                } catch (Exception e2) {
                }
            });
            throw new RuntimeException(e);
        }
    }

    public void assign(List<TopicPartition> list) {
        throw new UnsupportedOperationException("Cannot manually assign partitions");
    }

    public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
        throw new UnsupportedOperationException("Cannot subscribe with topic name pattern");
    }

    public void unsubscribe() {
        this.consumers.values().forEach(consumer -> {
            try {
                consumer.unsubscribe();
            } catch (PulsarClientException e) {
                throw new RuntimeException((Throwable) e);
            }
        });
    }

    public ConsumerRecords<K, V> poll(long j) {
        try {
            QueueItem poll = this.receivedMessages.poll(j, TimeUnit.MILLISECONDS);
            if (poll == null) {
                return ConsumerRecords.EMPTY;
            }
            HashMap hashMap = new HashMap();
            int i = 0;
            while (poll != null) {
                TopicName topicName = TopicName.get(poll.consumer.getTopic());
                String partitionedTopicName = topicName.getPartitionedTopicName();
                int partitionIndex = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
                Message<byte[]> message = poll.message;
                long offset = MessageIdUtils.getOffset(message.getMessageId());
                TopicPartition topicPartition = new TopicPartition(partitionedTopicName, partitionIndex);
                if (this.lastReceivedOffset.get(topicPartition) == null && !this.unpolledPartitions.contains(topicPartition)) {
                    log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", topicPartition);
                    resetOffsets(topicPartition);
                }
                K key = getKey(partitionedTopicName, message);
                if (this.valueSchema instanceof PulsarKafkaSchema) {
                    ((PulsarKafkaSchema) this.valueSchema).setTopic(partitionedTopicName);
                }
                ((List) hashMap.computeIfAbsent(topicPartition, topicPartition2 -> {
                    return new ArrayList();
                })).add(new ConsumerRecord(partitionedTopicName, partitionIndex, offset, key, this.valueSchema.decode(message.getData())));
                this.lastReceivedOffset.put(topicPartition, Long.valueOf(offset));
                this.unpolledPartitions.remove(topicPartition);
                i++;
                if (i >= this.maxRecordsInSinglePoll) {
                    break;
                }
                poll = this.receivedMessages.poll(0L, TimeUnit.MILLISECONDS);
            }
            if (this.isAutoCommit && !hashMap.isEmpty()) {
                commitAsync();
            }
            return new ConsumerRecords<>(hashMap);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void commitSync() {
        try {
            doCommitOffsets(getCurrentOffsetsMap()).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void commitSync(Map<TopicPartition, OffsetAndMetadata> map) {
        try {
            doCommitOffsets(map).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void commitAsync() {
        doCommitOffsets(getCurrentOffsetsMap());
    }

    public void commitAsync(OffsetCommitCallback offsetCommitCallback) {
        Map<TopicPartition, OffsetAndMetadata> currentOffsetsMap = getCurrentOffsetsMap();
        doCommitOffsets(currentOffsetsMap).handle((r8, th) -> {
            offsetCommitCallback.onComplete(currentOffsetsMap, th != null ? new Exception(th) : null);
            return null;
        });
    }

    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> map, OffsetCommitCallback offsetCommitCallback) {
        doCommitOffsets(map).handle((r8, th) -> {
            offsetCommitCallback.onComplete(map, th != null ? new Exception(th) : null);
            return null;
        });
    }

    private CompletableFuture<Void> doCommitOffsets(Map<TopicPartition, OffsetAndMetadata> map) {
        ArrayList arrayList = new ArrayList();
        map.forEach((topicPartition, offsetAndMetadata) -> {
            Consumer<byte[]> consumer = this.consumers.get(topicPartition);
            this.lastCommittedOffset.put(topicPartition, offsetAndMetadata);
            arrayList.add(consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId(offsetAndMetadata.offset())));
        });
        return FutureUtil.waitForAll(arrayList);
    }

    private Map<TopicPartition, OffsetAndMetadata> getCurrentOffsetsMap() {
        HashMap hashMap = new HashMap();
        this.lastReceivedOffset.forEach((topicPartition, l) -> {
            hashMap.put(topicPartition, new OffsetAndMetadata(l.longValue()));
        });
        return hashMap;
    }

    public void seek(TopicPartition topicPartition, long j) {
        MessageId messageId = MessageIdUtils.getMessageId(j);
        Consumer<byte[]> consumer = this.consumers.get(topicPartition);
        if (consumer == null) {
            throw new IllegalArgumentException("Cannot seek on a partition where we are not subscribed");
        }
        try {
            consumer.seek(messageId);
        } catch (PulsarClientException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public void seekToBeginning(TopicPartition... topicPartitionArr) {
        ArrayList arrayList = new ArrayList();
        if (topicPartitionArr.length == 0) {
            topicPartitionArr = (TopicPartition[]) this.consumers.keySet().toArray(new TopicPartition[0]);
        }
        this.lastCommittedOffset.clear();
        this.lastReceivedOffset.clear();
        for (TopicPartition topicPartition : topicPartitionArr) {
            Consumer<byte[]> consumer = this.consumers.get(topicPartition);
            if (consumer == null) {
                arrayList.add(FutureUtil.failedFuture(new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
            } else {
                arrayList.add(consumer.seekAsync(MessageId.earliest));
            }
        }
        FutureUtil.waitForAll(arrayList).join();
    }

    public void seekToEnd(TopicPartition... topicPartitionArr) {
        ArrayList arrayList = new ArrayList();
        if (topicPartitionArr.length == 0) {
            topicPartitionArr = (TopicPartition[]) this.consumers.keySet().toArray(new TopicPartition[0]);
        }
        this.lastCommittedOffset.clear();
        this.lastReceivedOffset.clear();
        for (TopicPartition topicPartition : topicPartitionArr) {
            Consumer<byte[]> consumer = this.consumers.get(topicPartition);
            if (consumer == null) {
                arrayList.add(FutureUtil.failedFuture(new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
            } else {
                arrayList.add(consumer.seekAsync(MessageId.latest));
            }
        }
        FutureUtil.waitForAll(arrayList).join();
    }

    public long position(TopicPartition topicPartition) {
        Long l = this.lastReceivedOffset.get(topicPartition);
        if (l == null && !this.unpolledPartitions.contains(topicPartition)) {
            return resetOffsets(topicPartition).getValue();
        }
        if (this.unpolledPartitions.contains(topicPartition)) {
            return 0L;
        }
        return l.longValue();
    }

    public OffsetAndMetadata committed(TopicPartition topicPartition) {
        return this.lastCommittedOffset.get(topicPartition);
    }

    public Map<MetricName, ? extends Metric> metrics() {
        throw new UnsupportedOperationException();
    }

    public List<PartitionInfo> partitionsFor(String str) {
        throw new UnsupportedOperationException();
    }

    public Map<String, List<PartitionInfo>> listTopics() {
        throw new UnsupportedOperationException();
    }

    public void pause(TopicPartition... topicPartitionArr) {
        throw new UnsupportedOperationException();
    }

    public void resume(TopicPartition... topicPartitionArr) {
        throw new UnsupportedOperationException();
    }

    public void close() {
        try {
            this.closed = true;
            if (this.isAutoCommit) {
                commitAsync();
            }
            this.client.closeAsync().get(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    public void wakeup() {
        throw new UnsupportedOperationException();
    }

    public void received(Consumer<byte[]> consumer, Message<byte[]> message) {
        try {
            this.receivedMessages.put(new QueueItem(consumer, message));
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            if (!this.closed) {
                throw new RuntimeException(e);
            }
        }
    }

    private SubscriptionInitialPosition resetOffsets(TopicPartition topicPartition) {
        log.info("Resetting partition {} and seeking to {} position", topicPartition, this.strategy);
        if (this.strategy == SubscriptionInitialPosition.Earliest) {
            seekToBeginning(topicPartition);
        } else {
            seekToEnd(topicPartition);
        }
        return this.strategy;
    }

    private K getKey(String str, Message<byte[]> message) {
        if (!message.hasKey()) {
            return null;
        }
        if (this.keySchema instanceof PulsarKafkaSchema) {
            PulsarKafkaSchema pulsarKafkaSchema = (PulsarKafkaSchema) this.keySchema;
            if (pulsarKafkaSchema.getKafkaDeserializer() instanceof StringDeserializer) {
                return (K) message.getKey();
            }
            pulsarKafkaSchema.setTopic(str);
        }
        return (K) this.keySchema.decode(Base64.getDecoder().decode(message.getKey()));
    }
}
