package org.infinispan.statetransfer;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.interceptors.AsyncInterceptor;
import org.infinispan.interceptors.BaseCustomAsyncInterceptor;
import org.infinispan.interceptors.impl.EntryWrappingInterceptor;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.util.ReplicatedControlledConsistentHashFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(groups = {"functional"}, testName = "statetransfer.ReplCommandForwardingTest")
/* loaded from: input_file:org/infinispan/statetransfer/ReplCommandForwardingTest.class */
public class ReplCommandForwardingTest extends MultipleCacheManagersTest {
    private static final String CACHE_NAME = "testCache";

    /* loaded from: input_file:org/infinispan/statetransfer/ReplCommandForwardingTest$DelayInterceptor.class */
    static class DelayInterceptor extends BaseCustomAsyncInterceptor {
        private final AtomicInteger counter = new AtomicInteger(0);
        private final CheckPoint checkPoint = new CheckPoint();
        private final Class<?> commandToBlock;

        public DelayInterceptor(Class<?> cls) {
            this.commandToBlock = cls;
        }

        public int getCounter() {
            return this.counter.get();
        }

        public void waitUntilBlocked(int i) throws TimeoutException, InterruptedException {
            AssertJUnit.assertEquals("blocked_" + i + "_on_" + String.valueOf(this.cache), this.checkPoint.peek(5L, TimeUnit.SECONDS, "blocked_" + i + "_on_" + String.valueOf(this.cache)));
        }

        public void unblock(int i) throws InterruptedException, TimeoutException, BrokenBarrierException {
            ReplCommandForwardingTest.log.tracef("Unblocking command on cache %s", this.cache);
            this.checkPoint.awaitStrict("blocked_" + i + "_on_" + String.valueOf(this.cache), 5L, TimeUnit.SECONDS);
            this.checkPoint.trigger("resume_" + i + "_on_" + String.valueOf(this.cache));
        }

        public Object visitPutKeyValueCommand(InvocationContext invocationContext, PutKeyValueCommand putKeyValueCommand) throws Throwable {
            return invokeNextThenAccept(invocationContext, putKeyValueCommand, (invocationContext2, putKeyValueCommand2, obj) -> {
                if (invocationContext.isInTxScope() || putKeyValueCommand.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) {
                    return;
                }
                doBlock(invocationContext, putKeyValueCommand);
            });
        }

        public Object visitLockControlCommand(TxInvocationContext txInvocationContext, LockControlCommand lockControlCommand) throws Throwable {
            return invokeNextThenAccept(txInvocationContext, lockControlCommand, (invocationContext, lockControlCommand2, obj) -> {
                if (txInvocationContext.getCacheTransaction().isFromStateTransfer()) {
                    return;
                }
                doBlock(txInvocationContext, lockControlCommand);
            });
        }

        public Object visitPrepareCommand(TxInvocationContext txInvocationContext, PrepareCommand prepareCommand) throws Throwable {
            return invokeNextThenAccept(txInvocationContext, prepareCommand, (invocationContext, prepareCommand2, obj) -> {
                if (txInvocationContext.getCacheTransaction().isFromStateTransfer()) {
                    return;
                }
                doBlock(txInvocationContext, prepareCommand);
            });
        }

        public Object visitCommitCommand(TxInvocationContext txInvocationContext, CommitCommand commitCommand) throws Throwable {
            return invokeNextThenAccept(txInvocationContext, commitCommand, (invocationContext, commitCommand2, obj) -> {
                if (txInvocationContext.getCacheTransaction().isFromStateTransfer()) {
                    return;
                }
                doBlock(txInvocationContext, commitCommand);
            });
        }

        private void doBlock(InvocationContext invocationContext, ReplicableCommand replicableCommand) throws InterruptedException, TimeoutException {
            if (this.commandToBlock != replicableCommand.getClass()) {
                return;
            }
            ReplCommandForwardingTest.log.tracef("Delaying command %s originating from %s", replicableCommand, invocationContext.getOrigin());
            Integer valueOf = Integer.valueOf(this.counter.incrementAndGet());
            this.checkPoint.trigger("blocked_" + valueOf + "_on_" + String.valueOf(this.cache));
            this.checkPoint.awaitStrict("resume_" + valueOf + "_on_" + String.valueOf(this.cache), 15L, TimeUnit.SECONDS);
            ReplCommandForwardingTest.log.tracef("Command unblocked: %s", replicableCommand);
        }

        public String toString() {
            return "DelayInterceptor{counter=" + String.valueOf(this.counter) + "}";
        }
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() {
    }

    private ConfigurationBuilder buildConfig() {
        ConfigurationBuilder defaultClusteredCacheConfig = getDefaultClusteredCacheConfig(CacheMode.REPL_ASYNC, false);
        defaultClusteredCacheConfig.clustering().remoteTimeout(15000L);
        defaultClusteredCacheConfig.clustering().hash().numSegments(1).consistentHashFactory(new ReplicatedControlledConsistentHashFactory(0, new int[0]));
        defaultClusteredCacheConfig.clustering().stateTransfer().fetchInMemoryState(true);
        return defaultClusteredCacheConfig;
    }

