package com.mongodb.kafka.connect.source.heartbeat;

import com.mongodb.client.MongoChangeStreamCursor;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mongodb/kafka/connect/source/heartbeat/HeartbeatManager.class */
public class HeartbeatManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatManager.class);
    public static final String HEARTBEAT_KEY = "HEARTBEAT";
    private final Time time;
    private final String heartbeatTopicName;
    private final MongoChangeStreamCursor<? extends BsonDocument> cursor;
    private final long heartbeatIntervalMS;
    private final Map<String, Object> partitionMap;
    private final boolean canCreateHeartbeat;
    private volatile long lastHeartbeatMS = 0;
    private volatile String lastResumeToken = "";

    public HeartbeatManager(Time time, MongoChangeStreamCursor<? extends BsonDocument> mongoChangeStreamCursor, long j, String str, Map<String, Object> map) {
        this.time = time;
        this.heartbeatTopicName = str;
        this.cursor = mongoChangeStreamCursor;
        this.heartbeatIntervalMS = j;
        this.partitionMap = map;
        this.canCreateHeartbeat = mongoChangeStreamCursor != null && j > 0;
    }

    public Optional<SourceRecord> heartbeat() {
        if (this.cursor == null) {
            return Optional.empty();
        }
        long milliseconds = this.time.milliseconds();
        long j = milliseconds - this.lastHeartbeatMS;
        if (!this.canCreateHeartbeat || j <= this.heartbeatIntervalMS) {
            return Optional.empty();
        }
        this.lastHeartbeatMS = milliseconds;
        return Optional.ofNullable(this.cursor.getResumeToken()).map(bsonDocument -> {
            String json = bsonDocument.toJson();
            if (json.equals(this.lastResumeToken)) {
                return null;
            }
            LOGGER.info("Generating heartbeat event. {}", json);
            HashMap hashMap = new HashMap();
            hashMap.put("_id", json);
            hashMap.put(HEARTBEAT_KEY, "true");
            this.lastResumeToken = json;
            return new SourceRecord(this.partitionMap, hashMap, this.heartbeatTopicName, Schema.STRING_SCHEMA, json, Schema.STRING_SCHEMA, json);
        });
    }
}
