package org.infinispan.statetransfer;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.infinispan.Cache;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.remote.recovery.TxCompletionNotificationCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.VersionedPrepareCommand;
import org.infinispan.commons.marshall.SerializeWith;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.DistributionTestHelper;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.interceptors.AsyncInterceptor;
import org.infinispan.interceptors.BaseAsyncInterceptor;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.test.fwk.TransportFlags;
import org.infinispan.topology.CacheTopology;
import org.infinispan.transaction.impl.WriteSkewHelper;
import org.infinispan.util.BaseControlledConsistentHashFactory;
import org.infinispan.util.BlockingLocalTopologyManager;
import org.infinispan.util.ControlledRpcManager;
import org.infinispan.util.concurrent.IsolationLevel;
import org.infinispan.util.logging.Log;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "statetransfer.WriteSkewDuringStateTransferTest", singleThreaded = true)
/* loaded from: input_file:org/infinispan/statetransfer/WriteSkewDuringStateTransferTest.class */
public class WriteSkewDuringStateTransferTest extends MultipleCacheManagersTest {
    private final List<BlockingLocalTopologyManager> topologyManagerList = Collections.synchronizedList(new ArrayList(4));

    /* loaded from: input_file:org/infinispan/statetransfer/WriteSkewDuringStateTransferTest$Action.class */
    public interface Action {
        boolean isApplicable(InvocationContext invocationContext, VisitableCommand visitableCommand);

        void before(InvocationContext invocationContext, VisitableCommand visitableCommand, Cache<?, ?> cache);

        void after(InvocationContext invocationContext, VisitableCommand visitableCommand, Cache<?, ?> cache);
    }

    @SerializeWith(Externalizer.class)
    /* loaded from: input_file:org/infinispan/statetransfer/WriteSkewDuringStateTransferTest$ConsistentHashFactoryImpl.class */
    public static class ConsistentHashFactoryImpl extends BaseControlledConsistentHashFactory.Default {

        /* loaded from: input_file:org/infinispan/statetransfer/WriteSkewDuringStateTransferTest$ConsistentHashFactoryImpl$Externalizer.class */
        public static class Externalizer implements org.infinispan.commons.marshall.Externalizer<ConsistentHashFactoryImpl> {
            public void writeObject(ObjectOutput objectOutput, ConsistentHashFactoryImpl consistentHashFactoryImpl) throws IOException {
            }

            /* renamed from: readObject, reason: merged with bridge method [inline-methods] */
            public ConsistentHashFactoryImpl m426readObject(ObjectInput objectInput) throws IOException, ClassNotFoundException {
                return new ConsistentHashFactoryImpl();
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public ConsistentHashFactoryImpl() {
            super(1);
        }

        /* JADX WARN: Type inference failed for: r0v3, types: [int[], int[][]] */
        /* JADX WARN: Type inference failed for: r0v5, types: [int[], int[][]] */
        /* JADX WARN: Type inference failed for: r0v7, types: [int[], int[][]] */
        @Override // org.infinispan.util.BaseControlledConsistentHashFactory
        protected final int[][] assignOwners(int i, List<Address> list) {
            switch (list.size()) {
                case 1:
                    return new int[]{new int[]{0}};
                case 2:
                    return new int[]{new int[]{1, 0}};
                default:
                    return new int[]{new int[]{list.size() - 1, 0, 1}};
            }
        }
    }

    /* loaded from: input_file:org/infinispan/statetransfer/WriteSkewDuringStateTransferTest$ControlledCommandInterceptor.class */
    public static class ControlledCommandInterceptor extends BaseAsyncInterceptor {
        private final List<Action> actionList = new ArrayList(3);
        private Cache<Object, Object> cache;

        public ControlledCommandInterceptor(Cache<Object, Object> cache) {
            this.cache = cache;
            this.cacheConfiguration = cache.getCacheConfiguration();
            TestingUtil.extractInterceptorChain(cache).addInterceptor(this, 0);
        }

        public ControlledCommandInterceptor() {
        }

        public void addAction(Action action) {
            this.actionList.add(action);
        }

        public Object visitCommand(InvocationContext invocationContext, VisitableCommand visitableCommand) throws Throwable {
            List<Action> extractActions = extractActions(invocationContext, visitableCommand);
            if (extractActions.isEmpty()) {
                return invokeNext(invocationContext, visitableCommand);
            }
            Iterator<Action> it = extractActions.iterator();
            while (it.hasNext()) {
                it.next().before(invocationContext, visitableCommand, this.cache);
            }
            return invokeNextThenAccept(invocationContext, visitableCommand, (invocationContext2, visitableCommand2, obj) -> {
                Iterator it2 = extractActions.iterator();
                while (it2.hasNext()) {
                    ((Action) it2.next()).after(invocationContext, visitableCommand, this.cache);
                }
            });
        }

