package org.infinispan.conflict.impl;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.commands.statetransfer.ConflictResolutionStartCommand;
import org.infinispan.commands.statetransfer.StateResponseCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.test.Exceptions;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.conflict.ConflictManager;
import org.infinispan.conflict.ConflictManagerFactory;
import org.infinispan.conflict.EntryMergePolicy;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.container.entries.NullCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.MagicKey;
import org.infinispan.executors.LimitedExecutorTest;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.DataRehashed;
import org.infinispan.notifications.cachelistener.event.DataRehashedEvent;
import org.infinispan.partitionhandling.BasePartitionHandlingTest;
import org.infinispan.partitionhandling.PartitionHandling;
import org.infinispan.remoting.inboundhandler.AbstractDelegatingHandler;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.PerCacheInboundInvocationHandler;
import org.infinispan.remoting.inboundhandler.Reply;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.TestException;
import org.infinispan.test.TestingUtil;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "conflict.resolution.ConflictManagerTest")
/* loaded from: input_file:org/infinispan/conflict/impl/ConflictManagerTest.class */
public class ConflictManagerTest extends BasePartitionHandlingTest {
    private static final String CACHE_NAME = "conflict-cache";
    private static final int NUMBER_OF_OWNERS = 2;
    private static final int NUMBER_OF_CACHE_ENTRIES = 100;
    private static final int INCONSISTENT_VALUE_INCREMENT = 10;
    private static final int NULL_VALUE_FREQUENCY = 20;

    /* loaded from: input_file:org/infinispan/conflict/impl/ConflictManagerTest$DelayStateResponseCommandHandler.class */
    public class DelayStateResponseCommandHandler extends AbstractDelegatingHandler {
        final CountDownLatch latch;

        DelayStateResponseCommandHandler(CountDownLatch countDownLatch, PerCacheInboundInvocationHandler perCacheInboundInvocationHandler) {
            super(perCacheInboundInvocationHandler);
            this.latch = countDownLatch;
        }

        public void handle(CacheRpcCommand cacheRpcCommand, Reply reply, DeliverOrder deliverOrder) {
            if ((cacheRpcCommand instanceof StateResponseCommand) && ((StateResponseCommand) cacheRpcCommand).getStateChunks().stream().anyMatch((v0) -> {
                return v0.isLastChunk();
            })) {
                try {
                    this.latch.await(60L, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                }
            }
            this.delegate.handle(cacheRpcCommand, reply, deliverOrder);
        }
    }

    /* loaded from: input_file:org/infinispan/conflict/impl/ConflictManagerTest$DropClusteredGetCommandHandler.class */
    private class DropClusteredGetCommandHandler extends AbstractDelegatingHandler {
        DropClusteredGetCommandHandler(PerCacheInboundInvocationHandler perCacheInboundInvocationHandler) {
            super(perCacheInboundInvocationHandler);
        }

        public void handle(CacheRpcCommand cacheRpcCommand, Reply reply, DeliverOrder deliverOrder) {
            if (cacheRpcCommand instanceof ClusteredGetCommand) {
                return;
            }
            this.delegate.handle(cacheRpcCommand, reply, deliverOrder);
        }
    }

    @Listener
    /* loaded from: input_file:org/infinispan/conflict/impl/ConflictManagerTest$RehashListener.class */
    private class RehashListener {
        final CountDownLatch latch = new CountDownLatch(1);

        private RehashListener() {
        }

        @DataRehashed
        public void onDataRehashed(DataRehashedEvent dataRehashedEvent) {
            if (dataRehashedEvent.isPre()) {
                this.latch.countDown();
            }
        }
    }

    /* loaded from: input_file:org/infinispan/conflict/impl/ConflictManagerTest$StateTransferCancellation.class */
    public class StateTransferCancellation extends AbstractDelegatingHandler {
        private final CountDownLatch latch;

        protected StateTransferCancellation(CountDownLatch countDownLatch, PerCacheInboundInvocationHandler perCacheInboundInvocationHandler) {
            super(perCacheInboundInvocationHandler);
            this.latch = countDownLatch;
        }

