package io.streamnative.beam.pulsar;

import io.streamnative.beam.pulsar.PulsarIO;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.schema.SchemaType;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DoFn.UnboundedPerElement
/* loaded from: input_file:io/streamnative/beam/pulsar/ReadFromPulsarDoFn.class */
public class ReadFromPulsarDoFn<T> extends DoFn<PulsarSourceDescriptor, PulsarMessage<T>> {

    @VisibleForTesting
    static final String METRIC_NAMESPACE = "PulsarIOReader";

    @VisibleForTesting
    static final String RAW_SIZE_METRIC_PREFIX = "rawSize/";
    private PulsarClient client;
    private PulsarAdmin admin;
    private String clientUrl;
    private String adminUrl;
    private final String authPlugin;
    private final String authParameters;
    private final SchemaType schemaType;
    private final Class<T> pojoClass;
    private final int receiverQueueSize;
    private final String readerName;
    private final String subscriptionRolePrefix;
    private final String subscriptionName;
    private final boolean readCompacted;
    private final List<Range> keyHashRanges;
    private final int maxPendingChunkedMessage;
    private final boolean autoAckOldestChunkedMessageOnQueueFull;
    private final Duration expireTimeOfIncompleteChunkedMessage;
    private final Duration readTimeout;
    private final Duration idleTimeout;
    private final SerializableFunction<Message<T>, Instant> extractOutputTimestampFn;
    private final String ackSubscriptionName;
    private transient Schema<T> schema;
    private transient Map<String, Consumer<byte[]>> ackConsumers;
    private transient Map<String, Reader<T>> readers;
    private static final long DEFAULT_PULSAR_POLL_TIMEOUT = 2;
    private static final Logger LOG = LoggerFactory.getLogger(ReadFromPulsarDoFn.class);
    private static final AtomicLong FN_ID = new AtomicLong();
    private Instant lastProcessedTime = Instant.now();
    private final int maxBatchSize = 1000;
    protected final UUID uuid = UUID.randomUUID();
    private final long fnId = FN_ID.getAndIncrement();

    public ReadFromPulsarDoFn(PulsarIO.ReadFromPulsar<T> readFromPulsar) {
        if (readFromPulsar.read.getExtractOutputTimestampFn() != null) {
            this.extractOutputTimestampFn = readFromPulsar.read.getExtractOutputTimestampFn();
        } else {
            this.extractOutputTimestampFn = PulsarIO.ExtractOutputTimestampFn.usePublishTime();
        }
        this.clientUrl = readFromPulsar.read.getClientUrl();
        this.adminUrl = readFromPulsar.read.getAdminUrl();
        this.authPlugin = readFromPulsar.read.getAuthPlugin();
        this.authParameters = readFromPulsar.read.getAuthParameters();
        this.schemaType = readFromPulsar.read.getSchemaType();
        this.pojoClass = readFromPulsar.read.getPojo();
        this.receiverQueueSize = readFromPulsar.read.getReceiverQueueSize();
        this.readerName = readFromPulsar.read.getReaderName();
        this.subscriptionRolePrefix = readFromPulsar.read.getSubscriptionRolePrefix();
        this.subscriptionName = readFromPulsar.read.getSubscriptionName();
        this.readCompacted = readFromPulsar.read.isReadCompacted();
        this.keyHashRanges = readFromPulsar.read.getKeyHashRanges();
        this.maxPendingChunkedMessage = readFromPulsar.read.getMaxPendingChunkedMessage();
        this.autoAckOldestChunkedMessageOnQueueFull = readFromPulsar.read.isAutoAckOldestChunkedMessageOnQueueFull();
        this.expireTimeOfIncompleteChunkedMessage = readFromPulsar.read.getExpireTimeOfIncompleteChunkedMessage();
        Duration readTimeout = readFromPulsar.read.getReadTimeout();
        this.readTimeout = readTimeout == null ? Duration.millis(java.time.Duration.ofSeconds(DEFAULT_PULSAR_POLL_TIMEOUT).toMillis()) : readTimeout;
        this.idleTimeout = readFromPulsar.read.getIdleTimeout();
        this.ackSubscriptionName = readFromPulsar.read.getAckSubscriptionName();
        LOG.info("ReadFromPulsarDoFn created {} fnId: {} with clientUrl: {}, adminUrl: {}, authPlugin: {}, authParameters: {}, schemaType: {}, pojoClass: {}, receiverQueueSize: {}, readerName: {}, subscriptionRolePrefix: {}, subscriptionName: {}, readCompacted: {}, keyHashRanges: {}, maxPendingChunkedMessage: {}, autoAckOldestChunkedMessageOnQueueFull: {}, expireTimeOfIncompleteChunkedMessage: {}, readTimeout: {}, idleTimeout: {}, ackSubscriptionName: {}", new Object[]{this, Long.valueOf(this.fnId), this.clientUrl, this.adminUrl, this.authPlugin, this.authParameters, this.schemaType, this.pojoClass, Integer.valueOf(this.receiverQueueSize), this.readerName, this.subscriptionRolePrefix, this.subscriptionName, Boolean.valueOf(this.readCompacted), this.keyHashRanges, Integer.valueOf(this.maxPendingChunkedMessage), Boolean.valueOf(this.autoAckOldestChunkedMessageOnQueueFull), this.expireTimeOfIncompleteChunkedMessage, this.readTimeout, this.idleTimeout, this.ackSubscriptionName});
    }

