package org.infinispan.stream.stress;

import java.lang.invoke.SerializedLambda;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.infinispan.Cache;
import org.infinispan.commands.StressTest;
import org.infinispan.commons.executors.BlockingThreadPoolExecutorFactory;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.container.entries.ImmortalCacheEntry;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.partitionhandling.impl.PreferAvailabilityStrategyTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.InCacheMode;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.test.fwk.TransportFlags;
import org.infinispan.util.function.SerializablePredicate;
import org.testng.Assert;
import org.testng.annotations.Test;

@InCacheMode({CacheMode.DIST_SYNC, CacheMode.REPL_SYNC})
@Test(groups = {"stress"}, testName = "stream.stress.DistributedStreamRehashStressTest", timeOut = 900000)
/* loaded from: input_file:org/infinispan/stream/stress/DistributedStreamRehashStressTest.class */
public class DistributedStreamRehashStressTest extends StressTest {
    protected final String CACHE_NAME = "testCache";
    protected static final int CACHE_COUNT = 5;
    protected static final int THREAD_MULTIPLIER = 5;
    protected static final long CACHE_ENTRY_COUNT = 250000;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/infinispan/stream/stress/DistributedStreamRehashStressTest$PerformOperation.class */
    public interface PerformOperation {
        void perform(Cache<Integer, Integer> cache, Map<Integer, Integer> map, int i);
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        this.builderUsed = new ConfigurationBuilder();
        this.builderUsed.clustering().cacheMode(this.cacheMode);
        this.builderUsed.clustering().hash().numOwners(3);
        this.builderUsed.clustering().stateTransfer().chunkSize(25000);
        this.builderUsed.clustering().remoteTimeout(12000000L);
        this.builderUsed.clustering().stateTransfer().timeout(240L, TimeUnit.SECONDS);
        createClusteredCaches(5, "testCache", this.builderUsed);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.test.MultipleCacheManagersTest
    public EmbeddedCacheManager addClusterEnabledCacheManager(TransportFlags transportFlags) {
        GlobalConfigurationBuilder defaultClusteredBuilder = GlobalConfigurationBuilder.defaultClusteredBuilder();
        TestCacheManagerFactory.amendGlobalConfiguration(defaultClusteredBuilder, transportFlags);
        BlockingThreadPoolExecutorFactory blockingThreadPoolExecutorFactory = new BlockingThreadPoolExecutorFactory(25, 25, 10000, 30000L);
        defaultClusteredBuilder.transport().transportThreadPool().threadPoolFactory(blockingThreadPoolExecutorFactory);
        defaultClusteredBuilder.transport().remoteCommandThreadPool().threadPoolFactory(blockingThreadPoolExecutorFactory);
        EmbeddedCacheManager newDefaultCacheManager = TestCacheManagerFactory.newDefaultCacheManager(true, defaultClusteredBuilder, new ConfigurationBuilder());
        this.cacheManagers.add(newDefaultCacheManager);
        return newDefaultCacheManager;
    }

