package org.infinispan.xsite.statetransfer.failures;

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.infinispan.distribution.DistributionTestHelper;
import org.infinispan.manager.CacheContainer;
import org.infinispan.remoting.transport.AbstractDelegatingTransport;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.XSiteResponse;
import org.infinispan.remoting.transport.impl.XSiteResponseImpl;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.util.BlockingLocalTopologyManager;
import org.infinispan.xsite.XSiteBackup;
import org.infinispan.xsite.commands.remote.XSiteRequest;
import org.infinispan.xsite.statetransfer.XSiteProviderDelegator;
import org.infinispan.xsite.statetransfer.XSiteStateProvider;
import org.infinispan.xsite.statetransfer.XSiteStatePushCommand;
import org.infinispan.xsite.statetransfer.failures.AbstractTopologyChangeTest;
import org.testng.annotations.Test;

@Test(groups = {"xsite", "unstable"}, testName = "xsite.statetransfer.failures.SiteProviderTopologyChangeTest")
/* loaded from: input_file:org/infinispan/xsite/statetransfer/failures/SiteProviderTopologyChangeTest.class */
public class SiteProviderTopologyChangeTest extends AbstractTopologyChangeTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/xsite/statetransfer/failures/SiteProviderTopologyChangeTest$StateTransferRequest.class */
    public static class StateTransferRequest {
        private final String siteName;
        private final Address requestor;
        private final int minTopologyId;
        private final XSiteStateProvider provider;

        private StateTransferRequest(String str, Address address, int i, XSiteStateProvider xSiteStateProvider) {
            this.siteName = str;
            this.requestor = address;
            this.minTopologyId = i;
            this.provider = xSiteStateProvider;
        }

        public void execute() {
            this.provider.startStateTransfer(this.siteName, this.requestor, this.minTopologyId);
        }
    }

    public void testJoinAfterXSiteST() throws Exception {
        doTopologyChangeAfterXSiteStateTransfer(AbstractTopologyChangeTest.TopologyEvent.JOIN);
    }

    public void testLeaveAfterXSiteST() throws Exception {
        doTopologyChangeAfterXSiteStateTransfer(AbstractTopologyChangeTest.TopologyEvent.LEAVE);
    }

    public void testCoordinatorLeaveAfterXSiteST() throws Exception {
        doTopologyChangeAfterXSiteStateTransfer(AbstractTopologyChangeTest.TopologyEvent.COORDINATOR_LEAVE);
    }

    public void testSiteMasterLeaveAfterXSiteST() throws Exception {
        doTopologyChangeAfterXSiteStateTransfer(AbstractTopologyChangeTest.TopologyEvent.SITE_MASTER_LEAVE);
    }

    public void testJoinDuringXSiteST() throws Exception {
        doTopologyChangeDuringXSiteStateTransfer(AbstractTopologyChangeTest.TopologyEvent.JOIN);
    }

    public void testLeaveDuringXSiteST() throws Exception {
        doTopologyChangeDuringXSiteStateTransfer(AbstractTopologyChangeTest.TopologyEvent.LEAVE);
    }

    public void testCoordinatorLeaveDuringXSiteST() throws Exception {
        doTopologyChangeDuringXSiteStateTransfer(AbstractTopologyChangeTest.TopologyEvent.COORDINATOR_LEAVE);
    }

    public void testSiteMasterLeaveDuringXSiteST() throws Exception {
        doTopologyChangeDuringXSiteStateTransfer(AbstractTopologyChangeTest.TopologyEvent.SITE_MASTER_LEAVE);
    }

    public void testXSiteSTDuringJoin() throws Exception {
        doXSiteStateTransferDuringTopologyChange(AbstractTopologyChangeTest.TopologyEvent.JOIN);
    }

    public void testXSiteSTDuringLeave() throws Exception {
        doXSiteStateTransferDuringTopologyChange(AbstractTopologyChangeTest.TopologyEvent.LEAVE);
    }

    @Test(groups = {"xsite", "unstable"}, description = "See ISPN-6749")
    public void testXSiteSTDuringSiteMasterLeave() throws Exception {
        doXSiteStateTransferDuringTopologyChange(AbstractTopologyChangeTest.TopologyEvent.SITE_MASTER_LEAVE);
    }

    private void doTopologyChangeAfterXSiteStateTransfer(AbstractTopologyChangeTest.TopologyEvent topologyEvent) throws Exception {
        log.debugf("Start topology change after x-site state transfer with %s", topologyEvent);
        initBeforeTest();
        log.debug("Setting blocking conditions");
        AbstractTopologyChangeTest.TestCaches createTestCache = createTestCache(topologyEvent, "LON-1");
        AtomicReference atomicReference = new AtomicReference(null);
        log.debugf("Controlled cache=%s, Coordinator cache=%s, Cache to remove=%s", DistributionTestHelper.addressOf(createTestCache.controllerCache), DistributionTestHelper.addressOf(createTestCache.coordinator), createTestCache.removeIndex < 0 ? "NONE" : DistributionTestHelper.addressOf(cache("LON-1", createTestCache.removeIndex)));
        if (createTestCache.removeIndex >= 0) {
            log.debugf("Discard x-site state transfer start command in cache %s to remove", DistributionTestHelper.addressOf(cache("LON-1", createTestCache.removeIndex)));
            TestingUtil.wrapComponent(cache("LON-1", createTestCache.removeIndex), XSiteStateProvider.class, (cache, xSiteStateProvider) -> {
                return new XSiteProviderDelegator(xSiteStateProvider) { // from class: org.infinispan.xsite.statetransfer.failures.SiteProviderTopologyChangeTest.1
                    @Override // org.infinispan.xsite.statetransfer.XSiteProviderDelegator
                    public void startStateTransfer(String str, Address address, int i) {
                        SiteProviderTopologyChangeTest.log.debugf("Discard state transfer request to %s from %s", str, address);
                    }
                };
            }, true);
        } else {
            log.debugf("Block x-site state transfer start command in cache %s", DistributionTestHelper.addressOf(cache("LON-1", 1)));
            TestingUtil.wrapComponent(cache("LON-1", 1), XSiteStateProvider.class, (cache2, xSiteStateProvider2) -> {
                return new XSiteProviderDelegator(this, xSiteStateProvider2) { // from class: org.infinispan.xsite.statetransfer.failures.SiteProviderTopologyChangeTest.2
                    final /* synthetic */ SiteProviderTopologyChangeTest this$0;

                    {
                        this.this$0 = this;
                    }

                    @Override // org.infinispan.xsite.statetransfer.XSiteProviderDelegator
                    public void startStateTransfer(String str, Address address, int i) {
                        SiteProviderTopologyChangeTest.log.debugf("Blocking state transfer request to %s from %s", str, address);
                        atomicReference.set(new StateTransferRequest(str, address, i, this.xSiteStateProvider));
                    }
                };
            }, true);
        }
        log.debug("Start x-site state transfer");
        startStateTransfer(createTestCache.coordinator, "NYC-2");
        assertOnline("LON-1", "NYC-2");
        log.debug("Await until X-Site state transfer is finished!");
        eventually(() -> {
            return ((XSiteStateProvider) TestingUtil.extractComponent(createTestCache.controllerCache, XSiteStateProvider.class)).getCurrentStateSending().isEmpty();
        }, TimeUnit.SECONDS.toMillis(30L));
        triggerTopologyChange("LON-1", createTestCache.removeIndex).get();
        awaitLocalStateTransfer("LON-1");
        if (atomicReference.get() != null) {
            log.debug("Let the blocked x-site state transfer request to proceed");
            ((StateTransferRequest) atomicReference.get()).execute();
        }
        awaitXSiteStateSent("LON-1");
        log.debug("Check data in both sites.");
        assertData();
    }

    private void doTopologyChangeDuringXSiteStateTransfer(AbstractTopologyChangeTest.TopologyEvent topologyEvent) throws Exception {
        log.debugf("Start topology change during x-site state transfer with %s", topologyEvent);
        initBeforeTest();
        AbstractTopologyChangeTest.TestCaches createTestCache = createTestCache(topologyEvent, "LON-1");
        log.debugf("Controlled cache=%s, Coordinator cache=%s, Cache to remove=%s", DistributionTestHelper.addressOf(createTestCache.controllerCache), DistributionTestHelper.addressOf(createTestCache.coordinator), createTestCache.removeIndex < 0 ? "NONE" : DistributionTestHelper.addressOf(cache("LON-1", createTestCache.removeIndex)));
        final CheckPoint checkPoint = new CheckPoint();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        TestingUtil.wrapGlobalComponent((CacheContainer) createTestCache.controllerCache.getCacheManager(), Transport.class, (TestingUtil.WrapFactory) new TestingUtil.WrapFactory<Transport, Transport, CacheContainer>(this) { // from class: org.infinispan.xsite.statetransfer.failures.SiteProviderTopologyChangeTest.3
            final /* synthetic */ SiteProviderTopologyChangeTest this$0;

            {
                this.this$0 = this;
            }

            @Override // org.infinispan.test.TestingUtil.WrapFactory
            public Transport wrap(CacheContainer cacheContainer, Transport transport) {
                final AtomicBoolean atomicBoolean2 = atomicBoolean;
                final CheckPoint checkPoint2 = checkPoint;
                return new AbstractDelegatingTransport(this, transport) { // from class: org.infinispan.xsite.statetransfer.failures.SiteProviderTopologyChangeTest.3.1
                    final /* synthetic */ AnonymousClass3 this$1;

                    {
                        this.this$1 = this;
                    }

                    public void start() {
                    }

                    public <O> XSiteResponse<O> backupRemotely(XSiteBackup xSiteBackup, XSiteRequest<O> xSiteRequest) {
                        if ((xSiteRequest instanceof XSiteStatePushCommand) && atomicBoolean2.compareAndSet(false, true)) {
                            checkPoint2.trigger("before-second-chunk");
                            try {
                                checkPoint2.awaitStrict("second-chunk", 30L, TimeUnit.SECONDS);
                            } catch (InterruptedException | TimeoutException e) {
                                XSiteResponseImpl xSiteResponseImpl = new XSiteResponseImpl(AbstractInfinispanTest.TIME_SERVICE, xSiteBackup);
                                xSiteResponseImpl.completeExceptionally(e);
                                return xSiteResponseImpl;
                            }
                        }
                        return super.backupRemotely(xSiteBackup, xSiteRequest);
                    }
                };
            }
        }, true);
        log.debug("Start x-site state transfer");
        startStateTransfer(createTestCache.coordinator, "NYC-2");
        assertOnline("LON-1", "NYC-2");
        checkPoint.awaitStrict("before-second-chunk", 30L, TimeUnit.SECONDS);
        triggerTopologyChange("LON-1", createTestCache.removeIndex).get();
        checkPoint.triggerForever("second-chunk");
        awaitLocalStateTransfer("LON-1");
        awaitXSiteStateSent("LON-1");
        assertData();
    }

    private void doXSiteStateTransferDuringTopologyChange(AbstractTopologyChangeTest.TopologyEvent topologyEvent) throws Exception {
        log.debugf("Start topology change during x-site state transfer with %s", topologyEvent);
        initBeforeTest();
        AbstractTopologyChangeTest.TestCaches createTestCache = createTestCache(topologyEvent, "LON-1");
        log.debugf("Controlled cache=%s, Coordinator cache=%s, Cache to remove=%s", DistributionTestHelper.addressOf(createTestCache.controllerCache), DistributionTestHelper.addressOf(createTestCache.coordinator), createTestCache.removeIndex < 0 ? "NONE" : DistributionTestHelper.addressOf(cache("LON-1", createTestCache.removeIndex)));
        BlockingLocalTopologyManager replaceTopologyManagerDefaultCache = BlockingLocalTopologyManager.replaceTopologyManagerDefaultCache(createTestCache.controllerCache.getCacheManager());
        Future<Void> triggerTopologyChange = triggerTopologyChange("LON-1", createTestCache.removeIndex);
        BlockingLocalTopologyManager.BlockedTopology expectTopologyUpdate = replaceTopologyManagerDefaultCache.expectTopologyUpdate();
        log.debug("Start x-site state transfer");
        startStateTransfer(createTestCache.coordinator, "NYC-2");
        assertOnline("LON-1", "NYC-2");
        expectTopologyUpdate.unblock();
        triggerTopologyChange.get();
        awaitLocalStateTransfer("LON-1");
        awaitXSiteStateSent("LON-1");
        assertData();
    }
}
