package io.synadia.flink.v0.source.reader;

import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Message;
import io.synadia.flink.utils.ConnectionFactory;
import io.synadia.flink.utils.MiscUtils;
import io.synadia.flink.v0.payload.PayloadDeserializer;
import io.synadia.flink.v0.source.split.NatsSubjectSplit;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/synadia/flink/v0/source/reader/NatsSourceReader.class */
public class NatsSourceReader<OutputT> implements SourceReader<OutputT, NatsSubjectSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(NatsSourceReader.class);
    private final String id;
    private final ConnectionFactory connectionFactory;
    private final PayloadDeserializer<OutputT> payloadDeserializer;
    private final SourceReaderContext readerContext;
    private final List<NatsSubjectSplit> subbedSplits = new ArrayList();
    private final FutureCompletingBlockingQueue<Message> messages = new FutureCompletingBlockingQueue<>();
    private Connection connection;
    private Dispatcher dispatcher;

    public NatsSourceReader(String str, ConnectionFactory connectionFactory, PayloadDeserializer<OutputT> payloadDeserializer, SourceReaderContext sourceReaderContext) {
        this.id = MiscUtils.generatePrefixedId(str);
        this.connectionFactory = connectionFactory;
        this.payloadDeserializer = payloadDeserializer;
        this.readerContext = (SourceReaderContext) Preconditions.checkNotNull(sourceReaderContext);
    }

    public void start() {
        LOG.debug("{} | start", this.id);
        try {
            this.connection = this.connectionFactory.connect();
            this.dispatcher = this.connection.createDispatcher(message -> {
                this.messages.put(1, message);
            });
        } catch (IOException e) {
            throw new FlinkRuntimeException(e);
        }
    }

    public InputStatus pollNext(ReaderOutput<OutputT> readerOutput) throws Exception {
        Message message = (Message) this.messages.poll();
        if (message == null) {
            LOG.debug("{} | pollNext no message NOTHING_AVAILABLE", this.id);
            return InputStatus.NOTHING_AVAILABLE;
        }
        readerOutput.collect(this.payloadDeserializer.getObject(message.getSubject(), message.getData(), message.getHeaders()));
        InputStatus inputStatus = this.messages.isEmpty() ? InputStatus.NOTHING_AVAILABLE : InputStatus.MORE_AVAILABLE;
        LOG.debug("{} | pollNext had message, then {}", this.id, inputStatus);
        return inputStatus;
    }

    public List<NatsSubjectSplit> snapshotState(long j) {
        LOG.debug("{} | snapshotState", this.id);
        return Collections.unmodifiableList(this.subbedSplits);
    }

    public CompletableFuture<Void> isAvailable() {
        return this.messages.getAvailabilityFuture();
    }

    public void addSplits(List<NatsSubjectSplit> list) {
        for (NatsSubjectSplit natsSubjectSplit : list) {
            LOG.debug("{} | addSplits {}", this.id, natsSubjectSplit);
            if (this.subbedSplits.indexOf(natsSubjectSplit) == -1) {
                this.dispatcher.subscribe(natsSubjectSplit.getSubject());
                this.subbedSplits.add(natsSubjectSplit);
            }
        }
    }

    public void notifyNoMoreSplits() {
        LOG.debug("{} | notifyNoMoreSplits", this.id);
    }

    public void close() throws Exception {
        LOG.debug("{} | close", this.id);
        this.connection.close();
    }

    public void handleSourceEvents(SourceEvent sourceEvent) {
        LOG.debug("{} | handleSourceEvents {}", this.id, sourceEvent);
    }

    public String toString() {
        return "NatsSourceReader{id='" + this.id + "', subbedSplits=" + this.subbedSplits + "}";
    }
}
