package io.debezium.connector.mongodb.sink;

import com.mongodb.MongoBulkWriteException;
import com.mongodb.MongoNamespace;
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.BulkWriteOptions;
import io.debezium.DebeziumException;
import io.debezium.dlq.ErrorReporter;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.bson.BsonDocument;
import org.slf4j.Logger;

/* loaded from: input_file:io/debezium/connector/mongodb/sink/StartedMongoDbSinkTask.class */
final class StartedMongoDbSinkTask implements AutoCloseable {
    private final MongoDbSinkConnectorConfig sinkConfig;
    private final MongoClient mongoClient;
    private final ErrorReporter errorReporter;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StartedMongoDbSinkTask(MongoDbSinkConnectorConfig mongoDbSinkConnectorConfig, MongoClient mongoClient, ErrorReporter errorReporter) {
        this.sinkConfig = mongoDbSinkConnectorConfig;
        this.mongoClient = mongoClient;
        this.errorReporter = errorReporter;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        MongoClient mongoClient = this.mongoClient;
        if (mongoClient != null) {
            mongoClient.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void put(Collection<SinkRecord> collection) {
        try {
            trackLatestRecordTimestampOffset(collection);
            if (collection.isEmpty()) {
                MongoDbSinkConnectorTask.LOGGER.debug("No sink records to process for current poll operation");
            } else {
                Iterator<List<MongoProcessedSinkRecordData>> it = MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(collection, this.sinkConfig, this.errorReporter).iterator();
                while (it.hasNext()) {
                    bulkWriteBatch(it.next());
                }
            }
        } catch (Exception e) {
            throw new DebeziumException(e);
        }
    }

    private void trackLatestRecordTimestampOffset(Collection<SinkRecord> collection) {
        collection.stream().filter(sinkRecord -> {
            return sinkRecord.timestamp() != null;
        }).mapToLong((v0) -> {
            return v0.timestamp();
        }).max();
    }

    private void bulkWriteBatch(List<MongoProcessedSinkRecordData> list) {
        if (list.isEmpty()) {
            return;
        }
        MongoNamespace namespace = list.get(0).getNamespace();
        List list2 = (List) list.stream().map((v0) -> {
            return v0.getWriteModel();
        }).collect(Collectors.toList());
        try {
            Logger logger = MongoDbSinkConnectorTask.LOGGER;
            Object[] objArr = new Object[3];
            objArr[0] = Integer.valueOf(list2.size());
            objArr[1] = namespace.getFullName();
            objArr[2] = 1 != 0 ? "ordered" : "unordered";
            logger.debug("Bulk writing {} document(s) into collection [{}] via an {} bulk write", objArr);
            MongoDbSinkConnectorTask.LOGGER.debug("Mongodb bulk write result: {}", this.mongoClient.getDatabase(namespace.getDatabaseName()).getCollection(namespace.getCollectionName(), BsonDocument.class).bulkWrite(list2, new BulkWriteOptions().ordered(true)));
        } catch (RuntimeException e) {
            handleTolerableWriteException((List) list.stream().map((v0) -> {
                return v0.getSinkRecord();
            }).collect(Collectors.toList()), true, e, true, true);
        }
    }

    private void handleTolerableWriteException(List<SinkRecord> list, boolean z, RuntimeException runtimeException, boolean z2, boolean z3) {
        if (runtimeException instanceof MongoBulkWriteException) {
            throw new DataException(runtimeException);
        }
        if (z2) {
            log(list, runtimeException);
        }
        if (!z3) {
            throw new DataException(runtimeException);
        }
        list.forEach(sinkRecord -> {
            this.errorReporter.report(sinkRecord, runtimeException);
        });
    }

    private static void log(Collection<SinkRecord> collection, RuntimeException runtimeException) {
        MongoDbSinkConnectorTask.LOGGER.error("Failed to put into the sink the following records: {}", collection, runtimeException);
    }
}
