package org.infinispan.statetransfer;

import java.util.Arrays;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.Cache;
import org.infinispan.commands.remote.recovery.TxCompletionNotificationCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.interceptors.AsyncInterceptorChain;
import org.infinispan.interceptors.DDAsyncInterceptor;
import org.infinispan.interceptors.impl.CallInterceptor;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.concurrent.StateSequencer;
import org.infinispan.test.concurrent.StateSequencerUtil;
import org.infinispan.transaction.impl.TransactionTable;
import org.infinispan.transaction.lookup.EmbeddedTransactionManagerLookup;
import org.infinispan.transaction.tm.EmbeddedTransaction;
import org.infinispan.transaction.tm.EmbeddedTransactionManager;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.ByteString;
import org.infinispan.util.ControlledConsistentHashFactory;
import org.infinispan.util.concurrent.IsolationLevel;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "statetransfer.TxReplay2Test")
/* loaded from: input_file:org/infinispan/statetransfer/TxReplay2Test.class */
public class TxReplay2Test extends MultipleCacheManagersTest {
    private static final String VALUE = "value";
    ControlledConsistentHashFactory consistentHashFactory = new ControlledConsistentHashFactory.Default(0, 1, 2);

    /* loaded from: input_file:org/infinispan/statetransfer/TxReplay2Test$CountingInterceptor.class */
    static class CountingInterceptor extends DDAsyncInterceptor {
        private static final Log log = LogFactory.getLog(CountingInterceptor.class);
        private final AtomicInteger numberPrepares = new AtomicInteger(0);
        private final AtomicInteger numberCommits = new AtomicInteger(0);
        private final AtomicInteger numberRollbacks = new AtomicInteger(0);

        CountingInterceptor() {
        }

        public Object visitPrepareCommand(TxInvocationContext txInvocationContext, PrepareCommand prepareCommand) throws Throwable {
            if (!txInvocationContext.isOriginLocal()) {
                log.debugf("Received remote prepare for transaction %s", prepareCommand.getGlobalTransaction());
                this.numberPrepares.incrementAndGet();
            }
            return invokeNext(txInvocationContext, prepareCommand);
        }

        public Object visitCommitCommand(TxInvocationContext txInvocationContext, CommitCommand commitCommand) throws Throwable {
            if (!txInvocationContext.isOriginLocal()) {
                log.debugf("Received remote commit for transaction %s", commitCommand.getGlobalTransaction());
                this.numberCommits.incrementAndGet();
            }
            return invokeNext(txInvocationContext, commitCommand);
        }

        public Object visitRollbackCommand(TxInvocationContext txInvocationContext, RollbackCommand rollbackCommand) throws Throwable {
            if (!txInvocationContext.isOriginLocal()) {
                log.debugf("Received remote rollback for transaction %s", rollbackCommand.getGlobalTransaction());
                this.numberRollbacks.incrementAndGet();
            }
            return invokeNext(txInvocationContext, rollbackCommand);
        }

        public static CountingInterceptor inject(Cache cache) {
            AsyncInterceptorChain extractInterceptorChain = TestingUtil.extractInterceptorChain(cache);
            if (extractInterceptorChain.containsInterceptorType(CountingInterceptor.class)) {
                return extractInterceptorChain.findInterceptorWithClass(CountingInterceptor.class);
            }
            CountingInterceptor countingInterceptor = new CountingInterceptor();
            extractInterceptorChain.addInterceptorBefore(countingInterceptor, CallInterceptor.class);
            return countingInterceptor;
        }
    }

