package org.infinispan.distribution.rehash;

import jakarta.transaction.TransactionManager;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.remote.recovery.TxCompletionNotificationCommand;
import org.infinispan.commands.statetransfer.StateResponseCommand;
import org.infinispan.commands.statetransfer.StateTransferGetTransactionsCommand;
import org.infinispan.commands.statetransfer.StateTransferStartCommand;
import org.infinispan.commands.triangle.BackupWriteCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.write.BackupAckCommand;
import org.infinispan.distribution.BaseDistFunctionalTest;
import org.infinispan.distribution.BlockingInterceptor;
import org.infinispan.distribution.MagicKey;
import org.infinispan.globalstate.NoOpGlobalConfigurationManager;
import org.infinispan.interceptors.impl.EntryWrappingInterceptor;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateConsumer;
import org.infinispan.statetransfer.StateTransferInterceptor;
import org.infinispan.test.AbstractCacheTest;
import org.infinispan.test.Mocks;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.op.TestWriteOperation;
import org.infinispan.topology.ClusterTopologyManager;
import org.infinispan.util.ControlledRpcManager;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"functional"})
/* loaded from: input_file:org/infinispan/distribution/rehash/BaseTxStateTransferOverwriteTest.class */
public abstract class BaseTxStateTransferOverwriteTest extends BaseDistFunctionalTest<Object, Object> {
    public BaseTxStateTransferOverwriteTest() {
        this.INIT_CLUSTER_SIZE = 3;
        this.numOwners = 2;
        this.transactional = true;
        this.performRehashing = true;
        this.cleanup = AbstractCacheTest.CleanupPhase.AFTER_METHOD;
    }

