package org.apache.kafka.common.record;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Random;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.EnumSource;

/* loaded from: input_file:org/apache/kafka/common/record/MemoryRecordsBuilderTest.class */
public class MemoryRecordsBuilderTest {
    private final Time time = Time.SYSTEM;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/common/record/MemoryRecordsBuilderTest$Args.class */
    public static class Args {
        final int bufferOffset;
        final CompressionType compressionType;
        final byte magic;

        public Args(int i, CompressionType compressionType, byte b) {
            this.bufferOffset = i;
            this.compressionType = compressionType;
            this.magic = b;
        }

        public String toString() {
            return "magic=" + ((int) this.magic) + ", bufferOffset=" + this.bufferOffset + ", compressionType=" + this.compressionType;
        }
    }

    /* loaded from: input_file:org/apache/kafka/common/record/MemoryRecordsBuilderTest$MemoryRecordsBuilderArgumentsProvider.class */
    private static class MemoryRecordsBuilderArgumentsProvider implements ArgumentsProvider {
        private MemoryRecordsBuilderArgumentsProvider() {
        }

        public Stream<? extends Arguments> provideArguments(ExtensionContext extensionContext) {
            ArrayList arrayList = new ArrayList();
            Iterator it = Arrays.asList(0, 15).iterator();
            while (it.hasNext()) {
                int intValue = ((Integer) it.next()).intValue();
                CompressionType[] values = CompressionType.values();
                int length = values.length;
                for (int i = 0; i < length; i++) {
                    CompressionType compressionType = values[i];
                    Iterator it2 = (compressionType == CompressionType.ZSTD ? Collections.singletonList((byte) 2) : Arrays.asList((byte) 0, (byte) 1, (byte) 2)).iterator();
                    while (it2.hasNext()) {
                        arrayList.add(Arguments.of(new Object[]{new Args(intValue, compressionType, ((Byte) it2.next()).byteValue())}));
                    }
                }
            }
            return arrayList.stream();
        }
    }

    /* loaded from: input_file:org/apache/kafka/common/record/MemoryRecordsBuilderTest$V2MemoryRecordsBuilderArgumentsProvider.class */
    private static class V2MemoryRecordsBuilderArgumentsProvider implements ArgumentsProvider {
        private V2MemoryRecordsBuilderArgumentsProvider() {
        }

        public Stream<? extends Arguments> provideArguments(ExtensionContext extensionContext) {
            ArrayList arrayList = new ArrayList();
            Iterator it = Arrays.asList(0, 15).iterator();
            while (it.hasNext()) {
                int intValue = ((Integer) it.next()).intValue();
                for (CompressionType compressionType : CompressionType.values()) {
                    arrayList.add(Arguments.of(new Object[]{new Args(intValue, compressionType, (byte) 2)}));
                }
            }
            return arrayList.stream();
        }
    }