    @DoFn.Setup
    public void initPulsarClients() throws Exception {
        if (this.clientUrl == null) {
            this.clientUrl = PulsarIOUtils.SERVICE_URL;
        }
        if (this.adminUrl == null) {
            this.adminUrl = PulsarIOUtils.SERVICE_HTTP_URL;
        }
        if (this.client == null) {
            this.client = PulsarClient.builder().connectionMaxIdleSeconds(180).serviceUrl(this.clientUrl).authentication(this.authPlugin, this.authParameters).ioThreads(Runtime.getRuntime().availableProcessors()).connectionsPerBroker(2).keepAliveInterval(30, TimeUnit.SECONDS).operationTimeout(30, TimeUnit.SECONDS).build();
        }
        if (this.admin == null) {
            this.admin = PulsarAdmin.builder().authentication(this.authPlugin, this.authParameters).serviceHttpUrl(this.adminUrl).tlsTrustCertsFilePath((String) null).allowTlsInsecureConnection(false).build();
        }
        if (this.schema == null) {
            this.schema = PulsarIOUtils.getSchema(this.schemaType, this.pojoClass);
        }
        if (this.ackConsumers == null) {
            this.ackConsumers = new HashMap();
        }
        if (this.readers == null) {
            this.readers = new HashMap();
        }
    }

    @DoFn.Teardown
    public void teardown() throws Exception {
        if (this.readers != null) {
            Iterator<Reader<T>> it = this.readers.values().iterator();
            while (it.hasNext()) {
                try {
                    it.next().close();
                } catch (Exception e) {
                    LOG.warn("Error closing reader", e);
                }
            }
            this.readers.clear();
        }
        if (this.ackConsumers != null) {
            Iterator<Consumer<byte[]>> it2 = this.ackConsumers.values().iterator();
            while (it2.hasNext()) {
                try {
                    it2.next().close();
                } catch (Exception e2) {
                    LOG.warn("Error closing consumer", e2);
                }
            }
            this.ackConsumers.clear();
        }
        if (this.admin != null) {
            try {
                try {
                    this.admin.close();
                    this.admin = null;
                } catch (Exception e3) {
                    LOG.warn("Error closing admin client", e3);
                    this.admin = null;
                }
            } catch (Throwable th) {
                this.admin = null;
                throw th;
            }
        }
        try {
            if (this.client != null) {
                try {
                    this.client.close();
                    this.client = null;
                } catch (Exception e4) {
                    LOG.warn("Error closing client", e4);
                    this.client = null;
                }
            }
        } catch (Throwable th2) {
            this.client = null;
            throw th2;
        }
    }