    protected boolean l1Enabled() {
        return cache(0, this.cacheName).getCacheConfiguration().clustering().l1().enabled();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.test.MultipleCacheManagersTest
    public void amendCacheManagerBeforeStart(EmbeddedCacheManager embeddedCacheManager) {
        NoOpGlobalConfigurationManager.amendCacheManager(embeddedCacheManager);
    }

    protected Class<? extends VisitableCommand> getVisitableCommand(TestWriteOperation testWriteOperation) {
        return PrepareCommand.class;
    }

    protected Callable<?> runWithTx(TransactionManager transactionManager, Callable<?> callable) {
        return () -> {
            return TestingUtil.withTx(transactionManager, callable);
        };
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitWithPut() throws Exception {
        doStateTransferInBetweenPrepareCommit(TestWriteOperation.PUT_OVERWRITE, true);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitMultipleEntryWithPut() throws Exception {
        doStateTransferInBetweenPrepareCommit(TestWriteOperation.PUT_OVERWRITE, false);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitWithPutCreate() throws Exception {
        doStateTransferInBetweenPrepareCommit(TestWriteOperation.PUT_CREATE, true);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitMultipleEntryWithPutCreate() throws Exception {
        doStateTransferInBetweenPrepareCommit(TestWriteOperation.PUT_CREATE, false);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitWithPutIfAbsent() throws Exception {
        doStateTransferInBetweenPrepareCommit(TestWriteOperation.PUT_IF_ABSENT, true);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitMultipleEntryWithPutIfAbsent() throws Exception {
        doStateTransferInBetweenPrepareCommit(TestWriteOperation.PUT_IF_ABSENT, false);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitWithRemoveExact() throws Exception {
        doStateTransferInBetweenPrepareCommit(TestWriteOperation.REMOVE_EXACT, true);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitMultipleEntryWithRemoveExact() throws Exception {
        doStateTransferInBetweenPrepareCommit(TestWriteOperation.REMOVE_EXACT, false);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitWithRemove() throws Exception {
        doStateTransferInBetweenPrepareCommit(TestWriteOperation.REMOVE, true);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitMultipleEntryWithRemove() throws Exception {
        doStateTransferInBetweenPrepareCommit(TestWriteOperation.REMOVE, false);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitWithReplace() throws Exception {
        doStateTransferInBetweenPrepareCommit(TestWriteOperation.REPLACE, true);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitMultipleEntryWithReplace() throws Exception {
        doStateTransferInBetweenPrepareCommit(TestWriteOperation.REPLACE, false);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitWithReplaceExact() throws Exception {
        doStateTransferInBetweenPrepareCommit(TestWriteOperation.REPLACE_EXACT, true);
    }

    @Test
    public void testStateTransferInBetweenPrepareCommitMultipleEntryWithReplaceExact() throws Exception {
        doStateTransferInBetweenPrepareCommit(TestWriteOperation.REPLACE_EXACT, false);
    }

    @Test
    public void testNonCoordinatorOwnerLeavingDuringPut() throws Exception {
        doTestWhereCommitOccursAfterStateTransferBeginsBeforeCompletion(TestWriteOperation.PUT_CREATE);
    }

    @Test
    public void testNonCoordinatorOwnerLeavingDuringPutIfAbsent() throws Exception {
        doTestWhereCommitOccursAfterStateTransferBeginsBeforeCompletion(TestWriteOperation.PUT_IF_ABSENT);
    }

    @Test
    public void testNonCoordinatorOwnerLeavingDuringPut2() throws Exception {
        doL1InvalidationOldTopologyComesAfterRebalance(TestWriteOperation.PUT_CREATE);
    }

    @Test
    public void testNonCoordinatorOwnerLeavingDuringPutOverwrite2() throws Exception {
        doL1InvalidationOldTopologyComesAfterRebalance(TestWriteOperation.PUT_OVERWRITE);
    }

    @Test
    public void testNonCoordinatorOwnerLeavingDuringPutIfAbsent2() throws Exception {
        doL1InvalidationOldTopologyComesAfterRebalance(TestWriteOperation.PUT_IF_ABSENT);
    }

    @Test
    public void testNonCoordinatorOwnerLeavingDuringReplace2() throws Exception {
        doL1InvalidationOldTopologyComesAfterRebalance(TestWriteOperation.REPLACE);
    }

    @Test
    public void testNonCoordinatorOwnerLeavingDuringReplaceWithPreviousValue2() throws Exception {
        doL1InvalidationOldTopologyComesAfterRebalance(TestWriteOperation.REPLACE_EXACT);
    }

    @Test
    public void testNonCoordinatorOwnerLeavingDuringRemove2() throws Exception {
        doL1InvalidationOldTopologyComesAfterRebalance(TestWriteOperation.REMOVE);
    }

    @Test
    public void testNonCoordinatorOwnerLeavingDuringRemoveWithPreviousValue2() throws Exception {
        doL1InvalidationOldTopologyComesAfterRebalance(TestWriteOperation.REMOVE_EXACT);
    }

    protected void doStateTransferInBetweenPrepareCommit(TestWriteOperation testWriteOperation, boolean z) throws Exception {
        Cache<?, ?> advancedCache = advancedCache(0, this.cacheName);
        AdvancedCache advancedCache2 = advancedCache(1, this.cacheName);
        Cache<?, ?> advancedCache3 = advancedCache(2, this.cacheName);
        MagicKey magicKey = new MagicKey(String.valueOf(testWriteOperation) + "-key", cache(0, this.cacheName), cache(1, this.cacheName));
        Object previousValue = testWriteOperation.getPreviousValue();
        if (previousValue != null) {
            advancedCache.put(magicKey, previousValue);
            AssertJUnit.assertEquals(previousValue, advancedCache.get(magicKey));
            log.tracef("Previous value inserted: %s = %s", magicKey, previousValue);
            AssertJUnit.assertEquals(previousValue, advancedCache3.get(magicKey));
            if (l1Enabled()) {
                assertIsInL1(advancedCache3, magicKey);
            }
        }
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        try {
            Future fork = fork(runWithTx(advancedCache.getTransactionManager(), () -> {
                if (z) {
                    MagicKey magicKey2 = new MagicKey("placeholder", (Cache<?, ?>) advancedCache3);
                    advancedCache.put(magicKey2, "somevalue");
                    log.tracef("Adding additional value on nonOwner value inserted: %s = %s", magicKey2, "somevalue");
                }
                TestingUtil.extractInterceptorChain(advancedCache).addInterceptorBefore(new BlockingInterceptor(cyclicBarrier, (Class) getVisitableCommand(testWriteOperation), true, false), StateTransferInterceptor.class);
                return testWriteOperation.perform(advancedCache, magicKey);
            }));
            cyclicBarrier.await(10L, TimeUnit.SECONDS);
            removeAllBlockingInterceptorsFromCache(advancedCache);
            CheckPoint checkPoint = new CheckPoint();
            log.trace("Adding proxy to state transfer");
            waitUntilStateBeingTransferred(advancedCache3, checkPoint);
            advancedCache2.getCacheManager().stop();
            checkPoint.awaitStrict("pre_state_apply_invoked_for_" + String.valueOf(advancedCache3), 10L, TimeUnit.SECONDS);
            cyclicBarrier.await(10L, TimeUnit.SECONDS);
            AssertJUnit.assertEquals(testWriteOperation.getReturnValue(), fork.get(10L, TimeUnit.SECONDS));
            checkPoint.trigger("pre_state_apply_release_for_" + String.valueOf(advancedCache3));
            TestingUtil.waitForNoRebalance(advancedCache, advancedCache3);
            switch (testWriteOperation) {
                case REMOVE:
                case REMOVE_EXACT:
                    break;
                default:
                    assertIsInContainerImmortal(advancedCache, magicKey);
                    assertIsInContainerImmortal(advancedCache3, magicKey);
                    break;
            }
            AssertJUnit.assertEquals(testWriteOperation.getValue(), advancedCache.get(magicKey));
            AssertJUnit.assertEquals(testWriteOperation.getValue(), advancedCache3.get(magicKey));
            removeAllBlockingInterceptorsFromCache(advancedCache);
        } catch (Throwable th) {
            removeAllBlockingInterceptorsFromCache(advancedCache);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doTestWhereCommitOccursAfterStateTransferBeginsBeforeCompletion(TestWriteOperation testWriteOperation) throws Exception {
        if (l1Enabled() && testWriteOperation.getPreviousValue() != null) {
            AssertJUnit.fail("This test cannot be ran with L1 when a previous value is set");
        }
        Cache<?, ?> advancedCache = cache(0, this.cacheName).getAdvancedCache();
        Cache advancedCache2 = cache(1, this.cacheName).getAdvancedCache();
        Cache<?, ?> advancedCache3 = cache(2, this.cacheName).getAdvancedCache();
        MagicKey magicKey = new MagicKey(advancedCache, (Cache<?, ?>[]) new Cache[]{advancedCache2});
        Object previousValue = testWriteOperation.getPreviousValue();
        if (previousValue != null) {
            advancedCache.put(magicKey, previousValue);
            AssertJUnit.assertEquals(previousValue, advancedCache.get(magicKey));
            log.tracef("Previous value inserted: %s = %s", magicKey, previousValue);
            AssertJUnit.assertEquals(previousValue, advancedCache3.get(magicKey));
            if (l1Enabled()) {
                assertIsInL1(advancedCache3, magicKey);
            }
        }
        int topologyId = advancedCache.getDistributionManager().getCacheTopology().getTopologyId();
        CheckPoint checkPoint = new CheckPoint();
        ControlledRpcManager replaceRpcManager = ControlledRpcManager.replaceRpcManager(advancedCache);
        ControlledRpcManager replaceRpcManager2 = ControlledRpcManager.replaceRpcManager(advancedCache3);
        replaceRpcManager.excludeCommands(BackupWriteCommand.class, PrepareCommand.class, CommitCommand.class, TxCompletionNotificationCommand.class);
        replaceRpcManager2.excludeCommands(BackupAckCommand.class);
        int i = topologyId + 2;
        blockRebalanceConfirmation(advancedCache.getCacheManager(), checkPoint, i);
        AssertJUnit.assertEquals(advancedCache.getCacheManager().getCoordinator(), advancedCache.getCacheManager().getAddress());
        log.trace("Stopping the cache");
        advancedCache2.getCacheManager().stop();
        eventuallyEquals(2, () -> {
            return Integer.valueOf(advancedCache.getRpcManager().getMembers().size());
        });
        eventuallyEquals(2, () -> {
            return Integer.valueOf(advancedCache3.getRpcManager().getMembers().size());
        });
        AssertJUnit.assertEquals(advancedCache.getCacheManager().getCoordinator(), advancedCache.getCacheManager().getAddress());
        if (this.transactional.booleanValue()) {
            replaceRpcManager.expectCommand(StateTransferGetTransactionsCommand.class).send().receiveAll();
            replaceRpcManager2.expectCommand(StateTransferGetTransactionsCommand.class).send().receiveAll();
        }
        ControlledRpcManager.BlockedRequest expectCommand = replaceRpcManager.expectCommand(StateTransferStartCommand.class);
        replaceRpcManager2.expectCommand(StateTransferStartCommand.class).send().receiveAllAsync();
        ControlledRpcManager.BlockedRequest expectCommand2 = replaceRpcManager.expectCommand(StateResponseCommand.class);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        TestingUtil.extractInterceptorChain(advancedCache3).addInterceptorAfter(new BlockingInterceptor(cyclicBarrier, (Class) testWriteOperation.getCommandClass(), true, false), EntryWrappingInterceptor.class);
        Future fork = fork(() -> {
            return testWriteOperation.perform(advancedCache, magicKey);
        });
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        removeAllBlockingInterceptorsFromCache(advancedCache3);
        expectCommand2.send().receiveAll();
        expectCommand.send().receiveAllAsync();
        replaceRpcManager2.expectCommand(StateResponseCommand.class).send().receiveAll();
        checkPoint.awaitStrict("pre_rebalance_confirmation_" + i + "_from_" + String.valueOf(advancedCache.getCacheManager().getAddress()), 10L, TimeUnit.SECONDS);
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals(testWriteOperation.getReturnValue(), fork.get(10L, TimeUnit.SECONDS));
        log.tracef("%s operation is done", testWriteOperation);
        checkPoint.trigger("resume_rebalance_confirmation_" + i + "_from_" + String.valueOf(advancedCache.getCacheManager().getAddress()));
        checkPoint.trigger("resume_rebalance_confirmation_" + i + "_from_" + String.valueOf(advancedCache3.getCacheManager().getAddress()));
        TestingUtil.waitForNoRebalance(advancedCache, advancedCache3);
        switch (testWriteOperation) {
            case REMOVE:
            case REMOVE_EXACT:
                break;
            default:
                assertIsInContainerImmortal(advancedCache, magicKey);
                assertIsInContainerImmortal(advancedCache3, magicKey);
                break;
        }
        AssertJUnit.assertEquals(testWriteOperation.getValue(), advancedCache.get(magicKey));
        AssertJUnit.assertEquals(testWriteOperation.getValue(), advancedCache3.get(magicKey));
    }

    private void doL1InvalidationOldTopologyComesAfterRebalance(TestWriteOperation testWriteOperation) throws Exception {
        Cache<?, ?> advancedCache = advancedCache(0, this.cacheName);
        AdvancedCache advancedCache2 = advancedCache(1, this.cacheName);
        Cache<?, ?> advancedCache3 = advancedCache(2, this.cacheName);
        MagicKey magicKey = new MagicKey(String.valueOf(testWriteOperation) + "-key", cache(0, this.cacheName), cache(1, this.cacheName));
        Object previousValue = testWriteOperation.getPreviousValue();
        if (previousValue != null) {
            advancedCache.put(magicKey, previousValue);
            AssertJUnit.assertEquals(previousValue, advancedCache.get(magicKey));
            log.tracef("Previous value inserted: %s = %s", magicKey, previousValue);
            AssertJUnit.assertEquals(previousValue, advancedCache3.get(magicKey));
            if (l1Enabled()) {
                assertIsInL1(advancedCache3, magicKey);
            }
        }
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        TestingUtil.extractInterceptorChain(advancedCache).addInterceptorAfter(new BlockingInterceptor(cyclicBarrier, (Class) getVisitableCommand(testWriteOperation), false, false), StateTransferInterceptor.class);
        Future fork = fork(() -> {
            try {
                Object perform = testWriteOperation.perform(advancedCache, magicKey);
                log.tracef("%s operation is done", testWriteOperation);
                return perform;
            } catch (Throwable th) {
                log.tracef("%s operation is done", testWriteOperation);
                throw th;
            }
        });
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        removeAllBlockingInterceptorsFromCache(advancedCache);
        log.tracef("Stopping the cache", new Object[0]);
        advancedCache2.getCacheManager().stop();
        eventually(() -> {
            return advancedCache.getRpcManager().getMembers().size() == 2 && advancedCache3.getRpcManager().getMembers().size() == 2;
        });
        TestingUtil.waitForNoRebalance(advancedCache, advancedCache3);
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals(testWriteOperation.getReturnValue(), fork.get(10L, TimeUnit.SECONDS));
        log.tracef("%s operation is done", testWriteOperation);
        switch (testWriteOperation) {
            case REMOVE:
            case REMOVE_EXACT:
                break;
            default:
                assertIsInContainerImmortal(advancedCache, magicKey);
                assertIsInContainerImmortal(advancedCache3, magicKey);
                break;
        }
        AssertJUnit.assertEquals(testWriteOperation.getValue(), advancedCache.get(magicKey));
        AssertJUnit.assertEquals(testWriteOperation.getValue(), advancedCache3.get(magicKey));
    }

    private void blockRebalanceConfirmation(EmbeddedCacheManager embeddedCacheManager, CheckPoint checkPoint, int i) throws Exception {
        Answer delegatesTo = AdditionalAnswers.delegatesTo((ClusterTopologyManager) TestingUtil.extractGlobalComponent(embeddedCacheManager, ClusterTopologyManager.class));
        ClusterTopologyManager clusterTopologyManager = (ClusterTopologyManager) Mockito.mock(ClusterTopologyManager.class, Mockito.withSettings().defaultAnswer(delegatesTo));
        ((ClusterTopologyManager) Mockito.doAnswer(invocationOnMock -> {
            Object[] arguments = invocationOnMock.getArguments();
            Address address = (Address) arguments[1];
            int intValue = ((Integer) arguments[2]).intValue();
            if (intValue != i) {
                return delegatesTo.answer(invocationOnMock);
            }
            checkPoint.trigger("pre_rebalance_confirmation_" + intValue + "_from_" + String.valueOf(address));
            return checkPoint.future("resume_rebalance_confirmation_" + intValue + "_from_" + String.valueOf(address), 20L, TimeUnit.SECONDS, testExecutor()).thenCompose(r5 -> {
                return (CompletionStage) Mocks.callAnotherAnswer(delegatesTo, invocationOnMock);
            });
        }).when(clusterTopologyManager)).handleRebalancePhaseConfirm(ArgumentMatchers.anyString(), (Address) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), (Throwable) ArgumentMatchers.isNull(), ArgumentMatchers.anyInt());
        TestingUtil.replaceComponent((CacheContainer) embeddedCacheManager, (Class<ClusterTopologyManager>) ClusterTopologyManager.class, clusterTopologyManager, true);
    }

    protected void waitUntilStateBeingTransferred(Cache<?, ?> cache, CheckPoint checkPoint) {
        Answer delegatesTo = AdditionalAnswers.delegatesTo((StateConsumer) TestingUtil.extractComponent(cache, StateConsumer.class));
        StateConsumer stateConsumer = (StateConsumer) Mockito.mock(StateConsumer.class, Mockito.withSettings().defaultAnswer(delegatesTo));
        ((StateConsumer) Mockito.doAnswer(invocationOnMock -> {
            checkPoint.trigger("pre_state_apply_invoked_for_" + String.valueOf(cache));
            checkPoint.awaitStrict("pre_state_apply_release_for_" + String.valueOf(cache), 20L, TimeUnit.SECONDS);
            return delegatesTo.answer(invocationOnMock);
        }).when(stateConsumer)).applyState((Address) ArgumentMatchers.any(Address.class), ArgumentMatchers.anyInt(), ArgumentMatchers.anyCollection());
        TestingUtil.replaceComponent(cache, (Class<? extends StateConsumer>) StateConsumer.class, stateConsumer, true);
    }
}
