package com.mongodb.kafka.connect.sink;

import com.mongodb.MongoBulkWriteException;
import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.WriteModel;
import com.mongodb.kafka.connect.Versions;
import com.mongodb.kafka.connect.sink.converter.SinkConverter;
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
import com.mongodb.kafka.connect.sink.processor.PostProcessors;
import com.mongodb.kafka.connect.sink.writemodel.strategy.WriteModelStrategy;
import com.mongodb.kafka.connect.util.ConfigHelper;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mongodb/kafka/connect/sink/MongoSinkTask.class */
public class MongoSinkTask extends SinkTask {
    private static final String CONNECTOR_TYPE = "sink";
    private MongoSinkConfig sinkConfig;
    private MongoClient mongoClient;
    private Map<String, AtomicInteger> remainingRetriesTopicMap;
    private SinkConverter sinkConverter = new SinkConverter();
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoSinkTask.class);
    private static final BulkWriteOptions BULK_WRITE_OPTIONS = new BulkWriteOptions();

    public String version() {
        return Versions.VERSION;
    }

    public void start(Map<String, String> map) {
        LOGGER.info("Starting MongoDB sink task");
        try {
            this.sinkConfig = new MongoSinkConfig(map);
            this.remainingRetriesTopicMap = new ConcurrentHashMap((Map) this.sinkConfig.getTopics().orElse(Collections.emptyList()).stream().collect(Collectors.toMap(str -> {
                return str;
            }, str2 -> {
                return new AtomicInteger(this.sinkConfig.getMongoSinkTopicConfig(str2).getInt(MongoSinkTopicConfig.MAX_NUM_RETRIES_CONFIG).intValue());
            })));
            LOGGER.debug("Started MongoDB sink task");
        } catch (Exception e) {
            throw new ConnectException("Failed to start new task", e);
        }
    }

    public void put(Collection<SinkRecord> collection) {
        if (collection.isEmpty()) {
            LOGGER.debug("No sink records to process for current poll operation");
        } else {
            createSinkRecordBatchesPerTopic(collection).forEach((str, recordBatches) -> {
                MongoSinkTopicConfig mongoSinkTopicConfig = this.sinkConfig.getMongoSinkTopicConfig(str);
                recordBatches.getBufferedBatches().forEach(list -> {
                    processSinkRecords(mongoSinkTopicConfig, list);
                    RateLimitSettings rateLimitSettings = mongoSinkTopicConfig.getRateLimitSettings();
                    if (rateLimitSettings.isTriggered()) {
                        LOGGER.debug("Rate limit settings triggering {}ms defer timeout after processing {} further batches for topic {}", new Object[]{Integer.valueOf(rateLimitSettings.getTimeoutMs()), Integer.valueOf(rateLimitSettings.getEveryN()), str});
                        try {
                            Thread.sleep(rateLimitSettings.getTimeoutMs());
                        } catch (InterruptedException e) {
                            LOGGER.error(e.getMessage());
                        }
                    }
                });
            });
        }
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
        LOGGER.debug("Flush called - noop");
    }

    public void stop() {
        LOGGER.info("Stopping MongoDB sink task");
        if (this.mongoClient != null) {
            this.mongoClient.close();
        }
    }

    private MongoClient getMongoClient() {
        if (this.mongoClient == null) {
            this.mongoClient = MongoClients.create(this.sinkConfig.getConnectionString(), ConfigHelper.getMongoDriverInformation(CONNECTOR_TYPE));
        }
        return this.mongoClient;
    }

    private void processSinkRecords(MongoSinkTopicConfig mongoSinkTopicConfig, List<SinkRecord> list) {
        List<? extends WriteModel<BsonDocument>> buildWriteModelCDC = mongoSinkTopicConfig.getCdcHandler().isPresent() ? buildWriteModelCDC(mongoSinkTopicConfig, list) : buildWriteModel(mongoSinkTopicConfig, list);
        try {
            if (!buildWriteModelCDC.isEmpty()) {
                LOGGER.debug("Bulk writing {} document(s) into collection [{}]", Integer.valueOf(buildWriteModelCDC.size()), mongoSinkTopicConfig.getNamespace().getFullName());
                LOGGER.debug("Mongodb bulk write result: {}", getMongoClient().getDatabase(mongoSinkTopicConfig.getNamespace().getDatabaseName()).getCollection(mongoSinkTopicConfig.getNamespace().getCollectionName(), BsonDocument.class).bulkWrite(buildWriteModelCDC, BULK_WRITE_OPTIONS));
            }
        } catch (MongoBulkWriteException e) {
            LOGGER.error("Mongodb bulk write (partially) failed", e);
            LOGGER.error("WriteResult: {}", e.getWriteResult());
            LOGGER.error("WriteErrors: {}", e.getWriteErrors());
            LOGGER.error("WriteConcernError: {}", e.getWriteConcernError());
            checkRetriableException(mongoSinkTopicConfig, e);
        } catch (MongoException e2) {
            LOGGER.error("Error on mongodb operation", e2);
            LOGGER.error("Writing {} document(s) into collection [{}] failed -> remaining retries ({})", new Object[]{Integer.valueOf(buildWriteModelCDC.size()), mongoSinkTopicConfig.getNamespace().getFullName(), Integer.valueOf(getRemainingRetriesForTopic(mongoSinkTopicConfig.getTopic()).get())});
            checkRetriableException(mongoSinkTopicConfig, e2);
        }
    }

    private AtomicInteger getRemainingRetriesForTopic(String str) {
        if (!this.remainingRetriesTopicMap.containsKey(str)) {
            this.remainingRetriesTopicMap.put(str, new AtomicInteger(this.sinkConfig.getMongoSinkTopicConfig(str).getInt(MongoSinkTopicConfig.MAX_NUM_RETRIES_CONFIG).intValue()));
        }
        return this.remainingRetriesTopicMap.get(str);
    }

    private void checkRetriableException(MongoSinkTopicConfig mongoSinkTopicConfig, MongoException mongoException) {
        if (getRemainingRetriesForTopic(mongoSinkTopicConfig.getTopic()).decrementAndGet() <= 0) {
            throw new DataException("Failed to write mongodb documents despite retrying", mongoException);
        }
        LOGGER.debug("Deferring retry operation for {}ms", mongoSinkTopicConfig.getInt(MongoSinkTopicConfig.RETRIES_DEFER_TIMEOUT_CONFIG));
        this.context.timeout(r0.intValue());
        throw new RetriableException(mongoException.getMessage(), mongoException);
    }

    Map<String, RecordBatches> createSinkRecordBatchesPerTopic(Collection<SinkRecord> collection) {
        LOGGER.debug("Number of sink records to process: {}", Integer.valueOf(collection.size()));
        HashMap hashMap = new HashMap();
        LOGGER.debug("Buffering sink records into grouped topic batches");
        collection.forEach(sinkRecord -> {
            RecordBatches recordBatches = (RecordBatches) hashMap.get(sinkRecord.topic());
            if (recordBatches == null) {
                int intValue = this.sinkConfig.getMongoSinkTopicConfig(sinkRecord.topic()).getInt(MongoSinkTopicConfig.MAX_BATCH_SIZE_CONFIG).intValue();
                LOGGER.debug("Batch size for collection {} is at most {} record(s)", this.sinkConfig.getMongoSinkTopicConfig(sinkRecord.topic()).getNamespace().getCollectionName(), Integer.valueOf(intValue));
                recordBatches = new RecordBatches(intValue, collection.size());
                hashMap.put(sinkRecord.topic(), recordBatches);
            }
            recordBatches.buffer(sinkRecord);
        });
        return hashMap;
    }

    List<? extends WriteModel<BsonDocument>> buildWriteModel(MongoSinkTopicConfig mongoSinkTopicConfig, Collection<SinkRecord> collection) {
        ArrayList arrayList = new ArrayList(collection.size());
        LOGGER.debug("building write model for {} record(s)", Integer.valueOf(collection.size()));
        PostProcessors postProcessors = mongoSinkTopicConfig.getPostProcessors();
        collection.forEach(sinkRecord -> {
            SinkDocument convert = this.sinkConverter.convert(sinkRecord);
            postProcessors.getPostProcessorList().forEach(postProcessor -> {
                postProcessor.process(convert, sinkRecord);
            });
            if (convert.getValueDoc().isPresent()) {
                arrayList.add(mongoSinkTopicConfig.getWriteModelStrategy().createWriteModel(convert));
                return;
            }
            Optional<WriteModelStrategy> deleteOneWriteModelStrategy = mongoSinkTopicConfig.getDeleteOneWriteModelStrategy();
            if (convert.getKeyDoc().isPresent() && deleteOneWriteModelStrategy.isPresent()) {
                arrayList.add(deleteOneWriteModelStrategy.get().createWriteModel(convert));
            } else {
                LOGGER.error("skipping sink record {} for which neither key doc nor value doc were present", sinkRecord);
            }
        });
        return arrayList;
    }

    List<? extends WriteModel<BsonDocument>> buildWriteModelCDC(MongoSinkTopicConfig mongoSinkTopicConfig, Collection<SinkRecord> collection) {
        LOGGER.debug("Building CDC write model for {} record(s) for topic {}", Integer.valueOf(collection.size()), mongoSinkTopicConfig.getTopic());
        Stream<SinkRecord> stream = collection.stream();
        SinkConverter sinkConverter = this.sinkConverter;
        Objects.requireNonNull(sinkConverter);
        return (List) stream.map(sinkConverter::convert).map(sinkDocument -> {
            return mongoSinkTopicConfig.getCdcHandler().flatMap(cdcHandler -> {
                return cdcHandler.handle(sinkDocument);
            });
        }).flatMap(optional -> {
            return (Stream) optional.map((v0) -> {
                return Stream.of(v0);
            }).orElseGet(Stream::empty);
        }).collect(Collectors.toList());
    }
}
