package org.infinispan.statetransfer;

import io.reactivex.rxjava3.core.Flowable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.infinispan.Cache;
import org.infinispan.commands.CommandInvocationId;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.statetransfer.StateTransferCancelCommand;
import org.infinispan.commands.statetransfer.StateTransferGetTransactionsCommand;
import org.infinispan.commands.statetransfer.StateTransferStartCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commons.util.IntSet;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.conflict.impl.InternalConflictManager;
import org.infinispan.container.entries.ImmortalCacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextFactory;
import org.infinispan.context.impl.SingleKeyNonTxInvocationContext;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.TestAddress;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.impl.DefaultConsistentHash;
import org.infinispan.distribution.ch.impl.DefaultConsistentHashFactory;
import org.infinispan.distribution.ch.impl.HashFunctionPartitioner;
import org.infinispan.interceptors.AsyncInterceptorChain;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.metadata.Metadata;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.persistence.ActivationDuringEvictTest;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.reactive.publisher.impl.LocalPublisherManager;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.PerCacheInboundInvocationHandler;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.PersistentUUID;
import org.infinispan.topology.PersistentUUIDManager;
import org.infinispan.topology.PersistentUUIDManagerImpl;
import org.infinispan.transaction.impl.TransactionTable;
import org.infinispan.util.ByteString;
import org.infinispan.util.concurrent.CommandAckCollector;
import org.infinispan.util.concurrent.IsolationLevel;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.statetransfer.XSiteStateTransferManager;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "statetransfer.StateConsumerTest")
/* loaded from: input_file:org/infinispan/statetransfer/StateConsumerTest.class */
public class StateConsumerTest extends AbstractInfinispanTest {
    private static final Log log = LogFactory.getLog(StateConsumerTest.class);
    private static final ByteString CACHE_NAME = ByteString.fromString("test-cache");
    private ExecutorService pooledExecutorService;

