package com.apple.foundationdb.record.provider.foundationdb.runners.throttled;

import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.async.MoreAsyncUtil;
import com.apple.foundationdb.record.RecordCursor;
import com.apple.foundationdb.record.RecordCursorResult;
import com.apple.foundationdb.record.RecordCursorVisitor;
import com.apple.foundationdb.record.ScanProperties;
import com.apple.foundationdb.record.TestRecords1Proto;
import com.apple.foundationdb.record.provider.foundationdb.FDBDatabaseRunner;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreTestBase;
import com.apple.foundationdb.record.provider.foundationdb.runners.throttled.ThrottledRetryingIterator;
import com.apple.foundationdb.tuple.Tuple;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import org.antlr.runtime.debug.Profiler;
import org.apache.logging.log4j.message.StructuredDataId;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

/* loaded from: input_file:com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest.class */
class ThrottledIteratorTest extends FDBRecordStoreTestBase {

    /* loaded from: input_file:com/apple/foundationdb/record/provider/foundationdb/runners/throttled/ThrottledIteratorTest$SingleItemCursor.class */
    private class SingleItemCursor<T> implements RecordCursor<T> {
        private boolean done = false;
        private final CompletableFuture<RecordCursorResult<T>> future;
        private final Executor executor;

        public SingleItemCursor(Executor executor, CompletableFuture<RecordCursorResult<T>> completableFuture) {
            this.future = completableFuture;
            this.executor = executor;
        }

        @Override // com.apple.foundationdb.record.RecordCursor
        @Nonnull
        public CompletableFuture<RecordCursorResult<T>> onNext() {
            if (this.done) {
                return CompletableFuture.completedFuture(RecordCursorResult.exhausted());
            }
            this.done = true;
            return this.future;
        }

        @Override // com.apple.foundationdb.record.RecordCursor, java.lang.AutoCloseable
        public void close() {
            this.done = true;
        }

        @Override // com.apple.foundationdb.record.RecordCursor
        public boolean isClosed() {
            return this.done;
        }

        @Override // com.apple.foundationdb.record.RecordCursor
        @Nonnull
        public Executor getExecutor() {
            return this.executor;
        }

        @Override // com.apple.foundationdb.record.RecordCursor
        public boolean accept(@Nonnull RecordCursorVisitor recordCursorVisitor) {
            recordCursorVisitor.visitEnter(this);
            return recordCursorVisitor.visitLeave(this);
        }
    }

    ThrottledIteratorTest() {
    }

    @ParameterizedTest
    @CsvSource({"1000,100,0,0", "2000,100,180,0", "100,10,1,0", "105,10,1,0", "1000,100,100,0", "100,10,1,0", "1000,100,200,1000", "2000,100,210,100", "250,100,100,750", "250,50,100,1750", "1,50,100,1999", "1999,50,100,1", "10,10,1,90", "500,100,49,0", "500,100,50,0", "500,100,51,10"})
    void testThrottledIteratorGetDelay(long j, int i, int i2, long j2) {
        Assertions.assertThat(ThrottledRetryingIterator.throttlePerSecGetDelayMillis(j, i, i2)).isEqualTo(j2);
    }

    @Test
    void testIncreaseLimit() {
        Assertions.assertThat(ThrottledRetryingIterator.increaseLimit(0)).isEqualTo(0);
        Assertions.assertThat(ThrottledRetryingIterator.increaseLimit(100)).isEqualTo(125);
        Assertions.assertThat(ThrottledRetryingIterator.increaseLimit(1)).isEqualTo(5);
        Assertions.assertThat(ThrottledRetryingIterator.increaseLimit(3)).isEqualTo(7);
        Assertions.assertThat(ThrottledRetryingIterator.increaseLimit(10)).isEqualTo(14);
    }

    @Test
    void testDecreaseLimit() {
        Assertions.assertThat(ThrottledRetryingIterator.decreaseLimit(0)).isEqualTo(1);
        Assertions.assertThat(ThrottledRetryingIterator.decreaseLimit(1)).isEqualTo(1);
        Assertions.assertThat(ThrottledRetryingIterator.decreaseLimit(2)).isEqualTo(1);
        Assertions.assertThat(ThrottledRetryingIterator.decreaseLimit(3)).isEqualTo(2);
        Assertions.assertThat(ThrottledRetryingIterator.decreaseLimit(100)).isEqualTo(90);
    }

