package com.mongodb.kafka.connect.sink;

import com.mongodb.MongoBulkWriteException;
import com.mongodb.MongoNamespace;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.kafka.connect.sink.dlq.AnalyzedBatchFailedWithBulkWriteException;
import com.mongodb.kafka.connect.sink.dlq.ErrorReporter;
import com.mongodb.kafka.connect.util.TimeseriesValidation;
import com.mongodb.kafka.connect.util.jmx.SinkTaskStatistics;
import com.mongodb.kafka.connect.util.jmx.Timer;
import com.mongodb.kafka.connect.util.jmx.internal.MBeanServerUtils;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mongodb/kafka/connect/sink/StartedMongoSinkTask.class */
public final class StartedMongoSinkTask {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MongoSinkTask.class);
    private final MongoSinkConfig sinkConfig;
    private final MongoClient mongoClient;
    private final ErrorReporter errorReporter;
    private Timer lastTaskInvocation = null;
    private final Set<MongoNamespace> checkedTimeseriesNamespaces = new HashSet();
    private final SinkTaskStatistics statistics = new SinkTaskStatistics(getMBeanName());

    /* JADX INFO: Access modifiers changed from: package-private */
    public StartedMongoSinkTask(MongoSinkConfig mongoSinkConfig, MongoClient mongoClient, ErrorReporter errorReporter) {
        this.sinkConfig = mongoSinkConfig;
        this.mongoClient = mongoClient;
        this.errorReporter = errorReporter;
        this.statistics.register();
    }

    private String getMBeanName() {
        return "com.mongodb.kafka.connect:type=sink-task-metrics,task=sink-task-" + MBeanServerUtils.taskIdFromCurrentThread();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        this.mongoClient.close();
        MBeanServerUtils.unregisterMBean(getMBeanName());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void put(Collection<SinkRecord> collection) {
        if (this.lastTaskInvocation != null) {
            this.statistics.getInConnectFramework().sample(this.lastTaskInvocation.getElapsedTime(TimeUnit.MILLISECONDS));
        }
        Timer start = Timer.start();
        this.statistics.getRecords().sample(collection.size());
        trackLatestRecordTimestampOffset(collection);
        if (collection.isEmpty()) {
            LOGGER.debug("No sink records to process for current poll operation");
        } else {
            Timer start2 = Timer.start();
            List<List<MongoProcessedSinkRecordData>> orderedGroupByTopicAndNamespace = MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(collection, this.sinkConfig, this.errorReporter);
            this.statistics.getProcessingPhases().sample(start2.getElapsedTime(TimeUnit.MILLISECONDS));
            Iterator<List<MongoProcessedSinkRecordData>> it = orderedGroupByTopicAndNamespace.iterator();
            while (it.hasNext()) {
                bulkWriteBatch(it.next());
            }
        }
        this.statistics.getInTaskPut().sample(start.getElapsedTime(TimeUnit.MILLISECONDS));
        this.lastTaskInvocation = Timer.start();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(this.statistics.getName() + ": " + this.statistics.toJSON());
        }
    }

    private void trackLatestRecordTimestampOffset(Collection<SinkRecord> collection) {
        OptionalLong max = collection.stream().filter(sinkRecord -> {
            return sinkRecord.timestamp() != null;
        }).mapToLong((v0) -> {
            return v0.timestamp();
        }).max();
        if (max.isPresent()) {
            this.statistics.getLatestKafkaTimeDifferenceMs().sample(System.currentTimeMillis() - max.getAsLong());
        }
    }

    private void bulkWriteBatch(List<MongoProcessedSinkRecordData> list) {
        if (list.isEmpty()) {
            return;
        }
        MongoNamespace namespace = list.get(0).getNamespace();
        MongoSinkTopicConfig config = list.get(0).getConfig();
        checkTimeseries(namespace, config);
        List list2 = (List) list.stream().map((v0) -> {
            return v0.getWriteModel();
        }).collect(Collectors.toList());
        boolean booleanValue = config.getBoolean(MongoSinkTopicConfig.BULK_WRITE_ORDERED_CONFIG).booleanValue();
        Timer start = Timer.start();
        try {
            Logger logger = LOGGER;
            Object[] objArr = new Object[3];
            objArr[0] = Integer.valueOf(list2.size());
            objArr[1] = namespace.getFullName();
            objArr[2] = booleanValue ? "ordered" : "unordered";
            logger.debug("Bulk writing {} document(s) into collection [{}] via an {} bulk write", objArr);
            BulkWriteResult bulkWrite = this.mongoClient.getDatabase(namespace.getDatabaseName()).getCollection(namespace.getCollectionName(), BsonDocument.class).bulkWrite(list2, new BulkWriteOptions().ordered(booleanValue));
            this.statistics.getBatchWritesSuccessful().sample(start.getElapsedTime(TimeUnit.MILLISECONDS));
            this.statistics.getRecordsSuccessful().sample(list.size());
            LOGGER.debug("Mongodb bulk write result: {}", bulkWrite);
        } catch (RuntimeException e) {
            this.statistics.getBatchWritesFailed().sample(start.getElapsedTime(TimeUnit.MILLISECONDS));
            this.statistics.getRecordsFailed().sample(list.size());
            handleTolerableWriteException((List) list.stream().map((v0) -> {
                return v0.getSinkRecord();
            }).collect(Collectors.toList()), booleanValue, e, config.logErrors(), config.tolerateErrors());
        }
        checkRateLimit(config);
    }

    private void checkTimeseries(MongoNamespace mongoNamespace, MongoSinkTopicConfig mongoSinkTopicConfig) {
        if (this.checkedTimeseriesNamespaces.contains(mongoNamespace)) {
            return;
        }
        if (mongoSinkTopicConfig.isTimeseries()) {
            TimeseriesValidation.validateCollection(this.mongoClient, mongoNamespace, mongoSinkTopicConfig);
        }
        this.checkedTimeseriesNamespaces.add(mongoNamespace);
    }

    private static void checkRateLimit(MongoSinkTopicConfig mongoSinkTopicConfig) {
        RateLimitSettings rateLimitSettings = mongoSinkTopicConfig.getRateLimitSettings();
        if (rateLimitSettings.isTriggered()) {
            LOGGER.debug("Rate limit settings triggering {}ms defer timeout after processing {} further batches for topic {}", Integer.valueOf(rateLimitSettings.getTimeoutMs()), Integer.valueOf(rateLimitSettings.getEveryN()), mongoSinkTopicConfig.getTopic());
            try {
                Thread.sleep(rateLimitSettings.getTimeoutMs());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new DataException("Rate limiting was interrupted", e);
            }
        }
    }

    private void handleTolerableWriteException(List<SinkRecord> list, boolean z, RuntimeException runtimeException, boolean z2, boolean z3) {
        if (!(runtimeException instanceof MongoBulkWriteException)) {
            if (z2) {
                log(list, runtimeException);
            }
            if (!z3) {
                throw new DataException(runtimeException);
            }
            return;
        }
        AnalyzedBatchFailedWithBulkWriteException analyzedBatchFailedWithBulkWriteException = new AnalyzedBatchFailedWithBulkWriteException(list, z, (MongoBulkWriteException) runtimeException, this.errorReporter, StartedMongoSinkTask::log);
        if (z2) {
            LOGGER.error("Failed to put into the sink some records, see log entries below for the details", (Throwable) runtimeException);
            analyzedBatchFailedWithBulkWriteException.log();
        }
        if (!z3) {
            throw new DataException(runtimeException);
        }
        analyzedBatchFailedWithBulkWriteException.report();
    }

    private static void log(Collection<SinkRecord> collection, RuntimeException runtimeException) {
        LOGGER.error("Failed to put into the sink the following records: {}", collection, runtimeException);
    }
}
