package io.debezium.connector.mongodb.transforms.outbox;

import io.debezium.common.annotation.Incubating;
import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.Module;
import io.debezium.connector.mongodb.transforms.ExtractNewDocumentState;
import io.debezium.connector.mongodb.transforms.MongoDataConverter;
import io.debezium.time.Timestamp;
import io.debezium.transforms.ConnectRecordUtil;
import io.debezium.transforms.outbox.EventRouterConfigDefinition;
import io.debezium.transforms.outbox.EventRouterDelegate;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.Transformation;
import org.bson.BsonDocument;
import org.bson.BsonValue;
import org.bson.json.JsonMode;
import org.bson.json.JsonWriterSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Incubating
/* loaded from: input_file:io/debezium/connector/mongodb/transforms/outbox/MongoEventRouter.class */
public class MongoEventRouter<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoEventRouter.class);
    private String fieldTimestamp;
    private String fieldPayload;
    private boolean expandPayload;
    private ExtractField<R> afterExtractor;
    private final JsonWriterSettings jsonWriterSettings = JsonWriterSettings.builder().outputMode(JsonMode.EXTENDED).indent(true).newLineCharacters("\n").build();
    private final MongoDataConverter converter = new MongoDataConverter(ExtractNewDocumentState.ArrayEncoding.ARRAY);
    private final EventRouterDelegate<R> eventRouterDelegate = new EventRouterDelegate<>();

    public R apply(R r) {
        return (R) this.eventRouterDelegate.apply(r, connectRecord -> {
            try {
                return expandAfterField(r);
            } catch (Exception e) {
                LOGGER.warn("Failed to expand after field: " + e.getMessage(), e);
                return r;
            }
        });
    }

    public ConfigDef config() {
        return MongoEventRouterConfigDefinition.configDef();
    }

    public void close() {
        this.eventRouterDelegate.close();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void configure(Map<String, ?> map) {
        Configuration from = Configuration.from(map);
        this.fieldTimestamp = from.getString(MongoEventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP);
        this.expandPayload = from.getBoolean(MongoEventRouterConfigDefinition.EXPAND_JSON_PAYLOAD);
        this.fieldPayload = from.getString(MongoEventRouterConfigDefinition.FIELD_PAYLOAD);
        this.afterExtractor = ConnectRecordUtil.extractAfterDelegate();
        this.eventRouterDelegate.configure(convertConfigMap(map));
    }

    public String version() {
        return Module.version();
    }

    private R expandAfterField(R r) throws IllegalStateException {
        ConnectRecord apply = this.afterExtractor.apply(r);
        Object value = apply.value();
        if (!(value instanceof String)) {
            throw new IllegalStateException("Unable to expand non-String after field: " + String.valueOf(value.getClass()));
        }
        Schema valueSchema = r.valueSchema();
        String name = apply.valueSchema().name();
        BsonDocument parse = BsonDocument.parse((String) value);
        Schema buildNewAfterSchema = buildNewAfterSchema(name, parse);
        Struct buildNewAfterStruct = buildNewAfterStruct(buildNewAfterSchema, parse);
        Schema buildNewValueSchema = buildNewValueSchema(valueSchema.name(), valueSchema, buildNewAfterSchema);
        return (R) r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(), buildNewValueSchema, buildNewValueStruct((Struct) r.value(), buildNewValueSchema, buildNewAfterStruct), r.timestamp(), r.headers());
    }

    private Schema buildNewAfterSchema(String str, BsonDocument bsonDocument) {
        SchemaBuilder name = SchemaBuilder.struct().name(str);
        for (Map.Entry<String, BsonValue> entry : bsonDocument.entrySet()) {
            String key = entry.getKey();
            if (key.equals(this.fieldTimestamp)) {
                name.field(this.fieldTimestamp, Timestamp.schema());
            } else if (key.equals(this.fieldPayload) && !this.expandPayload && (entry.getValue() instanceof BsonDocument)) {
                name.field(this.fieldPayload, Schema.OPTIONAL_STRING_SCHEMA);
            } else {
                this.converter.addFieldSchema(entry, name);
            }
        }
        return name.build();
    }

    private Struct buildNewAfterStruct(Schema schema, BsonDocument bsonDocument) {
        Struct struct = new Struct(schema);
        for (Map.Entry<String, BsonValue> entry : bsonDocument.entrySet()) {
            String key = entry.getKey();
            if (key.equals(this.fieldTimestamp)) {
                struct.put(this.fieldTimestamp, Long.valueOf(entry.getValue().asInt64().getValue()));
            } else if (key.equals(this.fieldPayload) && !this.expandPayload && (entry.getValue() instanceof BsonDocument)) {
                struct.put(this.fieldPayload, entry.getValue().asDocument().toJson(this.jsonWriterSettings));
            } else {
                this.converter.convertRecord(entry, schema, struct);
            }
        }
        return struct;
    }

    private Schema buildNewValueSchema(String str, Schema schema, Schema schema2) {
        SchemaBuilder name = SchemaBuilder.struct().name(str);
        for (Field field : schema.fields()) {
            if (!field.name().equals("after")) {
                name.field(field.name(), field.schema());
            }
        }
        name.field("after", schema2);
        return name.build();
    }

    private Struct buildNewValueStruct(Struct struct, Schema schema, Struct struct2) {
        Struct struct3 = new Struct(schema);
        for (Field field : struct.schema().fields()) {
            if (!field.name().equals("after")) {
                struct3.put(field.name(), struct.get(field));
            }
        }
        struct3.put("after", struct2);
        return struct3;
    }

    private <T> Map<String, T> convertConfigMap(Map<String, T> map) {
        Map<String, String> createFieldNameConverter = createFieldNameConverter();
        HashMap hashMap = new HashMap();
        for (String str : map.keySet()) {
            if (createFieldNameConverter.containsKey(str)) {
                hashMap.put(createFieldNameConverter.get(str), map.get(str));
            }
        }
        if (!hasConfigFieldEventId(hashMap)) {
            hashMap.put(EventRouterConfigDefinition.FIELD_EVENT_ID.name(), MongoEventRouterConfigDefinition.FIELD_EVENT_ID.defaultValue());
        }
        return hashMap;
    }

    private <T> boolean hasConfigFieldEventId(Map<String, T> map) {
        return map.containsKey(EventRouterConfigDefinition.FIELD_EVENT_ID.name());
    }

    private Map<String, String> createFieldNameConverter() {
        HashMap hashMap = new HashMap();
        hashMap.put(MongoEventRouterConfigDefinition.FIELD_EVENT_ID.name(), EventRouterConfigDefinition.FIELD_EVENT_ID.name());
        hashMap.put(MongoEventRouterConfigDefinition.FIELD_EVENT_KEY.name(), EventRouterConfigDefinition.FIELD_EVENT_KEY.name());
        hashMap.put(MongoEventRouterConfigDefinition.FIELD_EVENT_TYPE.name(), EventRouterConfigDefinition.FIELD_EVENT_TYPE.name());
        hashMap.put(MongoEventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP.name(), EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP.name());
        hashMap.put(MongoEventRouterConfigDefinition.FIELD_PAYLOAD.name(), EventRouterConfigDefinition.FIELD_PAYLOAD.name());
        hashMap.put(MongoEventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name());
        hashMap.put(MongoEventRouterConfigDefinition.FIELD_SCHEMA_VERSION.name(), EventRouterConfigDefinition.FIELD_SCHEMA_VERSION.name());
        hashMap.put(MongoEventRouterConfigDefinition.ROUTE_BY_FIELD.name(), EventRouterConfigDefinition.ROUTE_BY_FIELD.name());
        hashMap.put(MongoEventRouterConfigDefinition.ROUTE_TOPIC_REGEX.name(), EventRouterConfigDefinition.ROUTE_TOPIC_REGEX.name());
        hashMap.put(MongoEventRouterConfigDefinition.ROUTE_TOPIC_REPLACEMENT.name(), EventRouterConfigDefinition.ROUTE_TOPIC_REPLACEMENT.name());
        hashMap.put(MongoEventRouterConfigDefinition.ROUTE_TOMBSTONE_ON_EMPTY_PAYLOAD.name(), EventRouterConfigDefinition.ROUTE_TOMBSTONE_ON_EMPTY_PAYLOAD.name());
        hashMap.put(MongoEventRouterConfigDefinition.OPERATION_INVALID_BEHAVIOR.name(), EventRouterConfigDefinition.OPERATION_INVALID_BEHAVIOR.name());
        return hashMap;
    }
}