        public void handle(CacheRpcCommand cacheRpcCommand, Reply reply, DeliverOrder deliverOrder) {
            if (!(cacheRpcCommand instanceof ConflictResolutionStartCommand)) {
                this.delegate.handle(cacheRpcCommand, reply, deliverOrder);
                return;
            }
            StateReceiverImpl stateReceiverImpl = (StateReceiverImpl) TestingUtil.extractComponent(ConflictManagerTest.this.cache(0, cacheRpcCommand.getCacheName().toString()), StateReceiver.class);
            HashMap hashMap = new HashMap();
            ((ConflictResolutionStartCommand) cacheRpcCommand).getSegments().forEach(i -> {
                hashMap.putAll(stateReceiverImpl.getTransferTaskMap(i));
            });
            stateReceiverImpl.nonBlockingExecutor.execute(() -> {
                hashMap.forEach((address, inboundTransferTask) -> {
                    inboundTransferTask.cancel();
                });
                this.delegate.handle(cacheRpcCommand, reply, deliverOrder);
                this.latch.countDown();
            });
        }
    }

    public ConflictManagerTest() {
        this.cacheMode = CacheMode.DIST_SYNC;
        this.partitionHandling = PartitionHandling.ALLOW_READ_WRITES;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.partitionhandling.BasePartitionHandlingTest, org.infinispan.test.MultipleCacheManagersTest
    public void createCacheManagers() throws Throwable {
        super.createCacheManagers();
        ConfigurationBuilder defaultClusteredCacheConfig = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC);
        defaultClusteredCacheConfig.clustering().partitionHandling().whenSplit(this.partitionHandling).mergePolicy((EntryMergePolicy) null).stateTransfer().fetchInMemoryState(true);
        defineConfigurationOnAllManagers(CACHE_NAME, defaultClusteredCacheConfig);
    }

    public void testGetAllVersionsDuringStateTransfer() throws Exception {
        createCluster();
        getCache(2).put(1, 1);
        splitCluster();
        RehashListener rehashListener = new RehashListener();
        getCache(0).addListener(rehashListener);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        delayStateTransferCompletion(countDownLatch);
        Future<Void> fork = fork(() -> {
            partition(0).merge(partition(1));
        });
        AssertJUnit.assertTrue(rehashListener.latch.await(10L, TimeUnit.SECONDS));
        Future fork2 = fork(() -> {
            return getAllVersions(0, 1);
        });
        TestingUtil.assertNotDone((Future<?>) fork2);
        countDownLatch.countDown();
        fork.get(30L, TimeUnit.SECONDS);
        Map map = (Map) fork2.get(60L, TimeUnit.SECONDS);
        AssertJUnit.assertTrue(map != null);
        AssertJUnit.assertTrue(!map.isEmpty());
        AssertJUnit.assertEquals(String.format("Returned versionMap %s", map), 2, map.size());
    }

    public void testGetAllVersionsTimeout() throws Throwable {
        ConfigurationBuilder defaultClusteredCacheConfig = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC);
        defaultClusteredCacheConfig.clustering().remoteTimeout(5000L).stateTransfer().fetchInMemoryState(true);
        defineConfigurationOnAllManagers("conflict-cache2", defaultClusteredCacheConfig);
        waitForClusterToForm("conflict-cache2");
        dropClusteredGetCommands();
        Exceptions.expectException(CacheException.class, ".* encountered when attempting '.*.' on cache '.*.'", () -> {
            getAllVersions(0, LimitedExecutorTest.NAME);
        });
    }

    public void testGetConflictsDuringStateTransfer() throws Throwable {
        createCluster();
        splitCluster();
        RehashListener rehashListener = new RehashListener();
        getCache(0).addListener(rehashListener);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        delayStateTransferCompletion(countDownLatch);
        fork(() -> {
            partition(0).merge(partition(1), false);
        });
        rehashListener.latch.await();
        Exceptions.expectException(IllegalStateException.class, ".* Unable to retrieve conflicts as StateTransfer is currently in progress for cache .*", () -> {
            getConflicts(0);
        });
        countDownLatch.countDown();
    }

    public void testGetConflictAfterCancellation() throws Exception {
        waitForClusterToForm(CACHE_NAME);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        cancelStateTransfer(countDownLatch);
        Future fork = fork(() -> {
            return Long.valueOf(getConflicts(0).count());
        });
        if (!countDownLatch.await(10L, TimeUnit.SECONDS)) {
            throw new TestException("No state transfer cancelled");
        }
        AssertJUnit.assertEquals(0L, ((Long) fork.get(10L, TimeUnit.SECONDS)).longValue());
    }