    public void testReplay() throws Exception {
        StateSequencer stateSequencer = new StateSequencer();
        stateSequencer.logicalThread("tx", "tx:before_prepare_replay", "tx:resume_prepare_replay", "tx:mark_tx_completed");
        stateSequencer.logicalThread("sim", "sim:before_extra_commit", "sim:during_extra_commit", "sim:after_extra_commit");
        stateSequencer.order("tx:before_prepare_replay", "sim:before_extra_commit", new String[0]);
        stateSequencer.order("sim:during_extra_commit", "tx:resume_prepare_replay", new String[0]);
        stateSequencer.order("sim:after_extra_commit", "tx:mark_tx_completed", new String[0]);
        AssertJUnit.assertEquals(Arrays.asList(address(0), address(1), address(2)), cacheTopology(0).getDistribution("key").writeOwners());
        Cache cache = mo375cache(0);
        Cache<Object, Object> cache2 = mo375cache(3);
        CountingInterceptor inject = CountingInterceptor.inject(cache2);
        CountingInterceptor inject2 = CountingInterceptor.inject(cache);
        CountingInterceptor inject3 = CountingInterceptor.inject(mo375cache(2));
        StateSequencerUtil.advanceOnInterceptor(stateSequencer, cache2, CallInterceptor.class, StateSequencerUtil.matchCommand(PrepareCommand.class).matchCount(0).build()).before("tx:before_prepare_replay", "tx:resume_prepare_replay");
        StateSequencerUtil.advanceOnInterceptor(stateSequencer, cache2, TransactionSynchronizerInterceptor.class, StateSequencerUtil.matchCommand(CommitCommand.class).matchCount(1).build()).before("sim:during_extra_commit", new String[0]);
        StateSequencerUtil.advanceOnInboundRpc(stateSequencer, cache2, StateSequencerUtil.matchCommand(TxCompletionNotificationCommand.class).build()).before("tx:mark_tx_completed", new String[0]);
        EmbeddedTransactionManager tm = tm(0);
        tm.begin();
        cache.put("key", "value");
        EmbeddedTransaction transaction = tm.getTransaction();
        GlobalTransaction globalTransaction = TestingUtil.getTransactionTable(cache).getLocalTransaction(transaction).getGlobalTransaction();
        transaction.runPrepare();
        AssertJUnit.assertEquals("Wrong transaction status before killing backup owner.", 2, transaction.getStatus());
        killMember(1);
        int topologyId = cache.getAdvancedCache().getDistributionManager().getCacheTopology().getTopologyId();
        Future fork = fork(() -> {
            stateSequencer.advance("sim:before_extra_commit");
            CommitCommand commitCommand = new CommitCommand(ByteString.fromString(cache2.getName()), globalTransaction);
            commitCommand.setTopologyId(topologyId);
            commitCommand.markTransactionAsRemote(true);
            try {
                commitCommand.invokeAsync(TestingUtil.extractComponentRegistry(cache2));
                stateSequencer.advance("sim:after_extra_commit");
                return null;
            } catch (Throwable th) {
                throw new CacheException(th);
            }
        });
        checkIfTransactionExists(cache2);
        AssertJUnit.assertEquals("Wrong transaction status after killing backup owner.", 2, transaction.getStatus());
        transaction.runCommit(false);
        fork.get(10L, TimeUnit.SECONDS);
        assertNoTransactions();
        AssertJUnit.assertEquals("Wrong number of prepares!", 2, inject.numberPrepares.get());
        AssertJUnit.assertEquals("Wrong number of commits!", 2, inject.numberCommits.get());
        AssertJUnit.assertEquals("Wrong number of rollbacks!", 0, inject.numberRollbacks.get());
        AssertJUnit.assertEquals("Wrong number of prepares!", 2, inject3.numberPrepares.get());
        AssertJUnit.assertEquals("Wrong number of commits!", 1, inject3.numberCommits.get());
        AssertJUnit.assertEquals("Wrong number of rollbacks!", 0, inject3.numberRollbacks.get());
        AssertJUnit.assertEquals("Wrong number of prepares!", 0, inject2.numberPrepares.get());
        AssertJUnit.assertEquals("Wrong number of commits!", 0, inject2.numberCommits.get());
        AssertJUnit.assertEquals("Wrong number of rollbacks!", 0, inject2.numberRollbacks.get());
        checkKeyInDataContainer("key");
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder defaultClusteredCacheConfig = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, true);
        defaultClusteredCacheConfig.transaction().useSynchronization(false).transactionManagerLookup(new EmbeddedTransactionManagerLookup()).recovery().disable();
        defaultClusteredCacheConfig.clustering().hash().numOwners(3).numSegments(1).consistentHashFactory(this.consistentHashFactory).stateTransfer().fetchInMemoryState(true);
        defaultClusteredCacheConfig.locking().isolationLevel(IsolationLevel.READ_COMMITTED);
        createClusteredCaches(4, defaultClusteredCacheConfig);
    }

    private void checkKeyInDataContainer(Object obj) {
        for (Cache<?, ?> cache : caches()) {
            InternalCacheEntry internalCacheEntry = cache.getAdvancedCache().getDataContainer().get(obj);
            AssertJUnit.assertNotNull("Cache '" + String.valueOf(address(cache)) + "' does not contain key!", internalCacheEntry);
            AssertJUnit.assertEquals("Cache '" + String.valueOf(address(cache)) + "' has wrong value!", "value", internalCacheEntry.getValue());
        }
    }

    private void checkIfTransactionExists(Cache<Object, Object> cache) {
        AssertJUnit.assertFalse("Expected a remote transaction.", ((TransactionTable) TestingUtil.extractComponent(cache, TransactionTable.class)).getRemoteTransactions().isEmpty());
    }
}
