package org.apache.iotdb.consensus.ratis;

import java.io.File;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.config.RatisConfig;
import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
import org.apache.iotdb.consensus.ratis.TestUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/consensus/ratis/RecoverReadTest.class */
public class RecoverReadTest {
    private static final Logger logger = LoggerFactory.getLogger(RecoverReadTest.class);
    private static final TimeDuration stallDuration = TimeDuration.valueOf(500, TimeUnit.MILLISECONDS);
    private TestUtils.MiniCluster miniCluster;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/consensus/ratis/RecoverReadTest$SlowRecoverStateMachine.class */
    public static class SlowRecoverStateMachine extends TestUtils.IntegerCounter {
        private final TimeDuration stallDuration;
        private final AtomicBoolean stallApply;

        private SlowRecoverStateMachine(TimeDuration timeDuration) {
            this.stallApply = new AtomicBoolean(false);
            this.stallDuration = timeDuration;
        }

        private SlowRecoverStateMachine(TimeDuration timeDuration, boolean z) {
            this.stallApply = new AtomicBoolean(false);
            this.stallDuration = timeDuration;
            this.stallApply.set(z);
        }

        @Override // org.apache.iotdb.consensus.ratis.TestUtils.IntegerCounter
        public TSStatus write(IConsensusRequest iConsensusRequest) {
            if (this.stallApply.get()) {
                try {
                    this.stallDuration.sleep();
                    RecoverReadTest.logger.info("Apply i={} when recovering", Integer.valueOf(this.integer.get()));
                } catch (InterruptedException e) {
                    RecoverReadTest.logger.warn("Interrupted when stalling for write operations", e);
                    Thread.currentThread().interrupt();
                }
            }
            return super.write(iConsensusRequest);
        }

        @Override // org.apache.iotdb.consensus.ratis.TestUtils.IntegerCounter
        public boolean takeSnapshot(File file) {
            return false;
        }
    }

    @Before
    public void setUp() throws Exception {
        logger.info("[RECOVER TEST] start setting up the test env");
        this.miniCluster = new TestUtils.MiniClusterFactory().setSMProvider(() -> {
            return new SlowRecoverStateMachine(stallDuration);
        }).setRatisConfig(RatisConfig.newBuilder().setLog(RatisConfig.Log.newBuilder().setPurgeUptoSnapshotIndex(false).setPreserveNumsWhenPurge(1024L).build()).setRead(RatisConfig.Read.newBuilder().setReadTimeout(TimeDuration.valueOf(20L, TimeUnit.SECONDS)).build()).setRpc(RatisConfig.Rpc.newBuilder().setFirstElectionTimeoutMin(TimeDuration.valueOf(1L, TimeUnit.SECONDS)).setFirstElectionTimeoutMax(TimeDuration.valueOf(2L, TimeUnit.SECONDS)).setRequestTimeout(TimeDuration.valueOf(20L, TimeUnit.SECONDS)).build()).setImpl(RatisConfig.Impl.newBuilder().setRetryTimesMax(1).setRetryWaitMillis(500L).build()).build()).create();
        this.miniCluster.start();
        logger.info("[RECOVER TEST] end setting up the test env");
    }

    @After
    public void tearUp() throws Exception {
        logger.info("[RECOVER TEST] start tearing down the test env");
        this.miniCluster.cleanUp();
        logger.info("[RECOVER TEST] end tearing down the test env");
    }

    @Test
    public void inconsistentReadAfterRestart() throws Exception {
        ConsensusGroupId gid = this.miniCluster.getGid();
        List<Peer> peers = this.miniCluster.getPeers();
        Iterator<RatisConsensus> it = this.miniCluster.getServers().iterator();
        while (it.hasNext()) {
            it.next().createLocalPeer(gid, peers);
        }
        this.miniCluster.writeManySerial(0, 10);
        Assert.assertEquals(10L, this.miniCluster.mustRead(0));
        this.miniCluster.stop();
        this.miniCluster.resetSMProviderBeforeRestart(() -> {
            return new SlowRecoverStateMachine(stallDuration, true);
        });
        this.miniCluster.restart();
        this.miniCluster.getServer(0).allowStaleRead(gid);
        Assert.assertNotEquals(10L, ((TestUtils.TestDataSet) this.miniCluster.readThrough(0)).getNumber());
    }

    @Test
    public void consistentRead() throws Exception {
        ConsensusGroupId gid = this.miniCluster.getGid();
        List<Peer> peers = this.miniCluster.getPeers();
        Iterator<RatisConsensus> it = this.miniCluster.getServers().iterator();
        while (it.hasNext()) {
            it.next().createLocalPeer(gid, peers);
        }
        this.miniCluster.writeManySerial(0, 10);
        Assert.assertEquals(10L, this.miniCluster.mustRead(0));
        this.miniCluster.stop();
        this.miniCluster.resetSMProviderBeforeRestart(() -> {
            return new SlowRecoverStateMachine(stallDuration, true);
        });
        this.miniCluster.restart();
        this.miniCluster.waitUntilActiveLeaderElected();
        Assert.assertEquals(10L, this.miniCluster.mustRead(0));
    }

    @Test
    public void consistentReadFailedWithNoLeader() throws Exception {
        ConsensusGroupId gid = this.miniCluster.getGid();
        List<Peer> peers = this.miniCluster.getPeers();
        Iterator<RatisConsensus> it = this.miniCluster.getServers().iterator();
        while (it.hasNext()) {
            it.next().createLocalPeer(gid, peers);
        }
        this.miniCluster.writeManySerial(0, 10);
        Assert.assertEquals(10L, this.miniCluster.mustRead(0));
        this.miniCluster.stop();
        this.miniCluster.resetSMProviderBeforeRestart(() -> {
            return new SlowRecoverStateMachine(stallDuration, true);
        });
        this.miniCluster.restart();
        Assert.assertThrows(RatisReadUnavailableException.class, () -> {
            this.miniCluster.readThrough(0);
        });
    }

    @Test
    public void consistentReadWithSlowApply() throws Exception {
        ConsensusGroupId gid = this.miniCluster.getGid();
        List<Peer> peers = this.miniCluster.getPeers();
        Iterator<RatisConsensus> it = this.miniCluster.getServers().iterator();
        while (it.hasNext()) {
            it.next().createLocalPeer(gid, peers);
        }
        this.miniCluster.writeManySerial(0, 50);
        Assert.assertEquals(50L, this.miniCluster.mustRead(0));
        this.miniCluster.stop();
        this.miniCluster.resetSMProviderBeforeRestart(() -> {
            return new SlowRecoverStateMachine(stallDuration, true);
        });
        this.miniCluster.restart();
        this.miniCluster.waitUntilActiveLeaderElected();
        Assert.assertEquals(50L, this.miniCluster.mustRead(0));
    }
}
