package com.mongodb.spark.sql.connector.write;

import com.mongodb.client.MongoClient;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import com.mongodb.spark.sql.connector.config.WriteConfig;
import com.mongodb.spark.sql.connector.exceptions.DataException;
import com.mongodb.spark.sql.connector.schema.RowToBsonDocumentConverter;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;
import org.bson.BsonDocument;
import org.bson.BsonValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mongodb/spark/sql/connector/write/MongoDataWriter.class */
final class MongoDataWriter implements DataWriter<InternalRow> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoDataWriter.class);
    private final int partitionId;
    private final long taskId;
    private final RowToBsonDocumentConverter rowToBsonDocumentConverter;
    private final WriteConfig writeConfig;
    private final long epochId;
    private final BulkWriteOptions bulkWriteOptions;
    private final List<WriteModel<BsonDocument>> writeModelList = new ArrayList();
    private MongoClient mongoClient;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MongoDataWriter(int i, long j, StructType structType, WriteConfig writeConfig, long j2) {
        this.partitionId = i;
        this.taskId = j;
        this.rowToBsonDocumentConverter = new RowToBsonDocumentConverter(structType, writeConfig.convertJson());
        this.writeConfig = writeConfig;
        this.epochId = j2;
        this.bulkWriteOptions = new BulkWriteOptions().ordered(writeConfig.isOrdered());
    }

    public void write(InternalRow internalRow) {
        this.writeModelList.add(getWriteModel(this.rowToBsonDocumentConverter.fromRow(internalRow)));
        if (this.writeModelList.size() >= this.writeConfig.getMaxBatchSize()) {
            writeModels();
        }
    }

    public WriterCommitMessage commit() {
        writeModels();
        LOGGER.debug("Finished all writes for: PartitionId: {}, TaskId: {}.", Integer.valueOf(this.partitionId), Long.valueOf(this.taskId));
        return new MongoWriterCommitMessage(this.partitionId, this.taskId, this.epochId);
    }

    public void abort() {
        LOGGER.debug("Aborting write for: PartitionId: {}, TaskId: {}.", Integer.valueOf(this.partitionId), Long.valueOf(this.taskId));
        releaseClient();
        throw new DataException(String.format("Write aborted for: PartitionId: %s, TaskId: %s. Manual data clean up may be required.", Integer.valueOf(this.partitionId), Long.valueOf(this.taskId)));
    }

    public void close() {
        LOGGER.debug("Closing PartitionId: {}, TaskId: {}.", Integer.valueOf(this.partitionId), Long.valueOf(this.taskId));
        releaseClient();
    }

    private WriteModel<BsonDocument> getWriteModel(BsonDocument bsonDocument) {
        if (!hasIdFields(bsonDocument)) {
            return new InsertOneModel(bsonDocument);
        }
        switch (this.writeConfig.getOperationType()) {
            case INSERT:
                return new InsertOneModel(bsonDocument);
            case REPLACE:
                return new ReplaceOneModel(getIdFieldDocument(bsonDocument), bsonDocument, new ReplaceOptions().upsert(this.writeConfig.isUpsert()));
            case UPDATE:
                BsonDocument idFieldDocument = getIdFieldDocument(bsonDocument);
                Set keySet = idFieldDocument.keySet();
                Objects.requireNonNull(bsonDocument);
                keySet.forEach((v1) -> {
                    r1.remove(v1);
                });
                return new UpdateOneModel(idFieldDocument, new BsonDocument("$set", bsonDocument), new UpdateOptions().upsert(this.writeConfig.isUpsert()));
            default:
                throw new DataException("Unsupported operation type: " + this.writeConfig.getOperationType());
        }
    }

    private boolean hasIdFields(BsonDocument bsonDocument) {
        return bsonDocument.keySet().containsAll(this.writeConfig.getIdFields());
    }

    private BsonDocument getIdFieldDocument(BsonDocument bsonDocument) {
        BsonDocument bsonDocument2 = new BsonDocument();
        this.writeConfig.getIdFields().forEach(str -> {
            BsonValue bsonValue = bsonDocument.get(str);
            if (bsonValue == null) {
                throw new DataException(String.format("Missing id field: '%s' from: %s", str, bsonDocument.toJson()));
            }
            bsonDocument2.append(str, bsonValue);
        });
        return bsonDocument2;
    }

    private MongoClient getMongoClient() {
        if (this.mongoClient == null) {
            this.mongoClient = this.writeConfig.getMongoClient();
        }
        return this.mongoClient;
    }

    private void releaseClient() {
        if (this.mongoClient != null) {
            this.mongoClient.close();
            this.mongoClient = null;
        }
    }

    private void writeModels() {
        if (this.writeModelList.size() > 0) {
            LOGGER.debug("Writing batch of {} operations to: {}. PartitionId: {}, TaskId: {}.", new Object[]{Integer.valueOf(this.writeModelList.size()), this.writeConfig.getNamespace().getFullName(), Integer.valueOf(this.partitionId), Long.valueOf(this.taskId)});
            getMongoClient().getDatabase(this.writeConfig.getDatabaseName()).getCollection(this.writeConfig.getCollectionName(), BsonDocument.class).withWriteConcern(this.writeConfig.getWriteConcern()).bulkWrite(this.writeModelList, this.bulkWriteOptions);
            this.writeModelList.clear();
        }
    }
}