    public void testAllVersionsOfKeyReturned() {
        waitForClusterToForm(CACHE_NAME);
        IntStream.range(0, 100).forEach(i -> {
            getCache(0).put(Integer.valueOf(i), "v" + i);
        });
        compareCacheValuesForKey(10, true);
        introduceCacheConflicts();
        compareCacheValuesForKey(10, false);
        compareCacheValuesForKey(20, false);
    }

    public void testConsecutiveInvocationOfAllVersionsForKey() throws Exception {
        waitForClusterToForm(CACHE_NAME);
        Map<Address, InternalCacheValue<Object>> allVersions = getAllVersions(0, 1);
        Map<Address, InternalCacheValue<Object>> allVersions2 = getAllVersions(0, 1);
        AssertJUnit.assertNotSame(allVersions, allVersions2);
        AssertJUnit.assertEquals(allVersions, allVersions2);
    }

    public void testConflictsDetected() {
        waitForClusterToForm(CACHE_NAME);
        IntStream.range(0, 100).forEach(i -> {
            getCache(0).put(Integer.valueOf(i), "v" + i);
        });
        int i2 = this.numMembersInCluster - 1;
        AssertJUnit.assertEquals(0L, getConflicts(i2).count());
        introduceCacheConflicts();
        List<Map> list = (List) getConflicts(i2).collect(Collectors.toList());
        AssertJUnit.assertEquals(10, list.size());
        for (Map map : list) {
            AssertJUnit.assertEquals(2, map.keySet().size());
            int orElse = map.values().stream().filter(cacheEntry -> {
                return !(cacheEntry instanceof NullCacheEntry);
            }).mapToInt(cacheEntry2 -> {
                return ((Integer) cacheEntry2.getKey()).intValue();
            }).findAny().orElse(-1);
            AssertJUnit.assertTrue(orElse > -1);
            if (orElse % 20 == 0) {
                Stream stream = map.values().stream();
                Class<NullCacheEntry> cls = NullCacheEntry.class;
                Objects.requireNonNull(NullCacheEntry.class);
                AssertJUnit.assertTrue(stream.anyMatch((v1) -> {
                    return r1.isInstance(v1);
                }));
            } else {
                List list2 = (List) map.values().stream().map((v0) -> {
                    return v0.getValue();
                }).distinct().collect(Collectors.toList());
                AssertJUnit.assertEquals(2, list2.size());
                AssertJUnit.assertTrue("Expected one of the conflicting string values to be 'INCONSISTENT'", list2.contains("INCONSISTENT"));
            }
        }
    }

    public void testConflictsResolvedWithProvidedMergePolicy() {
        createCluster();
        AdvancedCache<Object, Object> cache = getCache(0);
        ConflictManager conflictManager = ConflictManagerFactory.get(cache);
        MagicKey magicKey = new MagicKey((Cache<?, ?>) mo376cache(0), (Cache<?, ?>[]) new Cache[]{mo376cache(1)});
        cache.put(magicKey, 1);
        cache.withFlags(Flag.CACHE_MODE_LOCAL).put(magicKey, 2);
        AssertJUnit.assertEquals(1L, getConflicts(0).count());
        conflictManager.resolveConflicts((cacheEntry, list) -> {
            return cacheEntry;
        });
        AssertJUnit.assertEquals(0L, getConflicts(0).count());
    }

    public void testCacheOperationOnConflictStream() {
        createCluster();
        AdvancedCache<Object, Object> cache = getCache(0);
        ConflictManager conflictManager = ConflictManagerFactory.get(cache);
        MagicKey magicKey = new MagicKey((Cache<?, ?>) mo376cache(0), (Cache<?, ?>[]) new Cache[]{mo376cache(1)});
        cache.put(magicKey, 1);
        cache.withFlags(Flag.CACHE_MODE_LOCAL).put(magicKey, 2);
        conflictManager.getConflicts().forEach(map -> {
            cache.remove(((CacheEntry) map.values().iterator().next()).getKey());
        });
        AssertJUnit.assertTrue(cache.isEmpty());
    }

