package org.infinispan.persistence.sifs;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.internal.subscriptions.AsyncSubscription;
import io.reactivex.rxjava3.processors.FlowableProcessor;
import io.reactivex.rxjava3.processors.UnicastProcessor;
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import java.nio.file.Paths;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.infinispan.commons.test.CommonsTestingUtil;
import org.infinispan.commons.test.Exceptions;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.Util;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.persistence.BaseNonBlockingStoreTest;
import org.infinispan.persistence.sifs.Compactor;
import org.infinispan.persistence.spi.MarshallableEntry;
import org.infinispan.persistence.spi.NonBlockingStore;
import org.infinispan.test.Mocks;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"unit"}, testName = "persistence.sifs.SoftIndexFileStoreTest")
/* loaded from: input_file:org/infinispan/persistence/sifs/SoftIndexFileStoreTest.class */
public class SoftIndexFileStoreTest extends BaseNonBlockingStoreTest {
    protected String tmpDirectory;

    @BeforeClass(alwaysRun = true)
    protected void setUpTempDir() {
        this.tmpDirectory = CommonsTestingUtil.tmpDirectory(getClass());
    }

    @AfterClass(alwaysRun = true)
    protected void clearTempDir() {
        Util.recursiveFileRemove(this.tmpDirectory);
    }

    @Override // org.infinispan.persistence.BaseNonBlockingStoreTest
    protected NonBlockingStore createStore() {
        return new NonBlockingSoftIndexFileStore();
    }

    @Override // org.infinispan.persistence.BaseNonBlockingStoreTest
    protected Configuration buildConfig(ConfigurationBuilder configurationBuilder) {
        configurationBuilder.clustering().hash().numSegments(2);
        return configurationBuilder.persistence().addSoftIndexFileStore().dataLocation(Paths.get(this.tmpDirectory, "data").toString()).indexLocation(Paths.get(this.tmpDirectory, "index").toString()).maxFileSize(1000).build();
    }

    public void testOverrideWithExpirableAndCompaction() {
        this.store.write(marshalledEntry(internalCacheEntry("key", "value1", -1L)));
        writeGibberish(-1L, true);
        this.store.write(marshalledEntry(internalCacheEntry("key", "value2", 1L)));
        this.timeService.advance(2L);
        writeGibberish(-1L, true);
        this.store.stop();
        startStore(this.store);
        MarshallableEntry<Object, Object> loadEntry = this.store.loadEntry("key");
        AssertJUnit.assertNull(loadEntry != null ? String.valueOf(loadEntry.getKey()) + "=" + String.valueOf(loadEntry.getValue()) : null, loadEntry);
    }

    private void writeGibberish(long j, boolean z) {
        for (int i = 0; i < 100; i++) {
            this.store.write(marshalledEntry(internalCacheEntry("foo" + i, "bar", j)));
            if (z) {
                this.store.delete("foo" + i);
            }
        }
    }

    public void testStopWithCompactorIndexNotComplete() throws InterruptedException, ExecutionException, TimeoutException {
        this.store.write(marshalledEntry(internalCacheEntry("never", "dies", -1L)));
        writeGibberish(10L, false);
        this.store.write(marshalledEntry(internalCacheEntry("foo0", "bar", -1L)));
        this.timeService.advance(10 + 1);
        Compactor compactor = (Compactor) TestingUtil.extractField(this.store.delegate(), "compactor");
        if (compactor.getFiles().isEmpty()) {
            AssertJUnit.fail("Compactor needs to have more than one file, had: " + String.valueOf(compactor.getFileStats()));
        }
        FlowableProcessor[] flowableProcessorArr = (FlowableProcessor[]) TestingUtil.extractField((Index) TestingUtil.extractField(compactor, "index"), "flowableProcessors");
        AssertJUnit.assertEquals(2, flowableProcessorArr.length);
        FlowableProcessor flowableProcessor = flowableProcessorArr[0];
        ArrayDeque arrayDeque = new ArrayDeque();
        UnicastProcessor create = UnicastProcessor.create();
        Flowable serialize = create.serialize();
        Objects.requireNonNull(arrayDeque);
        serialize.subscribe((v1) -> {
            r1.add(v1);
        });
        flowableProcessorArr[0] = create;
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Compactor.CompactionExpirationSubscriber compactionExpirationSubscriber = new Compactor.CompactionExpirationSubscriber() { // from class: org.infinispan.persistence.sifs.SoftIndexFileStoreTest.1
            public void onEntryPosition(EntryPosition entryPosition) {
            }

            public void onEntryEntryRecord(EntryRecord entryRecord) {
            }

            public void onComplete() {
                countDownLatch.countDown();
            }

            public void onError(Throwable th) {
            }
        };
        fork(() -> {
            compactor.performExpirationCompaction(compactionExpirationSubscriber);
        });
        AssertJUnit.assertFalse(countDownLatch.await(100L, TimeUnit.MILLISECONDS));
        flowableProcessorArr[0] = flowableProcessor;
        Objects.requireNonNull(flowableProcessor);
        arrayDeque.forEach((v1) -> {
            r1.onNext(v1);
        });
        AssertJUnit.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
        ((CompletionStage) fork(() -> {
            return this.store.stop();
        }).get(10L, TimeUnit.SECONDS)).toCompletableFuture().get(10L, TimeUnit.SECONDS);
        startStore(this.store);
    }

