package io.apicurio.registry.serde.avro.nats.client.streaming.consumers;

import io.apicurio.registry.serde.avro.AvroDeserializer;
import io.apicurio.registry.serde.avro.AvroSerdeConfig;
import io.nats.client.Connection;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.PullSubscribeOptions;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/apicurio/registry/serde/avro/nats/client/streaming/consumers/NatsConsumerImpl.class */
public class NatsConsumerImpl<DATA> implements NatsConsumer<DATA> {
    private final AvroDeserializer<DATA> deserializer;
    private final Connection connection;
    private final String subject;
    private final PullSubscribeOptions subscribeOptions;
    private JetStreamSubscription subscription;
    private static final Logger logger = LoggerFactory.getLogger(NatsConsumerImpl.class);

    public NatsConsumerImpl(Connection connection, String str, PullSubscribeOptions pullSubscribeOptions, Map<String, Object> map) {
        this.connection = connection;
        this.subject = str;
        this.subscribeOptions = pullSubscribeOptions;
        AvroSerdeConfig avroSerdeConfig = new AvroSerdeConfig(map);
        this.deserializer = new AvroDeserializer<>();
        this.deserializer.configure(avroSerdeConfig, false);
    }

    private JetStreamSubscription getLazySubscription() throws IOException, JetStreamApiException {
        if (this.subscription == null) {
            this.subscription = this.connection.jetStream().subscribe(this.subject, this.subscribeOptions);
        }
        return this.subscription;
    }

    @Override // io.apicurio.registry.serde.avro.nats.client.streaming.consumers.NatsConsumer
    public String getSubject() {
        return this.subject;
    }

    @Override // io.apicurio.registry.serde.avro.nats.client.streaming.consumers.NatsConsumer
    public NatsConsumerRecord<DATA> receive() throws JetStreamApiException, IOException {
        return receive(Duration.ofSeconds(3L));
    }

    @Override // io.apicurio.registry.serde.avro.nats.client.streaming.consumers.NatsConsumer
    public NatsConsumerRecord<DATA> receive(Duration duration) throws JetStreamApiException, IOException {
        return receive(1, duration).stream().findFirst().orElse(null);
    }

    @Override // io.apicurio.registry.serde.avro.nats.client.streaming.consumers.NatsConsumer
    public List<NatsConsumerRecord<DATA>> receive(int i, Duration duration) throws JetStreamApiException, IOException {
        List<Message> fetch = getLazySubscription().fetch(i, duration);
        if (fetch == null || fetch.isEmpty()) {
            logger.info("Receive timeout ({} ms)", Long.valueOf(duration.toMillis()));
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        for (Message message : fetch) {
            arrayList.add(new NatsConsumerRecordImpl(message, this.deserializer.deserializeData(this.subject, message.getData())));
        }
        return arrayList;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.subscription != null) {
            this.subscription.unsubscribe();
        }
    }
}