    public void testNoEntryMergePolicyConfigured() {
        Exceptions.expectException(CacheException.class, () -> {
            ConflictManagerFactory.get(getCache(0)).resolveConflicts();
        });
    }

    private void introduceCacheConflicts() {
        LocalizedCacheTopology cacheTopology = getCache(0).getDistributionManager().getCacheTopology();
        for (int i = 0; i < 100; i += 10) {
            AdvancedCache withFlags = manager(cacheTopology.getDistribution(Integer.valueOf(i)).primary()).getCache(CACHE_NAME).getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL);
            if (i % 20 == 0) {
                withFlags.remove(Integer.valueOf(i));
            } else {
                withFlags.put(Integer.valueOf(i), "INCONSISTENT");
            }
        }
    }

    private void compareCacheValuesForKey(int i, boolean z) {
        ArrayList<Map> arrayList = new ArrayList();
        for (int i2 = 0; i2 < this.numMembersInCluster; i2++) {
            arrayList.add(getAllVersions(i2, Integer.valueOf(i)));
        }
        boolean z2 = i % 20 == 0;
        int i3 = z2 ? 1 : 2;
        for (Map map : arrayList) {
            AssertJUnit.assertEquals(map.toString(), 2, map.keySet().size());
            if (!z2) {
                AssertJUnit.assertTrue("Version map contains null entries.", !map.values().contains(null));
            }
            List list = (List) map.values().stream().filter((v0) -> {
                return Objects.nonNull(v0);
            }).map((v0) -> {
                return v0.getValue();
            }).collect(Collectors.toList());
            AssertJUnit.assertEquals(list.toString(), i3, list.size());
            if (z) {
                AssertJUnit.assertTrue("Inconsistent values returned, they should be the same", list.stream().allMatch(obj -> {
                    return obj.equals(list.get(0));
                }));
            } else {
                AssertJUnit.assertTrue("Expected inconsistent values, but all values were equal", map.values().stream().distinct().count() > 1);
            }
        }
    }

    private void createCluster() {
        waitForClusterToForm(CACHE_NAME);
        List members = getCache(0).getRpcManager().getMembers();
        TestingUtil.waitForNoRebalance(caches());
        AssertJUnit.assertTrue(members.size() == 4);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v1, types: [int[], int[][]] */
    private void splitCluster() {
        splitCluster((int[][]) new int[]{new int[]{0, 1}, new int[]{2, 3}});
        TestingUtil.blockUntilViewsChanged(10000L, 2, (Cache<?, ?>[]) new Cache[]{getCache(0), getCache(1), getCache(2), getCache(3)});
        TestingUtil.waitForNoRebalance(getCache(0), getCache(1));
        TestingUtil.waitForNoRebalance(getCache(2), getCache(3));
    }

    private AdvancedCache<Object, Object> getCache(int i) {
        return advancedCache(i, CACHE_NAME);
    }

    private Stream<Map<Address, CacheEntry<Object, Object>>> getConflicts(int i) {
        return ConflictManagerFactory.get(getCache(i)).getConflicts();
    }

    private Map<Address, InternalCacheValue<Object>> getAllVersions(int i, Object obj) {
        return ConflictManagerFactory.get(getCache(i)).getAllVersions(obj);
    }

    private void dropClusteredGetCommands() {
        IntStream.range(0, this.numMembersInCluster).forEach(i -> {
            TestingUtil.wrapInboundInvocationHandler(getCache(i), perCacheInboundInvocationHandler -> {
                return new DropClusteredGetCommandHandler(perCacheInboundInvocationHandler);
            });
        });
    }

    private void delayStateTransferCompletion(CountDownLatch countDownLatch) {
        IntStream.range(0, this.numMembersInCluster).forEach(i -> {
            TestingUtil.wrapInboundInvocationHandler(getCache(i), perCacheInboundInvocationHandler -> {
                return new DelayStateResponseCommandHandler(countDownLatch, perCacheInboundInvocationHandler);
            });
        });
    }

    private void cancelStateTransfer(CountDownLatch countDownLatch) {
        IntStream.range(0, this.numMembersInCluster).forEach(i -> {
            TestingUtil.wrapInboundInvocationHandler(getCache(i), perCacheInboundInvocationHandler -> {
                return new StateTransferCancellation(countDownLatch, perCacheInboundInvocationHandler);
            });
        });
    }
}
