package org.infinispan.distribution.rehash;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.statetransfer.StateResponseCommand;
import org.infinispan.commands.triangle.BackupWriteCommand;
import org.infinispan.commands.write.BackupAckCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.ClearCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.globalstate.NoOpGlobalConfigurationManager;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.AbstractCacheTest;
import org.infinispan.test.Mocks;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CacheEntryDelegator;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.ClusteringDependentLogicDelegator;
import org.infinispan.test.op.TestWriteOperation;
import org.infinispan.topology.ClusterTopologyManager;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.util.ControlledRpcManager;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "distribution.rehash.NonTxStateTransferOverwritingValue2Test")
/* loaded from: input_file:org/infinispan/distribution/rehash/NonTxStateTransferOverwritingValue2Test.class */
public class NonTxStateTransferOverwritingValue2Test extends MultipleCacheManagersTest {
    public NonTxStateTransferOverwritingValue2Test() {
        this.cleanup = AbstractCacheTest.CleanupPhase.AFTER_METHOD;
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        addClusterEnabledCacheManager(getConfigurationBuilder());
        waitForClusterToForm();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.test.MultipleCacheManagersTest
    public void amendCacheManagerBeforeStart(EmbeddedCacheManager embeddedCacheManager) {
        NoOpGlobalConfigurationManager.amendCacheManager(embeddedCacheManager);
    }

    protected ConfigurationBuilder getConfigurationBuilder() {
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        configurationBuilder.clustering().cacheMode(CacheMode.DIST_SYNC);
        configurationBuilder.transaction().transactionMode(TransactionMode.NON_TRANSACTIONAL);
        return configurationBuilder;
    }

    public void testBackupOwnerJoiningDuringPutOverwrite() throws Exception {
        doTest(TestWriteOperation.PUT_OVERWRITE);
    }

    public void testBackupOwnerJoiningDuringReplace() throws Exception {
        doTest(TestWriteOperation.REPLACE);
    }

    public void testBackupOwnerJoiningDuringReplaceWithPreviousValue() throws Exception {
        doTest(TestWriteOperation.REPLACE_EXACT);
    }

    public void testBackupOwnerJoiningDuringRemove() throws Exception {
        doTest(TestWriteOperation.REMOVE);
    }

    public void testBackupOwnerJoiningDuringRemoveWithPreviousValue() throws Exception {
        doTest(TestWriteOperation.REMOVE_EXACT);
    }

    private void doTest(TestWriteOperation testWriteOperation) throws Exception {
        Cache advancedCache = advancedCache(0);
        Object previousValue = testWriteOperation.getPreviousValue();
        if (previousValue != null) {
            advancedCache.put("key", previousValue);
            AssertJUnit.assertEquals(previousValue, advancedCache.get("key"));
            log.tracef("Previous value inserted: %s = %s", "key", previousValue);
        }
        int topologyId = advancedCache.getDistributionManager().getCacheTopology().getTopologyId();
        CheckPoint checkPoint = new CheckPoint();
        ControlledRpcManager replaceRpcManager = ControlledRpcManager.replaceRpcManager(advancedCache, new Class[0]);
        replaceRpcManager.excludeCommands(BackupWriteCommand.class, BackupAckCommand.class);
        blockRebalanceConfirmation(mo176manager(0), checkPoint, topologyId + 1);
        log.tracef("Starting the cache on the joiner", new Object[0]);
        ConfigurationBuilder configurationBuilder = getConfigurationBuilder();
        configurationBuilder.clustering().stateTransfer().awaitInitialTransfer(false);
        addClusterEnabledCacheManager(configurationBuilder);
        Cache advancedCache2 = advancedCache(1);
        eventually(() -> {
            return advancedCache.getRpcManager().getMembers().size() == 2 && advancedCache2.getRpcManager().getMembers().size() == 2;
        });
        blockEntryCommit(checkPoint, advancedCache2);
        ControlledRpcManager.SentRequest send = replaceRpcManager.expectCommand(StateResponseCommand.class).send();
        checkPoint.awaitStrict("pre_commit_entry_key_from_null", 5L, TimeUnit.SECONDS);
        Future fork = fork(() -> {
            return testWriteOperation.perform(advancedCache, "key");
        });
        AssertJUnit.assertTrue(checkPoint.peek(1L, TimeUnit.SECONDS, "pre_commit_entry_key_from_" + String.valueOf(address(0))) == null);
        checkPoint.trigger("resume_commit_entry_key_from_null");
        checkPoint.awaitStrict("pre_commit_entry_key_from_" + String.valueOf(address(0)), 5L, TimeUnit.SECONDS);
        checkPoint.trigger("resume_commit_entry_key_from_" + String.valueOf(address(0)));
        checkPoint.awaitStrict("post_commit_entry_key_from_null", 10L, TimeUnit.SECONDS);
        checkPoint.awaitStrict("post_commit_entry_key_from_" + String.valueOf(address(0)), 10L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals(testWriteOperation.getReturnValue(), fork.get(10L, TimeUnit.SECONDS));
        log.tracef("%s operation is done", testWriteOperation);
        send.receiveAll();
        int i = topologyId + 1;
        checkPoint.trigger("resume_rebalance_confirmation_" + i + "_from_" + String.valueOf(address(0)));
        checkPoint.trigger("resume_rebalance_confirmation_" + i + "_from_" + String.valueOf(address(1)));
        TestingUtil.waitForNoRebalance(advancedCache, advancedCache2);
        AssertJUnit.assertEquals(testWriteOperation.getValue(), advancedCache.get("key"));
        AssertJUnit.assertEquals(testWriteOperation.getValue(), advancedCache2.get("key"));
        replaceRpcManager.stopBlocking();
    }

    private void blockEntryCommit(final CheckPoint checkPoint, AdvancedCache<Object, Object> advancedCache) {
        TestingUtil.replaceComponent((Cache<?, ?>) advancedCache, (Class<? extends ClusteringDependentLogicDelegator>) ClusteringDependentLogic.class, new ClusteringDependentLogicDelegator((ClusteringDependentLogic) TestingUtil.extractComponent(advancedCache, ClusteringDependentLogic.class)) { // from class: org.infinispan.distribution.rehash.NonTxStateTransferOverwritingValue2Test.1
            @Override // org.infinispan.test.fwk.ClusteringDependentLogicDelegator
            public CompletionStage<Void> commitEntry(CacheEntry cacheEntry, FlagAffectedCommand flagAffectedCommand, InvocationContext invocationContext, Flag flag, boolean z) {
                if (cacheEntry instanceof ClearCacheEntry) {
                    return super.commitEntry(cacheEntry, flagAffectedCommand, invocationContext, flag, z);
                }
                final Address origin = invocationContext.getOrigin();
                return super.commitEntry(new CacheEntryDelegator(cacheEntry) { // from class: org.infinispan.distribution.rehash.NonTxStateTransferOverwritingValue2Test.1.1
                    @Override // org.infinispan.test.fwk.CacheEntryDelegator
                    public void commit(DataContainer dataContainer) {
                        checkPoint.trigger("pre_commit_entry_" + String.valueOf(getKey()) + "_from_" + String.valueOf(origin));
                        try {
                            checkPoint.awaitStrict("resume_commit_entry_" + String.valueOf(getKey()) + "_from_" + String.valueOf(origin), 10L, TimeUnit.SECONDS);
                            super.commit(dataContainer);
                            checkPoint.trigger("post_commit_entry_" + String.valueOf(getKey()) + "_from_" + String.valueOf(origin));
                        } catch (InterruptedException | TimeoutException e) {
                            throw new RuntimeException(e);
                        }
                    }
                }, flagAffectedCommand, invocationContext, flag, z);
            }
        }, true);
    }

    private void blockRebalanceConfirmation(EmbeddedCacheManager embeddedCacheManager, CheckPoint checkPoint, int i) throws Exception {
        Answer delegatesTo = AdditionalAnswers.delegatesTo((ClusterTopologyManager) TestingUtil.extractGlobalComponent(embeddedCacheManager, ClusterTopologyManager.class));
        ClusterTopologyManager clusterTopologyManager = (ClusterTopologyManager) Mockito.mock(ClusterTopologyManager.class, Mockito.withSettings().defaultAnswer(delegatesTo));
        ((ClusterTopologyManager) Mockito.doAnswer(invocationOnMock -> {
            Object[] arguments = invocationOnMock.getArguments();
            Address address = (Address) arguments[1];
            int intValue = ((Integer) arguments[2]).intValue();
            if (i != intValue) {
                return delegatesTo.answer(invocationOnMock);
            }
            checkPoint.trigger("pre_rebalance_confirmation_" + intValue + "_from_" + String.valueOf(address));
            return checkPoint.future("resume_rebalance_confirmation_" + intValue + "_from_" + String.valueOf(address), 10L, TimeUnit.SECONDS, testExecutor()).thenCompose(r5 -> {
                return (CompletionStage) Mocks.callAnotherAnswer(delegatesTo, invocationOnMock);
            });
        }).when(clusterTopologyManager)).handleRebalancePhaseConfirm(ArgumentMatchers.anyString(), (Address) ArgumentMatchers.any(Address.class), ArgumentMatchers.anyInt(), (Throwable) ArgumentMatchers.isNull(), ArgumentMatchers.anyInt());
        TestingUtil.replaceComponent((CacheContainer) embeddedCacheManager, (Class<ClusterTopologyManager>) ClusterTopologyManager.class, clusterTopologyManager, true);
    }
}
