package org.infinispan.notifications.cachelistener.cluster;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.Cache;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.distribution.MagicKey;
import org.infinispan.notifications.cachelistener.cluster.AbstractClusterListenerUtilTest;
import org.infinispan.notifications.cachelistener.event.CacheEntryCreatedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
import org.infinispan.notifications.cachelistener.event.Event;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.util.concurrent.CommandAckCollector;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"functional"})
/* loaded from: input_file:org/infinispan/notifications/cachelistener/cluster/AbstractClusterListenerNonTxTest.class */
public abstract class AbstractClusterListenerNonTxTest extends AbstractClusterListenerTest {
    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractClusterListenerNonTxTest(boolean z, CacheMode cacheMode) {
        super(z, cacheMode);
    }

    @Test
    public void testPrimaryOwnerGoesDownAfterSendingEvent() throws InterruptedException, ExecutionException, TimeoutException {
        Cache<?, ?> cache = cache(0, "cluster-listener");
        Cache<?, ?> cache2 = cache(1, "cluster-listener");
        Cache cache3 = cache(2, "cluster-listener");
        AbstractClusterListenerUtilTest.ClusterListener listener = listener();
        cache.addListener(listener);
        CheckPoint checkPoint = new CheckPoint();
        waitUntilNotificationRaised(cache2, checkPoint);
        checkPoint.triggerForever("pre_raise_notification_release");
        MagicKey magicKey = new MagicKey(cache2, (Cache<?, ?>[]) new Cache[]{cache3});
        Future fork = fork(() -> {
            return (String) cache.put(magicKey, "first-value");
        });
        checkPoint.awaitStrict("post_raise_notification_invoked", 10L, TimeUnit.SECONDS);
        awaitForBackups(cache);
        TestingUtil.killCacheManagers(cache2.getCacheManager());
        fork.get(10L, TimeUnit.SECONDS);
        TestingUtil.waitForNoRebalance(cache, cache3);
        AssertJUnit.assertTrue("Expected 2 - 6 events, but received " + String.valueOf(listener.events), listener.events.size() >= 2 && listener.events.size() <= 6);
        checkEvent(listener.events.get(0), magicKey, true, false);
        AssertJUnit.assertEquals(TestingUtil.extractCacheTopology(cache).getDistribution(magicKey).primary(), TestingUtil.extractCacheTopology(cache3).getDistribution(magicKey).primary());
        listener.events.stream().skip(1L).forEach(cacheEntryEvent -> {
            checkEvent(cacheEntryEvent, magicKey, false, true);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkEvent(CacheEntryEvent<Object, String> cacheEntryEvent, MagicKey magicKey, boolean z, boolean z2) {
        if (z) {
            AssertJUnit.assertEquals(Event.Type.CACHE_ENTRY_CREATED, cacheEntryEvent.getType());
            AssertJUnit.assertEquals(((CacheEntryCreatedEvent) cacheEntryEvent).isCommandRetried(), z2);
        } else {
            AssertJUnit.assertEquals(Event.Type.CACHE_ENTRY_MODIFIED, cacheEntryEvent.getType());
            AssertJUnit.assertTrue(((CacheEntryModifiedEvent) cacheEntryEvent).isCommandRetried());
        }
        AssertJUnit.assertEquals(magicKey, cacheEntryEvent.getKey());
        AssertJUnit.assertEquals("first-value", (String) cacheEntryEvent.getValue());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void awaitForBackups(Cache<?, ?> cache) {
        if (TestingUtil.isTriangleAlgorithm(this.cacheMode, this.tx)) {
            CommandAckCollector commandAckCollector = (CommandAckCollector) TestingUtil.extractComponent(cache, CommandAckCollector.class);
            List pendingCommands = commandAckCollector.getPendingCommands();
            AssertJUnit.assertEquals(1, pendingCommands.size());
            eventually(() -> {
                return !commandAckCollector.hasPendingBackupAcks(((Long) pendingCommands.get(0)).longValue());
            });
        }
    }
}
