package com.mongodb.kafka.connect.sink;

import com.mongodb.MongoBulkWriteException;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoException;
import com.mongodb.MongoNamespace;
import com.mongodb.bulk.BulkWriteError;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.WriteModel;
import com.mongodb.kafka.connect.Versions;
import com.mongodb.kafka.connect.util.ConfigHelper;
import com.mongodb.kafka.connect.util.ServerApiConfig;
import com.mongodb.kafka.connect.util.TimeseriesValidation;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mongodb/kafka/connect/sink/MongoSinkTask.class */
public class MongoSinkTask extends SinkTask {
    private static final String CONNECTOR_TYPE = "sink";
    private MongoSinkConfig sinkConfig;
    private MongoClient mongoClient;
    private Map<String, AtomicInteger> remainingRetriesTopicMap;
    private Set<MongoNamespace> checkedTimeseriesNamespaces;
    private Consumer<MongoProcessedSinkRecordData> errorReporter;
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoSinkTask.class);
    private static final BulkWriteOptions BULK_WRITE_OPTIONS = new BulkWriteOptions();

    public String version() {
        return Versions.VERSION;
    }

    public void start(Map<String, String> map) {
        LOGGER.info("Starting MongoDB sink task");
        try {
            this.sinkConfig = new MongoSinkConfig(map);
            this.checkedTimeseriesNamespaces = new HashSet();
            this.remainingRetriesTopicMap = new ConcurrentHashMap((Map) this.sinkConfig.getTopics().orElse(Collections.emptyList()).stream().collect(Collectors.toMap(str -> {
                return str;
            }, str2 -> {
                return new AtomicInteger(this.sinkConfig.getMongoSinkTopicConfig(str2).getInt(MongoSinkTopicConfig.MAX_NUM_RETRIES_CONFIG).intValue());
            })));
            this.errorReporter = createErrorReporter();
            LOGGER.debug("Started MongoDB sink task");
        } catch (Exception e) {
            throw new ConnectException("Failed to start new task", e);
        }
    }

    public void put(Collection<SinkRecord> collection) {
        if (collection.isEmpty()) {
            LOGGER.debug("No sink records to process for current poll operation");
        } else {
            MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(collection, this.sinkConfig, this.errorReporter).forEach(this::bulkWriteBatch);
        }
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
        LOGGER.debug("Flush called - noop");
    }

    public void stop() {
        LOGGER.info("Stopping MongoDB sink task");
        if (this.mongoClient != null) {
            this.mongoClient.close();
        }
    }

    private Consumer<MongoProcessedSinkRecordData> createErrorReporter() {
        Consumer<MongoProcessedSinkRecordData> consumer = mongoProcessedSinkRecordData -> {
        };
        if (this.context != null) {
            try {
                if (this.context.errantRecordReporter() == null) {
                    LOGGER.info("Errant record reporter not configured.");
                }
                ErrantRecordReporter errantRecordReporter = this.context.errantRecordReporter();
                if (errantRecordReporter != null) {
                    consumer = mongoProcessedSinkRecordData2 -> {
                        errantRecordReporter.report(mongoProcessedSinkRecordData2.getSinkRecord(), mongoProcessedSinkRecordData2.getException());
                    };
                }
            } catch (NoClassDefFoundError | NoSuchMethodError e) {
                LOGGER.info("Kafka versions prior to 2.6 do not support the errant record reporter.");
            }
        }
        return consumer;
    }

    private MongoClient getMongoClient() {
        if (this.mongoClient == null) {
            MongoClientSettings.Builder applyConnectionString = MongoClientSettings.builder().applyConnectionString(this.sinkConfig.getConnectionString());
            ServerApiConfig.setServerApi(applyConnectionString, this.sinkConfig);
            this.mongoClient = MongoClients.create(applyConnectionString.build(), ConfigHelper.getMongoDriverInformation(CONNECTOR_TYPE, this.sinkConfig.getString("provider")));
        }
        return this.mongoClient;
    }

    private void checkTimeseries(MongoNamespace mongoNamespace, MongoSinkTopicConfig mongoSinkTopicConfig) {
        if (this.checkedTimeseriesNamespaces.contains(mongoNamespace)) {
            return;
        }
        if (mongoSinkTopicConfig.isTimeseries()) {
            TimeseriesValidation.validateCollection(getMongoClient(), mongoNamespace, mongoSinkTopicConfig);
        }
        this.checkedTimeseriesNamespaces.add(mongoNamespace);
    }

    private void bulkWriteBatch(List<MongoProcessedSinkRecordData> list) {
        if (list.isEmpty()) {
            return;
        }
        MongoNamespace namespace = list.get(0).getNamespace();
        MongoSinkTopicConfig config = list.get(0).getConfig();
        checkTimeseries(namespace, config);
        List<WriteModel<BsonDocument>> list2 = (List) list.stream().map((v0) -> {
            return v0.getWriteModel();
        }).collect(Collectors.toList());
        try {
            LOGGER.debug("Bulk writing {} document(s) into collection [{}]", Integer.valueOf(list2.size()), namespace.getFullName());
            LOGGER.debug("Mongodb bulk write result: {}", getMongoClient().getDatabase(namespace.getDatabaseName()).getCollection(namespace.getCollectionName(), BsonDocument.class).bulkWrite(list2, BULK_WRITE_OPTIONS));
            resetRemainingRetriesForTopic(config);
            checkRateLimit(config);
        } catch (MongoException e) {
            LOGGER.warn("Writing {} document(s) into collection [{}] failed.", Integer.valueOf(list2.size()), namespace.getFullName());
            handleMongoException(config, list2, e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new DataException("Rate limiting was interrupted", e2);
        } catch (Exception e3) {
            if (config.logErrors()) {
                LOGGER.error("Failed to write mongodb documents", e3);
            }
            if (!config.tolerateErrors()) {
                throw new DataException("Failed to write mongodb documents", e3);
            }
        }
    }

    private void resetRemainingRetriesForTopic(MongoSinkTopicConfig mongoSinkTopicConfig) {
        getRemainingRetriesForTopic(mongoSinkTopicConfig.getTopic()).set(mongoSinkTopicConfig.getInt(MongoSinkTopicConfig.MAX_NUM_RETRIES_CONFIG).intValue());
    }

    private void checkRateLimit(MongoSinkTopicConfig mongoSinkTopicConfig) throws InterruptedException {
        RateLimitSettings rateLimitSettings = mongoSinkTopicConfig.getRateLimitSettings();
        if (rateLimitSettings.isTriggered()) {
            LOGGER.debug("Rate limit settings triggering {}ms defer timeout after processing {} further batches for topic {}", new Object[]{Integer.valueOf(rateLimitSettings.getTimeoutMs()), Integer.valueOf(rateLimitSettings.getEveryN()), mongoSinkTopicConfig.getTopic()});
            Thread.sleep(rateLimitSettings.getTimeoutMs());
        }
    }

    private AtomicInteger getRemainingRetriesForTopic(String str) {
        if (!this.remainingRetriesTopicMap.containsKey(str)) {
            this.remainingRetriesTopicMap.put(str, new AtomicInteger(this.sinkConfig.getMongoSinkTopicConfig(str).getInt(MongoSinkTopicConfig.MAX_NUM_RETRIES_CONFIG).intValue()));
        }
        return this.remainingRetriesTopicMap.get(str);
    }

    private void handleMongoException(MongoSinkTopicConfig mongoSinkTopicConfig, List<WriteModel<BsonDocument>> list, MongoException mongoException) {
        if (getRemainingRetriesForTopic(mongoSinkTopicConfig.getTopic()).decrementAndGet() > 0) {
            LOGGER.info("Deferring retry operation for {}ms", mongoSinkTopicConfig.getInt(MongoSinkTopicConfig.RETRIES_DEFER_TIMEOUT_CONFIG));
            this.context.timeout(r0.intValue());
            throw new RetriableException(mongoException.getMessage(), mongoException);
        }
        if (mongoSinkTopicConfig.logErrors()) {
            LOGGER.error("Error on mongodb operation", mongoException);
            if (mongoException instanceof MongoBulkWriteException) {
                LOGGER.error("Mongodb bulk write (partially) failed", mongoException);
                LOGGER.error("WriteResult: {}", ((MongoBulkWriteException) mongoException).getWriteResult());
                LOGGER.error("WriteErrors: {}", generateWriteErrors(((MongoBulkWriteException) mongoException).getWriteErrors(), list));
                LOGGER.error("WriteConcernError: {}", ((MongoBulkWriteException) mongoException).getWriteConcernError());
            }
        }
        if (!mongoSinkTopicConfig.tolerateErrors()) {
            throw new DataException("Failed to write mongodb documents despite retrying", mongoException);
        }
    }

    private String generateWriteErrors(List<BulkWriteError> list, List<WriteModel<BsonDocument>> list2) {
        ArrayList arrayList = new ArrayList();
        for (BulkWriteError bulkWriteError : list) {
            if (bulkWriteError.getIndex() < list2.size()) {
                arrayList.add("BulkWriteError{writeModel=" + list2.get(bulkWriteError.getIndex()) + ", code=" + bulkWriteError.getCode() + ", message='" + bulkWriteError.getMessage() + "', details=" + bulkWriteError.getDetails() + '}');
            } else {
                arrayList.add(bulkWriteError.toString());
            }
        }
        return "[" + String.join(", ", arrayList) + "]";
    }
}
