package io.debezium.connector.mongodb.sink;

import com.mongodb.client.MongoClient;
import io.debezium.bindings.kafka.KafkaDebeziumSinkRecord;
import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.connection.MongoDbConnectionContext;
import io.debezium.dlq.ErrorReporter;
import java.util.Collection;
import java.util.Map;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/mongodb/sink/MongoDbSinkConnectorTask.class */
public class MongoDbSinkConnectorTask extends SinkTask {
    static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSinkConnectorTask.class);
    private static final String CONNECTOR_TYPE = "sink";
    private MongoDbChangeEventSink mongoSink;

    public String version() {
        return Module.version();
    }

    public void start(Map<String, String> map) {
        LOGGER.info("Starting MongoDB sink task");
        Configuration from = Configuration.from(map);
        MongoDbSinkConnectorConfig mongoDbSinkConnectorConfig = new MongoDbSinkConnectorConfig(from);
        MongoClient mongoClient = null;
        try {
            mongoClient = new MongoDbConnectionContext(from).getMongoClient();
            this.mongoSink = new MongoDbChangeEventSink(mongoDbSinkConnectorConfig, mongoClient, createErrorReporter());
            LOGGER.debug("Started MongoDB sink task");
        } catch (RuntimeException e) {
            MongoClient mongoClient2 = mongoClient;
            if (mongoClient2 != null) {
                try {
                    mongoClient2.close();
                } catch (RuntimeException e2) {
                    e.addSuppressed(e2);
                    throw new ConnectException("Failed to start MongoDB sink task", e);
                }
            }
            throw new ConnectException("Failed to start MongoDB sink task", e);
        }
    }

    public void put(Collection<SinkRecord> collection) {
        this.mongoSink.execute(collection);
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
        LOGGER.debug("Flush called - noop");
    }

    public void stop() {
        LOGGER.info("Stopping MongoDB sink task");
        if (this.mongoSink != null) {
            this.mongoSink.close();
        }
    }

    private ErrorReporter createErrorReporter() {
        ErrorReporter nopErrorReporter = nopErrorReporter();
        if (this.context != null) {
            try {
                ErrantRecordReporter errantRecordReporter = this.context.errantRecordReporter();
                if (errantRecordReporter != null) {
                    nopErrorReporter = (debeziumSinkRecord, exc) -> {
                        if (debeziumSinkRecord instanceof KafkaDebeziumSinkRecord) {
                            errantRecordReporter.report(((KafkaDebeziumSinkRecord) debeziumSinkRecord).getOriginalKafkaRecord(), exc);
                        }
                    };
                } else {
                    LOGGER.info("Errant record reporter not configured.");
                }
            } catch (NoClassDefFoundError | NoSuchMethodError e) {
                LOGGER.info("Kafka versions prior to 2.6 do not support the errant record reporter.");
            }
        }
        return nopErrorReporter;
    }

    static ErrorReporter nopErrorReporter() {
        return (debeziumSinkRecord, exc) -> {
        };
    }
}