    @DoFn.ProcessElement
    public DoFn.ProcessContinuation processElement(@DoFn.Element PulsarSourceDescriptor pulsarSourceDescriptor, RestrictionTracker<PulsarMessageIdRange, MessageId> restrictionTracker, DoFn.OutputReceiver<PulsarMessage<T>> outputReceiver, WatermarkEstimator watermarkEstimator) throws Exception {
        MessageId from = ((PulsarMessageIdRange) restrictionTracker.currentRestriction()).getFrom();
        MessageId to = ((PulsarMessageIdRange) restrictionTracker.currentRestriction()).getTo();
        LOG.info("Processing element for pulsarSourceDescriptor {} with offset range [{}, {}]", new Object[]{pulsarSourceDescriptor, from, to});
        String topic = pulsarSourceDescriptor.getTopic();
        LOG.info("Processing topic {} with offset range [{}, {}]", new Object[]{topic, from, to});
        LOG.info("Creating/getting reader for topic {} starting from message ID {}: readers: {}", new Object[]{topic, from, Integer.valueOf(this.readers.size())});
        Reader<T> newReader = newReader(this.client, topic, from);
        int i = 0;
        while (i < 1000) {
            LOG.info("Reading message at offset {} for topic {}", from, topic);
            try {
                if (newReader.hasReachedEndOfTopic()) {
                    LOG.info("Reached end of topic {} at offset {}", topic, from);
                    closeReader(topic);
                    return DoFn.ProcessContinuation.stop();
                }
                Message<T> readNext = newReader.readNext((int) this.readTimeout.getMillis(), TimeUnit.MILLISECONDS);
                if (readNext == null) {
                    readNext = newReader.readNext(0, TimeUnit.MILLISECONDS);
                    if (readNext == null) {
                        Duration standardSeconds = Duration.standardSeconds(60L);
                        if (this.idleTimeout != null) {
                            standardSeconds = this.idleTimeout;
                        }
                        if (Instant.now().isAfter(this.lastProcessedTime.plus(standardSeconds)) && (watermarkEstimator instanceof TimestampObservingWatermarkEstimator)) {
                            ((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(Instant.now());
                        }
                        LOG.debug("No message available for topic {} at offset {}, resuming", topic, from);
                        return DoFn.ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1L));
                    }
                }
                LOG.info("Read message {} id: {} at offset {} for topic {}", new Object[]{readNext, readNext.getMessageId(), from, topic});
                from = readNext.getMessageId();
                if (!restrictionTracker.tryClaim(from)) {
                    LOG.info("Failed to claim offset {} for topic {}, stopping", from, topic);
                    closeReader(topic);
                    return DoFn.ProcessContinuation.stop();
                }
                LOG.info("Claimed offset {} for topic {}", from, topic);
                if (pulsarSourceDescriptor.getStopReadTime() != null && readNext.getPublishTime() > pulsarSourceDescriptor.getStopReadTime().longValue()) {
                    LOG.info("Message publish time {} exceeds stop read time {} for topic {}, stopping", new Object[]{Long.valueOf(readNext.getPublishTime()), pulsarSourceDescriptor.getStopReadTime(), topic});
                    return DoFn.ProcessContinuation.stop();
                }
                PulsarMessage pulsarMessage = new PulsarMessage(readNext.getTopicName(), Long.valueOf(readNext.getPublishTime()), readNext.getValue());
                Instant instant = (Instant) this.extractOutputTimestampFn.apply(readNext);
                if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
                    ((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(instant);
                }
                outputReceiver.outputWithTimestamp(pulsarMessage, instant);
                LOG.info("Successfully processed message at offset {} for topic {}", from, topic);
                if (StringUtils.isNotEmpty(this.ackSubscriptionName)) {
                    acknowledgeMessage(readNext, topic);
                }
                this.lastProcessedTime = Instant.now();
                i++;
                if (i >= 1000) {
                    return DoFn.ProcessContinuation.resume();
                }
            } catch (Exception e) {
                LOG.error("Error while reading message from topic {}", topic, e);
                return DoFn.ProcessContinuation.stop();
            }
        }
        return DoFn.ProcessContinuation.resume();
    }

    private void acknowledgeMessage(Message<T> message, String str) throws PulsarClientException {
        Consumer<byte[]> consumer = this.ackConsumers.get(str);
        if (consumer == null) {
            consumer = this.client.newConsumer().topic(new String[]{str}).subscriptionName(this.ackSubscriptionName).subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).receiverQueueSize(this.receiverQueueSize).subscribe();
            this.ackConsumers.put(str, consumer);
        }
        consumer.acknowledgeAsync(message).exceptionally((Function) th -> {
            LOG.error("[{}][{}] acknowledge message {} cumulative fail.", new Object[]{str, this.ackSubscriptionName, message.getMessageId(), th});
            return null;
        });
    }

    @DoFn.GetSize
    public double getSize(@DoFn.Element PulsarSourceDescriptor pulsarSourceDescriptor, @DoFn.Restriction PulsarMessageIdRange pulsarMessageIdRange) throws ExecutionException {
        return restrictionTracker(pulsarSourceDescriptor, pulsarMessageIdRange).getProgress().getWorkRemaining();
    }

    private boolean closeReader(String str) {
        Reader<T> remove = this.readers.remove(str);
        if (remove == null) {
            return false;
        }
        try {
            remove.close();
            return true;
        } catch (Exception e) {
            LOG.error("Failed to close reader for topic {}", str, e);
            return false;
        }
    }

