package org.infinispan.statetransfer;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.Cache;
import org.infinispan.commons.util.IntSet;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.distribution.BaseDistFunctionalTest;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.AbstractCacheTest;
import org.infinispan.test.TestException;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.topology.CacheTopology;
import org.infinispan.transaction.LockingMode;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.util.ControlledConsistentHashFactory;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(groups = {"functional"}, testName = "statetransfer.StateTransferSegmentMetricsTest")
/* loaded from: input_file:org/infinispan/statetransfer/StateTransferSegmentMetricsTest.class */
public class StateTransferSegmentMetricsTest extends BaseDistFunctionalTest<String, String> {
    private final int MAX_NUM_SEGMENTS = 6;
    private final int[][] owners1And2 = {new int[]{0, 1}, new int[]{0, 1}, new int[]{0, 1}, new int[]{0, 1}, new int[]{0, 1}, new int[]{0, 1}};
    private final int[][] owners1And3 = {new int[]{0, 2}, new int[]{0, 2}, new int[]{0, 2}, new int[]{0, 2}, new int[]{0, 2}, new int[]{0, 2}};
    private final ControlledConsistentHashFactory factory = new ControlledConsistentHashFactory.Default(this.owners1And2);

    /* JADX WARN: Type inference failed for: r1v2, types: [int[], int[][]] */
    /* JADX WARN: Type inference failed for: r1v4, types: [int[], int[][]] */
    public StateTransferSegmentMetricsTest() {
        this.transactional = true;
        this.INIT_CLUSTER_SIZE = 3;
        this.numOwners = 2;
        this.performRehashing = true;
        this.cleanup = AbstractCacheTest.CleanupPhase.AFTER_METHOD;
    }

