package com.mongodb.kafka.connect.source;

import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCommandException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.event.CommandFailedEvent;
import com.mongodb.event.CommandListener;
import com.mongodb.event.CommandSucceededEvent;
import com.mongodb.kafka.connect.Versions;
import com.mongodb.kafka.connect.source.MongoSourceConfig;
import com.mongodb.kafka.connect.source.statistics.JmxStatisticsManager;
import com.mongodb.kafka.connect.util.Assertions;
import com.mongodb.kafka.connect.util.ConfigHelper;
import com.mongodb.kafka.connect.util.ResumeTokenUtils;
import com.mongodb.kafka.connect.util.ServerApiConfig;
import com.mongodb.kafka.connect.util.SslConfigs;
import com.mongodb.kafka.connect.util.jmx.SourceTaskStatistics;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mongodb/kafka/connect/source/MongoSourceTask.class */
public final class MongoSourceTask extends SourceTask {
    static final Logger LOGGER = LoggerFactory.getLogger(MongoSourceTask.class);
    private static final String CONNECTOR_TYPE = "source";
    public static final String ID_FIELD = "_id";
    static final String COPY_KEY = "copy";
    private static final String NS_KEY = "ns";
    private static final int UNKNOWN_FIELD_ERROR = 40415;
    private static final int FAILED_TO_PARSE_ERROR = 9;
    private StartedMongoSourceTask startedTask;

    public String version() {
        return Versions.VERSION;
    }

    public void start(Map<String, String> map) {
        LOGGER.info("Starting MongoDB source task");
        try {
            MongoSourceConfig mongoSourceConfig = new MongoSourceConfig(map);
            boolean shouldCopyData = shouldCopyData(this.context, mongoSourceConfig);
            final JmxStatisticsManager jmxStatisticsManager = new JmxStatisticsManager(shouldCopyData, JmxStatisticsManager.getConnectorName(map));
            try {
                MongoClientSettings.Builder applyToSslSettings = MongoClientSettings.builder().applyConnectionString(mongoSourceConfig.getConnectionString()).addCommandListener(new CommandListener() { // from class: com.mongodb.kafka.connect.source.MongoSourceTask.1
                    public void commandSucceeded(CommandSucceededEvent commandSucceededEvent) {
                        MongoSourceTask.mongoCommandSucceeded(commandSucceededEvent, jmxStatisticsManager.currentStatistics());
                    }

                    public void commandFailed(CommandFailedEvent commandFailedEvent) {
                        MongoSourceTask.mongoCommandFailed(commandFailedEvent, jmxStatisticsManager.currentStatistics());
                    }
                }).applyToSslSettings(builder -> {
                    SslConfigs.setupSsl(builder, mongoSourceConfig);
                });
                ServerApiConfig.setServerApi(applyToSslSettings, mongoSourceConfig);
                MongoClient create = MongoClients.create(applyToSslSettings.build(), ConfigHelper.getMongoDriverInformation(CONNECTOR_TYPE, mongoSourceConfig.getString("provider")));
                this.startedTask = new StartedMongoSourceTask(() -> {
                    return this.context;
                }, mongoSourceConfig, create, shouldCopyData ? new MongoCopyDataManager(mongoSourceConfig, create) : null, jmxStatisticsManager);
                LOGGER.info("Started MongoDB source task");
            } catch (RuntimeException e) {
                jmxStatisticsManager.close();
                throw e;
            }
        } catch (Exception e2) {
            throw new ConnectException("Failed to start new task", e2);
        }
    }

    StartedMongoSourceTask startedTask() {
        return (StartedMongoSourceTask) Assertions.assertNotNull(this.startedTask);
    }

    public List<SourceRecord> poll() {
        return this.startedTask.poll();
    }

