package io.synadia.flink.v0.source.reader;

import io.nats.client.Message;
import io.synadia.flink.utils.ConnectionFactory;
import io.synadia.flink.utils.MiscUtils;
import io.synadia.flink.v0.NatsJetStreamSourceConfiguration;
import io.synadia.flink.v0.emitter.NatsRecordEmitter;
import io.synadia.flink.v0.payload.PayloadDeserializer;
import io.synadia.flink.v0.source.split.NatsSubjectSplit;
import io.synadia.flink.v0.source.split.NatsSubjectSplitState;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderBase;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:io/synadia/flink/v0/source/reader/NatsJetStreamSourceReader.class */
public class NatsJetStreamSourceReader<OutputT> extends SourceReaderBase<Message, OutputT, NatsSubjectSplit, NatsSubjectSplitState> {
    private static final Logger LOG = LoggerFactory.getLogger(NatsJetStreamSourceReader.class);
    private final String id;
    private final ConnectionFactory connectionFactory;
    private final SourceReaderContext readerContext;
    private final AtomicReference<Throwable> cursorCommitThrowable;
    final SortedMap<Long, Map<String, List<Message>>> cursorsToCommit;
    private final ConcurrentMap<String, List<Message>> cursorsOfFinishedSplits;
    private final NatsJetStreamSourceConfiguration sourceConfiguration;

    public NatsJetStreamSourceReader(String str, FutureCompletingBlockingQueue<RecordsWithSplitIds<Message>> futureCompletingBlockingQueue, NatsSourceFetcherManager natsSourceFetcherManager, NatsJetStreamSourceConfiguration natsJetStreamSourceConfiguration, ConnectionFactory connectionFactory, PayloadDeserializer<OutputT> payloadDeserializer, SourceReaderContext sourceReaderContext) {
        super(futureCompletingBlockingQueue, natsSourceFetcherManager, new NatsRecordEmitter(payloadDeserializer), natsJetStreamSourceConfiguration.getConfiguration(), sourceReaderContext);
        this.id = MiscUtils.generatePrefixedId(str);
        this.sourceConfiguration = natsJetStreamSourceConfiguration;
        this.connectionFactory = connectionFactory;
        this.readerContext = (SourceReaderContext) Preconditions.checkNotNull(sourceReaderContext);
        this.cursorsToCommit = Collections.synchronizedSortedMap(new TreeMap());
        this.cursorsOfFinishedSplits = new ConcurrentHashMap();
        this.cursorCommitThrowable = new AtomicReference<>();
    }

    public void start() {
        LOG.debug("{} | start", this.id);
        super.start();
        if (this.sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
            Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::cumulativeAcknowledgmentMessage, this.sourceConfiguration.getFetchTimeout().toMillis(), this.sourceConfiguration.getAutoAckInterval().toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    public InputStatus pollNext(ReaderOutput<OutputT> readerOutput) throws Exception {
        Throwable th = this.cursorCommitThrowable.get();
        if (th != null) {
            throw new FlinkRuntimeException("An error occurred in acknowledge message.", th);
        }
        return super.pollNext(readerOutput);
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        LOG.debug("Committing cursors for checkpoint {}", Long.valueOf(j));
        Map<String, List<Message>> map = this.cursorsToCommit.get(Long.valueOf(j));
        try {
            ((NatsSourceFetcherManager) this.splitFetcherManager).acknowledgeMessages(map);
            LOG.debug("Successfully acknowledge cursors for checkpoint {}", Long.valueOf(j));
            this.cursorsOfFinishedSplits.keySet().removeAll(map.keySet());
            this.cursorsToCommit.headMap(Long.valueOf(j + 1)).clear();
        } catch (Exception e) {
            LOG.error("Failed to acknowledge cursors for checkpoint {}", Long.valueOf(j), e);
            this.cursorCommitThrowable.compareAndSet(null, e);
        }
    }

    public List<NatsSubjectSplit> snapshotState(long j) {
        List<NatsSubjectSplit> snapshotState = super.snapshotState(j);
        Map<String, List<Message>> computeIfAbsent = this.cursorsToCommit.computeIfAbsent(Long.valueOf(j), l -> {
            return new HashMap();
        });
        for (NatsSubjectSplit natsSubjectSplit : snapshotState) {
            computeIfAbsent.put(natsSubjectSplit.getSubject(), natsSubjectSplit.getCurrentMessages());
        }
        computeIfAbsent.putAll(this.cursorsOfFinishedSplits);
        return snapshotState;
    }

    public void close() throws Exception {
        super.close();
    }

    public void addSplits(List<NatsSubjectSplit> list) {
        super.addSplits(list);
    }

    protected void onSplitFinished(Map<String, NatsSubjectSplitState> map) {
        Iterator<String> it = map.keySet().iterator();
        while (it.hasNext()) {
            ((NatsSourceFetcherManager) this.splitFetcherManager).closeFetcher(it.next());
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("onSplitFinished event: {}", map);
        }
        Iterator<Map.Entry<String, NatsSubjectSplitState>> it2 = map.entrySet().iterator();
        while (it2.hasNext()) {
            NatsSubjectSplitState value = it2.next().getValue();
            this.cursorsOfFinishedSplits.put(value.getSplit().splitId(), value.getSplit().getCurrentMessages());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public NatsSubjectSplitState initializedState(NatsSubjectSplit natsSubjectSplit) {
        return new NatsSubjectSplitState(natsSubjectSplit);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public NatsSubjectSplit toSplitType(String str, NatsSubjectSplitState natsSubjectSplitState) {
        return natsSubjectSplitState.toNatsSubjectSplit();
    }

    private void cumulativeAcknowledgmentMessage() {
        HashMap hashMap = new HashMap(this.cursorsOfFinishedSplits);
        for (NatsSubjectSplit natsSubjectSplit : super.snapshotState(1L)) {
            hashMap.put(natsSubjectSplit.getSubject(), natsSubjectSplit.getCurrentMessages());
        }
        try {
            ((NatsSourceFetcherManager) this.splitFetcherManager).acknowledgeMessages(hashMap);
            this.cursorsOfFinishedSplits.keySet().removeAll(hashMap.keySet());
        } catch (Exception e) {
            LOG.error("Fail in auto cursor commit.", e);
            this.cursorCommitThrowable.compareAndSet(null, e);
        }
    }
}
