package org.infinispan.statetransfer;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.configuration.cache.BiasAcquisition;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.Configurations;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.interceptors.BaseAsyncInterceptor;
import org.infinispan.interceptors.DDAsyncInterceptor;
import org.infinispan.interceptors.impl.BiasedEntryWrappingInterceptor;
import org.infinispan.interceptors.impl.CallInterceptor;
import org.infinispan.interceptors.impl.EntryWrappingInterceptor;
import org.infinispan.interceptors.impl.InvocationContextInterceptor;
import org.infinispan.interceptors.impl.RetryingEntryWrappingInterceptor;
import org.infinispan.interceptors.impl.VersionedEntryWrappingInterceptor;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.topology.ClusterTopologyManager;
import org.infinispan.transaction.LockingMode;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.transaction.lookup.EmbeddedTransactionManagerLookup;
import org.infinispan.util.concurrent.IsolationLevel;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(groups = {"functional"}, testName = "statetransfer.OperationsDuringStateTransferTest")
/* loaded from: input_file:org/infinispan/statetransfer/OperationsDuringStateTransferTest.class */
public class OperationsDuringStateTransferTest extends MultipleCacheManagersTest {
    private static final Log log = LogFactory.getLog(OperationsDuringStateTransferTest.class);
    private ConfigurationBuilder cacheConfigBuilder;

    /* loaded from: input_file:org/infinispan/statetransfer/OperationsDuringStateTransferTest$GetLatchInterceptor.class */
    static class GetLatchInterceptor extends BaseAsyncInterceptor {
        private final CountDownLatch getKeyStartedLatch;
        private final CountDownLatch getKeyProceedLatch;

        public GetLatchInterceptor(CountDownLatch countDownLatch, CountDownLatch countDownLatch2) {
            this.getKeyStartedLatch = countDownLatch;
            this.getKeyProceedLatch = countDownLatch2;
        }

        public Object visitCommand(InvocationContext invocationContext, VisitableCommand visitableCommand) throws Throwable {
            if ((visitableCommand instanceof GetKeyValueCommand) && this.getKeyStartedLatch.getCount() != 0) {
                this.getKeyStartedLatch.countDown();
                if (!this.getKeyProceedLatch.await(10L, TimeUnit.SECONDS)) {
                    throw new TimeoutException();
                }
            }
            return invokeNext(invocationContext, visitableCommand);
        }
    }

    /* loaded from: input_file:org/infinispan/statetransfer/OperationsDuringStateTransferTest$PutLatchInterceptor.class */
    static class PutLatchInterceptor extends BaseAsyncInterceptor {
        private final CountDownLatch putStartedLatch;
        private final CountDownLatch putProceedLatch;

        public PutLatchInterceptor(CountDownLatch countDownLatch, CountDownLatch countDownLatch2) {
            this.putStartedLatch = countDownLatch;
            this.putProceedLatch = countDownLatch2;
        }

        public Object visitCommand(InvocationContext invocationContext, VisitableCommand visitableCommand) throws Throwable {
            if ((visitableCommand instanceof PutKeyValueCommand) && !((PutKeyValueCommand) visitableCommand).hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) {
                this.putStartedLatch.countDown();
                if (!this.putProceedLatch.await(10L, TimeUnit.SECONDS)) {
                    throw new TimeoutException();
                }
            }
            return invokeNext(invocationContext, visitableCommand);
        }
    }

    /* loaded from: input_file:org/infinispan/statetransfer/OperationsDuringStateTransferTest$RemoveLatchInterceptor.class */
    static class RemoveLatchInterceptor extends BaseAsyncInterceptor {
        private final CountDownLatch removeStartedLatch;
        private final CountDownLatch removeProceedLatch;

        public RemoveLatchInterceptor(CountDownLatch countDownLatch, CountDownLatch countDownLatch2) {
            this.removeStartedLatch = countDownLatch;
            this.removeProceedLatch = countDownLatch2;
        }

        public Object visitCommand(InvocationContext invocationContext, VisitableCommand visitableCommand) throws Throwable {
            if (visitableCommand instanceof RemoveCommand) {
                this.removeStartedLatch.countDown();
                if (!this.removeProceedLatch.await(10L, TimeUnit.SECONDS)) {
                    throw new TimeoutException();
                }
            }
            return invokeNext(invocationContext, visitableCommand);
        }
    }