    @AfterMethod
    public void resetFactory() {
        this.factory.setOwnerIndexes(this.owners1And2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.distribution.BaseDistFunctionalTest
    public ConfigurationBuilder buildConfiguration() {
        ConfigurationBuilder buildConfiguration = super.buildConfiguration();
        buildConfiguration.clustering().hash().consistentHashFactory(this.factory).numSegments(6).transaction().transactionMode(TransactionMode.TRANSACTIONAL).lockingMode(LockingMode.OPTIMISTIC);
        return buildConfiguration;
    }

    @Test
    public void testSegmentCounterDuringStateTransfer() throws Exception {
        StateTransferManager stateTransferManager = (StateTransferManager) TestingUtil.extractComponent(this.c3, StateTransferManager.class);
        CheckPoint checkPoint = new CheckPoint();
        waitTransactionRequest(this.c1, checkPoint);
        waitRequestingSegments(this.c3, checkPoint);
        waitApplyingSegmentBatch(this.c3, checkPoint);
        this.factory.setOwnerIndexes(this.owners1And3);
        addClusterEnabledCacheManager().defineConfiguration(this.cacheName, this.configuration.build());
        Future fork = fork(() -> {
            waitForClusterToForm(this.cacheName);
            log.debug("4th has joined");
            return null;
        });
        checkPoint.awaitStrict("topology_update_notify_invoked_" + String.valueOf(this.c3), 10L, TimeUnit.SECONDS);
        checkPoint.awaitStrict("transactions_requested_invoked_" + String.valueOf(this.c1), 10L, TimeUnit.SECONDS);
        Assert.assertEquals(stateTransferManager.getInflightTransactionalSegmentCount(), 6L);
        Assert.assertTrue(stateTransferManager.isStateTransferInProgress());
        checkPoint.triggerForever("transactions_requested_released_" + String.valueOf(this.c1));
        checkPoint.awaitStrict("topology_update_notify_executed_" + String.valueOf(this.c3), 10L, TimeUnit.SECONDS);
        Assert.assertEquals(stateTransferManager.getInflightTransactionalSegmentCount(), 0L);
        Assert.assertEquals(stateTransferManager.getInflightSegmentTransferCount(), 6L);
        checkPoint.triggerForever("topology_update_notify_released_" + String.valueOf(this.c3));
        checkPoint.awaitStrict("state_installed_invoked_" + String.valueOf(this.c3), 10L, TimeUnit.SECONDS);
        Assert.assertEquals(stateTransferManager.getInflightSegmentTransferCount(), 6L);
        checkPoint.triggerForever("state_installed_invoked_release_" + String.valueOf(this.c3));
        checkPoint.awaitStrict("state_applied_" + String.valueOf(this.c3), 10L, TimeUnit.SECONDS);
        Assert.assertEquals(stateTransferManager.getInflightSegmentTransferCount(), 0L);
        fork.cancel(true);
    }

    private void waitTransactionRequest(Cache<?, ?> cache, CheckPoint checkPoint) {
        Answer delegatesTo = AdditionalAnswers.delegatesTo((StateProvider) TestingUtil.extractComponent(cache, StateProvider.class));
        StateProvider stateProvider = (StateProvider) Mockito.mock(StateProvider.class, Mockito.withSettings().defaultAnswer(delegatesTo));
        ((StateProvider) Mockito.doAnswer(invocationOnMock -> {
            checkPoint.trigger("transactions_requested_invoked_" + String.valueOf(cache));
            Object answer = delegatesTo.answer(invocationOnMock);
            try {
                checkPoint.awaitStrict("transactions_requested_released_" + String.valueOf(cache), 10L, TimeUnit.SECONDS);
                return answer;
            } catch (InterruptedException | TimeoutException e) {
                throw new TestException(e);
            }
        }).when(stateProvider)).getTransactionsForSegments((Address) ArgumentMatchers.any(Address.class), ArgumentMatchers.anyInt(), (IntSet) ArgumentMatchers.any(IntSet.class));
        TestingUtil.replaceComponent(cache, (Class<? extends StateProvider>) StateProvider.class, stateProvider, true);
    }

    private void waitRequestingSegments(Cache<?, ?> cache, CheckPoint checkPoint) {
        Answer delegatesTo = AdditionalAnswers.delegatesTo((StateConsumer) TestingUtil.extractComponent(cache, StateConsumer.class));
        StateConsumer stateConsumer = (StateConsumer) Mockito.mock(StateConsumer.class, Mockito.withSettings().defaultAnswer(delegatesTo));
        ((StateConsumer) Mockito.doAnswer(invocationOnMock -> {
            checkPoint.trigger("topology_update_notify_invoked_" + String.valueOf(cache));
            return ((CompletionStage) delegatesTo.answer(invocationOnMock)).thenRun(() -> {
                checkPoint.trigger("topology_update_notify_executed_" + String.valueOf(cache));
                try {
                    checkPoint.awaitStrict("topology_update_notify_released_" + String.valueOf(cache), 10L, TimeUnit.SECONDS);
                } catch (InterruptedException | TimeoutException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }).when(stateConsumer)).onTopologyUpdate((CacheTopology) ArgumentMatchers.any(CacheTopology.class), ArgumentMatchers.anyBoolean());
        TestingUtil.replaceComponent(cache, (Class<? extends StateConsumer>) StateConsumer.class, stateConsumer, true);
    }

    private void waitApplyingSegmentBatch(Cache<?, ?> cache, CheckPoint checkPoint) {
        Answer delegatesTo = AdditionalAnswers.delegatesTo((StateConsumer) TestingUtil.extractComponent(cache, StateConsumer.class));
        StateConsumer stateConsumer = (StateConsumer) Mockito.mock(StateConsumer.class, Mockito.withSettings().defaultAnswer(delegatesTo));
        ((StateConsumer) Mockito.doAnswer(invocationOnMock -> {
            checkPoint.trigger("state_installed_invoked_" + String.valueOf(cache));
            checkPoint.awaitStrict("state_installed_invoked_release_" + String.valueOf(cache), 10L, TimeUnit.SECONDS);
            return ((CompletionStage) delegatesTo.answer(invocationOnMock)).thenRun(() -> {
                checkPoint.trigger("state_applied_" + String.valueOf(cache));
            });
        }).when(stateConsumer)).applyState((Address) ArgumentMatchers.any(Address.class), ArgumentMatchers.anyInt(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyCollection());
        TestingUtil.replaceComponent(cache, (Class<? extends StateConsumer>) StateConsumer.class, stateConsumer, true);
    }
}
