package alluxio.master.journal.raft;

import alluxio.grpc.DownloadSnapshotPRequest;
import alluxio.grpc.DownloadSnapshotPResponse;
import alluxio.grpc.SnapshotData;
import alluxio.grpc.UploadSnapshotPRequest;
import alluxio.grpc.UploadSnapshotPResponse;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.util.MD5FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:alluxio/master/journal/raft/SnapshotDownloader.class */
public class SnapshotDownloader<S, R> implements ClientResponseObserver<S, R> {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotDownloader.class);
    private final SimpleStateMachineStorage mStorage;
    private final Function<Long, S> mMessageBuilder;
    private final Function<R, SnapshotData> mDataGetter;
    private final String mSource;
    private TermIndex mTermIndex;
    private File mTempFile;
    private FileOutputStream mOutputStream;
    private StreamObserver<S> mStream;
    private SnapshotInfo mSnapshotToInstall;
    private final CompletableFuture<TermIndex> mFuture = new CompletableFuture<>();
    private long mBytesWritten = 0;

    public static SnapshotDownloader<UploadSnapshotPResponse, UploadSnapshotPRequest> forLeader(SimpleStateMachineStorage simpleStateMachineStorage, StreamObserver<UploadSnapshotPResponse> streamObserver, String str) {
        return new SnapshotDownloader<>(simpleStateMachineStorage, l -> {
            return UploadSnapshotPResponse.newBuilder().setOffsetReceived(l.longValue()).build();
        }, (v0) -> {
            return v0.getData();
        }, streamObserver, str);
    }

    public static SnapshotDownloader<DownloadSnapshotPRequest, DownloadSnapshotPResponse> forFollower(SimpleStateMachineStorage simpleStateMachineStorage, String str) {
        return new SnapshotDownloader<>(simpleStateMachineStorage, l -> {
            return DownloadSnapshotPRequest.newBuilder().setOffsetReceived(l.longValue()).build();
        }, (v0) -> {
            return v0.getData();
        }, null, str);
    }

    private SnapshotDownloader(SimpleStateMachineStorage simpleStateMachineStorage, Function<Long, S> function, Function<R, SnapshotData> function2, StreamObserver<S> streamObserver, String str) {
        this.mStorage = simpleStateMachineStorage;
        this.mMessageBuilder = function;
        this.mDataGetter = function2;
        this.mStream = streamObserver;
        this.mSource = str;
    }

    public void onNext(R r) {
        try {
            onNextInternal(r);
        } catch (Exception e) {
            this.mStream.onError(e);
            this.mFuture.completeExceptionally(e);
            cleanup();
        }
    }

    private void cleanup() {
        if (this.mOutputStream != null) {
            try {
                this.mOutputStream.close();
            } catch (IOException e) {
                LOG.error("Error closing snapshot file {}", this.mTempFile, e);
            }
        }
        if (this.mTempFile == null || this.mTempFile.delete()) {
            return;
        }
        LOG.error("Error deleting snapshot file {}", this.mTempFile.getPath());
    }

    private void onNextInternal(R r) throws IOException {
        TermIndex newTermIndex = TermIndex.newTermIndex(this.mDataGetter.apply(r).getSnapshotTerm(), this.mDataGetter.apply(r).getSnapshotIndex());
        if (this.mTermIndex == null) {
            LOG.info("Downloading new snapshot {} from {}", newTermIndex, this.mSource);
            this.mTermIndex = newTermIndex;
            this.mTempFile = RaftJournalUtils.createTempSnapshotFile(this.mStorage);
            this.mTempFile.deleteOnExit();
            this.mStream.onNext(this.mMessageBuilder.apply(0L));
            return;
        }
        if (!newTermIndex.equals(this.mTermIndex)) {
            throw new IOException(String.format("Mismatched term index when downloading the snapshot. expected: %s actual: %s", this.mTermIndex, newTermIndex));
        }
        if (!this.mDataGetter.apply(r).hasChunk()) {
            throw new IOException(String.format("A chunk for file %s is missing from the response %s.", this.mTempFile, r));
        }
        if (this.mOutputStream == null) {
            LOG.info("Start writing to temporary file {}", this.mTempFile.getPath());
            this.mOutputStream = new FileOutputStream(this.mTempFile);
        }
        long position = this.mOutputStream.getChannel().position();
        if (position != this.mDataGetter.apply(r).getOffset()) {
            throw new IOException(String.format("Mismatched offset in file %d, expect %d, bytes written %d", Long.valueOf(position), Long.valueOf(this.mDataGetter.apply(r).getOffset()), Long.valueOf(this.mBytesWritten)));
        }
        this.mOutputStream.write(this.mDataGetter.apply(r).getChunk().toByteArray());
        this.mBytesWritten += this.mDataGetter.apply(r).getChunk().size();
        LOG.debug("Written {} bytes to snapshot file {}", Long.valueOf(this.mBytesWritten), this.mTempFile.getPath());
        if (!this.mDataGetter.apply(r).getEof()) {
            this.mStream.onNext(this.mMessageBuilder.apply(Long.valueOf(this.mBytesWritten)));
            return;
        }
        LOG.debug("Completed writing to temporary file {} with size {}", this.mTempFile.getPath(), Long.valueOf(this.mOutputStream.getChannel().position()));
        this.mOutputStream.close();
        this.mOutputStream = null;
        this.mSnapshotToInstall = new SingleFileSnapshotInfo(new FileInfo(this.mTempFile.toPath(), MD5FileUtil.computeMd5ForFile(this.mTempFile)), this.mTermIndex.getTerm(), this.mTermIndex.getIndex());
        this.mFuture.complete(this.mTermIndex);
        LOG.info("Finished copying snapshot to local file {}.", this.mTempFile);
        this.mStream.onCompleted();
    }

    public void onError(Throwable th) {
        this.mFuture.completeExceptionally(th);
        cleanup();
    }

    public void onCompleted() {
        if (this.mOutputStream != null) {
            this.mFuture.completeExceptionally(new IllegalStateException("Request completed with unfinished upload"));
            cleanup();
        }
    }

    public void beforeStart(ClientCallStreamObserver<S> clientCallStreamObserver) {
        this.mStream = clientCallStreamObserver;
    }

    public CompletableFuture<TermIndex> getFuture() {
        return this.mFuture;
    }

    public SnapshotInfo getSnapshotToInstall() {
        return this.mSnapshotToInstall;
    }
}