    /* loaded from: input_file:org/infinispan/statetransfer/OperationsDuringStateTransferTest$ReplaceLatchInterceptor.class */
    static class ReplaceLatchInterceptor extends BaseAsyncInterceptor {
        private final CountDownLatch replaceStartedLatch;
        private final CountDownLatch replaceProceedLatch;

        public ReplaceLatchInterceptor(CountDownLatch countDownLatch, CountDownLatch countDownLatch2) {
            this.replaceStartedLatch = countDownLatch;
            this.replaceProceedLatch = countDownLatch2;
        }

        public Object visitCommand(InvocationContext invocationContext, VisitableCommand visitableCommand) throws Throwable {
            if (visitableCommand instanceof ReplaceCommand) {
                this.replaceStartedLatch.countDown();
                if (!this.replaceProceedLatch.await(10L, TimeUnit.SECONDS)) {
                    throw new TimeoutException();
                }
            }
            return invokeNext(invocationContext, visitableCommand);
        }
    }

    /* loaded from: input_file:org/infinispan/statetransfer/OperationsDuringStateTransferTest$StateTransferLatchInterceptor.class */
    static class StateTransferLatchInterceptor extends BaseAsyncInterceptor {
        private final CountDownLatch applyStateStartedLatch;
        private final CountDownLatch applyStateProceedLatch;

        public StateTransferLatchInterceptor(CountDownLatch countDownLatch, CountDownLatch countDownLatch2) {
            this.applyStateStartedLatch = countDownLatch;
            this.applyStateProceedLatch = countDownLatch2;
        }

        public Object visitCommand(InvocationContext invocationContext, VisitableCommand visitableCommand) throws Throwable {
            if ((visitableCommand instanceof PutKeyValueCommand) && ((PutKeyValueCommand) visitableCommand).hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) {
                this.applyStateStartedLatch.countDown();
                if (!this.applyStateProceedLatch.await(10L, TimeUnit.SECONDS)) {
                    throw new TimeoutException();
                }
            }
            return invokeNext(invocationContext, visitableCommand);
        }
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    public Object[] factory() {
        return new Object[]{new OperationsDuringStateTransferTest().cacheMode(CacheMode.DIST_SYNC).transactional(false), new OperationsDuringStateTransferTest().cacheMode(CacheMode.DIST_SYNC).transactional(true).lockingMode(LockingMode.PESSIMISTIC), new OperationsDuringStateTransferTest().cacheMode(CacheMode.DIST_SYNC).transactional(true).lockingMode(LockingMode.OPTIMISTIC), new OperationsDuringStateTransferTest().cacheMode(CacheMode.REPL_SYNC).transactional(false), new OperationsDuringStateTransferTest().cacheMode(CacheMode.REPL_SYNC).transactional(true).lockingMode(LockingMode.PESSIMISTIC), new OperationsDuringStateTransferTest().cacheMode(CacheMode.REPL_SYNC).transactional(true).lockingMode(LockingMode.OPTIMISTIC), new OperationsDuringStateTransferTest().cacheMode(CacheMode.SCATTERED_SYNC).transactional(false).biasAcquisition(BiasAcquisition.NEVER), new OperationsDuringStateTransferTest().cacheMode(CacheMode.SCATTERED_SYNC).transactional(false).biasAcquisition(BiasAcquisition.ON_WRITE)};
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() {
        this.cacheConfigBuilder = getDefaultClusteredCacheConfig(this.cacheMode, this.transactional.booleanValue(), true);
        if (this.transactional.booleanValue()) {
            this.cacheConfigBuilder.transaction().transactionMode(TransactionMode.TRANSACTIONAL).transactionManagerLookup(new EmbeddedTransactionManagerLookup());
            this.cacheConfigBuilder.transaction().lockingMode(this.lockingMode);
            if (this.lockingMode == LockingMode.OPTIMISTIC) {
                this.cacheConfigBuilder.locking().isolationLevel(IsolationLevel.REPEATABLE_READ);
            }
        }
        if (this.biasAcquisition != null) {
            this.cacheConfigBuilder.clustering().biasAcquisition(this.biasAcquisition);
        }
        this.cacheConfigBuilder.clustering().hash().numSegments(10).l1().disable().locking().lockAcquisitionTimeout(TestingUtil.shortTimeoutMillis());
        this.cacheConfigBuilder.clustering().stateTransfer().fetchInMemoryState(true).awaitInitialTransfer(false);
        addClusterEnabledCacheManager(this.cacheConfigBuilder);
        waitForClusterToForm();
    }

    public void testRemove() throws Exception {
        cache(0).put("myKey", "myValue");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.cacheConfigBuilder.customInterceptors().addInterceptor().after(ewi()).interceptor(new RemoveLatchInterceptor(countDownLatch, countDownLatch2));
        ClusterTopologyManager clusterTopologyManager = (ClusterTopologyManager) TestingUtil.extractGlobalComponent(mo194manager(0), ClusterTopologyManager.class);
        clusterTopologyManager.setRebalancingEnabled(false);
        log.info("Adding a new node ..");
        addClusterEnabledCacheManager(this.cacheConfigBuilder);
        log.info("Added a new node");
        LocalizedCacheTopology cacheTopology = advancedCache(1).getDistributionManager().getCacheTopology();
        AssertJUnit.assertNull(cacheTopology.getPendingCH());
        AssertJUnit.assertTrue(cacheTopology.getMembers().contains(address(0)));
        AssertJUnit.assertFalse(cacheTopology.getMembers().contains(address(1)));
        AssertJUnit.assertFalse(cacheTopology.getCurrentCH().getMembers().contains(address(1)));
        AssertJUnit.assertTrue(cache(1).getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).keySet().isEmpty());
        Future fork = fork(() -> {
            try {
                return cache(1).remove("myKey");
            } catch (Exception e) {
                log.errorf(e, "PUT failed: %s", e.getMessage());
                throw e;
            }
        });
        if (!countDownLatch.await(10L, TimeUnit.SECONDS)) {
            throw new TimeoutException();
        }
        AssertJUnit.assertTrue(cache(1).getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).keySet().isEmpty());
        clusterTopologyManager.setRebalancingEnabled(true);
        TestingUtil.waitForNoRebalance(cache(0), cache(1));
        AssertJUnit.assertEquals(1, cache(1).keySet().size());
        countDownLatch2.countDown();
        Object obj = fork.get(10L, TimeUnit.SECONDS);
        AssertJUnit.assertNotNull(obj);
        AssertJUnit.assertEquals("myValue", obj);
        AssertJUnit.assertNull(cache(0).get("myKey"));
        AssertJUnit.assertNull(cache(1).get("myKey"));
    }

