package io.kroxylicious.proxy.filter.simpletransform;

import io.kroxylicious.proxy.filter.FilterContext;
import io.kroxylicious.proxy.filter.ProduceRequestFilter;
import io.kroxylicious.proxy.filter.RequestFilterResult;
import java.util.Iterator;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferOutputStream;

/* loaded from: input_file:io/kroxylicious/proxy/filter/simpletransform/ProduceRequestTransformationFilter.class */
class ProduceRequestTransformationFilter implements ProduceRequestFilter {
    private final ByteBufferTransformation valueTransformation;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ProduceRequestTransformationFilter(ByteBufferTransformation byteBufferTransformation) {
        this.valueTransformation = byteBufferTransformation;
    }

    public CompletionStage<RequestFilterResult> onProduceRequest(short s, RequestHeaderData requestHeaderData, ProduceRequestData produceRequestData, FilterContext filterContext) {
        applyTransformation(filterContext, produceRequestData);
        return filterContext.forwardRequest(requestHeaderData, produceRequestData);
    }

    private void applyTransformation(FilterContext filterContext, ProduceRequestData produceRequestData) {
        produceRequestData.topicData().forEach(topicProduceData -> {
            for (ProduceRequestData.PartitionProduceData partitionProduceData : topicProduceData.partitionData()) {
                MemoryRecords records = partitionProduceData.records();
                ByteBufferOutputStream createByteBufferOutputStream = filterContext.createByteBufferOutputStream(records.sizeInBytes());
                MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(createByteBufferOutputStream, (byte) 2, Compression.NONE, TimestampType.CREATE_TIME, 0L, System.currentTimeMillis(), -1L, (short) -1, -1, false, false, -1, createByteBufferOutputStream.remaining());
                try {
                    Iterator it = records.batches().iterator();
                    while (it.hasNext()) {
                        for (Record record : (MutableRecordBatch) it.next()) {
                            memoryRecordsBuilder.appendWithOffset(record.offset(), record.timestamp(), record.key(), this.valueTransformation.transform(topicProduceData.name(), record.value()));
                        }
                    }
                    partitionProduceData.setRecords(memoryRecordsBuilder.build());
                    memoryRecordsBuilder.close();
                } catch (Throwable th) {
                    try {
                        memoryRecordsBuilder.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            }
        });
    }
}
