package io.debezium.connector.mongodb;

import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
import io.debezium.util.Threads;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/debezium-connector-mongodb-1.9.7.Final.jar:io/debezium/connector/mongodb/MongoDbConnector.class */
public class MongoDbConnector extends SourceConnector {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private Configuration config;
    private ReplicaSetMonitorThread monitorThread;
    private MongoDbTaskContext taskContext;
    private ConnectionContext connectionContext;
    private ExecutorService replicaSetMonitorExecutor;

    @Override // org.apache.kafka.connect.components.Versioned
    public String version() {
        return Module.version();
    }

    @Override // org.apache.kafka.connect.connector.Connector
    public Class<? extends Task> taskClass() {
        return MongoDbConnectorTask.class;
    }

    @Override // org.apache.kafka.connect.connector.Connector
    public void start(Map<String, String> map) {
        Configuration from = Configuration.from(map);
        Field.Set set = MongoDbConnectorConfig.ALL_FIELDS;
        Logger logger = this.logger;
        Objects.requireNonNull(logger);
        if (!from.validateAndRecord(set, logger::error)) {
            throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
        }
        this.config = from;
        this.taskContext = new MongoDbTaskContext(from);
        this.connectionContext = this.taskContext.getConnectionContext();
        LoggingContext.PreviousContext configureLoggingContext = this.taskContext.configureLoggingContext("conn");
        try {
            this.logger.info("Starting MongoDB connector and discovering replica set(s) at {}", this.connectionContext.hosts());
            this.replicaSetMonitorExecutor = Threads.newSingleThreadExecutor(MongoDbConnector.class, this.taskContext.serverName(), "replica-set-monitor");
            ReplicaSetDiscovery replicaSetDiscovery = new ReplicaSetDiscovery(this.taskContext);
            Objects.requireNonNull(replicaSetDiscovery);
            this.monitorThread = new ReplicaSetMonitorThread(replicaSetDiscovery::getReplicaSets, this.connectionContext.pollInterval(), Clock.SYSTEM, () -> {
                this.taskContext.configureLoggingContext("disc");
            }, this::replicaSetsChanged);
            this.replicaSetMonitorExecutor.execute(this.monitorThread);
            this.logger.info("Successfully started MongoDB connector, and continuing to discover changes in replica set(s) at {}", this.connectionContext.hosts());
            configureLoggingContext.restore();
        } catch (Throwable th) {
            configureLoggingContext.restore();
            throw th;
        }
    }

