package alluxio.master.journal.raft;

import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.grpc.LatestSnapshotInfoPRequest;
import alluxio.grpc.RaftJournalServiceGrpc;
import alluxio.grpc.SnapshotData;
import alluxio.grpc.SnapshotMetadata;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.util.compression.DirectoryMarshaller;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import io.grpc.Context;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.io.File;
import java.io.OutputStream;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:alluxio/master/journal/raft/RaftJournalServiceHandler.class */
public class RaftJournalServiceHandler extends RaftJournalServiceGrpc.RaftJournalServiceImplBase {
    private static final Logger LOG = LoggerFactory.getLogger(RaftJournalServiceHandler.class);
    private final StateMachineStorage mStateMachineStorage;
    private volatile long mLastSnapshotUploadDurationMs = -1;
    private volatile long mLastSnapshotUploadSize = -1;
    private volatile long mLastSnapshotUploadDiskSize = -1;

    /* loaded from: input_file:alluxio/master/journal/raft/RaftJournalServiceHandler$SnapshotGrpcOutputStream.class */
    static class SnapshotGrpcOutputStream extends OutputStream {
        private final StreamObserver<SnapshotData> mObserver;
        private final int mSnapshotReplicationChunkSize = (int) Configuration.getBytes(PropertyKey.MASTER_EMBEDDED_JOURNAL_SNAPSHOT_REPLICATION_CHUNK_SIZE);
        private long mTotalBytesSent = 0;
        private byte[] mBuffer = new byte[this.mSnapshotReplicationChunkSize];
        private int mBufferPosition = 0;

        public SnapshotGrpcOutputStream(StreamObserver<SnapshotData> streamObserver) {
            this.mObserver = streamObserver;
        }

        @Override // java.io.OutputStream
        public void write(int i) {
            byte[] bArr = this.mBuffer;
            int i2 = this.mBufferPosition;
            this.mBufferPosition = i2 + 1;
            bArr[i2] = (byte) i;
            if (this.mBufferPosition == this.mBuffer.length) {
                flushBuffer();
            }
        }

        @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            if (this.mBufferPosition > 0) {
                flushBuffer();
            }
        }

        private void flushBuffer() {
            ByteString unsafeWrap = UnsafeByteOperations.unsafeWrap(this.mBuffer, 0, this.mBufferPosition);
            this.mBuffer = new byte[this.mSnapshotReplicationChunkSize];
            RaftJournalServiceHandler.LOG.debug("Sending chunk of size {}: {}", Integer.valueOf(this.mBufferPosition), unsafeWrap);
            this.mObserver.onNext(SnapshotData.newBuilder().setChunk(unsafeWrap).build());
            this.mTotalBytesSent += this.mBufferPosition;
            this.mBufferPosition = 0;
        }

