package io.synadia.flink.v0.source.reader;

import io.nats.client.Connection;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.PullSubscribeOptions;
import io.synadia.flink.utils.ConnectionFactory;
import io.synadia.flink.utils.MiscUtils;
import io.synadia.flink.v0.source.NatsJetStreamSourceConfiguration;
import io.synadia.flink.v0.source.split.NatsSubjectSplit;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/synadia/flink/v0/source/reader/NatsSubjectSplitReader.class */
public class NatsSubjectSplitReader implements SplitReader<Message, NatsSubjectSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(NatsSubjectSplitReader.class);
    private final String id;
    private final ConnectionFactory connectionFactory;
    private final NatsJetStreamSourceConfiguration sourceConfiguration;
    private JetStreamSubscription jetStreamSubscription;
    private NatsSubjectSplit registeredSplit;
    private Connection _connection;

    public NatsSubjectSplitReader(String str, ConnectionFactory connectionFactory, NatsJetStreamSourceConfiguration natsJetStreamSourceConfiguration) {
        this.id = MiscUtils.generatePrefixedId(str);
        this.connectionFactory = connectionFactory;
        this.sourceConfiguration = natsJetStreamSourceConfiguration;
    }

    public RecordsWithSplitIds<Message> fetch() throws IOException {
        RecordsBySplits.Builder builder = new RecordsBySplits.Builder();
        if (getConnection() == null || this.registeredSplit == null) {
            return builder.build();
        }
        String splitId = this.registeredSplit.splitId();
        try {
            List fetch = this.jetStreamSubscription.fetch(this.sourceConfiguration.getMaxFetchRecords(), this.sourceConfiguration.getFetchTimeout());
            fetch.forEach(message -> {
                builder.add(splitId, message);
            });
            if (this.sourceConfiguration.getBoundedness() == Boundedness.BOUNDED && fetch.size() <= this.sourceConfiguration.getMaxFetchRecords()) {
                builder.addFinishedSplit(splitId);
            }
            LOG.debug("{} | {} | Finished polling message {}", new Object[]{this.id, splitId, 1});
            return builder.build();
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    public void handleSplitsChanges(SplitsChange<NatsSubjectSplit> splitsChange) {
        LOG.debug("{} | handleSplitsChanges {}", this.id, splitsChange);
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        if (this.registeredSplit != null) {
            throw new IllegalStateException("This split reader have assigned split.");
        }
        this.registeredSplit = (NatsSubjectSplit) splitsChange.splits().get(0);
        try {
            this.jetStreamSubscription = createSubscription(this.registeredSplit.getSubject());
            LOG.info("Register split {} consumer for current reader.", this.registeredSplit);
        } catch (Exception e) {
            throw new FlinkRuntimeException(e);
        }
    }

    public void pauseOrResumeSplits(Collection<NatsSubjectSplit> collection, Collection<NatsSubjectSplit> collection2) {
        LOG.debug("{} | pauseOrResumeSplits {} | {}", collection, collection2);
    }

    public void wakeUp() {
    }

    public void close() throws Exception {
        unsubscribe();
        closeConnection();
    }

    public void notifyCheckpointComplete(String str, List<Message> list) throws Exception {
        list.forEach(message -> {
            getConnection().publish(message.getReplyTo(), "+ACK".getBytes());
        });
    }

    private Connection getConnection() {
        if (this._connection == null) {
            try {
                this._connection = this.connectionFactory.connect();
            } catch (IOException e) {
                throw new FlinkRuntimeException(e);
            }
        }
        return this._connection;
    }

    private void closeConnection() {
        try {
            if (this._connection != null) {
                try {
                    this._connection.close();
                    this._connection = null;
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new FlinkRuntimeException(e);
                }
            }
        } catch (Throwable th) {
            this._connection = null;
            throw th;
        }
    }

    private void unsubscribe() {
        try {
            if (this.jetStreamSubscription != null) {
                try {
                    this.jetStreamSubscription.unsubscribe();
                    this.jetStreamSubscription = null;
                } catch (RuntimeException e) {
                    throw new FlinkRuntimeException(e);
                }
            }
        } catch (Throwable th) {
            this.jetStreamSubscription = null;
            throw th;
        }
    }

    private JetStreamSubscription createSubscription(String str) throws IOException, JetStreamApiException {
        return getConnection().jetStream().subscribe(str, ((PullSubscribeOptions.Builder) PullSubscribeOptions.builder().durable(this.sourceConfiguration.getConsumerName())).build());
    }
}
