package io.confluent.connect.cdc;

import com.github.jcustenborder.kafka.connect.utils.data.SourceRecordDeque;
import com.github.jcustenborder.kafka.connect.utils.data.SourceRecordDequeBuilder;
import com.google.common.base.Preconditions;
import io.confluent.connect.cdc.CDCSourceConnectorConfig;
import io.confluent.connect.cdc.Change;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/cdc/CDCSourceTask.class */
public abstract class CDCSourceTask<CONF extends CDCSourceConnectorConfig> extends SourceTask implements ChangeWriter {
    private static final Logger log = LoggerFactory.getLogger(CDCSourceTask.class);
    protected CONF config;
    protected Time time = new SystemTime();
    SchemaGenerator schemaGenerator;
    private SourceRecordDeque changes;

    protected abstract CONF getConfig(Map<String, String> map);

    void setStructField(Struct struct, String str, Object obj) {
        log.trace("setStructField() - field = '{}' value = '{}'", str, obj);
        try {
            struct.put(str, obj);
        } catch (DataException e) {
            Object[] objArr = new Object[2];
            objArr[0] = str;
            objArr[1] = null == obj ? "NULL" : obj.getClass();
            throw new DataException(String.format("Exception thrown while setting the value for field '%s'. data=%s", objArr), e);
        }
    }

    SourceRecord createRecord(SchemaPair schemaPair, Change change) {
        Struct struct;
        Schema schema;
        Preconditions.checkNotNull(change.metadata(), "change.metadata() cannot return null.");
        Struct struct2 = new Struct(schemaPair.getKey().schema);
        Schema schema2 = struct2.schema();
        log.trace("createRecord() - Setting key fields.");
        for (int i = 0; i < schemaPair.getKey().fields.size(); i++) {
            setStructField(struct2, schemaPair.getKey().fields.get(i), change.keyColumns().get(i).value());
        }
        LinkedHashMap linkedHashMap = new LinkedHashMap(change.metadata().size() + 3);
        linkedHashMap.putAll(change.metadata());
        linkedHashMap.put(Constants.DATABASE_NAME_VARIABLE, change.databaseName());
        linkedHashMap.put(Constants.SCHEMA_NAME_VARIABLE, change.schemaName());
        linkedHashMap.put(Constants.TABLE_NAME_VARIABLE, change.tableName());
        if (Change.ChangeType.DELETE == change.changeType()) {
            log.trace("createRecord() - changeType is delete, setting value to null.");
            struct = null;
            schema = null;
        } else {
            log.trace("createRecord() - Setting value fields.");
            struct = new Struct(schemaPair.getValue().schema);
            schema = struct.schema();
            for (int i2 = 0; i2 < schemaPair.getValue().fields.size(); i2++) {
                setStructField(struct, schemaPair.getValue().fields.get(i2), change.valueColumns().get(i2).value());
            }
            log.trace("createRecord() - Setting metadata.");
            setStructField(struct, Constants.METADATA_FIELD, linkedHashMap);
        }
        return new SourceRecord(change.sourcePartition(), change.sourceOffset(), this.schemaGenerator.topic(change), (Integer) null, schema2, struct2, schema, struct, Long.valueOf(change.timestamp()));
    }

    @Override // io.confluent.connect.cdc.ChangeWriter
    public void addChange(Change change) {
        log.trace("addChange() - Adding change {}", change);
        this.changes.add(createRecord(this.schemaGenerator.schemas(change), change));
    }

    public void start(Map<String, String> map) {
        this.config = getConfig(map);
        this.changes = SourceRecordDequeBuilder.of().batchSize(this.config.batchSize).emptyWaitMs(this.config.backoffTimeMs).build();
        this.schemaGenerator = new SchemaGenerator(this.config);
    }

    public List<SourceRecord> poll() throws InterruptedException {
        return this.changes.getBatch();
    }

    public String version() {
        return VersionUtil.getVersion();
    }
}
