package org.infinispan.reactive.publisher.impl;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.functions.BiFunction;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.infinispan.Cache;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.reactive.publisher.impl.SegmentAwarePublisherSupplier;
import org.infinispan.reactive.publisher.impl.commands.reduction.PublisherResult;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.InCacheMode;
import org.infinispan.util.concurrent.BlockingManager;
import org.testng.AssertJUnit;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@InCacheMode({CacheMode.REPL_SYNC, CacheMode.DIST_SYNC})
@Test(groups = {"functional"}, testName = "reactive.publisher.impl.SimpleLocalPublisherManagerTest")
/* loaded from: input_file:org/infinispan/reactive/publisher/impl/SimpleLocalPublisherManagerTest.class */
public class SimpleLocalPublisherManagerTest extends MultipleCacheManagersTest {
    private static final int SEGMENT_COUNT = 128;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConfigurationBuilder cacheConfiguration() {
        ConfigurationBuilder defaultClusteredCacheConfig = getDefaultClusteredCacheConfig(this.cacheMode, false);
        defaultClusteredCacheConfig.clustering().hash().numSegments(SEGMENT_COUNT);
        return defaultClusteredCacheConfig;
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        createCluster(cacheConfiguration(), 3);
        waitForClusterToForm();
    }

    private Map<Integer, String> insert(Cache<Integer, String> cache) {
        HashMap hashMap = new HashMap(100);
        IntStream.range(0, 100).forEach(i -> {
            hashMap.put(Integer.valueOf(i), "value-" + i);
        });
        cache.putAll(hashMap);
        return hashMap;
    }

    private LocalPublisherManager<Integer, String> lpm(Cache<Integer, String> cache) {
        return (LocalPublisherManager) TestingUtil.extractComponent(cache, LocalPublisherManager.class);
    }

    @DataProvider(name = "GuaranteeEntry")
    public Object[][] deliveryGuaranteeAndEntryProvider() {
        return (Object[][]) Arrays.stream(DeliveryGuarantee.values()).flatMap(deliveryGuarantee -> {
            return Stream.of((Object[]) new Boolean[]{Boolean.TRUE, Boolean.FALSE}).map(bool -> {
                return new Object[]{deliveryGuarantee, bool};
            });
        }).toArray(i -> {
            return new Object[i];
        });
    }

    @Test(dataProvider = "GuaranteeEntry")
    public void testNoIntermediateOps(DeliveryGuarantee deliveryGuarantee, boolean z) {
        SegmentAwarePublisherSupplier entryPublisher;
        Consumer consumer;
        Cache<Integer, String> cache = mo363cache(0);
        Map<Integer, String> insert = insert(cache);
        LocalPublisherManager<Integer, String> lpm = lpm(cache);
        IntSet immutableRangeSet = IntSets.immutableRangeSet(SEGMENT_COUNT);
        if (z) {
            entryPublisher = lpm.keyPublisher(immutableRangeSet, (Set) null, (Set) null, 0L, deliveryGuarantee, Function.identity());
            consumer = obj -> {
                AssertJUnit.assertTrue(insert.containsKey(obj));
            };
        } else {
            entryPublisher = lpm.entryPublisher(immutableRangeSet, (Set) null, (Set) null, 0L, deliveryGuarantee, Function.identity());
            consumer = obj2 -> {
                Map.Entry entry = (Map.Entry) obj2;
                AssertJUnit.assertEquals(insert.get(entry.getKey()), entry.getValue());
            };
        }
        int findHowManyInSegments = SimpleClusterPublisherManagerTest.findHowManyInSegments(insert.size(), ((DistributionManager) TestingUtil.extractComponent(cache, DistributionManager.class)).getCacheTopology().getLocalReadSegments(), (KeyPartitioner) TestingUtil.extractComponent(cache, KeyPartitioner.class));
        Set set = (Set) Flowable.fromPublisher(entryPublisher.publisherWithoutSegments()).collectInto(new HashSet(), (v0, v1) -> {
            v0.add(v1);
        }).blockingGet();
        AssertJUnit.assertEquals(findHowManyInSegments, set.size());
        set.forEach(consumer);
    }

