package alluxio.master.journal.raft;

import alluxio.exception.ExceptionMessage;
import alluxio.master.Master;
import alluxio.master.PrimarySelector;
import alluxio.master.journal.AbstractJournalSystem;
import alluxio.master.journal.AsyncJournalWriter;
import alluxio.master.journal.Journal;
import alluxio.proto.journal.Journal;
import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions;
import alluxio.util.io.FileUtils;
import com.google.common.base.Preconditions;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.netty.NettyTransport;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.copycat.client.RecoveryStrategies;
import io.atomix.copycat.server.CopycatServer;
import io.atomix.copycat.server.storage.Storage;
import io.atomix.copycat.server.storage.StorageLevel;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:alluxio/master/journal/raft/RaftJournalSystem.class */
public final class RaftJournalSystem extends AbstractJournalSystem {
    private static final Logger LOG = LoggerFactory.getLogger(RaftJournalSystem.class);
    private final RaftJournalConfiguration mConf;
    private final ReadWriteLock mJournalStateLock;
    private final AtomicBoolean mSnapshotAllowed;
    private final RaftPrimarySelector mPrimarySelector;
    private final CompletableFuture<CopycatClient> mSnapshotClient;
    private final ConcurrentHashMap<String, RaftJournal> mJournals;
    private JournalStateMachine mStateMachine;
    private CopycatServer mServer;
    private RaftJournalWriter mRaftJournalWriter;
    private final AtomicReference<AsyncJournalWriter> mAsyncJournalWriter;

    private RaftJournalSystem(RaftJournalConfiguration raftJournalConfiguration) {
        raftJournalConfiguration.validate();
        this.mConf = raftJournalConfiguration;
        this.mJournals = new ConcurrentHashMap<>();
        this.mSnapshotAllowed = new AtomicBoolean(true);
        this.mJournalStateLock = new ReentrantReadWriteLock(true);
        this.mPrimarySelector = new RaftPrimarySelector();
        this.mAsyncJournalWriter = new AtomicReference<>();
        this.mSnapshotClient = createClient().connect();
    }

    public static RaftJournalSystem create(RaftJournalConfiguration raftJournalConfiguration) {
        RaftJournalSystem raftJournalSystem = new RaftJournalSystem(raftJournalConfiguration);
        raftJournalSystem.initServer();
        return raftJournalSystem;
    }

    private synchronized void initServer() {
        LOG.debug("Creating journal with max segment size {}", Long.valueOf(this.mConf.getMaxLogSize()));
        Storage build = Storage.builder().withDirectory(this.mConf.getPath()).withStorageLevel(StorageLevel.valueOf(this.mConf.getStorageLevel().name())).withMinorCompactionInterval(Duration.ofDays(200L)).withMaxSegmentSize((int) this.mConf.getMaxLogSize()).build();
        if (this.mStateMachine != null) {
            this.mStateMachine.close();
        }
        this.mStateMachine = new JournalStateMachine(this.mJournals);
        this.mServer = CopycatServer.builder(getLocalAddress(this.mConf)).withStorage(build).withElectionTimeout(Duration.ofMillis(this.mConf.getElectionTimeoutMs())).withHeartbeatInterval(Duration.ofMillis(this.mConf.getHeartbeatIntervalMs())).withSnapshotAllowed(this.mSnapshotAllowed).withSerializer(createSerializer()).withTransport(new NettyTransport()).withStateMachine(new OnceSupplier(this.mStateMachine)).build();
        this.mPrimarySelector.init(this.mServer);
    }

    private CopycatClient createClient() {
        return CopycatClient.builder(getClusterAddresses(this.mConf)).withRecoveryStrategy(RecoveryStrategies.RECOVER).withConnectionStrategy(attempt -> {
            attempt.retry(Duration.ofMillis(Math.min(Math.round(100.0d * Math.pow(2.0d, attempt.attempt())), 1000L)));
        }).build();
    }

    private static List<Address> getClusterAddresses(RaftJournalConfiguration raftJournalConfiguration) {
        return (List) raftJournalConfiguration.getClusterAddresses().stream().map(inetSocketAddress -> {
            return new Address(inetSocketAddress.getHostName(), inetSocketAddress.getPort());
        }).collect(Collectors.toList());
    }

    private static Address getLocalAddress(RaftJournalConfiguration raftJournalConfiguration) {
        return new Address(raftJournalConfiguration.getLocalAddress().getHostName(), raftJournalConfiguration.getLocalAddress().getPort());
    }

    public static Serializer createSerializer() {
        return new Serializer().register(JournalEntryCommand.class, 1);
    }

    @Override // alluxio.master.journal.JournalSystem
    public synchronized Journal createJournal(Master master) {
        RaftJournal raftJournal = new RaftJournal(master, this.mConf.getPath().toURI(), this.mAsyncJournalWriter, this.mJournalStateLock.readLock());
        this.mJournals.put(master.getName(), raftJournal);
        return raftJournal;
    }