    public void testCompactLogFileNotInTemporaryTable() throws InterruptedException, TimeoutException, ExecutionException {
        Compactor compactor = (Compactor) TestingUtil.extractField(this.store.delegate(), "compactor");
        LogAppender logAppender = (LogAppender) TestingUtil.extractField(this.store.delegate(), "logAppender");
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.triggerForever(Mocks.AFTER_RELEASE);
        TemporaryTable temporaryTable = (TemporaryTable) Mocks.blockingFieldMock(checkPoint, TemporaryTable.class, logAppender, LogAppender.class, "temporaryTable", (stubber, temporaryTable2) -> {
            ((TemporaryTable) stubber.when(temporaryTable2)).set(Mockito.anyInt(), ArgumentMatchers.any(), Mockito.anyInt(), Mockito.anyInt());
        }, new Class[0]);
        Future<Void> fork = fork(() -> {
            this.store.write(marshalledEntry(internalCacheEntry("foo", "bar", -1L)));
        });
        checkPoint.awaitStrict(Mocks.BEFORE_INVOCATION, 10L, TimeUnit.SECONDS);
        Exceptions.expectException(TimeoutException.class, () -> {
            fork.get(10L, TimeUnit.MILLISECONDS);
        });
        TestingUtil.replaceField(temporaryTable, "temporaryTable", logAppender, (Class<?>) LogAppender.class);
        final TestSubscriber create = TestSubscriber.create();
        create.onSubscribe(new AsyncSubscription());
        compactor.performExpirationCompaction(new Compactor.CompactionExpirationSubscriber() { // from class: org.infinispan.persistence.sifs.SoftIndexFileStoreTest.2
            public void onEntryPosition(EntryPosition entryPosition) {
            }

            public void onEntryEntryRecord(EntryRecord entryRecord) {
            }

            public void onComplete() {
                create.onComplete();
            }

            public void onError(Throwable th) {
                create.onError(th);
            }
        });
        create.awaitDone(10L, TimeUnit.SECONDS).assertComplete().assertNoErrors();
        checkPoint.triggerForever(Mocks.BEFORE_RELEASE);
        fork.get(10L, TimeUnit.SECONDS);
    }

    public void testWriteDuringCompaction() throws Exception {
        Compactor compactor = (Compactor) TestingUtil.extractField(this.store.delegate(), "compactor");
        CheckPoint checkPoint = new CheckPoint();
        this.store.write(marshalledEntry(internalCacheEntry("foo", "bar", 10L)));
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final TestSubscriber create = TestSubscriber.create();
        create.onSubscribe(new AsyncSubscription());
        Compactor.CompactionExpirationSubscriber compactionExpirationSubscriber = new Compactor.CompactionExpirationSubscriber() { // from class: org.infinispan.persistence.sifs.SoftIndexFileStoreTest.3
            public void onEntryPosition(EntryPosition entryPosition) {
            }

            public void onEntryEntryRecord(EntryRecord entryRecord) {
                atomicInteger.incrementAndGet();
            }

            public void onComplete() {
                create.onComplete();
            }

            public void onError(Throwable th) {
                create.onError(th);
            }
        };
        this.timeService.advance(11L);
        Future<Void> fork = fork(() -> {
            compactor.performExpirationCompaction(compactionExpirationSubscriber);
        });
        checkPoint.awaitStrict(Mocks.BEFORE_INVOCATION, 10L, TimeUnit.SECONDS);
        create.assertNotComplete();
        this.store.write(marshalledEntry(internalCacheEntry("newer", "entry", 10L)));
        checkPoint.trigger(Mocks.BEFORE_RELEASE);
        checkPoint.awaitStrict(Mocks.AFTER_INVOCATION, 10L, TimeUnit.SECONDS);
        checkPoint.triggerForever(Mocks.AFTER_RELEASE);
        Objects.requireNonNull(fork);
        eventually(fork::isDone);
        fork.get(10L, TimeUnit.SECONDS);
        create.awaitDone(10L, TimeUnit.SECONDS).assertComplete().assertNoErrors();
        Assertions.assertThat(atomicInteger.get()).isEqualTo(1);
    }

