package org.apache.pulsar.client.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/client/impl/TableViewImpl.class */
public class TableViewImpl<T> implements TableView<T> {
    private static final Logger log = LoggerFactory.getLogger(TableViewImpl.class);
    private final TableViewConfigurationData conf;
    private final CompletableFuture<Reader<T>> reader;
    private final boolean isPersistentTopic;
    private TopicCompactionStrategy<T> compactionStrategy;
    private final ConcurrentMap<String, T> data = new ConcurrentHashMap();
    private final Map<String, T> immutableData = Collections.unmodifiableMap(this.data);
    private final List<BiConsumer<String, T>> listeners = new ArrayList();
    private final ReentrantLock listenersMutex = new ReentrantLock();

    /* JADX INFO: Access modifiers changed from: package-private */
    public TableViewImpl(PulsarClientImpl pulsarClientImpl, Schema<T> schema, TableViewConfigurationData tableViewConfigurationData) {
        this.conf = tableViewConfigurationData;
        this.isPersistentTopic = tableViewConfigurationData.getTopicName().startsWith(TopicDomain.persistent.toString());
        this.compactionStrategy = TopicCompactionStrategy.load(tableViewConfigurationData.getTopicCompactionStrategyClassName());
        ReaderBuilder subscriptionName = pulsarClientImpl.newReader(schema).topic(tableViewConfigurationData.getTopicName()).startMessageId(MessageId.earliest).autoUpdatePartitions(true).autoUpdatePartitionsInterval((int) tableViewConfigurationData.getAutoUpdatePartitionsSeconds(), TimeUnit.SECONDS).poolMessages(true).subscriptionName(tableViewConfigurationData.getSubscriptionName());
        if (this.isPersistentTopic) {
            subscriptionName.readCompacted(true);
        }
        CryptoKeyReader cryptoKeyReader = tableViewConfigurationData.getCryptoKeyReader();
        if (cryptoKeyReader != null) {
            subscriptionName.cryptoKeyReader(cryptoKeyReader);
        }
        subscriptionName.cryptoFailureAction(tableViewConfigurationData.getCryptoFailureAction());
        this.reader = subscriptionName.createAsync();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<TableView<T>> start() {
        return this.reader.thenCompose(reader -> {
            if (this.isPersistentTopic) {
                return readAllExistingMessages(reader);
            }
            readTailMessages(reader);
            return CompletableFuture.completedFuture(reader);
        }).thenApply((Function<? super U, ? extends U>) reader2 -> {
            return this;
        });
    }

    public int size() {
        return this.data.size();
    }

    public boolean isEmpty() {
        return this.data.isEmpty();
    }

    public boolean containsKey(String str) {
        return this.data.containsKey(str);
    }

    public T get(String str) {
        return this.data.get(str);
    }

    public Set<Map.Entry<String, T>> entrySet() {
        return this.immutableData.entrySet();
    }

    public Set<String> keySet() {
        return this.immutableData.keySet();
    }

    public Collection<T> values() {
        return this.immutableData.values();
    }

    public void forEach(BiConsumer<String, T> biConsumer) {
        this.data.forEach(biConsumer);
    }

    public void listen(BiConsumer<String, T> biConsumer) {
        try {
            this.listenersMutex.lock();
            this.listeners.add(biConsumer);
        } finally {
            this.listenersMutex.unlock();
        }
    }

    public void forEachAndListen(BiConsumer<String, T> biConsumer) {
        try {
            this.listenersMutex.lock();
            forEach(biConsumer);
            this.listeners.add(biConsumer);
        } finally {
            this.listenersMutex.unlock();
        }
    }

    public CompletableFuture<Void> closeAsync() {
        return this.reader.thenCompose((v0) -> {
            return v0.closeAsync();
        });
    }

    public void close() throws PulsarClientException {
        try {
            closeAsync().get();
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    /* JADX WARN: Finally extract failed */
    /* JADX WARN: Multi-variable type inference failed */
    private void handleMessage(Message<T> message) {
        try {
            if (message.hasKey()) {
                String key = message.getKey();
                Object value = message.size() > 0 ? message.getValue() : null;
                if (log.isDebugEnabled()) {
                    log.debug("Applying message from topic {}. key={} value={}", new Object[]{this.conf.getTopicName(), key, value});
                }
                boolean z = true;
                if (this.compactionStrategy != null) {
                    T t = this.data.get(key);
                    z = !this.compactionStrategy.shouldKeepLeft(t, value);
                    if (!z) {
                        log.info("Skipped the message from topic {}. key={} value={} prev={}", new Object[]{this.conf.getTopicName(), key, value, t});
                    }
                }
                if (z) {
                    try {
                        this.listenersMutex.lock();
                        if (null == value) {
                            this.data.remove(key);
                        } else {
                            this.data.put(key, value);
                        }
                        Iterator<BiConsumer<String, T>> it = this.listeners.iterator();
                        while (it.hasNext()) {
                            try {
                                it.next().accept(key, value);
                            } catch (Throwable th) {
                                log.error("Table view listener raised an exception", th);
                            }
                        }
                        this.listenersMutex.unlock();
                    } catch (Throwable th2) {
                        this.listenersMutex.unlock();
                        throw th2;
                    }
                }
            }
        } finally {
            message.release();
        }
    }

    private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) {
        long nanoTime = System.nanoTime();
        AtomicLong atomicLong = new AtomicLong();
        CompletableFuture<Reader<T>> completableFuture = new CompletableFuture<>();
        readAllExistingMessages(reader, completableFuture, nanoTime, atomicLong);
        return completableFuture;
    }

    private void readAllExistingMessages(Reader<T> reader, CompletableFuture<Reader<T>> completableFuture, long j, AtomicLong atomicLong) {
        reader.hasMessageAvailableAsync().thenAccept((Consumer) bool -> {
            if (bool.booleanValue()) {
                reader.readNextAsync().thenAccept((Consumer) message -> {
                    atomicLong.incrementAndGet();
                    handleMessage(message);
                    readAllExistingMessages(reader, completableFuture, j, atomicLong);
                }).exceptionally(th -> {
                    if (th.getCause() instanceof PulsarClientException.AlreadyClosedException) {
                        log.error("Reader {} was closed while reading existing messages.", reader.getTopic(), th);
                    } else {
                        log.warn("Reader {} was interrupted while reading existing messages. ", reader.getTopic(), th);
                    }
                    completableFuture.completeExceptionally(th);
                    return null;
                });
                return;
            }
            log.info("Started table view for topic {} - Replayed {} messages in {} seconds", new Object[]{reader.getTopic(), atomicLong, Double.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - j) / 1000.0d)});
            completableFuture.complete(reader);
            readTailMessages(reader);
        });
    }

    private void readTailMessages(Reader<T> reader) {
        reader.readNextAsync().thenAccept((Consumer) message -> {
            handleMessage(message);
            readTailMessages(reader);
        }).exceptionally(th -> {
            if (th.getCause() instanceof PulsarClientException.AlreadyClosedException) {
                log.error("Reader {} was closed while reading tail messages.", reader.getTopic(), th);
                return null;
            }
            log.warn("Reader {} was interrupted while reading tail messages. Retrying..", reader.getTopic(), th);
            readTailMessages(reader);
            return null;
        });
    }
}