    @Override // alluxio.master.journal.JournalSystem
    public synchronized void gainPrimacy() {
        this.mSnapshotAllowed.set(false);
        CopycatClient createClient = createClient();
        try {
            createClient.connect().get();
            try {
                catchUp(this.mStateMachine, createClient);
                long upgrade = this.mStateMachine.upgrade() + 1;
                Preconditions.checkState(this.mRaftJournalWriter == null);
                this.mRaftJournalWriter = new RaftJournalWriter(upgrade, createClient);
                this.mAsyncJournalWriter.set(new AsyncJournalWriter(this.mRaftJournalWriter));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            } catch (TimeoutException e2) {
                throw new RuntimeException(e2);
            }
        } catch (InterruptedException e3) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e3);
        } catch (ExecutionException e4) {
            throw new RuntimeException(ExceptionMessage.FAILED_RAFT_CONNECT.getMessage(new Object[]{Arrays.toString(getClusterAddresses(this.mConf).toArray()), e4.getCause().toString()}), e4.getCause());
        }
    }

    @Override // alluxio.master.journal.JournalSystem
    public synchronized void losePrimacy() {
        try {
            this.mRaftJournalWriter.close();
        } catch (IOException e) {
            LOG.warn("Error closing journal writer: {}", e.toString());
        } finally {
            this.mAsyncJournalWriter.set(null);
            this.mRaftJournalWriter = null;
        }
        LOG.info("Shutting down Raft server");
        try {
            this.mServer.shutdown().get();
            LOG.info("Shut down Raft server");
            this.mSnapshotAllowed.set(true);
            initServer();
            LOG.info("Bootstrapping new Raft server");
            try {
                this.mServer.bootstrap(getClusterAddresses(this.mConf)).get();
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted while rejoining Raft cluster");
            } catch (ExecutionException e3) {
                LOG.error("Fatal error: failed to rejoin Raft cluster with addresses {} while stepping down", getClusterAddresses(this.mConf), e3);
                System.exit(-1);
            }
            LOG.info("Raft server successfully restarted");
        } catch (InterruptedException e4) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while leaving Raft cluster");
        } catch (ExecutionException e5) {
            LOG.error("Fatal error: failed to leave Raft cluster while stepping down", e5);
            System.exit(-1);
            throw new IllegalStateException(e5);
        }
    }

    private void catchUp(JournalStateMachine journalStateMachine, CopycatClient copycatClient) throws TimeoutException, InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        CommonUtils.waitFor("snapshotting to finish", () -> {
            return Boolean.valueOf(!journalStateMachine.isSnapshotting());
        }, WaitForOptions.defaults().setTimeoutMs(600000));
        while (this.mPrimarySelector.getState() == PrimarySelector.State.PRIMARY) {
            long lastAppliedSequenceNumber = journalStateMachine.getLastAppliedSequenceNumber();
            long nextLong = ThreadLocalRandom.current().nextLong(Long.MIN_VALUE, 0L);
            LOG.info("Performing catchup. Last applied SN: {}. Catchup ID: {}", Long.valueOf(lastAppliedSequenceNumber), Long.valueOf(nextLong));
            try {
                copycatClient.submit(new JournalEntryCommand(Journal.JournalEntry.newBuilder().setSequenceNumber(nextLong).build())).get(5L, TimeUnit.SECONDS);
                try {
                    CommonUtils.waitFor("term start entry " + nextLong + " to be applied to state machine", () -> {
                        return Boolean.valueOf(journalStateMachine.getLastPrimaryStartSequenceNumber() == nextLong);
                    }, WaitForOptions.defaults().setInterval(1000).setTimeoutMs(5000));
                    CommonUtils.sleepMs(2 * this.mConf.getElectionTimeoutMs());
                    if (journalStateMachine.getLastAppliedSequenceNumber() == lastAppliedSequenceNumber && journalStateMachine.getLastPrimaryStartSequenceNumber() == nextLong) {
                        LOG.info("Caught up in {}ms. Last sequence number from previous term: {}.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Long.valueOf(journalStateMachine.getLastAppliedSequenceNumber()));
                        return;
                    }
                } catch (TimeoutException e) {
                    LOG.info(e.toString());
                }
            } catch (ExecutionException | TimeoutException e2) {
                LOG.info("Exception submitting term start entry: {}", e2.toString());
            }
        }
    }

    @Override // alluxio.master.journal.AbstractJournalSystem
    public synchronized void startInternal() throws InterruptedException, IOException {
        List<Address> clusterAddresses = getClusterAddresses(this.mConf);
        LOG.info("Starting Raft journal system. Cluster addresses: {}. Local address: {}", clusterAddresses, getLocalAddress(this.mConf));
        long currentTimeMillis = System.currentTimeMillis();
        try {
            this.mServer.bootstrap(clusterAddresses).get();
            LOG.info("Started Raft Journal System in {}ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        } catch (ExecutionException e) {
            throw new IOException(ExceptionMessage.FAILED_RAFT_BOOTSTRAP.getMessage(new Object[]{Arrays.toString(clusterAddresses.toArray()), e.getCause().toString()}), e.getCause());
        }
    }

    @Override // alluxio.master.journal.AbstractJournalSystem
    public synchronized void stopInternal() throws InterruptedException, IOException {
        LOG.info("Shutting down raft journal");
        this.mRaftJournalWriter.close();
        try {
            this.mServer.shutdown().get(2L, TimeUnit.SECONDS);
        } catch (ExecutionException e) {
            throw new RuntimeException("Failed to shut down Raft server", e);
        } catch (TimeoutException e2) {
            LOG.info("Timed out shutting down raft server");
        }
        LOG.info("Journal shutdown complete");
    }

    @Override // alluxio.master.journal.JournalSystem
    public synchronized boolean isEmpty() {
        return this.mRaftJournalWriter != null && this.mRaftJournalWriter.getNextSequenceNumberToWrite() == 0;
    }

    @Override // alluxio.master.journal.JournalSystem
    public boolean isFormatted() {
        return this.mConf.getPath().exists();
    }

    @Override // alluxio.master.journal.JournalSystem
    public void format() throws IOException {
        FileUtils.deletePathRecursively(this.mConf.getPath().getAbsolutePath());
        this.mConf.getPath().mkdirs();
    }

    public PrimarySelector getPrimarySelector() {
        return this.mPrimarySelector;
    }
}
