package org.infinispan.statetransfer;

import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.infinispan.Cache;
import org.infinispan.commons.test.TestResourceTracker;
import org.infinispan.commons.util.Util;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.partitionhandling.PartitionHandling;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.fwk.JGroupsConfigBuilder;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.test.fwk.TransportFlags;
import org.jgroups.BytesMessage;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.blocks.RequestCorrelator;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.fork.ForkChannel;
import org.jgroups.fork.UnknownForkHandler;
import org.jgroups.protocols.FORK;
import org.jgroups.stack.Protocol;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(testName = "statetransfer.ForkChannelRestartTest", groups = {"functional"})
/* loaded from: input_file:org/infinispan/statetransfer/ForkChannelRestartTest.class */
public class ForkChannelRestartTest extends MultipleCacheManagersTest {
    private static final byte[] FORK_NOT_FOUND_BUFFER = Util.EMPTY_BYTE_ARRAY;
    private static final String CACHE_NAME = "repl";
    private static final int CLUSTER_SIZE = 3;

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
    }

    public void testRestart() throws Exception {
        TestResourceTracker.testThreadStarted(getTestName());
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        configurationBuilder.clustering().cacheMode(CacheMode.REPL_SYNC).stateTransfer().timeout(30L, TimeUnit.SECONDS);
        configurationBuilder.clustering().partitionHandling().whenSplit(PartitionHandling.DENY_READ_WRITES);
        String[] strArr = new String[4];
        JChannel[] jChannelArr = new JChannel[4];
        EmbeddedCacheManager[] embeddedCacheManagerArr = new EmbeddedCacheManager[4];
        for (int i = 0; i < 3; i++) {
            configureManager(configurationBuilder, strArr, jChannelArr, embeddedCacheManagerArr, i);
        }
        for (int i2 = 0; i2 < 3; i2++) {
            embeddedCacheManagerArr[i2].getCache("repl");
        }
        log.debugf("Cache managers created. Crashing manager %s but keeping the channel in the view", strArr[1]);
        TestingUtil.getDiscardForCache(embeddedCacheManagerArr[1]).discardAll(true);
        TestingUtil.installNewView(embeddedCacheManagerArr[1]);
        embeddedCacheManagerArr[1].stop();
        configureManager(configurationBuilder, strArr, jChannelArr, embeddedCacheManagerArr, 3);
        Future fork = fork(() -> {
            return embeddedCacheManagerArr[3].getCache("repl");
        });
        Thread.sleep(1000L);
        log.debugf("Stopping channel %s", strArr[1]);
        jChannelArr[1].close();
        ArrayList arrayList = new ArrayList(Arrays.asList(embeddedCacheManagerArr));
        arrayList.remove(1);
        TestingUtil.blockUntilViewsReceived(10000, false, (Collection<?>) arrayList);
        TestingUtil.waitForNoRebalance((Collection<? extends Cache>) arrayList.stream().map(embeddedCacheManager -> {
            return embeddedCacheManager.getCache("repl");
        }).collect(Collectors.toList()));
        log.debug("Rebalance finished successfully");
        fork.get(10L, TimeUnit.SECONDS);
    }

    private void configureManager(ConfigurationBuilder configurationBuilder, String[] strArr, JChannel[] jChannelArr, EmbeddedCacheManager[] embeddedCacheManagerArr, int i) throws Exception {
        strArr[i] = TestResourceTracker.getNextNodeName();
        jChannelArr[i] = createChannel(strArr[i]);
        embeddedCacheManagerArr[i] = createCacheManager(configurationBuilder, strArr[i], jChannelArr[i]);
        embeddedCacheManagerArr[i].defineConfiguration("repl", configurationBuilder.build());
    }

    private EmbeddedCacheManager createCacheManager(ConfigurationBuilder configurationBuilder, String str, JChannel jChannel) throws Exception {
        ForkChannel forkChannel = new ForkChannel(jChannel, "stack1", "channel1", new Protocol[0]);
        GlobalConfigurationBuilder globalConfigurationBuilder = new GlobalConfigurationBuilder();
        globalConfigurationBuilder.transport().nodeName(str);
        globalConfigurationBuilder.transport().transport(new JGroupsTransport(forkChannel));
        globalConfigurationBuilder.transport().distributedSyncTimeout(40L, TimeUnit.SECONDS);
        EmbeddedCacheManager newDefaultCacheManager = TestCacheManagerFactory.newDefaultCacheManager(true, globalConfigurationBuilder, configurationBuilder);
        registerCacheManager(newDefaultCacheManager);
        return newDefaultCacheManager;
    }

    private JChannel createChannel(final String str) throws Exception {
        JChannel jChannel = new JChannel(new ByteArrayInputStream(JGroupsConfigBuilder.getJGroupsConfig(ForkChannelRestartTest.class.getName(), new TransportFlags().withFD(true)).getBytes()));
        TestResourceTracker.addResource(new TestResourceTracker.Cleaner<JChannel>(jChannel) { // from class: org.infinispan.statetransfer.ForkChannelRestartTest.1
            public void close() {
                ((JChannel) this.ref).close();
            }
        });
        jChannel.setName(str);
        jChannel.addAddressGenerator(Address::randomUUID);
        final FORK fork = new FORK();
        fork.setUnknownForkHandler(new UnknownForkHandler(this) { // from class: org.infinispan.statetransfer.ForkChannelRestartTest.2
            final /* synthetic */ ForkChannelRestartTest this$0;

            {
                this.this$0 = this;
            }

            public Object handleUnknownForkStack(Message message, String str2) {
                return handle(message);
            }

            public Object handleUnknownForkChannel(Message message, String str2) {
                return handle(message);
            }

            private Object handle(Message message) {
                short protocolId = ClassConfigurator.getProtocolId(RequestCorrelator.class);
                RequestCorrelator.Header header = message.getHeader(protocolId);
                if (header == null) {
                    return null;
                }
                ForkChannelRestartTest.log.debugf("Sending CacheNotFoundResponse reply from %s for %s", str, header);
                Message flag = new BytesMessage(message.getSrc()).setFlag(JGroupsTransport.REPLY_FLAGS, false);
                flag.putHeader(FORK.ID, message.getHeader(FORK.ID));
                flag.putHeader(protocolId, new RequestCorrelator.Header((byte) 1, header.req_id, protocolId));
                flag.setArray(ForkChannelRestartTest.FORK_NOT_FOUND_BUFFER);
                fork.down(flag);
                return null;
            }
        });
        jChannel.getProtocolStack().addProtocol(fork);
        jChannel.connect("FORKISPN");
        log.tracef("Channel %s connected: %s", jChannel, jChannel.getViewAsString());
        return jChannel;
    }
}
