package org.infinispan.stress;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.infinispan.Cache;
import org.infinispan.commons.io.ByteBufferFactoryImpl;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.test.CommonsTestingUtil;
import org.infinispan.commons.time.DefaultTimeService;
import org.infinispan.commons.time.TimeService;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.ProcessorInfo;
import org.infinispan.commons.util.Util;
import org.infinispan.commons.util.concurrent.BlockingRejectedExecutionHandler;
import org.infinispan.commons.util.concurrent.NonBlockingRejectedExecutionHandler;
import org.infinispan.configuration.cache.StoreConfiguration;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.impl.InternalEntryFactory;
import org.infinispan.container.impl.InternalEntryFactoryImpl;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.distribution.ch.impl.SingleSegmentKeyPartitioner;
import org.infinispan.executors.LimitedExecutorTest;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.impl.TestComponentAccessors;
import org.infinispan.factories.threads.NonBlockingThreadFactory;
import org.infinispan.marshall.TestObjectStreamMarshaller;
import org.infinispan.marshall.persistence.impl.MarshalledEntryFactoryImpl;
import org.infinispan.marshall.persistence.impl.MarshalledEntryUtil;
import org.infinispan.metadata.EmbeddedMetadata;
import org.infinispan.persistence.DummyInitializationContext;
import org.infinispan.persistence.PersistenceUtil;
import org.infinispan.persistence.async.AsyncNonBlockingStore;
import org.infinispan.persistence.dummy.DummyInMemoryStore;
import org.infinispan.persistence.dummy.DummyInMemoryStoreConfigurationBuilder;
import org.infinispan.persistence.file.SingleFileStore;
import org.infinispan.persistence.spi.MarshallableEntry;
import org.infinispan.persistence.spi.NonBlockingStore;
import org.infinispan.persistence.spi.PersistenceException;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.util.concurrent.BlockingManagerImpl;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.concurrent.locks.impl.LockContainer;
import org.infinispan.util.concurrent.locks.impl.PerKeyLockContainer;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.mockito.Mockito;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(testName = "stress.AsyncStoreStressTest", groups = {"stress"})
/* loaded from: input_file:org/infinispan/stress/AsyncStoreStressTest.class */
public class AsyncStoreStressTest extends AbstractInfinispanTest {
    static final int LOOP_FACTOR = 10;
    private volatile CountDownLatch latch;
    private List<String> keys = new ArrayList();
    private final InternalEntryFactory entryFactory = new InternalEntryFactoryImpl();
    private final Map<Object, InternalCacheEntry> expectedState = new ConcurrentHashMap();
    private TestObjectStreamMarshaller marshaller;
    private ExecutorService nonBlockingExecutor;
    private ExecutorService blockingExecutor;
    private TimeService timeService;
    protected String location;
    private LockContainer locks;
    static final Log log = LogFactory.getLog(AsyncStoreStressTest.class);
    static final int CAPACITY = Integer.getInteger("size", 100000).intValue();
    static final long RUNNING_TIME = (Integer.getInteger("time", 1).intValue() * 60) * 1000;
    static final Random RANDOM = new Random(12345);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/stress/AsyncStoreStressTest$OpStats.class */
    public static class OpStats {
        public final String opName;
        public final int threadCount;
        public final long opCount;
        public final long runningTime;
        public final long missCount;

        private OpStats(String str, long j, long j2, long j3) {
            this.opName = str;
            this.threadCount = 1;
            this.opCount = j;
            this.runningTime = j2;
            this.missCount = j3;
        }

