package io.streamthoughts.jikkou.kafka.change.handlers.record;

import io.streamthoughts.jikkou.api.change.ChangeDescription;
import io.streamthoughts.jikkou.api.change.ChangeHandler;
import io.streamthoughts.jikkou.api.change.ChangeMetadata;
import io.streamthoughts.jikkou.api.change.ChangeResponse;
import io.streamthoughts.jikkou.api.change.ChangeType;
import io.streamthoughts.jikkou.api.change.ValueChange;
import io.streamthoughts.jikkou.api.model.HasMetadataChange;
import io.streamthoughts.jikkou.kafka.change.KafkaTableRecordChange;
import io.streamthoughts.jikkou.kafka.internals.KafkaRecord;
import io.streamthoughts.jikkou.kafka.internals.producer.KafkaRecordSender;
import io.streamthoughts.jikkou.kafka.model.DataValue;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTableRecordSpec;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.VisibleForTesting;

/* loaded from: input_file:io/streamthoughts/jikkou/kafka/change/handlers/record/KafkaTableRecordChangeHandler.class */
public final class KafkaTableRecordChangeHandler implements ChangeHandler<KafkaTableRecordChange> {
    private final Producer<byte[], byte[]> producer;

    public KafkaTableRecordChangeHandler(@NotNull Producer<byte[], byte[]> producer) {
        this.producer = (Producer) Objects.requireNonNull(producer, "producerFactory must not be null");
    }

    public Set<ChangeType> supportedChangeTypes() {
        return Set.of(ChangeType.ADD, ChangeType.UPDATE);
    }

    public ChangeDescription getDescriptionFor(@NotNull HasMetadataChange<KafkaTableRecordChange> hasMetadataChange) {
        return new KafkaTableRecordChangeDescription(hasMetadataChange);
    }

    public List<ChangeResponse<KafkaTableRecordChange>> apply(@NotNull List<HasMetadataChange<KafkaTableRecordChange>> list) {
        KafkaRecordSender kafkaRecordSender = new KafkaRecordSender(this.producer);
        return (List) list.stream().map(hasMetadataChange -> {
            return send(hasMetadataChange, kafkaRecordSender);
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: private */
    @NotNull
    public static ChangeResponse<KafkaTableRecordChange> send(HasMetadataChange<KafkaTableRecordChange> hasMetadataChange, KafkaRecordSender<byte[], byte[]> kafkaRecordSender) {
        return new ChangeResponse<>(hasMetadataChange, kafkaRecordSender.send(toKafkaRecord(hasMetadataChange).mapKey(byteBuffer -> {
            return (byte[]) Optional.ofNullable(byteBuffer).map((v0) -> {
                return v0.array();
            }).orElse(null);
        }).mapValue(byteBuffer2 -> {
            return (byte[]) Optional.ofNullable(byteBuffer2).map((v0) -> {
                return v0.array();
            }).orElse(null);
        })).thenApply(producerRequestResult -> {
            return producerRequestResult.error() != null ? ChangeMetadata.of(producerRequestResult.error()) : ChangeMetadata.empty();
        }));
    }

    @VisibleForTesting
    static KafkaRecord<ByteBuffer, ByteBuffer> toKafkaRecord(HasMetadataChange<KafkaTableRecordChange> hasMetadataChange) {
        KafkaTableRecordChange kafkaTableRecordChange = (KafkaTableRecordChange) hasMetadataChange.getChange();
        ChangeType changeType = kafkaTableRecordChange.getChangeType();
        ValueChange<V1KafkaTableRecordSpec> record = kafkaTableRecordChange.getRecord();
        DataValue key = changeType == ChangeType.ADD ? ((V1KafkaTableRecordSpec) record.getAfter()).getKey() : ((V1KafkaTableRecordSpec) record.getBefore()).getKey();
        String topic = kafkaTableRecordChange.getTopic();
        Optional<ByteBuffer> serialize = key.type().getDataSerde().serialize(topic, key.data(), Collections.emptyMap(), true);
        Optional<ByteBuffer> empty = Optional.empty();
        if (changeType != ChangeType.DELETE) {
            DataValue value = ((V1KafkaTableRecordSpec) record.getAfter()).getValue();
            empty = value.type().getDataSerde().serialize(topic, value.data(), Collections.emptyMap(), false);
        }
        return KafkaRecord.builder().topic(topic).headers(new RecordHeaders((List) ((V1KafkaTableRecordSpec) record.getAfter()).getHeaders().stream().map(kafkaRecordHeader -> {
            return new RecordHeader(kafkaRecordHeader.name(), kafkaRecordHeader.value().getBytes(StandardCharsets.UTF_8));
        }).collect(Collectors.toList()))).key(serialize.orElse(null)).value(empty.orElse(null)).build();
    }
}