        public long totalBytes() {
            return this.mTotalBytesSent + this.mBufferPosition;
        }
    }

    public RaftJournalServiceHandler(StateMachineStorage stateMachineStorage) {
        this.mStateMachineStorage = stateMachineStorage;
        MetricsSystem.registerGaugeIfAbsent(MetricKey.MASTER_EMBEDDED_JOURNAL_LAST_SNAPSHOT_UPLOAD_DURATION_MS.getName(), () -> {
            return Long.valueOf(this.mLastSnapshotUploadDurationMs);
        });
        MetricsSystem.registerGaugeIfAbsent(MetricKey.MASTER_EMBEDDED_JOURNAL_LAST_SNAPSHOT_UPLOAD_SIZE.getName(), () -> {
            return Long.valueOf(this.mLastSnapshotUploadSize);
        });
        MetricsSystem.registerGaugeIfAbsent(MetricKey.MASTER_EMBEDDED_JOURNAL_LAST_SNAPSHOT_UPLOAD_DISK_SIZE.getName(), () -> {
            return Long.valueOf(this.mLastSnapshotUploadDiskSize);
        });
    }

    public void requestLatestSnapshotInfo(LatestSnapshotInfoPRequest latestSnapshotInfoPRequest, StreamObserver<SnapshotMetadata> streamObserver) {
        LOG.info("Received request for latest snapshot info");
        if (Context.current().isCancelled()) {
            streamObserver.onError(Status.CANCELLED.withDescription("Cancelled by client").asRuntimeException());
            return;
        }
        SnapshotInfo latestSnapshot = this.mStateMachineStorage.getLatestSnapshot();
        SnapshotMetadata.Builder newBuilder = SnapshotMetadata.newBuilder();
        if (latestSnapshot == null) {
            LOG.info("No snapshot to send");
            newBuilder.setExists(false);
        } else {
            LOG.info("Found snapshot {}", latestSnapshot.getTermIndex());
            newBuilder.setExists(true).setSnapshotTerm(latestSnapshot.getTerm()).setSnapshotIndex(latestSnapshot.getIndex());
        }
        streamObserver.onNext(newBuilder.build());
        streamObserver.onCompleted();
    }

    public void requestLatestSnapshotData(SnapshotMetadata snapshotMetadata, StreamObserver<SnapshotData> streamObserver) {
        TermIndex valueOf = TermIndex.valueOf(snapshotMetadata.getSnapshotTerm(), snapshotMetadata.getSnapshotIndex());
        LOG.info("Received request for snapshot data {}", valueOf);
        if (Context.current().isCancelled()) {
            streamObserver.onError(Status.CANCELLED.withDescription("Cancelled by client").asRuntimeException());
            return;
        }
        Path path = new File(this.mStateMachineStorage.getSnapshotDir(), SimpleStateMachineStorage.getSnapshotFileName(snapshotMetadata.getSnapshotTerm(), snapshotMetadata.getSnapshotIndex())).toPath();
        LOG.info("Begin snapshot upload of {}", valueOf);
        Instant now = Instant.now();
        try {
            SnapshotGrpcOutputStream snapshotGrpcOutputStream = new SnapshotGrpcOutputStream(streamObserver);
            Throwable th = null;
            try {
                try {
                    long write = DirectoryMarshaller.Factory.create().write(path, snapshotGrpcOutputStream);
                    long j = snapshotGrpcOutputStream.totalBytes();
                    if (snapshotGrpcOutputStream != null) {
                        if (0 != 0) {
                            try {
                                snapshotGrpcOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            snapshotGrpcOutputStream.close();
                        }
                    }
                    streamObserver.onCompleted();
                    this.mLastSnapshotUploadDurationMs = Duration.between(now, Instant.now()).toMillis();
                    MetricsSystem.timer(MetricKey.MASTER_EMBEDDED_JOURNAL_SNAPSHOT_UPLOAD_TIMER.getName()).update(this.mLastSnapshotUploadDurationMs, TimeUnit.MILLISECONDS);
                    LOG.info("Total milliseconds to upload {}: {}", valueOf, Long.valueOf(this.mLastSnapshotUploadDurationMs));
                    this.mLastSnapshotUploadDiskSize = write;
                    MetricsSystem.histogram(MetricKey.MASTER_EMBEDDED_JOURNAL_SNAPSHOT_UPLOAD_DISK_HISTOGRAM.getName()).update(this.mLastSnapshotUploadDiskSize);
                    LOG.info("Total snapshot uncompressed bytes for {}: {}", valueOf, Long.valueOf(this.mLastSnapshotUploadDiskSize));
                    this.mLastSnapshotUploadSize = j;
                    MetricsSystem.histogram(MetricKey.MASTER_EMBEDDED_JOURNAL_SNAPSHOT_UPLOAD_HISTOGRAM.getName()).update(this.mLastSnapshotUploadSize);
                    LOG.info("Total bytes sent for {}: {}", valueOf, Long.valueOf(this.mLastSnapshotUploadSize));
                    LOG.info("Uploaded snapshot {}", valueOf);
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            LOG.warn("Failed to upload snapshot {}", valueOf, e);
            streamObserver.onError(Status.INTERNAL.withCause(e).asRuntimeException());
        }
    }
}