    public void testForwardToJoinerNonTransactional() throws Exception {
        ReplicatedControlledConsistentHashFactory.SCI sci = ReplicatedControlledConsistentHashFactory.SCI.INSTANCE;
        GlobalConfigurationBuilder defaultClusteredBuilder = GlobalConfigurationBuilder.defaultClusteredBuilder();
        defaultClusteredBuilder.serialization().addContextInitializer(sci);
        String str = "testCache";
        TestCacheManagerFactory.addInterceptor(defaultClusteredBuilder, (Predicate<String>) (v1) -> {
            return r1.equals(v1);
        }, (AsyncInterceptor) new DelayInterceptor(PutKeyValueCommand.class), TestCacheManagerFactory.InterceptorPosition.AFTER, (Class<? extends AsyncInterceptor>) EntryWrappingInterceptor.class);
        Cache createCache = addClusterEnabledCacheManager(defaultClusteredBuilder, (ConfigurationBuilder) null).createCache("testCache", buildConfig().build());
        DelayInterceptor findInterceptor = TestingUtil.findInterceptor(createCache, DelayInterceptor.class);
        int topologyId = createCache.getAdvancedCache().getDistributionManager().getCacheTopology().getTopologyId();
        GlobalConfigurationBuilder defaultClusteredBuilder2 = GlobalConfigurationBuilder.defaultClusteredBuilder();
        defaultClusteredBuilder2.serialization().addContextInitializer(sci);
        String str2 = "testCache";
        TestCacheManagerFactory.addInterceptor(defaultClusteredBuilder2, (Predicate<String>) (v1) -> {
            return r1.equals(v1);
        }, (AsyncInterceptor) new DelayInterceptor(PutKeyValueCommand.class), TestCacheManagerFactory.InterceptorPosition.AFTER, (Class<? extends AsyncInterceptor>) EntryWrappingInterceptor.class);
        Cache createCache2 = addClusterEnabledCacheManager(defaultClusteredBuilder2, (ConfigurationBuilder) null).createCache("testCache", buildConfig().build());
        DelayInterceptor findInterceptor2 = TestingUtil.findInterceptor(createCache2, DelayInterceptor.class);
        waitForStateTransfer(topologyId + 4, createCache, createCache2);
        GlobalConfigurationBuilder defaultClusteredBuilder3 = GlobalConfigurationBuilder.defaultClusteredBuilder();
        defaultClusteredBuilder3.serialization().addContextInitializer(sci);
        String str3 = "testCache";
        TestCacheManagerFactory.addInterceptor(defaultClusteredBuilder3, (Predicate<String>) (v1) -> {
            return r1.equals(v1);
        }, (AsyncInterceptor) new DelayInterceptor(PutKeyValueCommand.class), TestCacheManagerFactory.InterceptorPosition.AFTER, (Class<? extends AsyncInterceptor>) EntryWrappingInterceptor.class);
        EmbeddedCacheManager addClusterEnabledCacheManager = addClusterEnabledCacheManager(defaultClusteredBuilder3, (ConfigurationBuilder) null);
        addClusterEnabledCacheManager.createCache("differentCache", getDefaultClusteredCacheConfig(CacheMode.REPL_SYNC).build());
        Future fork = fork(() -> {
            log.tracef("Initiating a put command on %s", createCache);
            createCache.put("k", "v");
            return null;
        });
        findInterceptor.waitUntilBlocked(1);
        findInterceptor2.waitUntilBlocked(1);
        Cache createCache3 = addClusterEnabledCacheManager.createCache("testCache", buildConfig().build());
        DelayInterceptor findInterceptor3 = TestingUtil.findInterceptor(createCache3, DelayInterceptor.class);
        waitForStateTransfer(topologyId + 8, createCache, createCache2, createCache3);
        findInterceptor2.unblock(1);
        findInterceptor.unblock(1);
        Thread.sleep(2000L);
        AssertJUnit.assertEquals("The command shouldn't have been forwarded to " + String.valueOf(createCache3), 0, findInterceptor3.getCounter());
        log.tracef("Waiting for the put command to finish on %s", createCache);
        Object obj = fork.get(10L, TimeUnit.SECONDS);
        log.tracef("Put command finished on %s", createCache);
        AssertJUnit.assertNull(obj);
        AssertJUnit.assertEquals(1, findInterceptor.getCounter());
        AssertJUnit.assertEquals(1, findInterceptor2.getCounter());
        AssertJUnit.assertEquals(0, findInterceptor3.getCounter());
    }

    private void waitForStateTransfer(int i, Cache... cacheArr) {
        TestingUtil.waitForNoRebalance(cacheArr);
        for (Cache cache : cacheArr) {
            LocalizedCacheTopology cacheTopology = cache.getAdvancedCache().getDistributionManager().getCacheTopology();
            AssertJUnit.assertEquals(String.format("Wrong topology on cache %s, expected %d and got %s", cache, Integer.valueOf(i), cacheTopology), cacheTopology.getTopologyId(), i);
        }
    }
}
