package alluxio.master.journal.raft;

import alluxio.ClientContext;
import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.exception.status.AbortedException;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.NotFoundException;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.DownloadSnapshotPRequest;
import alluxio.grpc.DownloadSnapshotPResponse;
import alluxio.grpc.GetSnapshotInfoRequest;
import alluxio.grpc.GetSnapshotInfoResponse;
import alluxio.grpc.GetSnapshotRequest;
import alluxio.grpc.JournalQueryRequest;
import alluxio.grpc.JournalQueryResponse;
import alluxio.grpc.QuorumServerState;
import alluxio.grpc.SnapshotData;
import alluxio.grpc.SnapshotMetadata;
import alluxio.grpc.UploadSnapshotPRequest;
import alluxio.grpc.UploadSnapshotPResponse;
import alluxio.master.MasterClientContext;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.security.authentication.ClientIpAddressInjector;
import alluxio.util.CommonUtils;
import alluxio.util.LogUtils;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.ratis.util.MD5FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:alluxio/master/journal/raft/SnapshotReplicationManager.class */
public class SnapshotReplicationManager {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotReplicationManager.class);
    private static final long SNAPSHOT_REQUEST_TIMEOUT_MS = ServerConfiguration.getMs(PropertyKey.MASTER_EMBEDDED_JOURNAL_TRANSPORT_REQUEST_TIMEOUT_MS);
    private final SimpleStateMachineStorage mStorage;
    private final RaftJournalSystem mJournalSystem;
    private volatile long mSnapshotRequestTime;
    private volatile RaftJournalServiceClient mJournalServiceClient;
    private volatile SnapshotInfo mDownloadedSnapshot;
    private final AtomicReference<DownloadState> mDownloadState;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:alluxio/master/journal/raft/SnapshotReplicationManager$DownloadState.class */
    public enum DownloadState {
        IDLE,
        REQUEST_INFO,
        REQUEST_DATA,
        STREAM_DATA,
        DOWNLOADED,
        INSTALLING
    }

    public SnapshotReplicationManager(RaftJournalSystem raftJournalSystem, SimpleStateMachineStorage simpleStateMachineStorage) {
        this.mSnapshotRequestTime = 0L;
        this.mDownloadState = new AtomicReference<>(DownloadState.IDLE);
        this.mStorage = simpleStateMachineStorage;
        this.mJournalSystem = raftJournalSystem;
    }

    @VisibleForTesting
    SnapshotReplicationManager(RaftJournalSystem raftJournalSystem, SimpleStateMachineStorage simpleStateMachineStorage, RaftJournalServiceClient raftJournalServiceClient) {
        this(raftJournalSystem, simpleStateMachineStorage);
        this.mJournalServiceClient = raftJournalServiceClient;
    }

    public CompletableFuture<TermIndex> installSnapshotFromLeader() {
        if (this.mJournalSystem.isLeader()) {
            return RaftJournalUtils.completeExceptionally(new IllegalStateException("Abort snapshot installation after becoming a leader"));
        }
        if (!transitionState(DownloadState.IDLE, DownloadState.STREAM_DATA)) {
            return RaftJournalUtils.completeExceptionally(new IllegalStateException("State is not IDLE when starting a snapshot installation"));
        }
        try {
            RaftJournalServiceClient journalServiceClient = getJournalServiceClient();
            String valueOf = String.valueOf(journalServiceClient.getAddress());
            SnapshotDownloader<DownloadSnapshotPRequest, DownloadSnapshotPResponse> forFollower = SnapshotDownloader.forFollower(this.mStorage, valueOf);
            Timer.Context time = MetricsSystem.timer(MetricKey.MASTER_EMBEDDED_JOURNAL_SNAPSHOT_DOWNLOAD_TIMER.getName()).time();
            journalServiceClient.downloadSnapshot(forFollower);
            return forFollower.getFuture().thenApplyAsync(termIndex -> {
                time.close();
                this.mDownloadedSnapshot = forFollower.getSnapshotToInstall();
                transitionState(DownloadState.STREAM_DATA, DownloadState.DOWNLOADED);
                long installDownloadedSnapshot = installDownloadedSnapshot();
                if (installDownloadedSnapshot == -1) {
                    throw new CompletionException(new RuntimeException(String.format("Failed to install the downloaded snapshot %s", termIndex)));
                }
                if (installDownloadedSnapshot != termIndex.getIndex()) {
                    throw new CompletionException(new IllegalStateException(String.format("Mismatched snapshot installed - downloaded %d, installed %d", Long.valueOf(termIndex.getIndex()), Long.valueOf(installDownloadedSnapshot))));
                }
                return termIndex;
            }).whenComplete((BiConsumer<? super U, ? super Throwable>) (termIndex2, th) -> {
                if (th != null) {
                    LOG.error("Unexpected exception downloading snapshot from leader {}.", valueOf, th);
                    transitionState(DownloadState.STREAM_DATA, DownloadState.IDLE);
                }
            });
        } catch (Exception e) {
            transitionState(DownloadState.STREAM_DATA, DownloadState.IDLE);
            return RaftJournalUtils.completeExceptionally(e);
        }
    }

    public void sendSnapshotToLeader() throws IOException {
        if (this.mJournalSystem.isLeader()) {
            throw new IllegalStateException("Server is no longer a follower");
        }
        LOG.debug("Checking latest snapshot to send");
        SingleFileSnapshotInfo latestSnapshot = this.mStorage.getLatestSnapshot();
        if (latestSnapshot == null) {
            throw new NotFoundException("No snapshot available");
        }
        SnapshotUploader<UploadSnapshotPRequest, UploadSnapshotPResponse> forFollower = SnapshotUploader.forFollower(this.mStorage, latestSnapshot);
        LOG.info("Sending stream request to {} for snapshot {}", getJournalServiceClient().getAddress(), latestSnapshot.getTermIndex());
        getJournalServiceClient().uploadSnapshot(forFollower).onNext(UploadSnapshotPRequest.newBuilder().setData(SnapshotData.newBuilder().setSnapshotTerm(latestSnapshot.getTerm()).setSnapshotIndex(latestSnapshot.getIndex()).setOffset(0L)).build());
    }

    public long maybeCopySnapshotFromFollower() {
        if (this.mDownloadState.get() == DownloadState.DOWNLOADED) {
            return installDownloadedSnapshot();
        }
        if (this.mDownloadState.get() == DownloadState.REQUEST_DATA) {
            checkRequestTimeout();
        }
        if (this.mDownloadState.get() != DownloadState.IDLE) {
            return -1L;
        }
        CompletableFuture.runAsync(this::requestSnapshotFromFollowers);
        return -1L;
    }

    public StreamObserver<UploadSnapshotPRequest> receiveSnapshotFromFollower(StreamObserver<UploadSnapshotPResponse> streamObserver) {
        String ipAddress = ClientIpAddressInjector.getIpAddress();
        LOG.info("Received upload snapshot request from follower {}", ipAddress);
        SnapshotDownloader<UploadSnapshotPResponse, UploadSnapshotPRequest> forLeader = SnapshotDownloader.forLeader(this.mStorage, streamObserver, ipAddress);
        if (transitionState(DownloadState.REQUEST_DATA, DownloadState.STREAM_DATA)) {
            forLeader.getFuture().thenApply(termIndex -> {
                this.mDownloadedSnapshot = forLeader.getSnapshotToInstall();
                transitionState(DownloadState.STREAM_DATA, DownloadState.DOWNLOADED);
                return termIndex;
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                LOG.error("Unexpected exception downloading snapshot from follower {}.", ipAddress, th);
                transitionState(DownloadState.STREAM_DATA, DownloadState.IDLE);
                return null;
            });
            return forLeader;
        }
        streamObserver.onCompleted();
        return forLeader;
    }

    public Message handleRequest(JournalQueryRequest journalQueryRequest) throws IOException {
        if (!journalQueryRequest.hasSnapshotInfoRequest()) {
            if (!journalQueryRequest.hasSnapshotRequest()) {
                return null;
            }
            LOG.debug("Start sending snapshot to leader");
            sendSnapshotToLeader();
            return Message.EMPTY;
        }
        SingleFileSnapshotInfo latestSnapshot = this.mStorage.getLatestSnapshot();
        if (latestSnapshot == null) {
            LOG.debug("No snapshot to send");
            return toMessage(GetSnapshotInfoResponse.getDefaultInstance());
        }
        JournalQueryResponse build = JournalQueryResponse.newBuilder().setSnapshotInfoResponse(GetSnapshotInfoResponse.newBuilder().setLatest(toSnapshotMetadata(latestSnapshot.getTermIndex()))).build();
        LOG.debug("Sent snapshot info response {}", build);
        return toMessage(build);
    }

    public StreamObserver<DownloadSnapshotPRequest> sendSnapshotToFollower(StreamObserver<DownloadSnapshotPResponse> streamObserver) {
        SingleFileSnapshotInfo latestSnapshot = this.mStorage.getLatestSnapshot();
        LOG.debug("Received snapshot download request from {}", ClientIpAddressInjector.getIpAddress());
        SnapshotUploader<DownloadSnapshotPResponse, DownloadSnapshotPRequest> forLeader = SnapshotUploader.forLeader(this.mStorage, latestSnapshot, streamObserver);
        if (latestSnapshot == null) {
            streamObserver.onError(Status.NOT_FOUND.withDescription("Cannot find a valid snapshot to download.").asException());
            return forLeader;
        }
        streamObserver.onNext(DownloadSnapshotPResponse.newBuilder().setData(SnapshotData.newBuilder().setSnapshotTerm(latestSnapshot.getTerm()).setSnapshotIndex(latestSnapshot.getIndex()).setOffset(0L)).build());
        return forLeader;
    }

    private static Message toMessage(MessageLite messageLite) {
        return Message.valueOf(UnsafeByteOperations.unsafeWrap(messageLite.toByteString().asReadOnlyByteBuffer()));
    }

    private SnapshotMetadata toSnapshotMetadata(TermIndex termIndex) {
        if (termIndex == null) {
            return null;
        }
        return SnapshotMetadata.newBuilder().setSnapshotTerm(termIndex.getTerm()).setSnapshotIndex(termIndex.getIndex()).build();
    }

    private boolean transitionState(DownloadState downloadState, DownloadState downloadState2) {
        if (this.mDownloadState.compareAndSet(downloadState, downloadState2)) {
            return true;
        }
        LOG.warn("Failed to transition from {} to {}: current state is {}", new Object[]{downloadState, downloadState2, this.mDownloadState.get()});
        return false;
    }

    private long installDownloadedSnapshot() {
        if (!transitionState(DownloadState.DOWNLOADED, DownloadState.INSTALLING)) {
            return -1L;
        }
        File file = null;
        try {
            try {
                Timer.Context time = MetricsSystem.timer(MetricKey.MASTER_EMBEDDED_JOURNAL_SNAPSHOT_INSTALL_TIMER.getName()).time();
                try {
                    SnapshotInfo snapshotInfo = this.mDownloadedSnapshot;
                    if (snapshotInfo == null) {
                        throw new IllegalStateException("Snapshot is not completed");
                    }
                    FileInfo fileInfo = (FileInfo) snapshotInfo.getFiles().get(0);
                    File file2 = fileInfo.getPath().toFile();
                    if (!file2.exists()) {
                        throw new FileNotFoundException(String.format("Snapshot file %s is not found", file2));
                    }
                    SingleFileSnapshotInfo latestSnapshot = this.mStorage.getLatestSnapshot();
                    TermIndex termIndex = latestSnapshot == null ? null : latestSnapshot.getTermIndex();
                    TermIndex termIndex2 = snapshotInfo.getTermIndex();
                    if (termIndex != null && termIndex2.compareTo(termIndex) < 0) {
                        throw new AbortedException(String.format("Snapshot to be installed %s is older than current snapshot %s", termIndex2, termIndex));
                    }
                    File snapshotFile = this.mStorage.getSnapshotFile(termIndex2.getTerm(), termIndex2.getIndex());
                    LOG.debug("Moving temp snapshot {} to file {}", file2, snapshotFile);
                    MD5FileUtil.saveMD5File(snapshotFile, fileInfo.getFileDigest());
                    if (!file2.renameTo(snapshotFile)) {
                        throw new IOException(String.format("Failed to rename %s to %s", file2, snapshotFile));
                    }
                    this.mStorage.loadLatestSnapshot();
                    LOG.info("Completed storing snapshot at {} to file {}", termIndex2, snapshotFile);
                    long index = termIndex2.getIndex();
                    if (time != null) {
                        time.close();
                    }
                    transitionState(DownloadState.INSTALLING, DownloadState.IDLE);
                    return index;
                } catch (Throwable th) {
                    if (time != null) {
                        try {
                            time.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Exception e) {
                LOG.error("Failed to install snapshot", e);
                if (0 != 0) {
                    file.delete();
                }
                transitionState(DownloadState.INSTALLING, DownloadState.IDLE);
                return -1L;
            }
        } catch (Throwable th3) {
            transitionState(DownloadState.INSTALLING, DownloadState.IDLE);
            throw th3;
        }
    }

    private void requestSnapshotFromFollowers() {
        if (transitionState(DownloadState.IDLE, DownloadState.REQUEST_INFO)) {
            RaftPeerId raftPeerId = null;
            try {
                SingleFileSnapshotInfo latestSnapshot = this.mStorage.getLatestSnapshot();
                SnapshotMetadata build = latestSnapshot == null ? null : SnapshotMetadata.newBuilder().setSnapshotTerm(latestSnapshot.getTerm()).setSnapshotIndex(latestSnapshot.getIndex()).build();
                for (Map.Entry entry : ((Map) this.mJournalSystem.getQuorumServerInfoList().stream().filter(quorumServerInfo -> {
                    return quorumServerInfo.getServerState() == QuorumServerState.AVAILABLE;
                }).map(quorumServerInfo2 -> {
                    return RaftJournalUtils.getPeerId(quorumServerInfo2.getServerAddress().getHost(), quorumServerInfo2.getServerAddress().getRpcPort());
                }).filter(raftPeerId2 -> {
                    return !raftPeerId2.equals(this.mJournalSystem.getLocalPeerId());
                }).collect(Collectors.toMap(Function.identity(), raftPeerId3 -> {
                    return this.mJournalSystem.sendMessageAsync(raftPeerId3, toMessage(JournalQueryRequest.newBuilder().setSnapshotInfoRequest(GetSnapshotInfoRequest.getDefaultInstance()).build()));
                }))).entrySet()) {
                    RaftPeerId raftPeerId4 = (RaftPeerId) entry.getKey();
                    try {
                        RaftClientReply raftClientReply = (RaftClientReply) ((CompletableFuture) entry.getValue()).get();
                        if (raftClientReply.getException() != null) {
                            LOG.warn("Received exception requesting snapshot info {}", raftClientReply.getException().getMessage());
                        } else {
                            try {
                                JournalQueryResponse parseFrom = JournalQueryResponse.parseFrom(raftClientReply.getMessage().getContent().asReadOnlyByteBuffer());
                                LOG.debug("Received snapshot info from follower {} - {}", raftPeerId4, parseFrom);
                                if (parseFrom.hasSnapshotInfoResponse()) {
                                    SnapshotMetadata latest = parseFrom.getSnapshotInfoResponse().getLatest();
                                    if (latest == null) {
                                        LOG.debug("Follower {} does not have a snapshot", raftPeerId4);
                                    } else if (build == null || (latest.getSnapshotTerm() >= build.getSnapshotTerm() && latest.getSnapshotIndex() > build.getSnapshotIndex())) {
                                        build = latest;
                                        raftPeerId = raftPeerId4;
                                    }
                                } else {
                                    LOG.warn("Invalid response for GetSnapshotInfoRequest {}", parseFrom);
                                }
                            } catch (InvalidProtocolBufferException e) {
                                LOG.warn("Failed to parse response {}", e.toString());
                            }
                        }
                    } catch (Exception e2) {
                        LOG.warn("Exception thrown while requesting snapshot info {}", e2.toString());
                    }
                }
                if (raftPeerId == null) {
                    throw new UnavailableException("No recent snapshot found from followers");
                }
                LOG.info("Request snapshot data from follower {}", raftPeerId);
                this.mSnapshotRequestTime = CommonUtils.getCurrentMs();
                transitionState(DownloadState.REQUEST_INFO, DownloadState.REQUEST_DATA);
                try {
                    RaftClientReply raftClientReply2 = this.mJournalSystem.sendMessageAsync(raftPeerId, toMessage(JournalQueryRequest.newBuilder().setSnapshotRequest(GetSnapshotRequest.getDefaultInstance()).build())).get();
                    if (raftClientReply2.getException() != null) {
                        throw raftClientReply2.getException();
                    }
                } catch (Exception e3) {
                    LOG.error("Failed to request snapshot data from {}", raftPeerId, e3);
                    transitionState(DownloadState.REQUEST_DATA, DownloadState.IDLE);
                }
            } catch (Exception e4) {
                LogUtils.warnWithException(LOG, "Failed to request snapshot info from followers", new Object[]{e4});
                transitionState(DownloadState.REQUEST_INFO, DownloadState.IDLE);
            }
        }
    }

    private void checkRequestTimeout() {
        if (CommonUtils.getCurrentMs() - this.mSnapshotRequestTime > SNAPSHOT_REQUEST_TIMEOUT_MS) {
            transitionState(DownloadState.REQUEST_DATA, DownloadState.IDLE);
        }
    }

    private synchronized RaftJournalServiceClient getJournalServiceClient() throws AlluxioStatusException {
        if (this.mJournalServiceClient == null) {
            this.mJournalServiceClient = new RaftJournalServiceClient(MasterClientContext.newBuilder(ClientContext.create(ServerConfiguration.global())).build());
        }
        this.mJournalServiceClient.connect();
        return this.mJournalServiceClient;
    }

    public synchronized void close() {
        if (this.mJournalServiceClient != null) {
            this.mJournalServiceClient.close();
            this.mJournalServiceClient = null;
        }
    }
}