    @Test(dataProvider = "GuaranteeEntry")
    public void testProperOrderingGuarantees(DeliveryGuarantee deliveryGuarantee, boolean z) {
        Cache<Integer, String> cache = mo363cache(0);
        Map<Integer, String> insert = insert(cache);
        LocalPublisherManager<Integer, String> lpm = lpm(cache);
        IntSet immutableRangeSet = IntSets.immutableRangeSet(SEGMENT_COUNT);
        SegmentAwarePublisherSupplier keyPublisher = z ? lpm.keyPublisher(immutableRangeSet, (Set) null, (Set) null, 0L, deliveryGuarantee, Function.identity()) : lpm.entryPublisher(immutableRangeSet, (Set) null, (Set) null, 0L, deliveryGuarantee, Function.identity());
        IntSet localReadSegments = ((DistributionManager) TestingUtil.extractComponent(cache, DistributionManager.class)).getCacheTopology().getLocalReadSegments();
        int findHowManyInSegments = SimpleClusterPublisherManagerTest.findHowManyInSegments(insert.size(), localReadSegments, (KeyPartitioner) TestingUtil.extractComponent(cache, KeyPartitioner.class));
        List<SegmentAwarePublisherSupplier.NotificationWithLost> list = (List) Flowable.fromPublisher(keyPublisher.publisherWithLostSegments()).collect(Collectors.toList()).blockingGet();
        AssertJUnit.assertEquals(findHowManyInSegments + SEGMENT_COUNT, list.size());
        int i = -1;
        for (SegmentAwarePublisherSupplier.NotificationWithLost notificationWithLost : list) {
            if (!notificationWithLost.isValue()) {
                if (!notificationWithLost.isSegmentComplete()) {
                    AssertJUnit.assertFalse(localReadSegments.contains(notificationWithLost.lostSegment()));
                } else if (!localReadSegments.contains(notificationWithLost.completedSegment())) {
                    AssertJUnit.assertEquals("Only at most once can say the segment is complete without having it", deliveryGuarantee, DeliveryGuarantee.AT_MOST_ONCE);
                }
                if (i != -1) {
                    AssertJUnit.assertEquals(i, notificationWithLost.completedSegment());
                    i = -1;
                }
            } else if (i == -1) {
                i = notificationWithLost.valueSegment();
            } else {
                AssertJUnit.assertEquals(i, notificationWithLost.valueSegment());
            }
        }
    }

    @DataProvider(name = "GuaranteeParallelEntry")
    public Object[][] deliveryGuaranteeParallelEntryProvider() {
        return (Object[][]) Arrays.stream(DeliveryGuarantee.values()).flatMap(deliveryGuarantee -> {
            return Stream.of((Object[]) new Boolean[]{Boolean.TRUE, Boolean.FALSE}).flatMap(bool -> {
                return Stream.of((Object[]) new Boolean[]{Boolean.TRUE, Boolean.FALSE}).map(bool -> {
                    return new Object[]{deliveryGuarantee, bool, bool};
                });
            });
        }).toArray(i -> {
            return new Object[i];
        });
    }

    @Test(dataProvider = "GuaranteeParallelEntry")
    public void testWithAsyncOperation(DeliveryGuarantee deliveryGuarantee, boolean z, boolean z2) {
        CompletionStage entryReduction;
        Consumer consumer;
        Cache<Integer, String> cache = mo363cache(0);
        Map<Integer, String> insert = insert(cache);
        BlockingManager blockingManager = (BlockingManager) TestingUtil.extractComponent(cache, BlockingManager.class);
        LocalPublisherManager<Integer, String> lpm = lpm(cache);
        IntSet immutableRangeSet = IntSets.immutableRangeSet(SEGMENT_COUNT);
        Collector set = Collectors.toSet();
        BiFunction biFunction = (set2, set3) -> {
            set2.addAll(set3);
            return set2;
        };
        io.reactivex.rxjava3.functions.Function function = obj -> {
            return Single.fromCompletionStage(blockingManager.supplyBlocking(() -> {
                return obj;
            }, "test-blocking-thread"));
        };
        if (z2) {
            entryReduction = lpm.keyReduction(z, immutableRangeSet, (Set) null, (Set) null, 0L, deliveryGuarantee, publisher -> {
                return Flowable.fromPublisher(publisher).concatMapSingle(function).collect(set).toCompletionStage();
            }, publisher2 -> {
                return Flowable.fromPublisher(publisher2).reduce(biFunction).toCompletionStage(Collections.emptySet());
            });
            consumer = obj2 -> {
                AssertJUnit.assertTrue(insert.containsKey(obj2));
            };
        } else {
            entryReduction = lpm.entryReduction(z, immutableRangeSet, (Set) null, (Set) null, 0L, deliveryGuarantee, publisher3 -> {
                return Flowable.fromPublisher(publisher3).concatMapSingle(function).collect(set).toCompletionStage();
            }, publisher4 -> {
                return Flowable.fromPublisher(publisher4).reduce(biFunction).toCompletionStage(Collections.emptySet());
            });
            consumer = obj3 -> {
                Map.Entry entry = (Map.Entry) obj3;
                AssertJUnit.assertEquals(insert.get(entry.getKey()), entry.getValue());
            };
        }
        int findHowManyInSegments = SimpleClusterPublisherManagerTest.findHowManyInSegments(insert.size(), ((DistributionManager) TestingUtil.extractComponent(cache, DistributionManager.class)).getCacheTopology().getLocalReadSegments(), (KeyPartitioner) TestingUtil.extractComponent(cache, KeyPartitioner.class));
        Set set4 = (Set) ((PublisherResult) CompletionStages.join(entryReduction)).getResult();
        AssertJUnit.assertEquals(findHowManyInSegments, set4.size());
        set4.forEach(consumer);
    }
}