        private List<Action> extractActions(InvocationContext invocationContext, VisitableCommand visitableCommand) {
            if (this.actionList.isEmpty()) {
                return Collections.emptyList();
            }
            ArrayList arrayList = new ArrayList(this.actionList.size());
            for (Action action : this.actionList) {
                if (action.isApplicable(invocationContext, visitableCommand)) {
                    arrayList.add(action);
                }
            }
            return arrayList;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/statetransfer/WriteSkewDuringStateTransferTest$NewNode.class */
    public static class NewNode {
        Future<Void> joinerFuture;
        CountDownLatch commandLatch = new CountDownLatch(1);
        NodeController controller;

        private NewNode() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/statetransfer/WriteSkewDuringStateTransferTest$NodeController.class */
    public static class NodeController {
        ControlledCommandInterceptor interceptor;
        BlockingLocalTopologyManager topologyManager;

        private NodeController() {
        }
    }

    @AutoProtoSchemaBuilder(includeClasses = {ConsistentHashFactoryImpl.class}, schemaFileName = "test.core.WriteSkewDuringStateTransferTest.proto", schemaFilePath = "proto/generated", schemaPackageName = "org.infinispan.test.core.WriteSkewDuringStateTransferTest", service = false)
    /* loaded from: input_file:org/infinispan/statetransfer/WriteSkewDuringStateTransferTest$WriteSkewDuringStateTransferSCI.class */
    public interface WriteSkewDuringStateTransferSCI extends SerializationContextInitializer {
        public static final WriteSkewDuringStateTransferSCI INSTANCE = new WriteSkewDuringStateTransferSCIImpl();
    }

    @AfterMethod(alwaysRun = true)
    public final void unblockAll() {
        Iterator<BlockingLocalTopologyManager> it = this.topologyManagerList.iterator();
        while (it.hasNext()) {
            it.next().stopBlocking();
        }
        this.topologyManagerList.clear();
    }

    public void testVersionsAfterStateTransfer() throws Exception {
        assertClusterSize("Wrong cluster size", 2);
        assertKeyOwnership("key1", mo375cache(1), mo375cache(0));
        int currentTopologyId = currentTopologyId(mo375cache(0));
        ControlledRpcManager replaceRpcManager = ControlledRpcManager.replaceRpcManager(mo375cache(0), new Class[0]);
        NodeController nodeControllerIn = setNodeControllerIn(mo375cache(0));
        setInitialPhaseForNodeA(nodeControllerIn, currentTopologyId);
        NodeController nodeControllerIn2 = setNodeControllerIn(mo375cache(1));
        setInitialPhaseForNodeB(nodeControllerIn2, currentTopologyId);
        NewNode addNode = addNode(currentTopologyId);
        BlockingLocalTopologyManager.confirmTopologyUpdate(CacheTopology.Phase.READ_OLD_WRITE_ALL, nodeControllerIn.topologyManager, nodeControllerIn2.topologyManager, addNode.controller.topologyManager);
        nodeControllerIn2.topologyManager.confirmTopologyUpdate(CacheTopology.Phase.READ_ALL_WRITE_ALL);
        Future<Object> executeTransaction = executeTransaction(mo375cache(0), "key1");
        ControlledRpcManager.BlockedResponseMap expectAllResponses = replaceRpcManager.expectCommand(VersionedPrepareCommand.class).send().expectAllResponses();
        AssertJUnit.assertEquals(0L, addNode.commandLatch.getCount());
        nodeControllerIn.topologyManager.confirmTopologyUpdate(CacheTopology.Phase.READ_ALL_WRITE_ALL);
        addNode.controller.topologyManager.confirmTopologyUpdate(CacheTopology.Phase.READ_ALL_WRITE_ALL);
        BlockingLocalTopologyManager.confirmTopologyUpdate(CacheTopology.Phase.READ_NEW_WRITE_ALL, nodeControllerIn.topologyManager, nodeControllerIn2.topologyManager, addNode.controller.topologyManager);
        BlockingLocalTopologyManager.confirmTopologyUpdate(CacheTopology.Phase.NO_REBALANCE, nodeControllerIn.topologyManager, nodeControllerIn2.topologyManager, addNode.controller.topologyManager);
        awaitForTopology(currentTopologyId + 4, mo375cache(0));
        expectAllResponses.receive();
        replaceRpcManager.expectCommand(PrepareCommand.class).send().receiveAll();
        replaceRpcManager.expectCommand(CommitCommand.class).send().receiveAll();
        replaceRpcManager.expectCommand(TxCompletionNotificationCommand.class).send();
        AssertJUnit.assertNull("Wrong put() return value.", executeTransaction.get());
        nodeControllerIn.topologyManager.stopBlocking();
        nodeControllerIn2.topologyManager.stopBlocking();
        addNode.controller.topologyManager.stopBlocking();
        addNode.joinerFuture.get(30L, TimeUnit.SECONDS);
        awaitForTopology(currentTopologyId + 4, mo375cache(0));
        awaitForTopology(currentTopologyId + 4, mo375cache(1));
        awaitForTopology(currentTopologyId + 4, mo375cache(2));
        assertKeyVersionInDataContainer("key1", mo375cache(1), mo375cache(2));
        replaceRpcManager.stopBlocking();
        mo375cache(0).put("key1", "v2");
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        createClusteredCaches(2, WriteSkewDuringStateTransferSCI.INSTANCE, configuration());
    }

    private void assertKeyVersionInDataContainer(Object obj, Cache<?, ?>... cacheArr) {
        for (Cache<?, ?> cache : cacheArr) {
            InternalCacheEntry peek = ((DataContainer) TestingUtil.extractComponent(cache, InternalDataContainer.class)).peek(obj);
            AssertJUnit.assertNotNull("Entry cannot be null in " + String.valueOf(address(cache)) + ".", peek);
            AssertJUnit.assertNotNull("Version cannot be null.", WriteSkewHelper.versionFromEntry(peek));
        }
    }

    private void awaitForTopology(int i, Cache<?, ?> cache) {
        eventually(() -> {
            return i == currentTopologyId(cache);
        });
    }

    private int currentTopologyId(Cache<?, ?> cache) {
        return cache.getAdvancedCache().getDistributionManager().getCacheTopology().getTopologyId();
    }

    private Future<Object> executeTransaction(Cache<Object, Object> cache, Object obj) {
        return fork(() -> {
            return TestingUtil.withTx(cache.getAdvancedCache().getTransactionManager(), () -> {
                return cache.put(obj, "value");
            });
        });
    }

    private NewNode addNode(final int i) {
        final NewNode newNode = new NewNode();
        ConfigurationBuilder configuration = configuration();
        newNode.controller = new NodeController();
        newNode.controller.interceptor = new ControlledCommandInterceptor();
        GlobalConfigurationBuilder defaultClusteredBuilder = GlobalConfigurationBuilder.defaultClusteredBuilder();
        defaultClusteredBuilder.serialization().addContextInitializer(WriteSkewDuringStateTransferSCI.INSTANCE);
        String str = TestCacheManagerFactory.DEFAULT_CACHE_NAME;
        TestCacheManagerFactory.addInterceptor(defaultClusteredBuilder, (Predicate<String>) (v1) -> {
            return r1.equals(v1);
        }, (AsyncInterceptor) newNode.controller.interceptor, TestCacheManagerFactory.InterceptorPosition.FIRST, (Class<? extends AsyncInterceptor>) null);
        EmbeddedCacheManager createClusteredCacheManager = TestCacheManagerFactory.createClusteredCacheManager(false, defaultClusteredBuilder, configuration, new TransportFlags());
        registerCacheManager(createClusteredCacheManager);
        newNode.controller.topologyManager = replaceTopologyManager(createClusteredCacheManager);
        newNode.controller.interceptor.addAction(new Action() { // from class: org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.1
            @Override // org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.Action
            public boolean isApplicable(InvocationContext invocationContext, VisitableCommand visitableCommand) {
                return !invocationContext.isOriginLocal() && (visitableCommand instanceof PrepareCommand);
            }

            @Override // org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.Action
            public void before(InvocationContext invocationContext, VisitableCommand visitableCommand, Cache<?, ?> cache) {
                WriteSkewDuringStateTransferTest.log.tracef("Before: command=%s. origin=%s", visitableCommand, invocationContext.getOrigin());
                if (invocationContext.getOrigin().equals(WriteSkewDuringStateTransferTest.this.address((Cache<?, ?>) WriteSkewDuringStateTransferTest.this.mo375cache(1)))) {
                    try {
                        ComponentRegistry.of(cache).getStateTransferLock().waitForTopology(i + 2, 10L, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } catch (TimeoutException e2) {
                        throw Log.CLUSTER.failedWaitingForTopology(i + 2);
                    }
                }
            }

            @Override // org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.Action
            public void after(InvocationContext invocationContext, VisitableCommand visitableCommand, Cache<?, ?> cache) {
                WriteSkewDuringStateTransferTest.log.tracef("After: command=%s. origin=%s", visitableCommand, invocationContext.getOrigin());
                if (invocationContext.getOrigin().equals(WriteSkewDuringStateTransferTest.this.address(0))) {
                    newNode.commandLatch.countDown();
                }
            }
        });
        newNode.joinerFuture = fork(() -> {
            createClusteredCacheManager.start();
            return null;
        });
        return newNode;
    }

    private ConfigurationBuilder configuration() {
        ConfigurationBuilder defaultClusteredCacheConfig = getDefaultClusteredCacheConfig(CacheMode.REPL_SYNC, true);
        defaultClusteredCacheConfig.clustering().stateTransfer().fetchInMemoryState(true).hash().numSegments(1).numOwners(3).consistentHashFactory(new ConsistentHashFactoryImpl());
        defaultClusteredCacheConfig.locking().isolationLevel(IsolationLevel.REPEATABLE_READ);
        return defaultClusteredCacheConfig;
    }

    private void assertKeyOwnership(Object obj, Cache<?, ?> cache, Cache<?, ?>... cacheArr) {
        AssertJUnit.assertTrue("Wrong ownership for " + String.valueOf(obj) + ".", DistributionTestHelper.hasOwners(obj, cache, cacheArr));
    }

    private BlockingLocalTopologyManager replaceTopologyManager(EmbeddedCacheManager embeddedCacheManager) {
        BlockingLocalTopologyManager replaceTopologyManagerDefaultCache = BlockingLocalTopologyManager.replaceTopologyManagerDefaultCache(embeddedCacheManager);
        this.topologyManagerList.add(replaceTopologyManagerDefaultCache);
        return replaceTopologyManagerDefaultCache;
    }

    private static NodeController setNodeControllerIn(Cache<Object, Object> cache) {
        NodeController nodeController = new NodeController();
        nodeController.interceptor = new ControlledCommandInterceptor(cache);
        nodeController.topologyManager = BlockingLocalTopologyManager.replaceTopologyManagerDefaultCache(cache.getCacheManager());
        return nodeController;
    }

    private static void setInitialPhaseForNodeA(NodeController nodeController, final int i) {
        nodeController.interceptor.addAction(new Action() { // from class: org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.2
            @Override // org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.Action
            public boolean isApplicable(InvocationContext invocationContext, VisitableCommand visitableCommand) {
                return invocationContext.isOriginLocal() && (visitableCommand instanceof PrepareCommand);
            }

            @Override // org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.Action
            public void before(InvocationContext invocationContext, VisitableCommand visitableCommand, Cache<?, ?> cache) {
                try {
                    ComponentRegistry.of(cache).getStateTransferLock().waitForTopology(i + 1, 10L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (TimeoutException e2) {
                    throw Log.CLUSTER.failedWaitingForTopology(i + 1);
                }
            }

            @Override // org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.Action
            public void after(InvocationContext invocationContext, VisitableCommand visitableCommand, Cache<?, ?> cache) {
            }
        });
    }

    private static void setInitialPhaseForNodeB(NodeController nodeController, final int i) {
        nodeController.interceptor.addAction(new Action() { // from class: org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.3
            @Override // org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.Action
            public boolean isApplicable(InvocationContext invocationContext, VisitableCommand visitableCommand) {
                return !invocationContext.isOriginLocal() && (visitableCommand instanceof PrepareCommand);
            }

            @Override // org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.Action
            public void before(InvocationContext invocationContext, VisitableCommand visitableCommand, Cache<?, ?> cache) {
                try {
                    ComponentRegistry.of(cache).getStateTransferLock().waitForTopology(i + 2, 10L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (TimeoutException e2) {
                    throw Log.CLUSTER.failedWaitingForTopology(i + 2);
                }
            }

            @Override // org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.Action
            public void after(InvocationContext invocationContext, VisitableCommand visitableCommand, Cache<?, ?> cache) {
            }
        });
    }
}