    @Test
    void cannotRunWhenClosed() throws Exception {
        ItemHandler<Integer> itemHandler = (fDBRecordStore, recordCursorResult, quotaManager) -> {
            return AsyncUtil.DONE;
        };
        FDBRecordContext openContext = openContext();
        try {
            openSimpleRecordStore(openContext);
            FDBRecordStore.Builder asBuilder = this.recordStore.asBuilder();
            commit(openContext);
            if (openContext != null) {
                openContext.close();
            }
            ThrottledRetryingIterator<Integer> build = iteratorBuilder(10, itemHandler, null, null, -1, -1, -1, -1, null).build();
            try {
                build.iterateAll(asBuilder).join();
                if (build != null) {
                    build.close();
                }
                Assertions.assertThatThrownBy(() -> {
                    build.iterateAll(this.recordStore.asBuilder()).join();
                }).hasCauseInstanceOf(FDBDatabaseRunner.RunnerClosed.class);
            } catch (Throwable th) {
                if (build != null) {
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (openContext != null) {
                try {
                    openContext.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @CsvSource({StructuredDataId.RESERVED, "0", "1", Profiler.Version, "100"})
    @ParameterizedTest
    void testThrottleIteratorSuccessDeleteLimit(int i) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        AtomicInteger atomicInteger3 = new AtomicInteger(0);
        AtomicInteger atomicInteger4 = new AtomicInteger(-1);
        ItemHandler<Integer> itemHandler = (fDBRecordStore, recordCursorResult, quotaManager) -> {
            quotaManager.deleteCountAdd(1);
            return AsyncUtil.DONE;
        };
        Consumer<ThrottledRetryingIterator.QuotaManager> consumer = quotaManager2 -> {
            atomicInteger3.incrementAndGet();
            atomicInteger.addAndGet(quotaManager2.getScannedCount());
            atomicInteger2.addAndGet(quotaManager2.getDeletesCount());
        };
        FDBRecordContext openContext = openContext();
        try {
            openSimpleRecordStore(openContext);
            FDBRecordStore.Builder asBuilder = this.recordStore.asBuilder();
            commit(openContext);
            if (openContext != null) {
                openContext.close();
            }
            ThrottledRetryingIterator<Integer> build = iteratorBuilder(42, itemHandler, null, consumer, -1, i, -1, -1, atomicInteger4).build();
            try {
                build.iterateAll(asBuilder).join();
                if (build != null) {
                    build.close();
                }
                Assertions.assertThat(atomicInteger.get()).isEqualTo(42);
                Assertions.assertThat(atomicInteger2.get()).isEqualTo(42);
                Assertions.assertThat(atomicInteger4.get()).isZero();
                if (i <= 0) {
                    Assertions.assertThat(atomicInteger3.get()).isOne();
                } else {
                    Assertions.assertThat(atomicInteger3.get()).isEqualTo((42 / i) + 1);
                }
            } catch (Throwable th) {
                if (build != null) {
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (openContext != null) {
                try {
                    openContext.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @CsvSource({StructuredDataId.RESERVED, "0", "50", "100"})
    @ParameterizedTest
    void testThrottleIteratorSuccessSecondsLimit(int i) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        AtomicInteger atomicInteger3 = new AtomicInteger(0);
        AtomicInteger atomicInteger4 = new AtomicInteger(0);
        AtomicInteger atomicInteger5 = new AtomicInteger(-1);
        ItemHandler<Integer> itemHandler = (fDBRecordStore, recordCursorResult, quotaManager) -> {
            quotaManager.deleteCountAdd(1);
            atomicInteger2.addAndGet(1);
            if (atomicInteger2.get() == 1) {
                throw new RuntimeException("Blah");
            }
            return AsyncUtil.DONE;
        };
        Consumer<ThrottledRetryingIterator.QuotaManager> consumer = quotaManager2 -> {
            atomicInteger4.incrementAndGet();
            atomicInteger.addAndGet(quotaManager2.getScannedCount());
            atomicInteger3.addAndGet(quotaManager2.getDeletesCount());
        };
        FDBRecordContext openContext = openContext();
        try {
            openSimpleRecordStore(openContext);
            commit(openContext);
            if (openContext != null) {
                openContext.close();
            }
            long currentTimeMillis = System.currentTimeMillis();
            openContext = openContext();
            try {
                openSimpleRecordStore(openContext);
                ThrottledRetryingIterator<Integer> build = iteratorBuilder(50, itemHandler, null, consumer, i, -1, -1, -1, atomicInteger5).build();
                try {
                    build.iterateAll(this.recordStore.asBuilder()).join();
                    if (build != null) {
                        build.close();
                    }
                    commit(openContext);
                    if (openContext != null) {
                        openContext.close();
                    }
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    Assertions.assertThat(atomicInteger.get()).isEqualTo(50);
                    Assertions.assertThat(atomicInteger3.get()).isEqualTo(50);
                    if (i > 0) {
                        Assertions.assertThat(currentTimeMillis2).isGreaterThan(TimeUnit.SECONDS.toMillis(50 / i));
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    void testThrottleIteratorTransactionTimeLimit() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ItemHandler<Integer> itemHandler = (fDBRecordStore, recordCursorResult, quotaManager) -> {
            return MoreAsyncUtil.delayedFuture(100L, TimeUnit.MILLISECONDS);
        };
        Consumer<ThrottledRetryingIterator.QuotaManager> consumer = quotaManager2 -> {
            atomicInteger.incrementAndGet();
        };
        long currentTimeMillis = System.currentTimeMillis();
        FDBRecordContext openContext = openContext();
        try {
            openSimpleRecordStore(openContext);
            FDBRecordStore.Builder asBuilder = this.recordStore.asBuilder();
            commit(openContext);
            if (openContext != null) {
                openContext.close();
            }
            ThrottledRetryingIterator<Integer> build = iteratorBuilder(55, itemHandler, consumer, null, -1, -1, -1, 500, null).build();
            try {
                build.iterateAll(asBuilder).join();
                if (build != null) {
                    build.close();
                }
                Assertions.assertThat(System.currentTimeMillis() - currentTimeMillis).isGreaterThan(5500L);
                Assertions.assertThat(atomicInteger.get()).isGreaterThanOrEqualTo(10);
            } catch (Throwable th) {
                if (build != null) {
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (openContext != null) {
                try {
                    openContext.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @CsvSource({StructuredDataId.RESERVED, "0", "1", Profiler.Version, "100"})
    @ParameterizedTest
    void testThrottleIteratorFailuresDeleteLimit(int i) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        AtomicInteger atomicInteger3 = new AtomicInteger(0);
        AtomicInteger atomicInteger4 = new AtomicInteger(0);
        AtomicInteger atomicInteger5 = new AtomicInteger(0);
        AtomicInteger atomicInteger6 = new AtomicInteger(0);
        AtomicInteger atomicInteger7 = new AtomicInteger(-1);
        ItemHandler<Integer> itemHandler = (fDBRecordStore, recordCursorResult, quotaManager) -> {
            return CompletableFuture.supplyAsync(() -> {
                int intValue;
                if (atomicInteger3.get() >= 5 || (intValue = ((Integer) recordCursorResult.get()).intValue()) <= 2 || intValue < atomicInteger6.get() + 2) {
                    quotaManager.deleteCountAdd(1);
                    return null;
                }
                atomicInteger3.incrementAndGet();
                atomicInteger6.set(intValue);
                throw new RuntimeException("intentionally failed while testing item " + String.valueOf(recordCursorResult.get()));
            });
        };
        Consumer<ThrottledRetryingIterator.QuotaManager> consumer = quotaManager2 -> {
            atomicInteger4.incrementAndGet();
        };
        Consumer<ThrottledRetryingIterator.QuotaManager> consumer2 = quotaManager3 -> {
            atomicInteger5.incrementAndGet();
            atomicInteger.addAndGet(quotaManager3.getScannedCount());
            atomicInteger2.addAndGet(quotaManager3.getDeletesCount());
        };
        FDBRecordContext openContext = openContext();
        try {
            openSimpleRecordStore(openContext);
            FDBRecordStore.Builder asBuilder = this.recordStore.asBuilder();
            commit(openContext);
            if (openContext != null) {
                openContext.close();
            }
            ThrottledRetryingIterator<Integer> build = iteratorBuilder(43, itemHandler, consumer, consumer2, -1, i, -1, -1, atomicInteger7).build();
            try {
                build.iterateAll(asBuilder).join();
                if (build != null) {
                    build.close();
                }
                Assertions.assertThat(atomicInteger.get()).isEqualTo(43);
                Assertions.assertThat(atomicInteger2.get()).isEqualTo(43);
                Assertions.assertThat(atomicInteger3.get()).isEqualTo(5);
                Assertions.assertThat(atomicInteger4.get()).isEqualTo(atomicInteger5.get() + atomicInteger3.get());
                Assertions.assertThat(atomicInteger7.get()).isLessThanOrEqualTo(3);
            } catch (Throwable th) {
                if (build != null) {
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (openContext != null) {
                try {
                    openContext.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @CsvSource({StructuredDataId.RESERVED, "0", "25", "50", "100"})
    @ParameterizedTest
    void testThrottleIteratorWithFailuresSecondsLimit(int i) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        AtomicInteger atomicInteger3 = new AtomicInteger(0);
        AtomicInteger atomicInteger4 = new AtomicInteger(0);
        AtomicInteger atomicInteger5 = new AtomicInteger(0);
        ItemHandler<Integer> itemHandler = (fDBRecordStore, recordCursorResult, quotaManager) -> {
            return CompletableFuture.supplyAsync(() -> {
                int intValue;
                if (atomicInteger2.get() >= 5 || (intValue = ((Integer) recordCursorResult.get()).intValue()) <= 2 || intValue < atomicInteger5.get() + 2) {
                    return null;
                }
                atomicInteger2.incrementAndGet();
                atomicInteger5.set(intValue);
                throw new RuntimeException("intentionally failed while testing item " + String.valueOf(recordCursorResult.get()));
            });
        };
        Consumer<ThrottledRetryingIterator.QuotaManager> consumer = quotaManager2 -> {
            atomicInteger3.incrementAndGet();
        };
        Consumer<ThrottledRetryingIterator.QuotaManager> consumer2 = quotaManager3 -> {
            atomicInteger4.incrementAndGet();
            atomicInteger.addAndGet(quotaManager3.getScannedCount());
        };
        long currentTimeMillis = System.currentTimeMillis();
        FDBRecordContext openContext = openContext();
        try {
            openSimpleRecordStore(openContext);
            FDBRecordStore.Builder asBuilder = this.recordStore.asBuilder();
            commit(openContext);
            if (openContext != null) {
                openContext.close();
            }
            ThrottledRetryingIterator<Integer> build = iteratorBuilder(43, itemHandler, consumer, consumer2, i, -1, -1, -1, null).build();
            try {
                build.iterateAll(asBuilder).join();
                if (build != null) {
                    build.close();
                }
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                if (i > 0) {
                    Assertions.assertThat(currentTimeMillis2).isGreaterThan(TimeUnit.SECONDS.toMillis(43 / i));
                }
                Assertions.assertThat(atomicInteger.get()).isEqualTo(43);
                Assertions.assertThat(atomicInteger2.get()).isEqualTo(5);
                Assertions.assertThat(atomicInteger3.get()).isEqualTo(atomicInteger4.get() + atomicInteger2.get());
            } catch (Throwable th) {
                if (build != null) {
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (openContext != null) {
                try {
                    openContext.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @CsvSource({StructuredDataId.RESERVED, "0", "1", "10"})
    @ParameterizedTest
    void testConstantFailures(int i) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        ItemHandler<Integer> itemHandler = (fDBRecordStore, recordCursorResult, quotaManager) -> {
            return futureFailure();
        };
        Consumer<ThrottledRetryingIterator.QuotaManager> consumer = quotaManager2 -> {
            atomicInteger.incrementAndGet();
        };
        Consumer<ThrottledRetryingIterator.QuotaManager> consumer2 = quotaManager3 -> {
            atomicBoolean.set(true);
        };
        FDBRecordContext openContext = openContext();
        try {
            openSimpleRecordStore(openContext);
            FDBRecordStore.Builder asBuilder = this.recordStore.asBuilder();
            commit(openContext);
            if (openContext != null) {
                openContext.close();
            }
            ThrottledRetryingIterator<Integer> build = iteratorBuilder(500, itemHandler, consumer, consumer2, -1, -1, i, -1, null).build();
            try {
                Assertions.assertThat(Assertions.catchThrowableOfType(RuntimeException.class, () -> {
                    build.iterateAll(asBuilder).join();
                }).getMessage()).contains(new CharSequence[]{"intentionally failed while testing"});
                if (build != null) {
                    build.close();
                }
                if (i == -1) {
                    Assertions.assertThat(atomicInteger.get()).isEqualTo(101);
                } else {
                    Assertions.assertThat(atomicInteger.get()).isEqualTo(i + 1);
                }
                Assertions.assertThat(atomicBoolean.get()).isFalse();
            } catch (Throwable th) {
                if (build != null) {
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (openContext != null) {
                try {
                    openContext.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    void testLimitHandlingOnFailure() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        ItemHandler<Integer> itemHandler = (fDBRecordStore, recordCursorResult, quotaManager) -> {
            int i = atomicInteger.get();
            int scannedCount = quotaManager.getScannedCount();
            switch (atomicInteger2.get()) {
                case 0:
                    Assertions.assertThat(i).isEqualTo(0);
                    if (scannedCount != 100) {
                        return AsyncUtil.DONE;
                    }
                    atomicInteger2.incrementAndGet();
                    return futureFailure();
                case 1:
                    Assertions.assertThat(i).isEqualTo(90);
                    if (scannedCount != 50) {
                        return AsyncUtil.DONE;
                    }
                    atomicInteger2.incrementAndGet();
                    return futureFailure();
                case 2:
                    Assertions.assertThat(i).isEqualTo(45);
                    break;
                default:
                    Assertions.assertThat(atomicInteger2.get()).isLessThanOrEqualTo(100);
                    break;
            }
            atomicInteger2.incrementAndGet();
            return futureFailure();
        };
        FDBRecordContext openContext = openContext();
        try {
            openSimpleRecordStore(openContext);
            FDBRecordStore.Builder asBuilder = this.recordStore.asBuilder();
            commit(openContext);
            if (openContext != null) {
                openContext.close();
            }
            ThrottledRetryingIterator<Integer> build = iteratorBuilder(999, itemHandler, null, null, -1, -1, -1, -1, atomicInteger).build();
            try {
                Assertions.assertThat(Assertions.catchThrowableOfType(RuntimeException.class, () -> {
                    build.iterateAll(asBuilder).join();
                }).getMessage()).contains(new CharSequence[]{"intentionally failed while testing"});
                if (build != null) {
                    build.close();
                }
                Assertions.assertThat(atomicInteger.get()).isOne();
            } catch (Throwable th) {
                if (build != null) {
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (openContext != null) {
                try {
                    openContext.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    void testLimitHandlingOnSuccess() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        ItemHandler<Integer> itemHandler = (fDBRecordStore, recordCursorResult, quotaManager) -> {
            int i = atomicInteger.get();
            int incrementAndGet = atomicInteger2.incrementAndGet();
            if (incrementAndGet == 1) {
                throw new RuntimeException("Blah");
            }
            if (incrementAndGet <= 41) {
                Assertions.assertThat(i).isEqualTo(1);
            } else if (incrementAndGet <= 241) {
                Assertions.assertThat(i).isEqualTo(5);
            } else if (incrementAndGet <= 601) {
                Assertions.assertThat(i).isEqualTo(9);
            } else {
                quotaManager.markExhausted();
            }
            return AsyncUtil.DONE;
        };
        FDBRecordContext openContext = openContext();
        try {
            openSimpleRecordStore(openContext);
            FDBRecordStore.Builder asBuilder = this.recordStore.asBuilder();
            commit(openContext);
            if (openContext != null) {
                openContext.close();
            }
            ThrottledRetryingIterator<Integer> build = iteratorBuilder(2000, itemHandler, null, null, -1, -1, -1, -1, atomicInteger).build();
            try {
                build.iterateAll(asBuilder).join();
                if (build != null) {
                    build.close();
                }
            } catch (Throwable th) {
                if (build != null) {
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (openContext != null) {
                try {
                    openContext.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @CsvSource({"0", "1", "20", "50"})
    @ParameterizedTest
    void testEarlyReturn(int i) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ItemHandler<Integer> itemHandler = (fDBRecordStore, recordCursorResult, quotaManager) -> {
            if (((Integer) recordCursorResult.get()).intValue() == i) {
                quotaManager.markExhausted();
            }
            return AsyncUtil.DONE;
        };
        Consumer<ThrottledRetryingIterator.QuotaManager> consumer = quotaManager2 -> {
            atomicInteger.addAndGet(quotaManager2.getScannedCount());
        };
        FDBRecordContext openContext = openContext();
        try {
            openSimpleRecordStore(openContext);
            FDBRecordStore.Builder asBuilder = this.recordStore.asBuilder();
            commit(openContext);
            if (openContext != null) {
                openContext.close();
            }
            ThrottledRetryingIterator<Integer> build = iteratorBuilder(50, itemHandler, null, consumer, -1, -1, -1, -1, null).build();
            try {
                build.iterateAll(asBuilder).join();
                if (build != null) {
                    build.close();
                }
                Assertions.assertThat(atomicInteger.get()).isEqualTo(Math.min(50, i + 1));
            } catch (Throwable th) {
                if (build != null) {
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (openContext != null) {
                try {
                    openContext.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    void testWithRealRecords() throws Exception {
        ArrayList arrayList = new ArrayList(50);
        CursorFactory cursorFactory = (fDBRecordStore, recordCursorResult, i) -> {
            return fDBRecordStore.scanRecordKeys(recordCursorResult == null ? null : recordCursorResult.getContinuation().toBytes(), ScanProperties.FORWARD_SCAN.with(executeProperties -> {
                return executeProperties.setReturnedRowLimit(i);
            }));
        };
        ItemHandler itemHandler = (fDBRecordStore2, recordCursorResult2, quotaManager) -> {
            return fDBRecordStore2.loadRecordAsync((Tuple) recordCursorResult2.get()).thenApply(fDBStoredRecord -> {
                TestRecords1Proto.MySimpleRecord.Builder newBuilder = TestRecords1Proto.MySimpleRecord.newBuilder();
                newBuilder.mergeFrom(fDBStoredRecord.getRecord());
                arrayList.add(Integer.valueOf((int) newBuilder.getRecNo()));
                return null;
            });
        };
        FDBRecordContext openContext = openContext();
        try {
            openSimpleRecordStore(openContext);
            for (int i2 = 0; i2 < 50; i2++) {
                this.recordStore.saveRecord(TestRecords1Proto.MySimpleRecord.newBuilder().setRecNo(i2).setStrValueIndexed("Some text").setNumValue3Indexed(1415 + (i2 * 7)).build());
            }
            commit(openContext);
            if (openContext != null) {
                openContext.close();
            }
            openContext = openContext();
            try {
                openSimpleRecordStore(openContext);
                ThrottledRetryingIterator build = ThrottledRetryingIterator.builder(this.fdb, cursorFactory, itemHandler).withNumOfRetries(2).build();
                try {
                    build.iterateAll(this.recordStore.asBuilder()).join();
                    if (build != null) {
                        build.close();
                    }
                    if (openContext != null) {
                        openContext.close();
                    }
                    Assertions.assertThat(arrayList).isEqualTo(IntStream.range(0, 50).boxed().collect(Collectors.toList()));
                    arrayList.clear();
                    ThrottledRetryingIterator build2 = ThrottledRetryingIterator.builder(this.fdb, cursorFactory, itemHandler).withNumOfRetries(2).build();
                    try {
                        FDBRecordContext openContext2 = openContext();
                        try {
                            openSimpleRecordStore(openContext2);
                            CompletableFuture<Void> iterateAll = build2.iterateAll(this.recordStore.asBuilder());
                            if (openContext2 != null) {
                                openContext2.close();
                            }
                            iterateAll.join();
                            if (build2 != null) {
                                build2.close();
                            }
                            Assertions.assertThat(arrayList).isEqualTo(IntStream.range(0, 50).boxed().collect(Collectors.toList()));
                        } catch (Throwable th) {
                            if (openContext2 != null) {
                                try {
                                    openContext2.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    } catch (Throwable th3) {
                        if (build2 != null) {
                            try {
                                build2.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        }
                        throw th3;
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    void testLateCompleteFutures() throws Exception {
        ArrayList arrayList = new ArrayList(50);
        Semaphore semaphore = new Semaphore(0);
        ThrottledRetryingIterator<Integer> build = iteratorBuilder(50, (fDBRecordStore, recordCursorResult, quotaManager) -> {
            CompletableFuture completableFuture = ((Integer) recordCursorResult.get()).intValue() == 0 ? new CompletableFuture() : CompletableFuture.completedFuture(null);
            arrayList.add(completableFuture);
            semaphore.release();
            return completableFuture;
        }, null, null, -1, -1, -1, -1, null).build();
        try {
            FDBRecordContext openContext = openContext();
            try {
                openSimpleRecordStore(openContext);
                CompletableFuture<Void> iterateAll = build.iterateAll(this.recordStore.asBuilder());
                commit(openContext);
                if (openContext != null) {
                    openContext.close();
                }
                semaphore.acquire();
                Assertions.assertThat(arrayList).hasSize(1);
                ((CompletableFuture) arrayList.get(0)).complete(null);
                iterateAll.join();
                if (build != null) {
                    build.close();
                }
                Assertions.assertThat(arrayList).hasSize(50);
            } finally {
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testIteratorClosesIncompleteFutures() throws Exception {
        int i = 50;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ArrayList arrayList = new ArrayList(50);
        AtomicReference atomicReference = new AtomicReference();
        Semaphore semaphore = new Semaphore(0);
        CursorFactory cursorFactory = (fDBRecordStore, recordCursorResult, i2) -> {
            atomicReference.set(RecordCursor.fromList((List) IntStream.range(0, i).boxed().collect(Collectors.toList()), (byte[]) null));
            return (RecordCursor) atomicReference.get();
        };
        ItemHandler itemHandler = (fDBRecordStore2, recordCursorResult2, quotaManager) -> {
            CompletableFuture completableFuture = ((Integer) recordCursorResult2.get()).intValue() == 0 ? new CompletableFuture() : CompletableFuture.completedFuture(null);
            arrayList.add(completableFuture);
            semaphore.release();
            return completableFuture;
        };
        Consumer<ThrottledRetryingIterator.QuotaManager> consumer = quotaManager2 -> {
            atomicInteger.incrementAndGet();
        };
        FDBRecordContext openContext = openContext();
        try {
            openSimpleRecordStore(openContext);
            FDBRecordStore.Builder asBuilder = this.recordStore.asBuilder();
            commit(openContext);
            if (openContext != null) {
                openContext.close();
            }
            ThrottledRetryingIterator build = ThrottledRetryingIterator.builder(this.fdb, cursorFactory, itemHandler).withTransactionInitNotification(consumer).build();
            try {
                CompletableFuture<Void> iterateAll = build.iterateAll(asBuilder);
                semaphore.acquire();
                if (build != null) {
                    build.close();
                }
                Assertions.assertThat(arrayList).hasSize(1);
                Assertions.assertThat(((CompletableFuture) arrayList.get(0)).isCompletedExceptionally()).isTrue();
                Assertions.assertThatThrownBy(() -> {
                    ((CompletableFuture) arrayList.get(0)).get();
                }).hasCauseInstanceOf(FDBDatabaseRunner.RunnerClosed.class);
                Objects.requireNonNull(iterateAll);
                Assertions.assertThatThrownBy(iterateAll::join).hasCauseInstanceOf(FDBDatabaseRunner.RunnerClosed.class);
                Assertions.assertThat(atomicInteger.get()).isOne();
                Assertions.assertThat(((RecordCursor) atomicReference.get()).isClosed()).isTrue();
            } catch (Throwable th) {
                if (build != null) {
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (openContext != null) {
                try {
                    openContext.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    void testIteratorClosesOnNextCloses() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        AtomicReference atomicReference = new AtomicReference();
        Semaphore semaphore = new Semaphore(0);
        CursorFactory cursorFactory = (fDBRecordStore, recordCursorResult, i) -> {
            atomicReference.set(new SingleItemCursor(fDBRecordStore.getExecutor(), completableFuture));
            semaphore.release();
            return (RecordCursor) atomicReference.get();
        };
        ItemHandler itemHandler = (fDBRecordStore2, recordCursorResult2, quotaManager) -> {
            return CompletableFuture.failedFuture(new RuntimeException("Should not be called"));
        };
        FDBRecordContext openContext = openContext();
        try {
            openSimpleRecordStore(openContext);
            FDBRecordStore.Builder asBuilder = this.recordStore.asBuilder();
            commit(openContext);
            if (openContext != null) {
                openContext.close();
            }
            ThrottledRetryingIterator build = ThrottledRetryingIterator.builder(this.fdb, cursorFactory, itemHandler).build();
            try {
                CompletableFuture<Void> iterateAll = build.iterateAll(asBuilder);
                semaphore.acquire();
                if (build != null) {
                    build.close();
                }
                Assertions.assertThat(completableFuture).isCompletedExceptionally();
                Assertions.assertThatThrownBy(() -> {
                    completableFuture.get();
                }).hasCauseInstanceOf(FDBDatabaseRunner.RunnerClosed.class);
                Objects.requireNonNull(iterateAll);
                Assertions.assertThatThrownBy(iterateAll::join).hasCauseInstanceOf(FDBDatabaseRunner.RunnerClosed.class);
                Assertions.assertThat(((RecordCursor) atomicReference.get()).isClosed()).isTrue();
            } catch (Throwable th) {
                if (build != null) {
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (openContext != null) {
                try {
                    openContext.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private ThrottledRetryingIterator.Builder<Integer> iteratorBuilder(int i, ItemHandler<Integer> itemHandler, Consumer<ThrottledRetryingIterator.QuotaManager> consumer, Consumer<ThrottledRetryingIterator.QuotaManager> consumer2, int i2, int i3, int i4, int i5, AtomicInteger atomicInteger) {
        ThrottledRetryingIterator.Builder<Integer> builder = ThrottledRetryingIterator.builder(this.fdb, intCursor(i, atomicInteger), itemHandler);
        if (consumer2 != null) {
            builder.withTransactionSuccessNotification(consumer2);
        }
        if (consumer != null) {
            builder.withTransactionInitNotification(consumer);
        }
        if (i2 != -1) {
            builder.withMaxRecordsScannedPerSec(i2);
        }
        if (i3 != -1) {
            builder.withMaxRecordsDeletesPerTransaction(i3);
        }
        if (i4 != -1) {
            builder.withNumOfRetries(i4);
        }
        if (i5 != -1) {
            builder.withTransactionTimeQuotaMillis(i5);
        }
        return builder;
    }

    private CursorFactory<Integer> intCursor(int i, AtomicInteger atomicInteger) {
        return listCursor((List) IntStream.range(0, i).boxed().collect(Collectors.toList()), atomicInteger);
    }

    private <T> CursorFactory<T> listCursor(List<T> list, AtomicInteger atomicInteger) {
        return (fDBRecordStore, recordCursorResult, i) -> {
            if (atomicInteger != null) {
                atomicInteger.set(i);
            }
            return RecordCursor.fromList(list, recordCursorResult == null ? null : recordCursorResult.getContinuation().toBytes()).limitRowsTo(i);
        };
    }

    private CompletableFuture<Void> futureFailure() {
        return CompletableFuture.failedFuture(new RuntimeException("intentionally failed while testing"));
    }
}