        private OpStats(OpStats opStats, long j, long j2, long j3) {
            this.opName = opStats.opName;
            this.threadCount = opStats.threadCount + 1;
            this.opCount = opStats.opCount + j;
            this.runningTime = opStats.runningTime + j2;
            this.missCount = opStats.missCount + j3;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/stress/AsyncStoreStressTest$Operation.class */
    public static abstract class Operation<K, V> {
        protected final String name;

        public Operation(String str) {
            this.name = str;
        }

        public abstract boolean call(K k, long j);

        public String getName() {
            return this.name;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/stress/AsyncStoreStressTest$TotalStats.class */
    public static class TotalStats {
        private final ConcurrentHashMap<String, OpStats> statsMap = new ConcurrentHashMap<>();

        private TotalStats() {
        }

        public void addStats(String str, long j, long j2, long j3) {
            boolean z = this.statsMap.putIfAbsent(str, new OpStats(str, j, j2, j3)) == null;
            while (!z) {
                OpStats opStats = this.statsMap.get(str);
                z = this.statsMap.replace(str, opStats, new OpStats(opStats, j, j2, j3));
            }
        }

        public double getOpsPerSec(String str) {
            if (this.statsMap.get(str) == null) {
                return 0.0d;
            }
            return ((r0.opCount * 1000.0d) / r0.runningTime) * r0.threadCount;
        }

        public double getTotalOpsPerSec() {
            long j = 0;
            long j2 = 0;
            long j3 = 0;
            Iterator<Map.Entry<String, OpStats>> it = this.statsMap.entrySet().iterator();
            while (it.hasNext()) {
                OpStats value = it.next().getValue();
                j += value.opCount;
                j2 += value.runningTime;
                j3 += value.threadCount;
            }
            return ((j * 1000.0d) / j2) * j3;
        }

        public double getHitRatio(String str) {
            if (this.statsMap.get(str) == null) {
                return 0.0d;
            }
            return 1.0d - ((1.0d * r0.missCount) / r0.opCount);
        }

        public double getTotalHitRatio() {
            long j = 0;
            long j2 = 0;
            Iterator<Map.Entry<String, OpStats>> it = this.statsMap.entrySet().iterator();
            while (it.hasNext()) {
                OpStats value = it.next().getValue();
                j += value.opCount;
                j2 += value.missCount;
            }
            return 1.0d - ((1.0d * j2) / j);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/stress/AsyncStoreStressTest$WorkerThread.class */
    public class WorkerThread extends Thread {
        private final long runningTimeout;
        private final TotalStats perf;
        private final Operation<String, Integer> op;

        public WorkerThread(String str, long j, TotalStats totalStats, Operation<String, Integer> operation) {
            super(str);
            this.runningTimeout = j;
            this.perf = totalStats;
            this.op = operation;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            AsyncStoreStressTest.this.waitForStart();
            long currentTimeMillis = System.currentTimeMillis();
            long j = currentTimeMillis + this.runningTimeout;
            int nextInt = AsyncStoreStressTest.RANDOM.nextInt(AsyncStoreStressTest.this.keys.size());
            long j2 = 0;
            long j3 = 0;
            while (true) {
                if ((j2 & 16383) == 0 && System.currentTimeMillis() >= j) {
                    this.perf.addStats(this.op.getName(), j2, System.currentTimeMillis() - currentTimeMillis, j3);
                    return;
                }
                if (!this.op.call(AsyncStoreStressTest.this.keys.get(nextInt), j2)) {
                    j3++;
                }
                nextInt++;
                j2++;
                if (nextInt >= AsyncStoreStressTest.this.keys.size()) {
                    nextInt = 0;
                }
            }
        }
    }

    @BeforeClass(alwaysRun = true)
    void startMarshaller() {
        this.marshaller = new TestObjectStreamMarshaller();
        this.location = CommonsTestingUtil.tmpDirectory(getClass());
        this.nonBlockingExecutor = new ThreadPoolExecutor(0, ProcessorInfo.availableProcessors() * 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(KnownComponentNames.getDefaultQueueSize("org.infinispan.executors.non-blocking")), new NonBlockingThreadFactory(5, "%c-%n-p%f-t%t", LimitedExecutorTest.NAME, "non-blocking"), NonBlockingRejectedExecutionHandler.getInstance());
        this.blockingExecutor = new ThreadPoolExecutor(0, 150, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(KnownComponentNames.getDefaultQueueSize("org.infinispan.executors.blocking")), getTestThreadFactory("Blocking"), BlockingRejectedExecutionHandler.getInstance());
        PerKeyLockContainer perKeyLockContainer = new PerKeyLockContainer();
        TestingUtil.inject(perKeyLockContainer, new DefaultTimeService());
        this.locks = perKeyLockContainer;
        this.timeService = new DefaultTimeService();
        TestingUtil.inject(this.locks, this.timeService, this.nonBlockingExecutor);
    }

    @AfterClass(alwaysRun = true)
    void stopMarshaller() throws InterruptedException {
        this.marshaller.stop();
        Util.recursiveFileRemove(this.location);
        if (this.nonBlockingExecutor != null) {
            this.nonBlockingExecutor.shutdown();
            this.nonBlockingExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        }
        if (this.blockingExecutor != null) {
            this.blockingExecutor.shutdown();
            this.blockingExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        }
    }

    private AsyncNonBlockingStore<Object, Object> createDummyAsyncStore() {
        return createAsyncStore(new DummyInMemoryStore(), ((DummyInMemoryStoreConfigurationBuilder) TestCacheManagerFactory.getDefaultCacheConfiguration(false).persistence().addStore(DummyInMemoryStoreConfigurationBuilder.class).segmented(false)).storeName("async2").mo340create());
    }

    private AsyncNonBlockingStore<Object, Object> createFileAsyncStore() {
        return createAsyncStore(new SingleFileStore(), TestCacheManagerFactory.getDefaultCacheConfiguration(false).persistence().addSingleFileStore().location(this.location).segmented(false).create());
    }

    private AsyncNonBlockingStore<Object, Object> createAsyncStore(NonBlockingStore nonBlockingStore, StoreConfiguration storeConfiguration) throws PersistenceException {
        AsyncNonBlockingStore<Object, Object> asyncNonBlockingStore = new AsyncNonBlockingStore<>(nonBlockingStore);
        BlockingManagerImpl blockingManagerImpl = new BlockingManagerImpl();
        TestingUtil.inject(blockingManagerImpl, new TestComponentAccessors.NamedComponent("org.infinispan.executors.non-blocking", this.nonBlockingExecutor), new TestComponentAccessors.NamedComponent("org.infinispan.executors.blocking", this.blockingExecutor));
        TestingUtil.startComponent(blockingManagerImpl);
        Cache cache = (Cache) Mockito.mock(Cache.class, Mockito.RETURNS_DEEP_STUBS);
        Mockito.when((KeyPartitioner) ComponentRegistry.componentOf(cache, KeyPartitioner.class)).thenReturn(SingleSegmentKeyPartitioner.getInstance());
        CompletionStages.join(asyncNonBlockingStore.start(new DummyInitializationContext(storeConfiguration, cache, this.marshaller, new ByteBufferFactoryImpl(), new MarshalledEntryFactoryImpl(this.marshaller), this.nonBlockingExecutor, new GlobalConfigurationBuilder().globalState().persistentLocation(this.location).build(), blockingManagerImpl, null, new DefaultTimeService())));
        return asyncNonBlockingStore;
    }

    private Map<String, AsyncNonBlockingStore<Object, Object>> createAsyncStores() throws PersistenceException {
        TreeMap treeMap = new TreeMap();
        treeMap.put("Dummy-ASYNC", createDummyAsyncStore());
        treeMap.put("File-ASYNC", createFileAsyncStore());
        return treeMap;
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "readWriteRemove")
    public Object[][] independentReadWriteRemoveParams() {
        return new Object[]{new Object[]{Integer.valueOf(CAPACITY), Integer.valueOf(3 * CAPACITY), 9, 20, 1}, new Object[]{Integer.valueOf(CAPACITY), Integer.valueOf(3 * CAPACITY), 90, 1, 0}};
    }

    @Test(dataProvider = "readWriteRemove")
    public void testReadWriteRemove(int i, int i2, int i3, int i4, int i5) throws Exception {
        System.out.printf("Testing independent read/write/remove performance with capacity %d, keys %d, readers %d, writers %d, removers %d\n", Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3), Integer.valueOf(i4), Integer.valueOf(i5));
        generateKeyList(i2);
        Map<String, AsyncNonBlockingStore<Object, Object>> createAsyncStores = createAsyncStores();
        try {
            for (Map.Entry<String, AsyncNonBlockingStore<Object, Object>> entry : createAsyncStores.entrySet()) {
                mapTestReadWriteRemove(entry.getKey(), entry.getValue(), i2, i3, i4, i5);
            }
            AssertJUnit.assertTrue("Not all stores were properly shut down", createAsyncStores.isEmpty());
        } finally {
            Iterator<AsyncNonBlockingStore<Object, Object>> it = createAsyncStores.values().iterator();
            while (it.hasNext()) {
                try {
                    CompletionStages.join(it.next().stop());
                    it.remove();
                } catch (Exception e) {
                    log.error("Failed to stop cache store", e);
                }
            }
        }
    }

    private void mapTestReadWriteRemove(String str, AsyncNonBlockingStore<Object, Object> asyncNonBlockingStore, int i, int i2, int i3, int i4) throws Exception {
        NonBlockingStore delegate = asyncNonBlockingStore.delegate();
        try {
            System.out.printf("[store=%s] Warming up\n", str);
            runMapTestReadWriteRemove(str, asyncNonBlockingStore, i2, i3, i4, 1000L);
            System.out.printf("[store=%s] Testing...\n", str);
            TotalStats runMapTestReadWriteRemove = runMapTestReadWriteRemove(str, asyncNonBlockingStore, i2, i3, i4, RUNNING_TIME);
            System.out.printf("[store=%s] Verify contents\n", str);
            eventually((Supplier<String>) () -> {
                return "Store contains: " + String.valueOf(PersistenceUtil.toKeySet(delegate, IntSets.immutableSet(0), (Predicate) null)) + " but expected: " + String.valueOf(this.expectedState.keySet());
            }, () -> {
                return PersistenceUtil.toKeySet(delegate, IntSets.immutableSet(0), (Predicate) null).equals(this.expectedState.keySet());
            });
            System.out.printf("Container %-12s  ", str);
            System.out.printf("Ops/s %10.2f  ", Double.valueOf(runMapTestReadWriteRemove.getTotalOpsPerSec()));
            System.out.printf("Gets/s %10.2f  ", Double.valueOf(runMapTestReadWriteRemove.getOpsPerSec("GET")));
            System.out.printf("Puts/s %10.2f  ", Double.valueOf(runMapTestReadWriteRemove.getOpsPerSec("PUT")));
            System.out.printf("Removes/s %10.2f  ", Double.valueOf(runMapTestReadWriteRemove.getOpsPerSec("REMOVE")));
            System.out.printf("HitRatio %10.2f  ", Double.valueOf(runMapTestReadWriteRemove.getTotalHitRatio() * 100.0d));
            System.out.printf("Size %10d  ", CompletionStages.join(asyncNonBlockingStore.size(IntSets.immutableSet(0))), null);
            System.out.printf("StdDev %10.2f\n", Double.valueOf(computeStdDev(asyncNonBlockingStore, i)));
            this.expectedState.clear();
            CompletionStages.join(delegate.clear());
        } catch (Throwable th) {
            this.expectedState.clear();
            CompletionStages.join(delegate.clear());
            throw th;
        }
    }

    private TotalStats runMapTestReadWriteRemove(String str, NonBlockingStore<Object, Object> nonBlockingStore, int i, int i2, int i3, long j) throws Exception {
        this.latch = new CountDownLatch(1);
        TotalStats totalStats = new TotalStats();
        LinkedList linkedList = new LinkedList();
        for (int i4 = 0; i4 < i; i4++) {
            linkedList.add(new WorkerThread("worker-" + str + "-get-" + i4, j, totalStats, readOperation(nonBlockingStore)));
        }
        for (int i5 = 0; i5 < i2; i5++) {
            linkedList.add(new WorkerThread("worker-" + str + "-put-" + i5, j, totalStats, writeOperation(nonBlockingStore)));
        }
        for (int i6 = 0; i6 < i3; i6++) {
            linkedList.add(new WorkerThread("worker-" + str + "-remove-" + i6, j, totalStats, removeOperation(nonBlockingStore)));
        }
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            ((Thread) it.next()).start();
        }
        this.latch.countDown();
        Iterator it2 = linkedList.iterator();
        while (it2.hasNext()) {
            ((Thread) it2.next()).join();
        }
        return totalStats;
    }

    private void generateKeyList(int i) {
        this.keys = null;
        this.keys = new ArrayList(i * 10);
        for (int i2 = 0; i2 < i * 10; i2++) {
            this.keys.add("key" + nextIntGaussian(i));
        }
    }

    private int nextIntGaussian(int i) {
        double nextGaussian = RANDOM.nextGaussian();
        return (nextGaussian < -3.0d || nextGaussian > 3.0d) ? nextIntGaussian(i) : (int) Math.abs(((nextGaussian + 3.0d) * i) / 6.0d);
    }

    private void waitForStart() {
        try {
            this.latch.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private Operation<String, Integer> readOperation(final NonBlockingStore<Object, Object> nonBlockingStore) {
        return new Operation<String, Integer>("GET") { // from class: org.infinispan.stress.AsyncStoreStressTest.1
            @Override // org.infinispan.stress.AsyncStoreStressTest.Operation
            public boolean call(String str, long j) {
                MarshallableEntry marshallableEntry = (MarshallableEntry) CompletionStages.join(nonBlockingStore.load(0, str));
                if (AsyncStoreStressTest.log.isTraceEnabled()) {
                    AsyncStoreStressTest.log.tracef("Loaded key=%s, value=%s", str, marshallableEntry != null ? marshallableEntry.getValue() : "null");
                }
                return marshallableEntry != null;
            }
        };
    }

    private Operation<String, Integer> writeOperation(final NonBlockingStore<Object, Object> nonBlockingStore) {
        return new Operation<String, Integer>("PUT") { // from class: org.infinispan.stress.AsyncStoreStressTest.2
            @Override // org.infinispan.stress.AsyncStoreStressTest.Operation
            public boolean call(String str, long j) {
                int i = (int) j;
                InternalCacheEntry create = AsyncStoreStressTest.this.entryFactory.create(str, Integer.valueOf(i), new EmbeddedMetadata.Builder().build());
                AsyncStoreStressTest asyncStoreStressTest = AsyncStoreStressTest.this;
                NonBlockingStore nonBlockingStore2 = nonBlockingStore;
                return asyncStoreStressTest.withKeyLock(str, () -> {
                    CompletionStages.join(nonBlockingStore2.write(0, MarshalledEntryUtil.create(create, (Marshaller) AsyncStoreStressTest.this.marshaller)));
                    AsyncStoreStressTest.this.expectedState.put(str, create);
                    if (AsyncStoreStressTest.log.isTraceEnabled()) {
                        AsyncStoreStressTest.log.tracef("Expected state updated with key=%s, value=%s", str, Integer.valueOf(i));
                    }
                    return true;
                });
            }
        };
    }

    private Operation<String, Integer> removeOperation(final NonBlockingStore<Object, Object> nonBlockingStore) {
        return new Operation<String, Integer>("REMOVE") { // from class: org.infinispan.stress.AsyncStoreStressTest.3
            @Override // org.infinispan.stress.AsyncStoreStressTest.Operation
            public boolean call(String str, long j) {
                AsyncStoreStressTest asyncStoreStressTest = AsyncStoreStressTest.this;
                NonBlockingStore nonBlockingStore2 = nonBlockingStore;
                return asyncStoreStressTest.withKeyLock(str, () -> {
                    AssertJUnit.assertNull((Boolean) CompletionStages.join(nonBlockingStore2.delete(0, str)));
                    AsyncStoreStressTest.this.expectedState.remove(str);
                    if (AsyncStoreStressTest.log.isTraceEnabled()) {
                        AsyncStoreStressTest.log.tracef("Expected state removed key=%s", str);
                    }
                    return true;
                });
            }
        };
    }

    private boolean withKeyLock(String str, Callable<Boolean> callable) {
        boolean z = false;
        try {
            try {
                this.locks.acquire(str, Thread.currentThread(), 5L, TimeUnit.SECONDS).lock();
                z = callable.call().booleanValue();
                this.locks.release(str, Thread.currentThread());
            } catch (Exception e) {
                e.printStackTrace();
                this.locks.release(str, Thread.currentThread());
            }
            return z;
        } catch (Throwable th) {
            this.locks.release(str, Thread.currentThread());
            throw th;
        }
    }

    private double computeStdDev(NonBlockingStore nonBlockingStore, int i) throws PersistenceException {
        double d = 0.0d;
        Iterator it = PersistenceUtil.toKeySet(nonBlockingStore, IntSets.immutableSet(0), (Predicate) null).iterator();
        while (it.hasNext()) {
            double parseInt = Integer.parseInt(((String) it.next()).substring(3));
            d += (parseInt - (i / 2)) * (parseInt - (i / 2));
        }
        return Math.sqrt(d / r0.size());
    }

    @Test(enabled = false)
    public static void main(String[] strArr) throws Exception {
        AsyncStoreStressTest asyncStoreStressTest = new AsyncStoreStressTest();
        asyncStoreStressTest.testReadWriteRemove(100000, 300000, 90, 9, 1);
        asyncStoreStressTest.testReadWriteRemove(10000, 30000, 9, 1, 0);
        System.exit(0);
    }
}
