package org.infinispan.topology;

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.infinispan.Cache;
import org.infinispan.commands.topology.RebalanceStatusRequestCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.Mocks;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(testName = "topology.ClusterTopologyViewChangesTest", groups = {"functional"})
/* loaded from: input_file:org/infinispan/topology/ClusterTopologyViewChangesTest.class */
public class ClusterTopologyViewChangesTest extends MultipleCacheManagersTest {
    private final int dataSize = 100;

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        createCluster(defaultGlobalConfig(), defaultCacheConfig(), 2);
    }

    private GlobalConfigurationBuilder defaultGlobalConfig() {
        GlobalConfigurationBuilder defaultClusteredBuilder = GlobalConfigurationBuilder.defaultClusteredBuilder();
        defaultClusteredBuilder.transport().distributedSyncTimeout(30L, TimeUnit.SECONDS);
        return defaultClusteredBuilder;
    }

    private ConfigurationBuilder defaultCacheConfig() {
        ConfigurationBuilder defaultClusteredCacheConfig = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false);
        defaultClusteredCacheConfig.clustering().stateTransfer().timeout(15L, TimeUnit.SECONDS);
        return defaultClusteredCacheConfig;
    }

    private EmbeddedCacheManager addNewMember() {
        return addClusterEnabledCacheManager(defaultGlobalConfig(), defaultCacheConfig());
    }

    public void nodeLeftDuringCacheJoin() throws Exception {
        executeJoinTest(findNonCoordinatorIndex());
    }

    public void coordinatorLeftDuringCacheJoin() throws Exception {
        executeJoinTest(findCoordinatorIndex());
    }

    public void concurrentJoin() throws Exception {
        executeJoinTest(-1);
    }

    public void nodeLeftDuringCacheJoinWithRebalanceDisabled() throws Exception {
        waitForClusterToForm();
        TestingUtil.extractGlobalComponentRegistry(findCoordinator()).getClusterTopologyManager().setRebalancingEnabled(false).toCompletableFuture().get(10L, TimeUnit.SECONDS);
        executeJoinTest(findNonCoordinatorIndex());
    }

    public void coordinatorLeftDuringCacheJoinWithRebalanceDisabled() throws Exception {
        waitForClusterToForm();
        TestingUtil.extractGlobalComponentRegistry(findCoordinator()).getClusterTopologyManager().setRebalancingEnabled(false).toCompletableFuture().get(10L, TimeUnit.SECONDS);
        executeJoinTest(findCoordinatorIndex());
    }

    private void executeJoinTest(int i) throws Exception {
        populateCache();
        CheckPoint checkPoint = new CheckPoint();
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        Mocks.blockInboundGlobalCommand(findCoordinator(), checkPoint, replicableCommand -> {
            return (replicableCommand instanceof RebalanceStatusRequestCommand) && ((RebalanceStatusRequestCommand) replicableCommand).getCacheName() == null && atomicBoolean.getAndSet(false);
        });
        waitForClusterToForm();
        Future fork = fork(this::addNewMember);
        checkPoint.awaitStrict(Mocks.BEFORE_INVOCATION, 10L, TimeUnit.SECONDS);
        if (i < 0) {
            addNewMember();
        } else {
            TestingUtil.killCacheManagers(this.cacheManagers.remove(i));
        }
        checkPoint.trigger(Mocks.BEFORE_RELEASE);
        checkPoint.trigger(Mocks.AFTER_RELEASE);
        log.info("Waiting for joiner to finish");
        boolean isRebalancingEnabled = TestingUtil.extractGlobalComponentRegistry((EmbeddedCacheManager) fork.get(10L, TimeUnit.SECONDS)).getClusterTopologyManager().isRebalancingEnabled();
        Assertions.assertThat(TestingUtil.extractGlobalComponentRegistry(findCoordinator()).getClusterTopologyManager().isRebalancingEnabled()).isEqualTo(isRebalancingEnabled);
        if (isRebalancingEnabled) {
            assertCacheData();
        }
    }

    private void populateCache() {
        Cache cache = mo363cache(0);
        IntStream.range(0, 100).parallel().forEach(i -> {
            cache.put("key-" + i, "value-" + i);
        });
    }

    private void assertCacheData() {
        for (int i = 0; i < managers().length; i++) {
            Cache cache = mo363cache(i);
            int size = cache.size();
            Assertions.assertThat(size).withFailMessage(String.format("Cache %d has %d/%d entries", Integer.valueOf(i), Integer.valueOf(size), 100), new Object[0]).isEqualTo(100);
            for (int i2 = 0; i2 < 100; i2++) {
                Assertions.assertThat((String) cache.get("key-" + i2)).isEqualTo("value-" + i2);
            }
        }
    }

    private EmbeddedCacheManager findCoordinator() {
        return mo178manager(findCoordinatorIndex());
    }

    private int findCoordinatorIndex() {
        for (int i = 0; i < managers().length; i++) {
            if (mo178manager(i).isCoordinator()) {
                return i;
            }
        }
        throw new IllegalStateException("Coordinator node not found");
    }

    private int findNonCoordinatorIndex() {
        for (int i = 0; i < managers().length; i++) {
            if (!mo178manager(i).isCoordinator()) {
                return i;
            }
        }
        throw new IllegalStateException("There are only coordinators?");
    }
}
