package org.infinispan.reactive.publisher.impl;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Stream;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commons.test.ExceptionRunnable;
import org.infinispan.commons.util.EnumUtil;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.MagicKey;
import org.infinispan.reactive.publisher.PublisherReducers;
import org.infinispan.reactive.publisher.impl.commands.reduction.ReductionPublisherRequestCommand;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.test.Mocks;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestDataSCI;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.util.ControlledConsistentHashFactory;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.AssertJUnit;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "reactive.publisher.impl.RehashClusterPublisherManagerTest")
/* loaded from: input_file:org/infinispan/reactive/publisher/impl/RehashClusterPublisherManagerTest.class */
public class RehashClusterPublisherManagerTest extends MultipleCacheManagersTest {
    private static final int[][] START_SEGMENT_OWNERS = {new int[]{0, 1}, new int[]{1, 2}, new int[]{2, 3}, new int[]{3, 0}};
    protected ControlledConsistentHashFactory factory;

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        this.factory = new ControlledConsistentHashFactory.Default(START_SEGMENT_OWNERS);
        configurationBuilder.clustering().cacheMode(CacheMode.DIST_SYNC).hash().consistentHashFactory(this.factory).numSegments(4);
        createClusteredCaches(4, TestDataSCI.INSTANCE, configurationBuilder);
    }

    @BeforeMethod
    protected void beforeMethod() throws Exception {
        this.factory.setOwnerIndexes(START_SEGMENT_OWNERS);
        this.factory.triggerRebalance(mo375cache(0));
        TestingUtil.waitForNoRebalance(caches());
    }

    @DataProvider(name = "GuaranteeParallelEntry")
    public Object[][] collectionAndVersionsProvider() {
        return (Object[][]) Arrays.stream(DeliveryGuarantee.values()).flatMap(deliveryGuarantee -> {
            return Stream.of((Object[]) new Boolean[]{Boolean.TRUE, Boolean.FALSE}).flatMap(bool -> {
                return Stream.of((Object[]) new Boolean[]{Boolean.TRUE, Boolean.FALSE}).map(bool -> {
                    return new Object[]{deliveryGuarantee, bool, bool};
                });
            });
        }).toArray(i -> {
            return new Object[i];
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v1, types: [int[], int[][]] */
    private void triggerRebalanceSegment2MovesToNode0() throws Exception {
        this.factory.setOwnerIndexes(new int[]{new int[]{0, 1}, new int[]{1, 2}, new int[]{0, 3}, new int[]{3, 0}});
        this.factory.triggerRebalance(mo375cache(0));
        TestingUtil.waitForNoRebalance(caches());
    }

    Function<Map<MagicKey, Object>, Set<MagicKey>> toKeys(boolean z) {
        return z ? map -> {
            HashSet hashSet = new HashSet();
            Stream filter = map.keySet().stream().filter(magicKey -> {
                return magicKey.getSegment() != 1;
            });
            Objects.requireNonNull(hashSet);
            filter.forEach((v1) -> {
                r1.add(v1);
            });
            hashSet.add(new MagicKey(mo375cache(3)));
            return hashSet;
        } : map2 -> {
            return null;
        };
    }

    @Test(dataProvider = "GuaranteeParallelEntry")
    public void testSegmentMovesToOriginatorWhileRetrievingPublisher(DeliveryGuarantee deliveryGuarantee, boolean z, boolean z2) throws Exception {
        Cache cache = mo375cache(2);
        CheckPoint checkPoint = new CheckPoint();
        Mocks.blockingMock(checkPoint, InternalDataContainer.class, cache, (stubber, internalDataContainer) -> {
            ((InternalDataContainer) stubber.when(internalDataContainer)).publisher(Mockito.eq(2));
        }, new Class[0]);
        Mocks.blockingMock(checkPoint, InternalDataContainer.class, cache, (stubber2, internalDataContainer2) -> {
            ((InternalDataContainer) stubber2.when(internalDataContainer2)).publisher((IntSet) Mockito.eq(IntSets.immutableSet(2)));
        }, new Class[0]);
        runCommand(deliveryGuarantee, z, z2, caches().size(), () -> {
            checkPoint.triggerForever(Mocks.BEFORE_RELEASE);
            Future<Void> fork = fork(this::triggerRebalanceSegment2MovesToNode0);
            checkPoint.awaitStrict(Mocks.AFTER_INVOCATION, 10L, TimeUnit.SECONDS);
            checkPoint.triggerForever(Mocks.AFTER_RELEASE);
            fork.get(10L, TimeUnit.SECONDS);
        });
    }

    @Test(dataProvider = "GuaranteeParallelEntry")
    public void testSegmentMovesToOriginatorJustBeforeSendingRemoteKey(DeliveryGuarantee deliveryGuarantee, boolean z, boolean z2) throws Exception {
        testSegmentMovesToOriginatorJustBeforeSendingRemote(deliveryGuarantee, z, z2, true);
    }

    @Test(dataProvider = "GuaranteeParallelEntry")
    public void testSegmentMovesToOriginatorJustBeforeSendingRemoteNoKey(DeliveryGuarantee deliveryGuarantee, boolean z, boolean z2) throws Exception {
        testSegmentMovesToOriginatorJustBeforeSendingRemote(deliveryGuarantee, z, z2, false);
    }

    private void testSegmentMovesToOriginatorJustBeforeSendingRemote(DeliveryGuarantee deliveryGuarantee, boolean z, boolean z2, boolean z3) throws Exception {
        Cache cache = mo375cache(0);
        Address address = address(2);
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.triggerForever(Mocks.AFTER_RELEASE);
        RpcManager rpcManager = (RpcManager) Mocks.blockingMock(checkPoint, RpcManager.class, cache, (stubber, rpcManager2) -> {
            ((RpcManager) stubber.when(rpcManager2)).invokeCommand((Address) ArgumentMatchers.eq(address), (ReplicableCommand) ArgumentMatchers.isA(ReductionPublisherRequestCommand.class), (ResponseCollector) Mockito.any(), (RpcOptions) Mockito.any());
        }, new Class[0]);
        int size = caches().size();
        if (deliveryGuarantee == DeliveryGuarantee.AT_MOST_ONCE) {
            size--;
        }
        if (z3) {
            size--;
        }
        try {
            runCommand(deliveryGuarantee, z, z2, size, () -> {
                checkPoint.awaitStrict(Mocks.BEFORE_INVOCATION, 10L, TimeUnit.SECONDS);
                triggerRebalanceSegment2MovesToNode0();
                checkPoint.triggerForever(Mocks.BEFORE_RELEASE);
            }, toKeys(z3));
            if (rpcManager != null) {
                TestingUtil.replaceComponent((Cache<?, ?>) cache, (Class<? extends RpcManager>) RpcManager.class, rpcManager, true);
            }
        } catch (Throwable th) {
            if (rpcManager != null) {
                TestingUtil.replaceComponent((Cache<?, ?>) cache, (Class<? extends RpcManager>) RpcManager.class, rpcManager, true);
            }
            throw th;
        }
    }

    @Test(dataProvider = "GuaranteeParallelEntry")
    public void testSegmentMovesToOriginatorJustBeforePublisherCompletes(DeliveryGuarantee deliveryGuarantee, boolean z, boolean z2) throws Exception {
        Cache cache = mo375cache(2);
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.triggerForever(Mocks.BEFORE_RELEASE);
        LocalPublisherManager localPublisherManager = (LocalPublisherManager) Mockito.spy((LocalPublisherManager) TestingUtil.extractComponent(cache, LocalPublisherManager.class));
        ((LocalPublisherManager) Mockito.doAnswer(invocationOnMock -> {
            return Mocks.blockingPublisherAware((SegmentAwarePublisherSupplier) invocationOnMock.callRealMethod(), checkPoint);
        }).when(localPublisherManager)).entryPublisher((IntSet) ArgumentMatchers.eq(IntSets.immutableSet(2)), (Set) Mockito.any(), (Set) Mockito.any(), ArgumentMatchers.eq(EnumUtil.bitSetOf(Flag.STATE_TRANSFER_PROGRESS)), (DeliveryGuarantee) Mockito.any(), (Function) Mockito.any());
        TestingUtil.replaceComponent((Cache<?, ?>) cache, (Class<? extends LocalPublisherManager>) LocalPublisherManager.class, localPublisherManager, true);
        runCommand(deliveryGuarantee, z, z2, caches().size(), () -> {
            Future<Void> fork = fork(this::triggerRebalanceSegment2MovesToNode0);
            checkPoint.awaitStrict(Mocks.AFTER_INVOCATION, 10L, TimeUnit.SECONDS);
            checkPoint.triggerForever(Mocks.AFTER_RELEASE);
            fork.get(10L, TimeUnit.SECONDS);
        });
    }

    private void runCommand(DeliveryGuarantee deliveryGuarantee, boolean z, boolean z2, int i, ExceptionRunnable exceptionRunnable) throws Exception {
        runCommand(deliveryGuarantee, z, z2, i, exceptionRunnable, map -> {
            return null;
        });
    }

    private void runCommand(DeliveryGuarantee deliveryGuarantee, boolean z, boolean z2, int i, ExceptionRunnable exceptionRunnable, Function<Map<MagicKey, Object>, Set<MagicKey>> function) throws Exception {
        HashMap hashMap = new HashMap();
        for (Cache cache : caches()) {
            MagicKey magicKey = new MagicKey(cache);
            String magicKey2 = magicKey.toString();
            cache.put(magicKey, magicKey2);
            hashMap.put(magicKey, magicKey2);
        }
        Set<MagicKey> apply = function.apply(hashMap);
        Future fork = fork(() -> {
            ClusterPublisherManager clusterPublisherManager = (ClusterPublisherManager) TestingUtil.extractComponent(mo375cache(0), ClusterPublisherManager.class);
            return z2 ? clusterPublisherManager.entryReduction(z, (IntSet) null, apply, (InvocationContext) null, 0L, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add()) : clusterPublisherManager.keyReduction(z, (IntSet) null, apply, (InvocationContext) null, 0L, deliveryGuarantee, PublisherReducers.count(), PublisherReducers.add());
        });
        exceptionRunnable.run();
        AssertJUnit.assertEquals(i, ((Long) ((CompletionStage) fork.get(10L, TimeUnit.SECONDS)).toCompletableFuture().get(10L, TimeUnit.SECONDS)).intValue());
    }
}