    private Reader<T> newReader(PulsarClient pulsarClient, String str, MessageId messageId) throws PulsarClientException {
        Reader<T> reader = this.readers.get(str);
        if (reader != null) {
            return reader;
        }
        int i = 1000;
        if (this.receiverQueueSize > 0) {
            i = this.receiverQueueSize;
        }
        int i2 = 10;
        if (this.maxPendingChunkedMessage > 0) {
            i2 = this.maxPendingChunkedMessage;
        }
        ReaderBuilder startMessageIdInclusive = pulsarClient.newReader(this.schema).topic(str).receiverQueueSize(i).readCompacted(this.readCompacted).autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull).maxPendingChunkedMessage(i2).startMessageId(messageId).startMessageIdInclusive();
        if (StringUtils.isNotEmpty(this.readerName)) {
            startMessageIdInclusive.readerName(this.readerName + "-" + messageId.toString());
        }
        if (StringUtils.isNotEmpty(this.subscriptionRolePrefix)) {
            startMessageIdInclusive.subscriptionRolePrefix(this.subscriptionRolePrefix);
        }
        startMessageIdInclusive.subscriptionRolePrefix(getSubscriptionName());
        if (this.keyHashRanges != null) {
            startMessageIdInclusive.keyHashRange((Range[]) this.keyHashRanges.toArray(new Range[0]));
        }
        if (this.expireTimeOfIncompleteChunkedMessage != null) {
            startMessageIdInclusive.expireTimeOfIncompleteChunkedMessage(this.expireTimeOfIncompleteChunkedMessage.getStandardSeconds(), TimeUnit.SECONDS);
        }
        Reader<T> create = startMessageIdInclusive.create();
        if (messageId != null && !messageId.equals(MessageId.earliest)) {
            create.seek(messageId);
        }
        this.readers.put(str, create);
        return create;
    }

    @DoFn.GetInitialWatermarkEstimatorState
    public Instant getInitialWatermarkEstimatorState(@DoFn.Timestamp Instant instant) {
        return instant;
    }

    @DoFn.NewWatermarkEstimator
    public WatermarkEstimator<Instant> newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant instant) {
        return new WatermarkEstimators.MonotonicallyIncreasing(ensureTimestampWithinBounds(instant));
    }

    @DoFn.NewTracker
    public PulsarMessageIdRangeTracker restrictionTracker(@DoFn.Element PulsarSourceDescriptor pulsarSourceDescriptor, @DoFn.Restriction PulsarMessageIdRange pulsarMessageIdRange) throws ExecutionException {
        return new PulsarMessageIdRangeTracker(pulsarMessageIdRange, this.admin, pulsarSourceDescriptor.getTopic());
    }

    @DoFn.GetInitialRestriction
    public PulsarMessageIdRange getInitialRestriction(@DoFn.Element PulsarSourceDescriptor pulsarSourceDescriptor) {
        LOG.info("Starting Pulsar Reader for initial restriction for {}", pulsarSourceDescriptor);
        MessageId messageId = MessageId.earliest;
        MessageId messageId2 = MessageId.latest;
        Long startReadTime = pulsarSourceDescriptor.getStartReadTime();
        if (startReadTime != null) {
            try {
                messageId = this.admin.topics().getMessageIdByTimestamp(pulsarSourceDescriptor.getTopic(), startReadTime.longValue());
            } catch (PulsarAdminException e) {
                LOG.error("Failed to get MessageId by the start timestamp", e);
                throw new RuntimeException((Throwable) e);
            }
        }
        if (pulsarSourceDescriptor.getStopReadTime() != null && pulsarSourceDescriptor.getStopReadTime().longValue() <= Instant.now().getMillis()) {
            try {
                messageId2 = this.admin.topics().getMessageIdByTimestamp(pulsarSourceDescriptor.getTopic(), pulsarSourceDescriptor.getStopReadTime().longValue());
            } catch (PulsarAdminException e2) {
                LOG.error("Failed to get MessageId by the end timestamp", e2);
                throw new RuntimeException((Throwable) e2);
            }
        }
        if (pulsarSourceDescriptor.getStartMessageId() != null) {
            messageId = pulsarSourceDescriptor.getStartMessageId();
        }
        if (pulsarSourceDescriptor.getEndMessageId() != null) {
            messageId2 = pulsarSourceDescriptor.getEndMessageId();
        }
        LOG.info("Initial restriction for topic {} - startOffset: {}, endOffset: {}", new Object[]{pulsarSourceDescriptor.getTopic(), messageId, messageId2});
        return new PulsarMessageIdRange(messageId, messageId2);
    }

    private static Instant ensureTimestampWithinBounds(Instant instant) {
        if (instant.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
            instant = BoundedWindow.TIMESTAMP_MIN_VALUE;
        } else if (instant.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            instant = BoundedWindow.TIMESTAMP_MAX_VALUE;
        }
        return instant;
    }

    void setPulsarAdmin(PulsarAdmin pulsarAdmin) {
        this.admin = pulsarAdmin;
    }

    void setPulsarClient(PulsarClient pulsarClient) {
        this.client = pulsarClient;
    }

    protected String getSubscriptionName() {
        if (StringUtils.isNotEmpty(this.subscriptionName)) {
            return this.subscriptionName;
        }
        String str = "beam-pulsar-" + this.uuid.toString();
        LOG.info("Subscription name is not set, using a random name {}", str);
        return str;
    }
}
