package solutions.a2.kafka.sink;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import solutions.a2.cdc.oracle.utils.Version;
import solutions.a2.kafka.ConnectorParams;
import solutions.a2.utils.ExceptionUtils;

/* loaded from: input_file:solutions/a2/kafka/sink/JdbcSinkTask.class */
public class JdbcSinkTask extends SinkTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSinkTask.class);
    private JdbcSinkConnectorConfig config;
    private int schemaType;
    private JdbcSinkConnectionPool sinkPool;
    private TableNameMapper tableNameMapper;
    private final Map<String, JdbcSinkTable> tablesInProcessing = new HashMap();
    private int batchSize = 1000;

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> map) {
        LOGGER.info("Starting oracdc '{}' Sink Task", map.get("name"));
        this.config = new JdbcSinkConnectorConfig(map);
        try {
            LOGGER.debug("BEGIN: Hikari Connection Pool initialization.");
            this.sinkPool = new JdbcSinkConnectionPool(map.get("name"), this.config);
            LOGGER.debug("END: Hikari Connection Pool initialization.");
            this.batchSize = this.config.getInt(ConnectorParams.BATCH_SIZE_PARAM).intValue();
            this.schemaType = this.config.getSchemaType();
            this.tableNameMapper = this.config.getTableNameMapper();
            this.tableNameMapper.configure(this.config);
        } catch (SQLException e) {
            LOGGER.error("Unable to connect to {}", this.config.getString(ConnectorParams.CONNECTION_URL_PARAM));
            LOGGER.error(ExceptionUtils.getExceptionStackTrace(e));
            throw new ConnectException("Unable to start oracdc Sink Connector Task.");
        }
    }

    public void put(Collection<SinkRecord> collection) {
        LOGGER.debug("BEGIN: put()");
        HashSet<String> hashSet = new HashSet();
        try {
            Connection connection = this.sinkPool.getConnection();
            try {
                int i = 0;
                HashMap hashMap = new HashMap();
                for (SinkRecord sinkRecord : collection) {
                    String tableName = this.tableNameMapper.getTableName(sinkRecord);
                    JdbcSinkTable jdbcSinkTable = this.tablesInProcessing.get(tableName);
                    if (jdbcSinkTable == null) {
                        LOGGER.debug("Create new table definition for {} and add it to processing map,", tableName);
                        jdbcSinkTable = new JdbcSinkTable(this.sinkPool, tableName, sinkRecord, this.schemaType, this.config);
                        this.tablesInProcessing.put(tableName, jdbcSinkTable);
                    }
                    if (!hashSet.contains(tableName)) {
                        LOGGER.debug("Adding {} to current batch set.", tableName);
                        hashSet.add(tableName);
                    }
                    if (jdbcSinkTable.duplicatedKeyInBatch(sinkRecord)) {
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("Executing batch due to duplicate key for table {} .", jdbcSinkTable.getTableName());
                        }
                        for (String str : hashSet) {
                            LOGGER.debug("Executing batch for table {}.", str);
                            if (StringUtils.equals(str, tableName)) {
                                jdbcSinkTable.execAndCloseCursors();
                            } else {
                                this.tablesInProcessing.get(str).exec();
                            }
                        }
                        flush(hashMap);
                        connection.commit();
                        hashMap.clear();
                        i = 0;
                    }
                    jdbcSinkTable.putData(connection, sinkRecord);
                    hashMap.put(new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition().intValue()), new OffsetAndMetadata(sinkRecord.kafkaOffset()));
                    i++;
                    if (i == this.batchSize) {
                        for (String str2 : hashSet) {
                            LOGGER.debug("Executing batch for table {}.", str2);
                            this.tablesInProcessing.get(str2).exec();
                        }
                        flush(hashMap);
                        connection.commit();
                        hashMap.clear();
                        i = 0;
                    }
                }
                LOGGER.debug("Execute and close cursors");
                for (String str3 : hashSet) {
                    LOGGER.debug("Last batch execution and statements closing for table {}.", str3);
                    this.tablesInProcessing.get(str3).execAndCloseCursors();
                }
                connection.commit();
                if (connection != null) {
                    connection.close();
                }
                LOGGER.debug("END: put()");
            } finally {
            }
        } catch (SQLException e) {
            LOGGER.error("Error '{}' when put to target system, SQL errorCode = {}, SQL state = '{}'", new Object[]{e.getMessage(), Integer.valueOf(e.getErrorCode()), e.getSQLState()});
            LOGGER.error(ExceptionUtils.getExceptionStackTrace(e));
            throw new ConnectException(e);
        }
    }

    public void stop() {
        this.sinkPool = null;
    }
}
