package com.mongodb.spark.sql.connector.read;

import com.mongodb.spark.sql.connector.assertions.Assertions;
import com.mongodb.spark.sql.connector.config.ReadConfig;
import com.mongodb.spark.sql.connector.schema.BsonDocumentToRowConverter;
import com.mongodb.spark.sql.connector.schema.InferSchema;
import java.time.Instant;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.types.StructType;
import org.bson.BsonTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mongodb/spark/sql/connector/read/MongoMicroBatchStream.class */
final class MongoMicroBatchStream implements MicroBatchStream {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoMicroBatchStream.class);
    private final StructType schema;
    private final MongoOffsetStore mongoOffsetStore;
    private final ReadConfig readConfig;
    private final BsonDocumentToRowConverter bsonDocumentToRowConverter;
    private volatile Long lastTime = Long.valueOf(Instant.now().getEpochSecond());

    /* JADX INFO: Access modifiers changed from: package-private */
    public MongoMicroBatchStream(StructType structType, String str, ReadConfig readConfig) {
        Assertions.validateConfig(structType, structType2 -> {
            return !structType2.isEmpty() && (!InferSchema.isInferred(structType2) || readConfig.streamPublishFullDocumentOnly());
        }, () -> {
            return "Mongo micro batch streams require a schema to be explicitly defined, unless using publish full document only.";
        });
        this.schema = structType;
        this.mongoOffsetStore = new MongoOffsetStore(SparkContext.getOrCreate().hadoopConfiguration(), str, MongoOffset.getInitialOffset(readConfig));
        this.readConfig = readConfig;
        this.bsonDocumentToRowConverter = new BsonDocumentToRowConverter(structType, readConfig);
    }

    public Offset latestOffset() {
        long epochSecond = Instant.now().getEpochSecond();
        if (this.lastTime.longValue() < epochSecond) {
            this.lastTime = Long.valueOf(epochSecond);
        }
        return new BsonTimestampOffset(new BsonTimestamp(this.lastTime.intValue(), 0));
    }

    public InputPartition[] planInputPartitions(Offset offset, Offset offset2) {
        return MongoInputPartitionHelper.generateMicroBatchPartitions(this.schema, this.readConfig, (BsonTimestampOffset) offset, (BsonTimestampOffset) offset2);
    }

    public PartitionReaderFactory createReaderFactory() {
        return new MongoMicroBatchPartitionReaderFactory(this.bsonDocumentToRowConverter, this.readConfig);
    }

    public Offset initialOffset() {
        return this.mongoOffsetStore.initialOffset();
    }

    public Offset deserializeOffset(String str) {
        return MongoOffset.fromJson(str);
    }

    public void commit(Offset offset) {
        LOGGER.info("MicroBatchStream commit: {}", offset);
        this.mongoOffsetStore.updateOffset((MongoOffset) offset);
    }

    public void stop() {
        LOGGER.info("MicroBatchStream stopped.");
    }
}