    public void testRemoveSegmentsCleansUpProperly() throws ExecutionException, InterruptedException, TimeoutException {
        ConcurrentMap<Integer, Compactor.Stats> fileStats = ((Compactor) TestingUtil.extractField(this.store.delegate(), "compactor")).getFileStats();
        AssertJUnit.assertEquals(0, fileStats.size());
        TestingUtil.join(this.store.write(0, marshalledEntry(internalCacheEntry("foo", "bar", 10L))));
        TestingUtil.join(this.store.removeSegments(IntSets.immutableSet(0)));
        AssertJUnit.assertNull(TestingUtil.join(this.store.load(0, "foo")));
        verifyStatsHaveNoData(-77L, fileStats);
        TestingUtil.join(this.store.addSegments(IntSets.immutableSet(0)));
        AssertJUnit.assertNull(TestingUtil.join(this.store.load(0, "foo")));
        verifyStatsHaveNoData(-77L, fileStats);
        this.store.stopAndWait();
        startStore(this.store);
        AssertJUnit.assertNull(TestingUtil.join(this.store.load(0, "foo")));
        ConcurrentMap fileStats2 = ((Compactor) TestingUtil.extractField(this.store.delegate(), "compactor")).getFileStats();
        AssertJUnit.assertTrue("fileStats were: " + String.valueOf(fileStats2), fileStats2.isEmpty());
        AssertJUnit.assertEquals(0L, SoftIndexFileStoreTestUtils.dataDirectorySize(this.tmpDirectory, "mock-cache"));
    }

    private void verifyStatsHaveNoData(long j, ConcurrentMap<Integer, Compactor.Stats> concurrentMap) {
        long j2 = 0;
        Iterator<Compactor.Stats> it = concurrentMap.values().iterator();
        while (it.hasNext()) {
            j2 -= r0.getFree();
            if (it.next().getTotal() > 0) {
                j2 += r0.getTotal();
            }
        }
        AssertJUnit.assertEquals(j, j2);
    }

    public void testFileStatsWriteNotOwnedSegment() throws ExecutionException, InterruptedException, TimeoutException {
        ConcurrentMap<Integer, Compactor.Stats> fileStats = ((Compactor) TestingUtil.extractField(this.store.delegate(), "compactor")).getFileStats();
        AssertJUnit.assertEquals(0, fileStats.size());
        TestingUtil.join(this.store.write(0, marshalledEntry(internalCacheEntry("foo-0", "bar-0", 10L))));
        AssertJUnit.assertTrue(fileStats.isEmpty());
        TestingUtil.join(this.store.removeSegments(IntSets.immutableSet(1)));
        TestingUtil.join(this.store.write(1, marshalledEntry(internalCacheEntry("foo-1", "bar-1", 10L))));
        verifyStatsHaveNoData(-81L, fileStats);
    }

    public void testFileStatsAfterRemovingSegment() throws ExecutionException, InterruptedException, TimeoutException {
        ConcurrentMap<Integer, Compactor.Stats> fileStats = ((Compactor) TestingUtil.extractField(this.store.delegate(), "compactor")).getFileStats();
        AssertJUnit.assertEquals(0, fileStats.size());
        TestingUtil.join(this.store.write(1, marshalledEntry(internalCacheEntry("foo-1", "bar-1", 10L))));
        TestingUtil.join(this.store.removeSegments(IntSets.immutableSet(1)));
        verifyStatsHaveNoData(-81L, fileStats);
    }

    public void testFileStatsAfterRemovingWithRemovedEntry() throws ExecutionException, InterruptedException, TimeoutException {
        ConcurrentMap<Integer, Compactor.Stats> fileStats = ((Compactor) TestingUtil.extractField(this.store.delegate(), "compactor")).getFileStats();
        AssertJUnit.assertEquals(0, fileStats.size());
        TestingUtil.join(this.store.write(1, marshalledEntry(internalCacheEntry("foo-1", "bar-1", 10L))));
        TestingUtil.join(this.store.delete(1, "foo-1"));
        verifyStatsHaveNoData(-81L, fileStats);
        TestingUtil.join(this.store.removeSegments(IntSets.immutableSet(1)));
        verifyStatsHaveNoData(-123L, fileStats);
    }
}