    @Test
    public void testUnsupportedCompress() {
        BiFunction biFunction = (b, compressionType) -> {
            return new MemoryRecordsBuilder(ByteBuffer.allocate(128), b.byteValue(), compressionType, TimestampType.CREATE_TIME, 0L, 0L, -1L, (short) -1, -1, false, false, -1, 128);
        };
        Arrays.asList((byte) 0, (byte) 1).forEach(b2 -> {
            Assertions.assertEquals(((Exception) Assertions.assertThrows(IllegalArgumentException.class, () -> {
                biFunction.apply(b2, CompressionType.ZSTD);
            })).getMessage(), "ZStandard compression is not supported for magic " + b2);
        });
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void testWriteEmptyRecordSet(Args args) {
        byte b = args.magic;
        ByteBuffer allocateBuffer = allocateBuffer(128, args);
        Assertions.assertEquals(0, new MemoryRecordsBuilder(allocateBuffer, b, args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, -1L, (short) -1, -1, false, false, -1, allocateBuffer.capacity()).build().sizeInBytes());
        Assertions.assertEquals(args.bufferOffset, allocateBuffer.position());
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void testWriteTransactionalRecordSet(Args args) {
        ByteBuffer allocateBuffer = allocateBuffer(128, args);
        long j = 9809;
        short s = 15;
        int i = 2342;
        Supplier supplier = () -> {
            return new MemoryRecordsBuilder(allocateBuffer, args.magic, args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, j, s, i, true, false, -1, allocateBuffer.capacity());
        };
        if (args.magic < 2) {
            Objects.requireNonNull(supplier);
            Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
            return;
        }
        MemoryRecordsBuilder memoryRecordsBuilder = (MemoryRecordsBuilder) supplier.get();
        memoryRecordsBuilder.append(System.currentTimeMillis(), "foo".getBytes(), "bar".getBytes());
        List list = Utils.toList(memoryRecordsBuilder.build().batches().iterator());
        Assertions.assertEquals(1, list.size());
        Assertions.assertTrue(((MutableRecordBatch) list.get(0)).isTransactional());
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void testWriteTransactionalWithInvalidPID(Args args) {
        ByteBuffer allocateBuffer = allocateBuffer(128, args);
        long j = -1;
        short s = 15;
        int i = 2342;
        Supplier supplier = () -> {
            return new MemoryRecordsBuilder(allocateBuffer, args.magic, args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, j, s, i, true, false, -1, allocateBuffer.capacity());
        };
        if (args.magic < 2) {
            Objects.requireNonNull(supplier);
            Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
        } else {
            MemoryRecordsBuilder memoryRecordsBuilder = (MemoryRecordsBuilder) supplier.get();
            Objects.requireNonNull(memoryRecordsBuilder);
            Assertions.assertThrows(IllegalArgumentException.class, memoryRecordsBuilder::close);
        }
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void testWriteIdempotentWithInvalidEpoch(Args args) {
        ByteBuffer allocateBuffer = allocateBuffer(128, args);
        long j = 9809;
        short s = -1;
        int i = 2342;
        Supplier supplier = () -> {
            return new MemoryRecordsBuilder(allocateBuffer, args.magic, args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, j, s, i, true, false, -1, allocateBuffer.capacity());
        };
        if (args.magic < 2) {
            Objects.requireNonNull(supplier);
            Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
        } else {
            MemoryRecordsBuilder memoryRecordsBuilder = (MemoryRecordsBuilder) supplier.get();
            Objects.requireNonNull(memoryRecordsBuilder);
            Assertions.assertThrows(IllegalArgumentException.class, memoryRecordsBuilder::close);
        }
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void testWriteIdempotentWithInvalidBaseSequence(Args args) {
        ByteBuffer allocateBuffer = allocateBuffer(128, args);
        long j = 9809;
        short s = 15;
        int i = -1;
        Supplier supplier = () -> {
            return new MemoryRecordsBuilder(allocateBuffer, args.magic, args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, j, s, i, true, false, -1, allocateBuffer.capacity());
        };
        if (args.magic < 2) {
            Objects.requireNonNull(supplier);
            Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
        } else {
            MemoryRecordsBuilder memoryRecordsBuilder = (MemoryRecordsBuilder) supplier.get();
            Objects.requireNonNull(memoryRecordsBuilder);
            Assertions.assertThrows(IllegalArgumentException.class, memoryRecordsBuilder::close);
        }
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void testWriteEndTxnMarkerNonTransactionalBatch(Args args) {
        ByteBuffer allocateBuffer = allocateBuffer(128, args);
        long j = 9809;
        short s = 15;
        int i = -1;
        Supplier supplier = () -> {
            return new MemoryRecordsBuilder(allocateBuffer, args.magic, args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, j, s, i, false, true, -1, allocateBuffer.capacity());
        };
        if (args.magic < 2) {
            Objects.requireNonNull(supplier);
            Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
        } else {
            MemoryRecordsBuilder memoryRecordsBuilder = (MemoryRecordsBuilder) supplier.get();
            Assertions.assertThrows(IllegalArgumentException.class, () -> {
                memoryRecordsBuilder.appendEndTxnMarker(-1L, new EndTransactionMarker(ControlRecordType.ABORT, 0));
            });
        }
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void testWriteEndTxnMarkerNonControlBatch(Args args) {
        ByteBuffer allocateBuffer = allocateBuffer(128, args);
        long j = 9809;
        short s = 15;
        int i = -1;
        Supplier supplier = () -> {
            return new MemoryRecordsBuilder(allocateBuffer, args.magic, args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, j, s, i, true, false, -1, allocateBuffer.capacity());
        };
        if (args.magic < 2) {
            Objects.requireNonNull(supplier);
            Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
        } else {
            MemoryRecordsBuilder memoryRecordsBuilder = (MemoryRecordsBuilder) supplier.get();
            Assertions.assertThrows(IllegalArgumentException.class, () -> {
                memoryRecordsBuilder.appendEndTxnMarker(-1L, new EndTransactionMarker(ControlRecordType.ABORT, 0));
            });
        }
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void testWriteLeaderChangeControlBatchWithoutLeaderEpoch(Args args) {
        ByteBuffer allocateBuffer = allocateBuffer(128, args);
        Supplier supplier = () -> {
            return new MemoryRecordsBuilder(allocateBuffer, args.magic, args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, -1L, (short) -1, -1, false, true, -1, allocateBuffer.capacity());
        };
        if (args.magic < 2) {
            Objects.requireNonNull(supplier);
            Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
        } else {
            MemoryRecordsBuilder memoryRecordsBuilder = (MemoryRecordsBuilder) supplier.get();
            Assertions.assertThrows(IllegalArgumentException.class, () -> {
                memoryRecordsBuilder.appendLeaderChangeMessage(-1L, new LeaderChangeMessage().setLeaderId(1).setVoters(Collections.emptyList()));
            });
        }
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void testWriteLeaderChangeControlBatch(Args args) {
        ByteBuffer allocateBuffer = allocateBuffer(128, args);
        List asList = Arrays.asList(2, 3);
        Supplier supplier = () -> {
            return new MemoryRecordsBuilder(allocateBuffer, args.magic, args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, -1L, (short) -1, -1, false, true, 5, allocateBuffer.capacity());
        };
        if (args.magic < 2) {
            Objects.requireNonNull(supplier);
            Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
            return;
        }
        MemoryRecordsBuilder memoryRecordsBuilder = (MemoryRecordsBuilder) supplier.get();
        memoryRecordsBuilder.appendLeaderChangeMessage(-1L, new LeaderChangeMessage().setLeaderId(1).setVoters((List) asList.stream().map(num -> {
            return new LeaderChangeMessage.Voter().setVoterId(num.intValue());
        }).collect(Collectors.toList())));
        List list = TestUtils.toList(memoryRecordsBuilder.build().records());
        Assertions.assertEquals(1, list.size());
        LeaderChangeMessage deserializeLeaderChangeMessage = ControlRecordUtils.deserializeLeaderChangeMessage((Record) list.get(0));
        Assertions.assertEquals(1, deserializeLeaderChangeMessage.leaderId());
        Assertions.assertEquals(asList, deserializeLeaderChangeMessage.voters().stream().map((v0) -> {
            return v0.voterId();
        }).collect(Collectors.toList()));
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void testLegacyCompressionRate(Args args) {
        byte b = args.magic;
        ByteBuffer allocateBuffer = allocateBuffer(1024, args);
        Supplier supplier = () -> {
            return new LegacyRecord[]{LegacyRecord.create(b, 0L, "a".getBytes(), "1".getBytes()), LegacyRecord.create(b, 1L, "b".getBytes(), "2".getBytes()), LegacyRecord.create(b, 2L, "c".getBytes(), "3".getBytes())};
        };
        if (b >= 2) {
            Objects.requireNonNull(supplier);
            Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
            return;
        }
        LegacyRecord[] legacyRecordArr = (LegacyRecord[]) supplier.get();
        MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(allocateBuffer, b, args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, -1L, (short) -1, -1, false, false, -1, allocateBuffer.capacity());
        int i = 0;
        for (LegacyRecord legacyRecord : legacyRecordArr) {
            i += legacyRecord.sizeInBytes() + 12;
            memoryRecordsBuilder.append(legacyRecord);
        }
        MemoryRecords build = memoryRecordsBuilder.build();
        if (args.compressionType == CompressionType.NONE) {
            Assertions.assertEquals(1.0d, memoryRecordsBuilder.compressionRatio(), 1.0E-5d);
        } else {
            Assertions.assertEquals(((build.sizeInBytes() - 12) - (b == 0 ? 14 : 22)) / i, memoryRecordsBuilder.compressionRatio(), 1.0E-5d);
        }
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void testEstimatedSizeInBytes(Args args) {
        ByteBuffer allocateBuffer = allocateBuffer(1024, args);
        MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(allocateBuffer, args.magic, args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, -1L, (short) -1, -1, false, false, -1, allocateBuffer.capacity());
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            memoryRecordsBuilder.append(new SimpleRecord(i2, ("" + i2).getBytes()));
            int estimatedSizeInBytes = memoryRecordsBuilder.estimatedSizeInBytes();
            Assertions.assertTrue(estimatedSizeInBytes > i);
            i = estimatedSizeInBytes;
        }
        int estimatedSizeInBytes2 = memoryRecordsBuilder.estimatedSizeInBytes();
        MemoryRecords build = memoryRecordsBuilder.build();
        Assertions.assertEquals(build.sizeInBytes(), memoryRecordsBuilder.estimatedSizeInBytes());
        if (args.compressionType == CompressionType.NONE) {
            Assertions.assertEquals(build.sizeInBytes(), estimatedSizeInBytes2);
        }
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void buildUsingLogAppendTime(Args args) {
        byte b = args.magic;
        ByteBuffer allocateBuffer = allocateBuffer(1024, args);
        long currentTimeMillis = System.currentTimeMillis();
        MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(allocateBuffer, b, args.compressionType, TimestampType.LOG_APPEND_TIME, 0L, currentTimeMillis, -1L, (short) -1, -1, false, false, -1, allocateBuffer.capacity());
        memoryRecordsBuilder.append(0L, "a".getBytes(), "1".getBytes());
        memoryRecordsBuilder.append(0L, "b".getBytes(), "2".getBytes());
        memoryRecordsBuilder.append(0L, "c".getBytes(), "3".getBytes());
        MemoryRecords build = memoryRecordsBuilder.build();
        MemoryRecordsBuilder.RecordsInfo info = memoryRecordsBuilder.info();
        Assertions.assertEquals(currentTimeMillis, info.maxTimestamp);
        if (args.compressionType != CompressionType.NONE || b > 1) {
            Assertions.assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
        } else {
            Assertions.assertEquals(0L, info.shallowOffsetOfMaxTimestamp);
        }
        for (RecordBatch recordBatch : build.batches()) {
            if (b == 0) {
                Assertions.assertEquals(TimestampType.NO_TIMESTAMP_TYPE, recordBatch.timestampType());
            } else {
                Assertions.assertEquals(TimestampType.LOG_APPEND_TIME, recordBatch.timestampType());
                Iterator it = recordBatch.iterator();
                while (it.hasNext()) {
                    Assertions.assertEquals(currentTimeMillis, ((Record) it.next()).timestamp());
                }
            }
        }
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void buildUsingCreateTime(Args args) {
        byte b = args.magic;
        ByteBuffer allocateBuffer = allocateBuffer(1024, args);
        MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(allocateBuffer, b, args.compressionType, TimestampType.CREATE_TIME, 0L, System.currentTimeMillis(), -1L, (short) -1, -1, false, false, -1, allocateBuffer.capacity());
        memoryRecordsBuilder.append(0L, "a".getBytes(), "1".getBytes());
        memoryRecordsBuilder.append(2L, "b".getBytes(), "2".getBytes());
        memoryRecordsBuilder.append(1L, "c".getBytes(), "3".getBytes());
        MemoryRecords build = memoryRecordsBuilder.build();
        MemoryRecordsBuilder.RecordsInfo info = memoryRecordsBuilder.info();
        if (b == 0) {
            Assertions.assertEquals(-1L, info.maxTimestamp);
        } else {
            Assertions.assertEquals(2L, info.maxTimestamp);
        }
        if (args.compressionType == CompressionType.NONE && b == 1) {
            Assertions.assertEquals(1L, info.shallowOffsetOfMaxTimestamp);
        } else {
            Assertions.assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
        }
        int i = 0;
        long[] jArr = {0, 2, 1};
        for (RecordBatch recordBatch : build.batches()) {
            if (b == 0) {
                Assertions.assertEquals(TimestampType.NO_TIMESTAMP_TYPE, recordBatch.timestampType());
            } else {
                Assertions.assertEquals(TimestampType.CREATE_TIME, recordBatch.timestampType());
                Iterator it = recordBatch.iterator();
                while (it.hasNext()) {
                    int i2 = i;
                    i++;
                    Assertions.assertEquals(jArr[i2], ((Record) it.next()).timestamp());
                }
            }
        }
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void testAppendedChecksumConsistency(Args args) {
        ByteBuffer allocate = ByteBuffer.allocate(512);
        MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(allocate, args.magic, args.compressionType, TimestampType.CREATE_TIME, 0L, -1L, -1L, (short) -1, -1, false, false, -1, allocate.capacity());
        memoryRecordsBuilder.append(1L, "key".getBytes(), "value".getBytes());
        Assertions.assertEquals(1, TestUtils.toList(memoryRecordsBuilder.build().records()).size());
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void testSmallWriteLimit(Args args) {
        byte[] bytes = "foo".getBytes();
        byte[] bytes2 = "bar".getBytes();
        MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(ByteBuffer.allocate(512), args.magic, args.compressionType, TimestampType.CREATE_TIME, 0L, -1L, -1L, (short) -1, -1, false, false, -1, 0);
        Assertions.assertFalse(memoryRecordsBuilder.isFull());
        Assertions.assertTrue(memoryRecordsBuilder.hasRoomFor(0L, bytes, bytes2, Record.EMPTY_HEADERS));
        memoryRecordsBuilder.append(0L, bytes, bytes2);
        Assertions.assertTrue(memoryRecordsBuilder.isFull());
        Assertions.assertFalse(memoryRecordsBuilder.hasRoomFor(0L, bytes, bytes2, Record.EMPTY_HEADERS));
        List list = TestUtils.toList(memoryRecordsBuilder.build().records());
        Assertions.assertEquals(1, list.size());
        Record record = (Record) list.get(0);
        Assertions.assertEquals(ByteBuffer.wrap(bytes), record.key());
        Assertions.assertEquals(ByteBuffer.wrap(bytes2), record.value());
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void writePastLimit(Args args) {
        byte b = args.magic;
        ByteBuffer allocateBuffer = allocateBuffer(64, args);
        MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(allocateBuffer, b, args.compressionType, TimestampType.CREATE_TIME, 0L, System.currentTimeMillis(), -1L, (short) -1, -1, false, false, -1, allocateBuffer.capacity());
        memoryRecordsBuilder.setEstimatedCompressionRatio(0.5f);
        memoryRecordsBuilder.append(0L, "a".getBytes(), "1".getBytes());
        memoryRecordsBuilder.append(1L, "b".getBytes(), "2".getBytes());
        Assertions.assertFalse(memoryRecordsBuilder.hasRoomFor(2L, "c".getBytes(), "3".getBytes(), Record.EMPTY_HEADERS));
        memoryRecordsBuilder.append(2L, "c".getBytes(), "3".getBytes());
        MemoryRecords build = memoryRecordsBuilder.build();
        MemoryRecordsBuilder.RecordsInfo info = memoryRecordsBuilder.info();
        if (b == 0) {
            Assertions.assertEquals(-1L, info.maxTimestamp);
        } else {
            Assertions.assertEquals(2L, info.maxTimestamp);
        }
        Assertions.assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
        long j = 0;
        for (RecordBatch<Record> recordBatch : build.batches()) {
            if (b == 0) {
                Assertions.assertEquals(TimestampType.NO_TIMESTAMP_TYPE, recordBatch.timestampType());
            } else {
                Assertions.assertEquals(TimestampType.CREATE_TIME, recordBatch.timestampType());
                for (Record record : recordBatch) {
                    long j2 = j;
                    j = j2 + 1;
                    Assertions.assertEquals(j2, record.timestamp());
                }
            }
        }
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void testAppendAtInvalidOffset(Args args) {
        ByteBuffer allocateBuffer = allocateBuffer(1024, args);
        MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(allocateBuffer, args.magic, args.compressionType, TimestampType.CREATE_TIME, 0L, System.currentTimeMillis(), -1L, (short) -1, -1, false, false, -1, allocateBuffer.capacity());
        memoryRecordsBuilder.appendWithOffset(0L, System.currentTimeMillis(), "a".getBytes(), (byte[]) null);
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            memoryRecordsBuilder.appendWithOffset(0L, System.currentTimeMillis(), "b".getBytes(), (byte[]) null);
        });
    }

    @EnumSource(CompressionType.class)
    @ParameterizedTest
    public void convertV2ToV1UsingMixedCreateAndLogAppendTime(CompressionType compressionType) {
        ByteBuffer allocate = ByteBuffer.allocate(512);
        MemoryRecordsBuilder builder = MemoryRecords.builder(allocate, (byte) 2, compressionType, TimestampType.LOG_APPEND_TIME, 0L);
        builder.append(10L, "1".getBytes(), "a".getBytes());
        builder.close();
        int position = allocate.position();
        MemoryRecords.writeEndTransactionalMarker(allocate, 1L, System.currentTimeMillis(), 0, 15L, (short) 0, new EndTransactionMarker(ControlRecordType.ABORT, 0));
        int position2 = allocate.position();
        MemoryRecordsBuilder builder2 = MemoryRecords.builder(allocate, (byte) 2, compressionType, TimestampType.CREATE_TIME, 1L);
        builder2.append(12L, "2".getBytes(), "b".getBytes());
        builder2.append(13L, "3".getBytes(), "c".getBytes());
        builder2.close();
        int position3 = position + (allocate.position() - position2);
        MemoryRecords.writeEndTransactionalMarker(allocate, 14L, System.currentTimeMillis(), 0, 1L, (short) 0, new EndTransactionMarker(ControlRecordType.COMMIT, 0));
        allocate.flip();
        Supplier supplier = () -> {
            return MemoryRecords.readableRecords(allocate).downConvert((byte) 1, 0L, this.time);
        };
        if (compressionType == CompressionType.ZSTD) {
            Objects.requireNonNull(supplier);
            Assertions.assertEquals("Down-conversion of zstandard-compressed batches is not supported", ((Exception) Assertions.assertThrows(UnsupportedCompressionTypeException.class, supplier::get)).getMessage());
            return;
        }
        ConvertedRecords convertedRecords = (ConvertedRecords) supplier.get();
        MemoryRecords records = convertedRecords.records();
        verifyRecordsProcessingStats(compressionType, convertedRecords.recordConversionStats(), 3, 3, records.sizeInBytes(), position3);
        List list = Utils.toList(records.batches().iterator());
        if (compressionType != CompressionType.NONE) {
            Assertions.assertEquals(2, list.size());
            Assertions.assertEquals(TimestampType.LOG_APPEND_TIME, ((RecordBatch) list.get(0)).timestampType());
            Assertions.assertEquals(TimestampType.CREATE_TIME, ((RecordBatch) list.get(1)).timestampType());
        } else {
            Assertions.assertEquals(3, list.size());
            Assertions.assertEquals(TimestampType.LOG_APPEND_TIME, ((RecordBatch) list.get(0)).timestampType());
            Assertions.assertEquals(TimestampType.CREATE_TIME, ((RecordBatch) list.get(1)).timestampType());
            Assertions.assertEquals(TimestampType.CREATE_TIME, ((RecordBatch) list.get(2)).timestampType());
        }
        List list2 = Utils.toList(records.records().iterator());
        Assertions.assertEquals(3, list2.size());
        Assertions.assertEquals(ByteBuffer.wrap("1".getBytes()), ((Record) list2.get(0)).key());
        Assertions.assertEquals(ByteBuffer.wrap("2".getBytes()), ((Record) list2.get(1)).key());
        Assertions.assertEquals(ByteBuffer.wrap("3".getBytes()), ((Record) list2.get(2)).key());
    }

    @EnumSource(CompressionType.class)
    @ParameterizedTest
    public void convertToV1WithMixedV0AndV2Data(CompressionType compressionType) {
        ByteBuffer allocate = ByteBuffer.allocate(512);
        Supplier supplier = () -> {
            return MemoryRecords.builder(allocate, (byte) 0, compressionType, TimestampType.NO_TIMESTAMP_TYPE, 0L);
        };
        if (compressionType == CompressionType.ZSTD) {
            Objects.requireNonNull(supplier);
            Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
            return;
        }
        MemoryRecordsBuilder memoryRecordsBuilder = (MemoryRecordsBuilder) supplier.get();
        memoryRecordsBuilder.append(-1L, "1".getBytes(), "a".getBytes());
        memoryRecordsBuilder.close();
        MemoryRecordsBuilder builder = MemoryRecords.builder(allocate, (byte) 2, compressionType, TimestampType.CREATE_TIME, 1L);
        builder.append(11L, "2".getBytes(), "b".getBytes());
        builder.append(12L, "3".getBytes(), "c".getBytes());
        builder.close();
        allocate.flip();
        ConvertedRecords downConvert = MemoryRecords.readableRecords(allocate).downConvert((byte) 1, 0L, this.time);
        MemoryRecords records = downConvert.records();
        verifyRecordsProcessingStats(compressionType, downConvert.recordConversionStats(), 3, 2, records.sizeInBytes(), allocate.limit());
        List list = Utils.toList(records.batches().iterator());
        if (compressionType != CompressionType.NONE) {
            Assertions.assertEquals(2, list.size());
            Assertions.assertEquals((byte) 0, ((RecordBatch) list.get(0)).magic());
            Assertions.assertEquals(0L, ((RecordBatch) list.get(0)).baseOffset());
            Assertions.assertEquals((byte) 1, ((RecordBatch) list.get(1)).magic());
            Assertions.assertEquals(1L, ((RecordBatch) list.get(1)).baseOffset());
        } else {
            Assertions.assertEquals(3, list.size());
            Assertions.assertEquals((byte) 0, ((RecordBatch) list.get(0)).magic());
            Assertions.assertEquals(0L, ((RecordBatch) list.get(0)).baseOffset());
            Assertions.assertEquals((byte) 1, ((RecordBatch) list.get(1)).magic());
            Assertions.assertEquals(1L, ((RecordBatch) list.get(1)).baseOffset());
            Assertions.assertEquals((byte) 1, ((RecordBatch) list.get(2)).magic());
            Assertions.assertEquals(2L, ((RecordBatch) list.get(2)).baseOffset());
        }
        List list2 = Utils.toList(records.records().iterator());
        Assertions.assertEquals("1", Utils.utf8(((Record) list2.get(0)).key()));
        Assertions.assertEquals("2", Utils.utf8(((Record) list2.get(1)).key()));
        Assertions.assertEquals("3", Utils.utf8(((Record) list2.get(2)).key()));
        ConvertedRecords downConvert2 = MemoryRecords.readableRecords(allocate).downConvert((byte) 1, 2L, this.time);
        MemoryRecords records2 = downConvert2.records();
        List list3 = Utils.toList(records2.batches().iterator());
        List list4 = Utils.toList(records2.records().iterator());
        if (compressionType == CompressionType.NONE) {
            Assertions.assertEquals(2, list3.size());
            Assertions.assertEquals((byte) 0, ((RecordBatch) list3.get(0)).magic());
            Assertions.assertEquals(0L, ((RecordBatch) list3.get(0)).baseOffset());
            Assertions.assertEquals((byte) 1, ((RecordBatch) list3.get(1)).magic());
            Assertions.assertEquals(2L, ((RecordBatch) list3.get(1)).baseOffset());
            Assertions.assertEquals("1", Utils.utf8(((Record) list4.get(0)).key()));
            Assertions.assertEquals("3", Utils.utf8(((Record) list4.get(1)).key()));
            verifyRecordsProcessingStats(compressionType, downConvert2.recordConversionStats(), 3, 1, records2.sizeInBytes(), allocate.limit());
            return;
        }
        Assertions.assertEquals(2, list3.size());
        Assertions.assertEquals((byte) 0, ((RecordBatch) list3.get(0)).magic());
        Assertions.assertEquals(0L, ((RecordBatch) list3.get(0)).baseOffset());
        Assertions.assertEquals((byte) 1, ((RecordBatch) list3.get(1)).magic());
        Assertions.assertEquals(1L, ((RecordBatch) list3.get(1)).baseOffset());
        Assertions.assertEquals("1", Utils.utf8(((Record) list4.get(0)).key()));
        Assertions.assertEquals("2", Utils.utf8(((Record) list4.get(1)).key()));
        Assertions.assertEquals("3", Utils.utf8(((Record) list4.get(2)).key()));
        verifyRecordsProcessingStats(compressionType, downConvert2.recordConversionStats(), 3, 2, records2.sizeInBytes(), allocate.limit());
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void shouldThrowIllegalStateExceptionOnBuildWhenAborted(Args args) {
        ByteBuffer allocateBuffer = allocateBuffer(128, args);
        MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(allocateBuffer, args.magic, args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, -1L, (short) -1, -1, false, false, -1, allocateBuffer.capacity());
        memoryRecordsBuilder.abort();
        Objects.requireNonNull(memoryRecordsBuilder);
        Assertions.assertThrows(IllegalStateException.class, memoryRecordsBuilder::build);
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void shouldResetBufferToInitialPositionOnAbort(Args args) {
        ByteBuffer allocateBuffer = allocateBuffer(128, args);
        MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(allocateBuffer, args.magic, args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, -1L, (short) -1, -1, false, false, -1, allocateBuffer.capacity());
        memoryRecordsBuilder.append(0L, "a".getBytes(), "1".getBytes());
        memoryRecordsBuilder.abort();
        Assertions.assertEquals(args.bufferOffset, memoryRecordsBuilder.buffer().position());
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void shouldThrowIllegalStateExceptionOnCloseWhenAborted(Args args) {
        ByteBuffer allocateBuffer = allocateBuffer(128, args);
        MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(allocateBuffer, args.magic, args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, -1L, (short) -1, -1, false, false, -1, allocateBuffer.capacity());
        memoryRecordsBuilder.abort();
        Objects.requireNonNull(memoryRecordsBuilder);
        Assertions.assertThrows(IllegalStateException.class, memoryRecordsBuilder::close, "Should have thrown IllegalStateException");
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void shouldThrowIllegalStateExceptionOnAppendWhenAborted(Args args) {
        ByteBuffer allocateBuffer = allocateBuffer(128, args);
        MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(allocateBuffer, args.magic, args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, -1L, (short) -1, -1, false, false, -1, allocateBuffer.capacity());
        memoryRecordsBuilder.abort();
        Assertions.assertThrows(IllegalStateException.class, () -> {
            memoryRecordsBuilder.append(0L, "a".getBytes(), "1".getBytes());
        }, "Should have thrown IllegalStateException");
    }

    @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void testBuffersDereferencedOnClose(Args args) {
        Runtime runtime = Runtime.getRuntime();
        ByteBuffer allocate = ByteBuffer.allocate(1048576 * 2);
        byte[] bArr = new byte[0];
        byte[] bArr2 = new byte[1048576];
        new Random().nextBytes(bArr2);
        ArrayList arrayList = new ArrayList(100);
        long j = 0;
        long j2 = 0;
        int i = 0;
        while (true) {
            int i2 = i;
            i++;
            if (i2 >= 100) {
                break;
            }
            allocate.rewind();
            MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(allocate, args.magic, args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, -1L, (short) -1, -1, false, false, -1, 0);
            memoryRecordsBuilder.append(1L, bArr, bArr2);
            memoryRecordsBuilder.build();
            arrayList.add(memoryRecordsBuilder);
            System.gc();
            j2 = (runtime.totalMemory() - runtime.freeMemory()) - j;
            if (i != 2) {
                if (i > 2 && j2 < (i - 2) * 1024) {
                    break;
                }
            } else {
                j = j2;
            }
        }
        Assertions.assertTrue(i < 100, "Memory usage too high: " + j2);
    }

    @ArgumentsSource(V2MemoryRecordsBuilderArgumentsProvider.class)
    @ParameterizedTest
    public void testRecordTimestampsWithDeleteHorizon(Args args) {
        MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(new ByteBufferOutputStream(ByteBuffer.allocate(1048576 * 2)), args.magic, args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, -1L, (short) -1, -1, false, false, -1, 0, 100L);
        memoryRecordsBuilder.append(50L, "0".getBytes(), "0".getBytes());
        memoryRecordsBuilder.append(100L, "1".getBytes(), (byte[]) null);
        memoryRecordsBuilder.append(150L, "2".getBytes(), "2".getBytes());
        List list = TestUtils.toList(memoryRecordsBuilder.build().batches());
        Assertions.assertEquals(OptionalLong.of(100L), ((MutableRecordBatch) list.get(0)).deleteHorizonMs());
        CloseableIterator streamingIterator = ((MutableRecordBatch) list.get(0)).streamingIterator(BufferSupplier.create());
        Assertions.assertEquals(50L, ((Record) streamingIterator.next()).timestamp());
        Assertions.assertEquals(100L, ((Record) streamingIterator.next()).timestamp());
        Assertions.assertEquals(150L, ((Record) streamingIterator.next()).timestamp());
        streamingIterator.close();
    }

    private void verifyRecordsProcessingStats(CompressionType compressionType, RecordConversionStats recordConversionStats, int i, int i2, long j, long j2) {
        Assertions.assertNotNull(recordConversionStats, "Records processing info is null");
        Assertions.assertEquals(i2, recordConversionStats.numRecordsConverted());
        Assertions.assertTrue(recordConversionStats.conversionTimeNanos() >= 0, "Processing time not recorded: " + recordConversionStats);
        long temporaryMemoryBytes = recordConversionStats.temporaryMemoryBytes();
        if (compressionType != CompressionType.NONE) {
            long j3 = (j - 12) - 14;
            Assertions.assertTrue(temporaryMemoryBytes > j3, String.format("Uncompressed size expected temp=%d, compressed=%d", Long.valueOf(temporaryMemoryBytes), Long.valueOf(j3)));
        } else if (i2 == 0) {
            Assertions.assertEquals(j, temporaryMemoryBytes);
        } else if (i2 == i) {
            Assertions.assertEquals(j2 + j, temporaryMemoryBytes);
        } else {
            Assertions.assertTrue(temporaryMemoryBytes > j && temporaryMemoryBytes < j + j2, String.format("Unexpected temp bytes %d final %d pre %d", Long.valueOf(temporaryMemoryBytes), Long.valueOf(j), Long.valueOf(j2)));
        }
    }

    private ByteBuffer allocateBuffer(int i, Args args) {
        ByteBuffer allocate = ByteBuffer.allocate(i);
        allocate.position(args.bufferOffset);
        return allocate;
    }
}
