package com.mongodb.kafka.connect.source.heartbeat;

import com.mongodb.client.MongoChangeStreamCursor;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mongodb/kafka/connect/source/heartbeat/HeartbeatManager.class */
public class HeartbeatManager {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) HeartbeatManager.class);
    public static final String HEARTBEAT_KEY = "HEARTBEAT";
    private final Time time;
    private final MongoChangeStreamCursor<? extends BsonDocument> cursor;
    private final String heartbeatTopicName;
    private final long heartbeatIntervalMS;
    private final Map<String, Object> partitionMap;
    private long lastHeartbeatMS = 0;
    private String lastResumeToken = "";

    public HeartbeatManager(Time time, MongoChangeStreamCursor<? extends BsonDocument> mongoChangeStreamCursor, long j, String str, Map<String, Object> map) {
        this.time = time;
        this.cursor = mongoChangeStreamCursor;
        this.heartbeatIntervalMS = j;
        this.heartbeatTopicName = str;
        this.partitionMap = map;
    }

    public Optional<SourceRecord> heartbeat() {
        if (this.cursor == null) {
            LOGGER.debug("Returning no heartbeat: null cursor");
            return Optional.empty();
        }
        if (this.heartbeatIntervalMS <= 0) {
            LOGGER.debug("Returning no heartbeat: heartbeatIntervalMS not positive: {}", Long.valueOf(this.heartbeatIntervalMS));
            return Optional.empty();
        }
        long milliseconds = this.time.milliseconds();
        if (milliseconds - this.lastHeartbeatMS <= this.heartbeatIntervalMS) {
            LOGGER.debug("Returning no heartbeat: timeSinceHeartbeat has not exceeded heartbeatInterval");
            return Optional.empty();
        }
        this.lastHeartbeatMS = milliseconds;
        BsonDocument resumeToken = this.cursor.getResumeToken();
        if (resumeToken == null) {
            LOGGER.debug("Returning no heartbeat: cursor resumeToken is null");
            return Optional.empty();
        }
        String json = resumeToken.toJson();
        if (json.equals(this.lastResumeToken)) {
            LOGGER.debug("Returning no heartbeat: same resumeToken");
            return Optional.empty();
        }
        LOGGER.info("Generating heartbeat event. {}", json);
        HashMap hashMap = new HashMap();
        hashMap.put("_id", json);
        hashMap.put(HEARTBEAT_KEY, "true");
        this.lastResumeToken = json;
        return Optional.of(new SourceRecord(this.partitionMap, hashMap, this.heartbeatTopicName, Schema.STRING_SCHEMA, json, Schema.OPTIONAL_BYTES_SCHEMA, (Object) null));
    }
}