    protected void replicaSetsChanged(ReplicaSets replicaSets) {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Requesting task reconfiguration due to new/removed replica set(s) for MongoDB with seeds {}", this.connectionContext.hosts());
            this.logger.info("New replica sets include:");
            replicaSets.onEachReplicaSet(replicaSet -> {
                this.logger.info("  {}", replicaSet);
            });
        }
        this.context.requestTaskReconfiguration();
    }

    @Override // org.apache.kafka.connect.connector.Connector
    public List<Map<String, String>> taskConfigs(int i) {
        LoggingContext.PreviousContext configureLoggingContext = this.taskContext.configureLoggingContext("conn");
        try {
            if (this.config == null) {
                this.logger.error("Configuring a maximum of {} tasks with no connector configuration available", Integer.valueOf(i));
                List<Map<String, String>> emptyList = Collections.emptyList();
                configureLoggingContext.restore();
                return emptyList;
            }
            ArrayList arrayList = new ArrayList(i);
            ReplicaSets replicaSets = this.monitorThread.getReplicaSets(10L, TimeUnit.SECONDS);
            if (replicaSets != null) {
                this.logger.info("Subdividing {} MongoDB replica set(s) into at most {} task(s)", Integer.valueOf(replicaSets.replicaSetCount()), Integer.valueOf(i));
                replicaSets.subdivide(i, replicaSets2 -> {
                    int size = arrayList.size();
                    this.logger.info("Configuring MongoDB connector task {} to capture events for replica set(s) at {}", Integer.valueOf(size), replicaSets2.hosts());
                    arrayList.add(this.config.edit().with(MongoDbConnectorConfig.HOSTS, replicaSets2.hosts()).with(MongoDbConnectorConfig.TASK_ID, size).build().asMap());
                });
            }
            this.logger.debug("Configuring {} MongoDB connector task(s)", Integer.valueOf(arrayList.size()));
            configureLoggingContext.restore();
            return arrayList;
        } catch (Throwable th) {
            configureLoggingContext.restore();
            throw th;
        }
    }

    @Override // org.apache.kafka.connect.connector.Connector
    public void stop() {
        LoggingContext.PreviousContext configureLoggingContext = this.taskContext != null ? this.taskContext.configureLoggingContext("conn") : null;
        try {
            this.logger.info("Stopping MongoDB connector");
            this.config = null;
            Thread.interrupted();
            if (this.replicaSetMonitorExecutor != null) {
                this.replicaSetMonitorExecutor.shutdownNow();
            }
            try {
                if (this.connectionContext != null) {
                    this.connectionContext.shutdown();
                }
                this.logger.info("Stopped MongoDB connector");
            } catch (Throwable th) {
                this.logger.info("Stopped MongoDB connector");
                throw th;
            }
        } finally {
            if (configureLoggingContext != null) {
                configureLoggingContext.restore();
            }
        }
    }

    @Override // org.apache.kafka.connect.connector.Connector
    public ConfigDef config() {
        return MongoDbConnectorConfig.configDef();
    }

    @Override // org.apache.kafka.connect.connector.Connector
    public Config validate(Map<String, String> map) {
        String string;
        Configuration from = Configuration.from(map);
        Map<String, ConfigValue> validate = from.validate(MongoDbConnectorConfig.EXPOSED_FIELDS);
        ConfigValue configValue = validate.get(MongoDbConnectorConfig.HOSTS.name());
        ConfigValue configValue2 = validate.get(MongoDbConnectorConfig.USER.name());
        ConfigValue configValue3 = validate.get(MongoDbConnectorConfig.PASSWORD.name());
        if (configValue.errorMessages().isEmpty() && configValue2.errorMessages().isEmpty() && configValue3.errorMessages().isEmpty()) {
            try {
                ConnectionContext connectionContext = new ConnectionContext(from);
                try {
                    MongoClient clientFor = connectionContext.clientFor(connectionContext.hosts());
                    try {
                        Document runCommand = clientFor.getDatabase(MongoUtil.contains(clientFor.listDatabaseNames(), "config") ? "config" : "local").runCommand(new Document("buildInfo", 1));
                        if (runCommand != null && (string = runCommand.getString("version")) != null) {
                            String[] split = string.split("\\.");
                            if (split.length > 0) {
                                try {
                                    int parseInt = Integer.parseInt(split[0]);
                                    ConfigValue configValue4 = validate.get(MongoDbConnectorConfig.CAPTURE_MODE.name());
                                    MongoDbConnectorConfig.CaptureMode captureMode = new MongoDbConnectorConfig(from).getCaptureMode();
                                    if (parseInt >= 5 && captureMode == MongoDbConnectorConfig.CaptureMode.OPLOG) {
                                        configValue4.addErrorMessage("The 'oplog' capture mode is not supported for MongoDB 5 and newer; Please use 'change_streams'  or 'change_streams_update_full' instead");
                                    }
                                } catch (NumberFormatException e) {
                                }
                            }
                        }
                        if (clientFor != null) {
                            clientFor.close();
                        }
                        connectionContext.close();
                    } catch (Throwable th) {
                        if (clientFor != null) {
                            try {
                                clientFor.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } finally {
                }
            } catch (MongoException e2) {
                configValue.addErrorMessage("Unable to connect: " + e2.getMessage());
            }
        }
        return new Config(new ArrayList(validate.values()));
    }
}
