package org.infinispan.statetransfer;

import io.reactivex.rxjava3.core.Flowable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.PrimitiveIterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.statetransfer.StateResponseCommand;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.container.entries.ImmortalCacheEntry;
import org.infinispan.reactive.publisher.impl.Notifications;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.LocalModeAddress;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.test.TestException;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(groups = {"functional"}, testName = "statetransfer.OutboundTransferTaskTest")
/* loaded from: input_file:org/infinispan/statetransfer/OutboundTransferTaskTest.class */
public class OutboundTransferTaskTest {
    /* JADX WARN: Type inference failed for: r0v3, types: [java.util.PrimitiveIterator$OfInt] */
    public void shouldNotifyForAllSegments() throws InterruptedException {
        IntSet from = IntSets.from((PrimitiveIterator.OfInt) IntStream.range(0, 30).iterator());
        RpcManager rpcManager = (RpcManager) Mockito.mock(RpcManager.class);
        CommandsFactory commandsFactory = (CommandsFactory) Mockito.mock(CommandsFactory.class);
        OutboundTransferTask outboundTransferTask = new OutboundTransferTask(LocalModeAddress.INSTANCE, from, 30, 30, 1, collection -> {
        }, rpcManager, commandsFactory, 10000L, "mock-cache", true, false);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Collection.class);
        Mockito.when(commandsFactory.buildStateResponseCommand(ArgumentMatchers.anyInt(), (Collection) forClass.capture(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean())).thenReturn((StateResponseCommand) Mockito.mock(StateResponseCommand.class));
        Mockito.when(rpcManager.invokeCommand((Address) ArgumentMatchers.any(Address.class), (ReplicableCommand) ArgumentMatchers.any(), (ResponseCollector) ArgumentMatchers.any(), (RpcOptions) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            return CompletableFutures.completedNull();
        });
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 30; i++) {
            arrayList.add(Notifications.value(new ImmortalCacheEntry("key", "value"), i));
            arrayList.add(Notifications.segmentComplete(i));
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        outboundTransferTask.execute(Flowable.fromIterable(arrayList)).whenComplete((r5, th) -> {
            if (th != null) {
                throw new TestException(th);
            }
            countDownLatch.countDown();
        });
        if (!countDownLatch.await(15L, TimeUnit.SECONDS)) {
            throw new TestException("Did not receive all segment notifications");
        }
        IntSet mutableEmptySet = IntSets.mutableEmptySet(30);
        Assert.assertEquals(forClass.getAllValues().size(), 2);
        for (Collection collection2 : forClass.getAllValues()) {
            Assert.assertEquals(collection2.size(), 15);
            mutableEmptySet.addAll((Collection) collection2.stream().map((v0) -> {
                return v0.getSegmentId();
            }).collect(Collectors.toList()));
        }
        Assert.assertEquals(mutableEmptySet, from);
    }
}
