package org.infinispan.statetransfer;

import jakarta.transaction.TransactionManager;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commons.executors.BlockingThreadPoolExecutorFactory;
import org.infinispan.commons.util.IntSet;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.container.DataContainer;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestException;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.transaction.LockingMode;
import org.infinispan.transaction.impl.TransactionTable;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(testName = "lock.ManyTxsDuringStateTransferTest", groups = {"functional"})
/* loaded from: input_file:org/infinispan/statetransfer/ManyTxsDuringStateTransferTest.class */
public class ManyTxsDuringStateTransferTest extends MultipleCacheManagersTest {
    public static final String CACHE_NAME = "testCache";
    private static final int NUM_TXS = 20;

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        addClusterEnabledCacheManager(getGlobalConfigurationBuilder(), configurationBuilder);
        addClusterEnabledCacheManager(getGlobalConfigurationBuilder(), configurationBuilder);
        waitForClusterToForm();
    }

    private GlobalConfigurationBuilder getGlobalConfigurationBuilder() {
        GlobalConfigurationBuilder defaultClusteredBuilder = GlobalConfigurationBuilder.defaultClusteredBuilder();
        defaultClusteredBuilder.transport().remoteCommandThreadPool().threadPoolFactory(new BlockingThreadPoolExecutorFactory(1, 1, 0, 5L));
        return defaultClusteredBuilder;
    }

    public void testManyTxs() throws Throwable {
        ConfigurationBuilder defaultCacheConfiguration = TestCacheManagerFactory.getDefaultCacheConfiguration(true);
        defaultCacheConfiguration.clustering().cacheMode(CacheMode.DIST_SYNC).stateTransfer().awaitInitialTransfer(false).transaction().lockingMode(LockingMode.OPTIMISTIC);
        mo175manager(0).defineConfiguration("testCache", defaultCacheConfiguration.build());
        mo175manager(1).defineConfiguration("testCache", defaultCacheConfiguration.build());
        CheckPoint checkPoint = new CheckPoint();
        AdvancedCache advancedCache = advancedCache(0, "testCache");
        TransactionManager transactionManager = advancedCache.getTransactionManager();
        StateProvider stateProvider = (StateProvider) Mockito.spy((StateProvider) TestingUtil.extractComponent(advancedCache, StateProvider.class));
        ((StateProvider) Mockito.doAnswer(invocationOnMock -> {
            Object[] arguments = invocationOnMock.getArguments();
            Address address = (Address) arguments[0];
            int intValue = ((Integer) arguments[1]).intValue();
            return ((CompletionStage) invocationOnMock.callRealMethod()).thenApply(obj -> {
                try {
                    checkPoint.trigger("post_get_transactions_" + intValue + "_from_" + String.valueOf(address));
                    checkPoint.awaitStrict("resume_get_transactions_" + intValue + "_from_" + String.valueOf(address), 10L, TimeUnit.SECONDS);
                    return obj;
                } catch (InterruptedException | TimeoutException e) {
                    throw new TestException(e);
                }
            });
        }).when(stateProvider)).getTransactionsForSegments((Address) ArgumentMatchers.any(Address.class), ArgumentMatchers.anyInt(), (IntSet) ArgumentMatchers.any());
        TestingUtil.replaceComponent((Cache<?, ?>) advancedCache, (Class<? extends StateProvider>) StateProvider.class, stateProvider, true);
        int topologyId = advancedCache.getDistributionManager().getCacheTopology().getTopologyId() + 1;
        AdvancedCache advancedCache2 = advancedCache(1, "testCache");
        checkPoint.awaitStrict("post_get_transactions_" + topologyId + "_from_" + String.valueOf(address(1)), 10L, TimeUnit.SECONDS);
        Future[] futureArr = new Future[20];
        for (int i = 0; i < 20; i++) {
            int i2 = i;
            futureArr[i] = fork(() -> {
                transactionManager.begin();
                advancedCache.put("testkey" + i2, "v" + i2);
                transactionManager.commit();
                return null;
            });
        }
        Thread.sleep(1000L);
        StateConsumer stateConsumer = (StateConsumer) TestingUtil.extractComponent(advancedCache2, StateConsumer.class);
        AssertJUnit.assertTrue(stateConsumer.isStateTransferInProgress());
        AssertJUnit.assertTrue(stateConsumer.inflightTransactionSegmentCount() > 0);
        checkPoint.trigger("resume_get_transactions_" + topologyId + "_from_" + String.valueOf(address(1)));
        TestingUtil.waitForNoRebalance(caches("testCache"));
        AssertJUnit.assertFalse(stateConsumer.isStateTransferInProgress());
        AssertJUnit.assertEquals(stateConsumer.inflightTransactionSegmentCount(), 0L);
        DataContainer dataContainer = (DataContainer) TestingUtil.extractComponent(advancedCache, InternalDataContainer.class);
        DataContainer dataContainer2 = (DataContainer) TestingUtil.extractComponent(advancedCache2, InternalDataContainer.class);
        for (int i3 = 0; i3 < 20; i3++) {
            futureArr[i3].get(10L, TimeUnit.SECONDS);
            AssertJUnit.assertEquals("v" + i3, dataContainer.get("testkey" + i3).getValue());
            AssertJUnit.assertEquals("v" + i3, dataContainer2.get("testkey" + i3).getValue());
        }
        TransactionTable transactionTable = (TransactionTable) TestingUtil.extractComponent(advancedCache, TransactionTable.class);
        TransactionTable transactionTable2 = (TransactionTable) TestingUtil.extractComponent(advancedCache2, TransactionTable.class);
        Objects.requireNonNull(transactionTable);
        eventuallyEquals(0, transactionTable::getLocalTxCount);
        Objects.requireNonNull(transactionTable2);
        eventuallyEquals(0, transactionTable2::getRemoteTxCount);
    }
}