    public void stop() {
        LOGGER.info("Stopping MongoDB source task");
        if (this.startedTask != null) {
            this.startedTask.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean doesNotSupportsStartAfter(MongoCommandException mongoCommandException) {
        return (mongoCommandException.getErrorCode() == FAILED_TO_PARSE_ERROR || mongoCommandException.getErrorCode() == UNKNOWN_FIELD_ERROR) && mongoCommandException.getErrorMessage().contains("startAfter");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Map<String, Object> createPartitionMap(MongoSourceConfig mongoSourceConfig) {
        String string = mongoSourceConfig.getString(MongoSourceConfig.OFFSET_PARTITION_NAME_CONFIG);
        if (string.isEmpty()) {
            string = createDefaultPartitionName(mongoSourceConfig);
        }
        return Collections.singletonMap(NS_KEY, string);
    }

    static Map<String, Object> createLegacyPartitionMap(MongoSourceConfig mongoSourceConfig) {
        return Collections.singletonMap(NS_KEY, createLegacyPartitionName(mongoSourceConfig));
    }

    static String createLegacyPartitionName(MongoSourceConfig mongoSourceConfig) {
        return String.format("%s/%s.%s", mongoSourceConfig.getString("connection.uri"), mongoSourceConfig.getString("database"), mongoSourceConfig.getString("collection"));
    }

    static String createDefaultPartitionName(MongoSourceConfig mongoSourceConfig) {
        ConnectionString connectionString = mongoSourceConfig.getConnectionString();
        StringBuilder sb = new StringBuilder();
        sb.append(connectionString.isSrvProtocol() ? "mongodb+srv://" : "mongodb://");
        sb.append(String.join(",", connectionString.getHosts()));
        sb.append("/");
        sb.append(mongoSourceConfig.getString("database"));
        if (!mongoSourceConfig.getString("collection").isEmpty()) {
            sb.append(MongoSourceConfig.TOPIC_SEPARATOR_DEFAULT);
            sb.append(mongoSourceConfig.getString("collection"));
        }
        return sb.toString();
    }

    private static boolean shouldCopyData(SourceTaskContext sourceTaskContext, MongoSourceConfig mongoSourceConfig) {
        Map<String, Object> offset = getOffset(sourceTaskContext, mongoSourceConfig);
        return mongoSourceConfig.getStartupConfig().startupMode() == MongoSourceConfig.StartupConfig.StartupMode.COPY_EXISTING && (offset == null || offset.containsKey(COPY_KEY));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Map<String, Object> getOffset(SourceTaskContext sourceTaskContext, MongoSourceConfig mongoSourceConfig) {
        if (sourceTaskContext == null) {
            return null;
        }
        Map<String, Object> offset = sourceTaskContext.offsetStorageReader().offset(createPartitionMap(mongoSourceConfig));
        if (offset == null && mongoSourceConfig.getString(MongoSourceConfig.OFFSET_PARTITION_NAME_CONFIG).isEmpty()) {
            offset = sourceTaskContext.offsetStorageReader().offset(createLegacyPartitionMap(mongoSourceConfig));
        }
        return offset;
    }

    public void commitRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) {
        this.startedTask.commitRecord(sourceRecord, recordMetadata);
    }

    static void mongoCommandSucceeded(CommandSucceededEvent commandSucceededEvent, SourceTaskStatistics sourceTaskStatistics) {
        String commandName = commandSucceededEvent.getCommandName();
        long elapsedTime = commandSucceededEvent.getElapsedTime(TimeUnit.MILLISECONDS);
        if ("getMore".equals(commandName)) {
            sourceTaskStatistics.getGetmoreCommandsSuccessful().sample(elapsedTime);
        } else if ("aggregate".equals(commandName) || "find".equals(commandName)) {
            sourceTaskStatistics.getInitialCommandsSuccessful().sample(elapsedTime);
        }
        ResumeTokenUtils.getResponseOffsetSecs(commandSucceededEvent.getResponse()).ifPresent(j -> {
            sourceTaskStatistics.getLatestMongodbTimeDifferenceSecs().sample(j);
        });
    }

    static void mongoCommandFailed(CommandFailedEvent commandFailedEvent, SourceTaskStatistics sourceTaskStatistics) {
        MongoCommandException throwable = commandFailedEvent.getThrowable();
        if ((throwable instanceof MongoCommandException) && doesNotSupportsStartAfter(throwable)) {
            return;
        }
        String commandName = commandFailedEvent.getCommandName();
        long elapsedTime = commandFailedEvent.getElapsedTime(TimeUnit.MILLISECONDS);
        if ("getMore".equals(commandName)) {
            sourceTaskStatistics.getGetmoreCommandsFailed().sample(elapsedTime);
        } else if ("aggregate".equals(commandName) || "find".equals(commandName)) {
            sourceTaskStatistics.getInitialCommandsFailed().sample(elapsedTime);
        }
    }
}