    @BeforeMethod
    public void createExecutorService() {
        this.pooledExecutorService = new ThreadPoolExecutor(0, 20, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue(), getTestThreadFactory("Worker"), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    @AfterMethod
    public void shutdownExecutorService() {
        if (this.pooledExecutorService != null) {
            this.pooledExecutorService.shutdownNow();
            this.pooledExecutorService = null;
        }
    }

    private static Address[] createMembers(PersistentUUIDManager persistentUUIDManager) {
        Address[] addressArr = new Address[4];
        for (int i = 0; i < 4; i++) {
            addressArr[i] = new TestAddress(i);
            persistentUUIDManager.addPersistentAddressMapping(addressArr[i], PersistentUUID.randomUUID());
        }
        return addressArr;
    }

    private static XSiteStateTransferManager mockXSiteStateTransferManager() {
        XSiteStateTransferManager xSiteStateTransferManager = (XSiteStateTransferManager) Mockito.mock(XSiteStateTransferManager.class);
        ((XSiteStateTransferManager) Mockito.doNothing().when(xSiteStateTransferManager)).onTopologyUpdated((CacheTopology) ArgumentMatchers.any(CacheTopology.class), ArgumentMatchers.anyBoolean());
        return xSiteStateTransferManager;
    }

    private static CommandsFactory mockCommandsFactory() {
        CommandsFactory commandsFactory = (CommandsFactory) Mockito.mock(CommandsFactory.class);
        Mockito.when(commandsFactory.buildStateTransferStartCommand(ArgumentMatchers.anyInt(), (IntSet) ArgumentMatchers.any(IntSet.class))).thenAnswer(invocationOnMock -> {
            return new StateTransferStartCommand(CACHE_NAME, ((Integer) invocationOnMock.getArgument(0)).intValue(), (IntSet) invocationOnMock.getArgument(1));
        });
        Mockito.when(commandsFactory.buildStateTransferGetTransactionsCommand(ArgumentMatchers.anyInt(), (IntSet) ArgumentMatchers.any(IntSet.class))).thenAnswer(invocationOnMock2 -> {
            return new StateTransferGetTransactionsCommand(CACHE_NAME, ((Integer) invocationOnMock2.getArgument(0)).intValue(), (IntSet) invocationOnMock2.getArgument(1));
        });
        Mockito.when(commandsFactory.buildStateTransferCancelCommand(ArgumentMatchers.anyInt(), (IntSet) ArgumentMatchers.any(IntSet.class))).thenAnswer(invocationOnMock3 -> {
            return new StateTransferCancelCommand(CACHE_NAME, ((Integer) invocationOnMock3.getArgument(0)).intValue(), (IntSet) invocationOnMock3.getArgument(1));
        });
        Mockito.when(commandsFactory.buildPutKeyValueCommand(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.anyInt(), (Metadata) ArgumentMatchers.any(Metadata.class), ArgumentMatchers.anyLong())).thenAnswer(invocationOnMock4 -> {
            return new PutKeyValueCommand(invocationOnMock4.getArgument(0), invocationOnMock4.getArgument(1), false, false, (Metadata) invocationOnMock4.getArgument(3), ((Integer) invocationOnMock4.getArgument(2)).intValue(), ((Long) invocationOnMock4.getArgument(4)).longValue(), CommandInvocationId.DUMMY_INVOCATION_ID);
        });
        return commandsFactory;
    }

    private static Configuration createConfiguration() {
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        configurationBuilder.invocationBatching().enable().clustering().cacheMode(CacheMode.DIST_SYNC).clustering().stateTransfer().timeout(30000L).locking().lockAcquisitionTimeout(TestingUtil.shortTimeoutMillis()).locking().isolationLevel(IsolationLevel.REPEATABLE_READ);
        return configurationBuilder.build();
    }

    private static Cache<?, ?> mockCache() {
        Cache<?, ?> cache = (Cache) Mockito.mock(Cache.class);
        Mockito.when(cache.getName()).thenReturn(CACHE_NAME.toString());
        Mockito.when(cache.getStatus()).thenReturn(ComponentStatus.RUNNING);
        return cache;
    }

    private static RpcManager mockRpcManager(Map<Address, Set<Integer>> map, Set<Integer> set, Address address) {
        Transport transport = (Transport) Mockito.mock(Transport.class);
        Mockito.when(Integer.valueOf(transport.getViewId())).thenReturn(1);
        RpcManager rpcManager = (RpcManager) Mockito.mock(RpcManager.class);
        Answer answer = invocationOnMock -> {
            return CompletableFuture.completedFuture(SuccessfulResponse.SUCCESSFUL_EMPTY_RESPONSE);
        };
        Mockito.when(rpcManager.invokeCommand((Address) ArgumentMatchers.any(Address.class), (ReplicableCommand) ArgumentMatchers.any(StateTransferGetTransactionsCommand.class), (ResponseCollector) ArgumentMatchers.any(ResponseCollector.class), (RpcOptions) ArgumentMatchers.any(RpcOptions.class))).thenAnswer(invocationOnMock2 -> {
            Address address2 = (Address) invocationOnMock2.getArgument(0);
            IntSet segments = ((StateTransferGetTransactionsCommand) invocationOnMock2.getArgument(1)).getSegments();
            map.put(address2, segments);
            set.addAll(segments);
            return CompletableFuture.completedFuture(SuccessfulResponse.create(new ArrayList()));
        });
        Mockito.when(rpcManager.invokeCommand((Address) ArgumentMatchers.any(Address.class), (ReplicableCommand) ArgumentMatchers.any(StateTransferStartCommand.class), (ResponseCollector) ArgumentMatchers.any(ResponseCollector.class), (RpcOptions) ArgumentMatchers.any(RpcOptions.class))).thenAnswer(answer);
        Mockito.when(rpcManager.invokeCommand((Address) ArgumentMatchers.any(Address.class), (ReplicableCommand) ArgumentMatchers.any(StateTransferCancelCommand.class), (ResponseCollector) ArgumentMatchers.any(ResponseCollector.class), (RpcOptions) ArgumentMatchers.any(RpcOptions.class))).thenAnswer(answer);
        Mockito.when(rpcManager.getSyncRpcOptions()).thenReturn(new RpcOptions(DeliverOrder.NONE, 10000L, TimeUnit.MILLISECONDS));
        Mockito.when(rpcManager.blocking((CompletionStage) ArgumentMatchers.any())).thenAnswer(invocationOnMock3 -> {
            return ((CompletionStage) invocationOnMock3.getArgument(0)).toCompletableFuture().join();
        });
        Mockito.when(rpcManager.getAddress()).thenReturn(address);
        Mockito.when(rpcManager.getTransport()).thenReturn(transport);
        return rpcManager;
    }

    private static PersistenceManager mockPersistenceManager() {
        PersistenceManager persistenceManager = (PersistenceManager) Mockito.mock(PersistenceManager.class);
        Mockito.when(persistenceManager.removeSegments((IntSet) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(false));
        Mockito.when(persistenceManager.addSegments((IntSet) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(false));
        Mockito.when(persistenceManager.publishKeys((Predicate) ArgumentMatchers.any(), (Predicate) ArgumentMatchers.any())).thenReturn(Flowable.empty());
        return persistenceManager;
    }

    private static TransactionTable mockTransactionTable() {
        TransactionTable transactionTable = (TransactionTable) Mockito.mock(TransactionTable.class);
        Mockito.when(transactionTable.getLocalTransactions()).thenReturn(Collections.emptyList());
        Mockito.when(transactionTable.getRemoteTransactions()).thenReturn(Collections.emptyList());
        return transactionTable;
    }

    private static InvocationContextFactory mockInvocationContextFactory() {
        InvocationContextFactory invocationContextFactory = (InvocationContextFactory) Mockito.mock(InvocationContextFactory.class);
        Mockito.when(invocationContextFactory.createSingleKeyNonTxInvocationContext()).thenAnswer(invocationOnMock -> {
            return new SingleKeyNonTxInvocationContext((Address) null);
        });
        return invocationContextFactory;
    }

    private static void noRebalance(StateConsumer stateConsumer, PersistentUUIDManager persistentUUIDManager, int i, int i2, ConsistentHash consistentHash) {
        stateConsumer.onTopologyUpdate(new CacheTopology(i, i2, consistentHash, (ConsistentHash) null, CacheTopology.Phase.NO_REBALANCE, consistentHash.getMembers(), persistentUUIDManager.mapAddresses(consistentHash.getMembers())), false);
    }

    private static void rebalanceStart(StateConsumer stateConsumer, PersistentUUIDManager persistentUUIDManager, int i, int i2, ConsistentHash consistentHash, ConsistentHash consistentHash2, ConsistentHash consistentHash3) {
        stateConsumer.onTopologyUpdate(new CacheTopology(i, i2, consistentHash, consistentHash2, consistentHash3, CacheTopology.Phase.READ_OLD_WRITE_ALL, consistentHash3.getMembers(), persistentUUIDManager.mapAddresses(consistentHash3.getMembers())), true);
    }

    private static void assertRebalanceStart(StateConsumerImpl stateConsumerImpl, ConsistentHash consistentHash, ConsistentHash consistentHash2, Address address, Set<Integer> set) {
        Set segmentsForOwner = consistentHash.getSegmentsForOwner(address);
        Set segmentsForOwner2 = consistentHash2.getSegmentsForOwner(address);
        segmentsForOwner2.removeAll(segmentsForOwner);
        log.debugf("Rebalancing. Added segments=%s, old segments=%s", segmentsForOwner2, segmentsForOwner);
        AssertJUnit.assertTrue(stateConsumerImpl.hasActiveTransfers());
        Assert.assertEquals(set, segmentsForOwner2);
        Assert.assertEquals(stateConsumerImpl.inflightRequestCount(), segmentsForOwner2.size());
    }

    private static void completeAndCheckRebalance(StateConsumerImpl stateConsumerImpl, Map<Address, Set<Integer>> map, int i) throws ExecutionException, InterruptedException, TimeoutException {
        long sum = map.values().stream().mapToLong((v0) -> {
            return v0.size();
        }).sum();
        Assert.assertEquals(stateConsumerImpl.inflightRequestCount(), sum);
        for (Map.Entry<Address, Set<Integer>> entry : map.entrySet()) {
            Iterator<Integer> it = entry.getValue().iterator();
            while (it.hasNext()) {
                stateConsumerImpl.applyState(entry.getKey(), i, Collections.singletonList(new StateChunk(it.next().intValue(), Collections.emptyList(), true))).toCompletableFuture().get(10L, TimeUnit.SECONDS);
                sum--;
                Assert.assertEquals(stateConsumerImpl.inflightRequestCount(), sum);
            }
        }
        Assert.assertEquals(stateConsumerImpl.inflightRequestCount(), 0L);
        eventually(() -> {
            return !stateConsumerImpl.hasActiveTransfers();
        });
    }

    private static void applyState(StateConsumer stateConsumer, Map<Address, Set<Integer>> map, Collection<InternalCacheEntry<?, ?>> collection) {
        Map.Entry<Address, Set<Integer>> next = map.entrySet().iterator().next();
        stateConsumer.applyState(next.getKey(), 22, Collections.singletonList(new StateChunk(next.getValue().iterator().next().intValue(), collection, true)));
    }

    private void injectComponents(StateConsumer stateConsumer, AsyncInterceptorChain asyncInterceptorChain, RpcManager rpcManager) {
        TestingUtil.inject(stateConsumer, mockCache(), TestingUtil.named("org.infinispan.executors.non-blocking", this.pooledExecutorService), asyncInterceptorChain, mockInvocationContextFactory(), createConfiguration(), rpcManager, mockCommandsFactory(), mockPersistenceManager(), Mockito.mock(InternalDataContainer.class), mockTransactionTable(), Mockito.mock(StateTransferLock.class), Mockito.mock(CacheNotifier.class), new CommitManager(), new CommandAckCollector(), new HashFunctionPartitioner(), Mockito.mock(InternalConflictManager.class), Mockito.mock(DistributionManager.class), Mockito.mock(LocalPublisherManager.class), Mockito.mock(PerCacheInboundInvocationHandler.class), mockXSiteStateTransferManager());
    }

    public void testClusterRecoverDuringStateTransfer() throws Exception {
        PersistentUUIDManagerImpl persistentUUIDManagerImpl = new PersistentUUIDManagerImpl();
        Address[] createMembers = createMembers(persistentUUIDManagerImpl);
        List asList = Arrays.asList(createMembers[0], createMembers[1], createMembers[2], createMembers[3]);
        List asList2 = Arrays.asList(createMembers[0], createMembers[1], createMembers[2]);
        DefaultConsistentHashFactory defaultConsistentHashFactory = new DefaultConsistentHashFactory();
        DefaultConsistentHash create = defaultConsistentHashFactory.create(2, 40, asList, (Map) null);
        DefaultConsistentHash updateMembers = defaultConsistentHashFactory.updateMembers(create, asList2, (Map) null);
        DefaultConsistentHash rebalance = defaultConsistentHashFactory.rebalance(updateMembers);
        DefaultConsistentHash union = defaultConsistentHashFactory.union(updateMembers, rebalance);
        log.debug(create);
        log.debug(updateMembers);
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ConcurrentSkipListSet concurrentSkipListSet = new ConcurrentSkipListSet();
        StateConsumerImpl stateConsumerImpl = new StateConsumerImpl();
        injectComponents(stateConsumerImpl, (AsyncInterceptorChain) Mockito.mock(AsyncInterceptorChain.class), mockRpcManager(concurrentHashMap, concurrentSkipListSet, createMembers[0]));
        stateConsumerImpl.start();
        AssertJUnit.assertFalse(stateConsumerImpl.hasActiveTransfers());
        noRebalance(stateConsumerImpl, persistentUUIDManagerImpl, 1, 1, updateMembers);
        AssertJUnit.assertFalse(stateConsumerImpl.hasActiveTransfers());
        rebalanceStart(stateConsumerImpl, persistentUUIDManagerImpl, 2, 2, updateMembers, rebalance, union);
        assertRebalanceStart(stateConsumerImpl, updateMembers, rebalance, createMembers[0], concurrentSkipListSet);
        Future fork = fork(() -> {
            noRebalance(stateConsumerImpl, persistentUUIDManagerImpl, 3, 2, updateMembers);
            return null;
        });
        noRebalance(stateConsumerImpl, persistentUUIDManagerImpl, 3, 2, updateMembers);
        fork.get();
        AssertJUnit.assertFalse(stateConsumerImpl.hasActiveTransfers());
        concurrentHashMap.clear();
        concurrentSkipListSet.clear();
        rebalanceStart(stateConsumerImpl, persistentUUIDManagerImpl, 4, 4, updateMembers, rebalance, union);
        assertRebalanceStart(stateConsumerImpl, updateMembers, rebalance, createMembers[0], concurrentSkipListSet);
        completeAndCheckRebalance(stateConsumerImpl, concurrentHashMap, 4);
        stateConsumerImpl.stop();
    }

    public void testJoinDuringStateTransfer() throws Exception {
        PersistentUUIDManagerImpl persistentUUIDManagerImpl = new PersistentUUIDManagerImpl();
        Address[] createMembers = createMembers(persistentUUIDManagerImpl);
        List asList = Arrays.asList(createMembers[0], createMembers[1], createMembers[2]);
        List asList2 = Arrays.asList(createMembers[1], createMembers[2]);
        DefaultConsistentHashFactory defaultConsistentHashFactory = new DefaultConsistentHashFactory();
        DefaultConsistentHash create = defaultConsistentHashFactory.create(2, 40, asList, (Map) null);
        DefaultConsistentHash updateMembers = defaultConsistentHashFactory.updateMembers(create, asList2, (Map) null);
        DefaultConsistentHash rebalance = defaultConsistentHashFactory.rebalance(updateMembers);
        DefaultConsistentHash union = defaultConsistentHashFactory.union(updateMembers, rebalance);
        log.debug(create);
        log.debug(updateMembers);
        log.debug(union);
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ConcurrentSkipListSet concurrentSkipListSet = new ConcurrentSkipListSet();
        CompletableFuture completableFuture = new CompletableFuture();
        AsyncInterceptorChain asyncInterceptorChain = (AsyncInterceptorChain) Mockito.mock(AsyncInterceptorChain.class);
        Mockito.when(asyncInterceptorChain.invokeAsync((InvocationContext) ArgumentMatchers.any(), (VisitableCommand) ArgumentMatchers.any())).thenReturn(completableFuture);
        StateConsumerImpl stateConsumerImpl = new StateConsumerImpl();
        injectComponents(stateConsumerImpl, asyncInterceptorChain, mockRpcManager(concurrentHashMap, concurrentSkipListSet, createMembers[1]));
        stateConsumerImpl.start();
        noRebalance(stateConsumerImpl, persistentUUIDManagerImpl, 21, 7, create);
        AssertJUnit.assertFalse(stateConsumerImpl.hasActiveTransfers());
        rebalanceStart(stateConsumerImpl, persistentUUIDManagerImpl, 22, 8, updateMembers, rebalance, union);
        assertRebalanceStart(stateConsumerImpl, updateMembers, rebalance, createMembers[1], concurrentSkipListSet);
        applyState(stateConsumerImpl, concurrentHashMap, Collections.singletonList(new ImmortalCacheEntry(ActivationDuringEvictTest.KEY, ActivationDuringEvictTest.VALUE)));
        noRebalance(stateConsumerImpl, persistentUUIDManagerImpl, 23, 9, updateMembers);
        concurrentHashMap.clear();
        concurrentSkipListSet.clear();
        rebalanceStart(stateConsumerImpl, persistentUUIDManagerImpl, 24, 10, updateMembers, rebalance, union);
        assertRebalanceStart(stateConsumerImpl, updateMembers, rebalance, createMembers[1], concurrentSkipListSet);
        completableFuture.complete(null);
        completeAndCheckRebalance(stateConsumerImpl, concurrentHashMap, 24);
        stateConsumerImpl.stop();
    }
}