    public void testStressNodesLeavingWhileMultipleCollectors() throws Throwable {
        testStressNodesLeavingWhilePerformingCallable((cache, map, i) -> {
            SerializablePredicate serializablePredicate = entry -> {
                return (((Integer) entry.getKey()).intValue() & 1) == 1;
            };
            findMismatchedSegments((KeyPartitioner) TestingUtil.extractComponent(cache, KeyPartitioner.class), (Map) map.entrySet().stream().filter(serializablePredicate).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            })), (Map) cache.entrySet().stream().filter(serializablePredicate).collect(() -> {
                return Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, (v0) -> {
                    return v0.getValue();
                });
            }), i);
            Assert.assertEquals(125000L, r0.size());
        });
    }

    public void testStressNodesLeavingWhileMultipleCount() throws Throwable {
        testStressNodesLeavingWhilePerformingCallable((cache, map, i) -> {
            long count = cache.entrySet().stream().count();
            Assert.assertEquals(CACHE_ENTRY_COUNT, count, "We didn't get a matching size! Expected 250000 but was " + count);
        });
    }

    public void testStressNodesLeavingWhileMultipleIterators() throws Throwable {
        testStressNodesLeavingWhilePerformingCallable((cache, map, i) -> {
            HashMap hashMap = new HashMap();
            for (Map.Entry entry : cache.entrySet().stream().distributedBatchSize(50000)) {
                if (hashMap.containsKey(entry.getKey())) {
                    log.tracef("Seen values were: %s", hashMap);
                    throw new IllegalArgumentException(String.valueOf(Thread.currentThread()) + "-Found duplicate value: " + String.valueOf(entry.getKey()) + " on iteration " + i);
                }
                if (!((Integer) map.get(entry.getKey())).equals(entry.getValue())) {
                    log.tracef("Seen values were: %s", hashMap);
                    throw new IllegalArgumentException(String.valueOf(Thread.currentThread()) + "-Found incorrect value: " + String.valueOf(entry.getKey()) + " with value " + String.valueOf(entry.getValue()) + " on iteration " + i);
                }
                hashMap.put((Integer) entry.getKey(), (Integer) entry.getValue());
            }
            if (hashMap.size() != map.size()) {
                findMismatchedSegments((KeyPartitioner) TestingUtil.extractComponent(cache, KeyPartitioner.class), map, hashMap, i);
            }
        });
    }

    public void testStressNodesLeavingWhileMultipleIteratorsLocalSegments() throws Throwable {
        testStressNodesLeavingWhilePerformingCallable((cache, map, i) -> {
            HashMap hashMap = new HashMap();
            KeyPartitioner keyPartitioner = (KeyPartitioner) TestingUtil.extractComponent(cache, KeyPartitioner.class);
            LocalizedCacheTopology cacheTopology = cache.getAdvancedCache().getDistributionManager().getCacheTopology();
            Set segmentsForOwner = cacheTopology.getWriteConsistentHash().getSegmentsForOwner(cacheTopology.getLocalAddress());
            Map<Integer, Integer> map = (Map) map.entrySet().stream().filter(entry -> {
                return segmentsForOwner.contains(Integer.valueOf(keyPartitioner.getSegment(entry.getKey())));
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }));
            for (Map.Entry entry2 : cache.entrySet().stream().distributedBatchSize(50000).filterKeySegments(segmentsForOwner)) {
                if (hashMap.containsKey(entry2.getKey())) {
                    log.tracef("Seen values were: %s", hashMap);
                    throw new IllegalArgumentException(String.valueOf(Thread.currentThread()) + "-Found duplicate value: " + String.valueOf(entry2.getKey()) + " on iteration " + i);
                }
                if (!map.get(entry2.getKey()).equals(entry2.getValue())) {
                    log.tracef("Seen values were: %s", hashMap);
                    throw new IllegalArgumentException(String.valueOf(Thread.currentThread()) + "-Found incorrect value: " + String.valueOf(entry2.getKey()) + " with value " + String.valueOf(entry2.getValue()) + " on iteration " + i);
                }
                hashMap.put((Integer) entry2.getKey(), (Integer) entry2.getValue());
            }
            if (hashMap.size() != map.size()) {
                findMismatchedSegments(keyPartitioner, map, hashMap, i);
            }
        });
    }

    private void findMismatchedSegments(KeyPartitioner keyPartitioner, Map<Integer, Integer> map, Map<Integer, Integer> map2, int i) {
        Map generateEntriesPerSegment = generateEntriesPerSegment(keyPartitioner, map.entrySet());
        Map generateEntriesPerSegment2 = generateEntriesPerSegment(keyPartitioner, map2.entrySet());
        for (Map.Entry entry : generateEntriesPerSegment.entrySet()) {
            Set set = (Set) entry.getValue();
            Set set2 = (Set) generateEntriesPerSegment2.get(entry.getKey());
            if (set2 != null) {
                set.removeAll(set2);
            }
            if (!set.isEmpty()) {
                throw new IllegalArgumentException(String.valueOf(Thread.currentThread()) + "-Found incorrect amount " + (set2 != null ? set2.size() : 0) + " of entries, expected " + set.size() + " for segment " + String.valueOf(entry.getKey()) + " missing entries " + String.valueOf(set) + " on iteration " + i);
            }
        }
    }

    void testStressNodesLeavingWhilePerformingCallable(PerformOperation performOperation) throws Throwable {
        HashMap hashMap = new HashMap();
        for (int i = 0; i < CACHE_ENTRY_COUNT; i++) {
            hashMap.put(Integer.valueOf(i), Integer.valueOf(i));
        }
        cache(0, "testCache").putAll(hashMap);
        System.out.println("Done with inserts!");
        List<Future<Void>> forkWorkerThreads = forkWorkerThreads("testCache", 5, 5, new Object[25], (cache, obj, i2) -> {
            performOperation.perform(cache, hashMap, i2);
        });
        forkWorkerThreads.add(forkRestartingThread(5));
        waitAndFinish(forkWorkerThreads, 1, TimeUnit.MINUTES);
    }

    private <K, V> Map<Integer, Set<Map.Entry<K, V>>> generateEntriesPerSegment(KeyPartitioner keyPartitioner, Iterable<Map.Entry<K, V>> iterable) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<K, V> entry : iterable) {
            hashMap.computeIfAbsent(Integer.valueOf(keyPartitioner.getSegment(entry.getKey())), num -> {
                return new HashSet();
            }).add(new ImmortalCacheEntry(entry.getKey(), entry.getValue()));
        }
        return hashMap;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -398324816:
                if (implMethodName.equals("lambda$testStressNodesLeavingWhileMultipleCollectors$97f080aa$1")) {
                    z = false;
                    break;
                }
                break;
            case 1201132668:
                if (implMethodName.equals("lambda$testStressNodesLeavingWhileMultipleCollectors$abcc3b40$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/infinispan/util/function/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/infinispan/stream/stress/DistributedStreamRehashStressTest") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/stream/Collector;")) {
                    return () -> {
                        return Collectors.toMap((v0) -> {
                            return v0.getKey();
                        }, (v0) -> {
                            return v0.getValue();
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/infinispan/util/function/SerializablePredicate") && serializedLambda.getFunctionalInterfaceMethodName().equals(PreferAvailabilityStrategyTest.CACHE_NAME) && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/infinispan/stream/stress/DistributedStreamRehashStressTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map$Entry;)Z")) {
                    return entry -> {
                        return (((Integer) entry.getKey()).intValue() & 1) == 1;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
