package org.apache.pulsar.io.mongodb;

import com.google.common.collect.Lists;
import com.mongodb.MongoBulkWriteException;
import com.mongodb.client.result.InsertManyResult;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoCollection;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.bson.BSONException;
import org.bson.Document;
import org.bson.json.JsonParseException;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name = "mongo", type = IOType.SINK, help = "A sink connector that sends pulsar messages to mongodb", configClass = MongoConfig.class)
/* loaded from: input_file:org/apache/pulsar/io/mongodb/MongoSink.class */
public class MongoSink implements Sink<byte[]> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MongoSink.class);
    private MongoConfig mongoConfig;
    private MongoClient mongoClient;
    private MongoCollection<Document> collection;
    private List<Record<byte[]>> incomingList;
    private ScheduledExecutorService flushExecutor;
    private Supplier<MongoClient> clientProvider;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/io/mongodb/MongoSink$DocsToInsertSubscriber.class */
    public class DocsToInsertSubscriber implements Subscriber<InsertManyResult> {
        final List<Document> docsToInsert;
        final List<Record<byte[]>> recordsToInsert;
        final List<Integer> idxToAck;
        final List<Integer> idxToFail = Lists.newArrayList();

        public DocsToInsertSubscriber(List<Document> list, List<Record<byte[]>> list2) {
            this.docsToInsert = list;
            this.recordsToInsert = list2;
            this.idxToAck = (List) IntStream.range(0, this.docsToInsert.size()).boxed().collect(Collectors.toList());
        }

        @Override // org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            subscription.request(2147483647L);
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(InsertManyResult insertManyResult) {
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            if (th != null) {
                MongoSink.log.error("MongoDB insertion error", th);
                if (th instanceof MongoBulkWriteException) {
                    ((MongoBulkWriteException) th).getWriteErrors().forEach(bulkWriteError -> {
                        this.idxToFail.add(Integer.valueOf(bulkWriteError.getIndex()));
                    });
                    this.idxToAck.removeAll(this.idxToFail);
                } else {
                    this.idxToFail.addAll(this.idxToAck);
                    this.idxToAck.clear();
                }
            }
            onComplete();
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            if (MongoSink.log.isDebugEnabled()) {
                MongoSink.log.debug("Nb ack={}, nb fail={}", Integer.valueOf(this.idxToAck.size()), Integer.valueOf(this.idxToFail.size()));
            }
            this.idxToAck.forEach(num -> {
                this.recordsToInsert.get(num.intValue()).ack();
            });
            this.idxToFail.forEach(num2 -> {
                this.recordsToInsert.get(num2.intValue()).fail();
            });
        }
    }

    public MongoSink() {
        this(null);
    }

    public MongoSink(Supplier<MongoClient> supplier) {
        this.clientProvider = supplier;
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        log.info("Open MongoDB Sink");
        this.mongoConfig = MongoConfig.load(map);
        this.mongoConfig.validate(true, true);
        if (this.clientProvider != null) {
            this.mongoClient = this.clientProvider.get();
        } else {
            this.mongoClient = MongoClients.create(this.mongoConfig.getMongoUri());
        }
        this.collection = this.mongoClient.getDatabase(this.mongoConfig.getDatabase()).getCollection(this.mongoConfig.getCollection());
        this.incomingList = Lists.newArrayList();
        this.flushExecutor = Executors.newScheduledThreadPool(1);
        this.flushExecutor.scheduleAtFixedRate(() -> {
            flush();
        }, this.mongoConfig.getBatchTimeMs(), this.mongoConfig.getBatchTimeMs(), TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void write(Record<byte[]> record) {
        int size;
        String str = new String(record.getValue(), StandardCharsets.UTF_8);
        if (log.isDebugEnabled()) {
            log.debug("Received record: " + str);
        }
        synchronized (this) {
            this.incomingList.add(record);
            size = this.incomingList.size();
        }
        if (size == this.mongoConfig.getBatchSize()) {
            this.flushExecutor.submit(() -> {
                flush();
            });
        }
    }

    private void flush() {
        ArrayList arrayList = new ArrayList();
        synchronized (this) {
            if (this.incomingList.isEmpty()) {
                return;
            }
            List<Record<byte[]>> list = this.incomingList;
            this.incomingList = Lists.newArrayList();
            Iterator<Record<byte[]>> it = list.iterator();
            while (it.hasNext()) {
                Record<byte[]> next = it.next();
                try {
                    arrayList.add(Document.parse(new String(next.getValue(), StandardCharsets.UTF_8)));
                } catch (BSONException | JsonParseException e) {
                    log.error("Bad message", e);
                    next.fail();
                    it.remove();
                }
            }
            if (arrayList.size() > 0) {
                this.collection.insertMany(arrayList).subscribe(new DocsToInsertSubscriber(arrayList, list));
            }
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.flushExecutor != null) {
            this.flushExecutor.shutdown();
        }
        if (this.mongoClient != null) {
            this.mongoClient.close();
        }
    }
}