    public Class<? extends DDAsyncInterceptor> ewi() {
        Class cls;
        if (this.cacheMode.isScattered()) {
            cls = this.biasAcquisition == BiasAcquisition.NEVER ? RetryingEntryWrappingInterceptor.class : BiasedEntryWrappingInterceptor.class;
        } else {
            cls = Configurations.isTxVersioned(cache(0).getCacheConfiguration()) ? VersionedEntryWrappingInterceptor.class : EntryWrappingInterceptor.class;
        }
        return cls;
    }

    public void testPut() throws Exception {
        cache(0).put("myKey", "myValue");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.cacheConfigBuilder.customInterceptors().addInterceptor().after(ewi()).interceptor(new PutLatchInterceptor(countDownLatch, countDownLatch2));
        ClusterTopologyManager clusterTopologyManager = (ClusterTopologyManager) TestingUtil.extractGlobalComponent(mo194manager(0), ClusterTopologyManager.class);
        clusterTopologyManager.setRebalancingEnabled(false);
        log.info("Adding a new node ..");
        addClusterEnabledCacheManager(this.cacheConfigBuilder);
        log.info("Added a new node");
        LocalizedCacheTopology cacheTopology = advancedCache(1).getDistributionManager().getCacheTopology();
        AssertJUnit.assertNull(cacheTopology.getPendingCH());
        AssertJUnit.assertTrue(cacheTopology.getMembers().contains(address(0)));
        AssertJUnit.assertFalse(cacheTopology.getMembers().contains(address(1)));
        AssertJUnit.assertFalse(cacheTopology.getCurrentCH().getMembers().contains(address(1)));
        AssertJUnit.assertTrue(cache(1).getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).keySet().isEmpty());
        Future fork = fork(() -> {
            try {
                return cache(1).put("myKey", "newValue");
            } catch (Exception e) {
                log.errorf(e, "PUT failed: %s", e.getMessage());
                throw e;
            }
        });
        if (!countDownLatch.await(10L, TimeUnit.SECONDS)) {
            throw new TimeoutException();
        }
        AssertJUnit.assertTrue(cache(1).getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).keySet().isEmpty());
        clusterTopologyManager.setRebalancingEnabled(true);
        TestingUtil.waitForNoRebalance(cache(0), cache(1));
        AssertJUnit.assertEquals(1, cache(1).keySet().size());
        countDownLatch2.countDown();
        Object obj = fork.get(10L, TimeUnit.SECONDS);
        AssertJUnit.assertNotNull(obj);
        AssertJUnit.assertEquals("myValue", obj);
        AssertJUnit.assertEquals("newValue", cache(0).get("myKey"));
        AssertJUnit.assertEquals("newValue", cache(1).get("myKey"));
    }

    public void testReplace() throws Exception {
        cache(0).put("myKey", "myValue");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.cacheConfigBuilder.customInterceptors().addInterceptor().after(ewi()).interceptor(new ReplaceLatchInterceptor(countDownLatch, countDownLatch2));
        ClusterTopologyManager clusterTopologyManager = (ClusterTopologyManager) TestingUtil.extractGlobalComponent(mo194manager(0), ClusterTopologyManager.class);
        clusterTopologyManager.setRebalancingEnabled(false);
        log.info("Adding a new node ..");
        addClusterEnabledCacheManager(this.cacheConfigBuilder);
        log.info("Added a new node");
        LocalizedCacheTopology cacheTopology = advancedCache(1).getDistributionManager().getCacheTopology();
        AssertJUnit.assertNull(cacheTopology.getPendingCH());
        AssertJUnit.assertTrue(cacheTopology.getMembers().contains(address(0)));
        AssertJUnit.assertFalse(cacheTopology.getMembers().contains(address(1)));
        AssertJUnit.assertFalse(cacheTopology.getCurrentCH().getMembers().contains(address(1)));
        AssertJUnit.assertTrue(cache(1).getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).keySet().isEmpty());
        Future fork = fork(() -> {
            try {
                return cache(1).replace("myKey", "newValue");
            } catch (Exception e) {
                log.errorf(e, "REPLACE failed: %s", e.getMessage());
                throw e;
            }
        });
        if (!countDownLatch.await(10L, TimeUnit.SECONDS)) {
            throw new TimeoutException();
        }
        AssertJUnit.assertTrue(cache(1).getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).keySet().isEmpty());
        clusterTopologyManager.setRebalancingEnabled(true);
        TestingUtil.waitForNoRebalance(cache(0), cache(1));
        AssertJUnit.assertEquals(1, cache(1).keySet().size());
        countDownLatch2.countDown();
        Object obj = fork.get(10L, TimeUnit.SECONDS);
        AssertJUnit.assertNotNull(obj);
        AssertJUnit.assertEquals("myValue", obj);
        AssertJUnit.assertEquals("newValue", cache(0).get("myKey"));
        AssertJUnit.assertEquals("newValue", cache(1).get("myKey"));
    }

    public void testGet() throws Exception {
        cache(0).put("myKey", "myValue");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.cacheConfigBuilder.customInterceptors().addInterceptor().before(InvocationContextInterceptor.class).interceptor(new StateTransferLatchInterceptor(countDownLatch2, countDownLatch));
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        CountDownLatch countDownLatch4 = new CountDownLatch(1);
        this.cacheConfigBuilder.customInterceptors().addInterceptor().before(CallInterceptor.class).interceptor(new GetLatchInterceptor(countDownLatch3, countDownLatch4));
        log.info("Adding a new node ..");
        addClusterEnabledCacheManager(this.cacheConfigBuilder);
        log.info("Added a new node");
        AssertJUnit.assertEquals(0, cache(1).getAdvancedCache().getDataContainer().size());
        if (!countDownLatch2.await(10L, TimeUnit.SECONDS)) {
            throw new TimeoutException();
        }
        AssertJUnit.assertEquals(0, cache(1).getAdvancedCache().getDataContainer().size());
        Future fork = fork(() -> {
            return cache(1).get("myKey");
        });
        if (!countDownLatch3.await(10L, TimeUnit.SECONDS)) {
            throw new TimeoutException();
        }
        countDownLatch.countDown();
        TestingUtil.waitForNoRebalance(cache(0), cache(1));
        AssertJUnit.assertEquals(1, cache(1).getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).keySet().size());
        countDownLatch4.countDown();
        AssertJUnit.assertEquals("myValue", fork.get(10L, TimeUnit.SECONDS));
    }
}
