package alluxio.client.block.stream;

import alluxio.exception.status.CancelledException;
import alluxio.exception.status.DeadlineExceededException;
import alluxio.exception.status.UnauthenticatedException;
import alluxio.grpc.WriteRequest;
import alluxio.grpc.WriteResponse;
import alluxio.util.ThreadFactoryUtils;
import io.grpc.Status;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:alluxio/client/block/stream/GrpcBlockingStreamTest.class */
public final class GrpcBlockingStreamTest {
    private static final int BUFFER_SIZE = 5;
    private static final long TIMEOUT = 10000;
    private static final long SHORT_TIMEOUT = 500;
    private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(4, ThreadFactoryUtils.build("test-executor-%d", true));
    private static final String TEST_MESSAGE = "test message";
    private BlockWorkerClient mClient;
    private ClientCallStreamObserver<WriteRequest> mRequestObserver;
    private ClientResponseObserver<WriteRequest, WriteResponse> mResponseObserver;
    private GrpcBlockingStream<WriteRequest, WriteResponse> mStream;
    private Runnable mOnReadyHandler;

    @Before
    public void before() {
        this.mClient = (BlockWorkerClient) Mockito.mock(BlockWorkerClient.class);
        this.mRequestObserver = (ClientCallStreamObserver) Mockito.mock(ClientCallStreamObserver.class);
        Mockito.when(this.mClient.writeBlock((StreamObserver) ArgumentMatchers.any(StreamObserver.class))).thenAnswer(invocationOnMock -> {
            this.mResponseObserver = (ClientResponseObserver) invocationOnMock.getArgument(0, ClientResponseObserver.class);
            return this.mRequestObserver;
        });
        Mockito.when(Boolean.valueOf(this.mRequestObserver.isReady())).thenReturn(true);
        BlockWorkerClient blockWorkerClient = this.mClient;
        blockWorkerClient.getClass();
        this.mStream = new GrpcBlockingStream<>(blockWorkerClient::writeBlock, BUFFER_SIZE, TEST_MESSAGE);
    }

    @Test
    public void send() throws Exception {
        WriteRequest build = WriteRequest.newBuilder().build();
        this.mStream.send(build, TIMEOUT);
        ((ClientCallStreamObserver) Mockito.verify(this.mRequestObserver)).onNext(build);
    }

    @Test
    public void receive() throws Exception {
        WriteResponse build = WriteResponse.newBuilder().build();
        this.mResponseObserver.onNext(build);
        Assert.assertEquals(build, (WriteResponse) this.mStream.receive(TIMEOUT));
    }

    @Test
    public void close() throws Exception {
        this.mStream.close();
        Assert.assertTrue(this.mStream.isClosed());
        Assert.assertFalse(this.mStream.isOpen());
        ((ClientCallStreamObserver) Mockito.verify(this.mRequestObserver)).onCompleted();
    }

    @Test
    public void cancel() throws Exception {
        this.mStream.cancel();
        Assert.assertTrue(this.mStream.isCanceled());
        Assert.assertFalse(this.mStream.isOpen());
        ((ClientCallStreamObserver) Mockito.verify(this.mRequestObserver)).cancel((String) ArgumentMatchers.any(String.class), (Throwable) ArgumentMatchers.eq((Object) null));
    }

    @Test
    public void onCompleted() throws Exception {
        this.mResponseObserver.onCompleted();
        Assert.assertNull((WriteResponse) this.mStream.receive(TIMEOUT));
    }

    @Test
    public void sendError() throws Exception {
        this.mResponseObserver.onError(Status.UNAUTHENTICATED.asRuntimeException());
        Assert.assertTrue(((Exception) Assert.assertThrows(UnauthenticatedException.class, () -> {
            this.mStream.send(WriteRequest.newBuilder().build(), TIMEOUT);
        })).getMessage().contains(TEST_MESSAGE));
    }

    @Test
    public void sendFailsAfterClosed() throws Exception {
        this.mStream.close();
        Assert.assertTrue(((Exception) Assert.assertThrows(CancelledException.class, () -> {
            this.mStream.send(WriteRequest.newBuilder().build(), TIMEOUT);
        })).getMessage().contains(TEST_MESSAGE));
    }

    @Test
    public void sendFailsAfterCanceled() throws Exception {
        this.mStream.cancel();
        Assert.assertTrue(((Exception) Assert.assertThrows(CancelledException.class, () -> {
            this.mStream.send(WriteRequest.newBuilder().build(), TIMEOUT);
        })).getMessage().contains(TEST_MESSAGE));
    }

    @Test
    public void receiveFailsAfterCanceled() throws Exception {
        this.mStream.cancel();
        Assert.assertTrue(((Exception) Assert.assertThrows(CancelledException.class, () -> {
        })).getMessage().contains(TEST_MESSAGE));
    }

    @Test
    public void receiveError() throws Exception {
        this.mResponseObserver.onError(Status.UNAUTHENTICATED.asRuntimeException());
        Assert.assertTrue(((Exception) Assert.assertThrows(UnauthenticatedException.class, () -> {
        })).getMessage().contains(TEST_MESSAGE));
    }

    @Test
    public void sendFailsAfterTimeout() throws Exception {
        Mockito.when(Boolean.valueOf(this.mRequestObserver.isReady())).thenReturn(false);
        Assert.assertTrue(((Exception) Assert.assertThrows(DeadlineExceededException.class, () -> {
            this.mStream.send(WriteRequest.newBuilder().build(), SHORT_TIMEOUT);
        })).getMessage().contains(TEST_MESSAGE));
    }

    @Test
    public void receiveFailsAfterTimeout() throws Exception {
        Assert.assertTrue(((Exception) Assert.assertThrows(DeadlineExceededException.class, () -> {
        })).getMessage().contains(TEST_MESSAGE));
    }

    @Test
    public void sendAfterStreamReady() throws Exception {
        Mockito.when(Boolean.valueOf(this.mRequestObserver.isReady())).thenReturn(false);
        ((ClientCallStreamObserver) Mockito.doAnswer(invocationOnMock -> {
            this.mOnReadyHandler = (Runnable) invocationOnMock.getArgument(0, Runnable.class);
            return null;
        }).when(this.mRequestObserver)).setOnReadyHandler((Runnable) ArgumentMatchers.any(Runnable.class));
        this.mResponseObserver.beforeStart(this.mRequestObserver);
        EXECUTOR.submit(() -> {
            try {
                Thread.sleep(SHORT_TIMEOUT);
                Mockito.when(Boolean.valueOf(this.mRequestObserver.isReady())).thenReturn(true);
                this.mOnReadyHandler.run();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        WriteRequest build = WriteRequest.newBuilder().build();
        this.mStream.send(build, TIMEOUT);
        ((ClientCallStreamObserver) Mockito.verify(this.mRequestObserver)).onNext(build);
    }

    @Test
    public void receiveAfterResponseArrives() throws Exception {
        WriteResponse build = WriteResponse.newBuilder().build();
        EXECUTOR.submit(() -> {
            try {
                Thread.sleep(SHORT_TIMEOUT);
                this.mResponseObserver.onNext(build);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        Assert.assertEquals(build, (WriteResponse) this.mStream.receive(TIMEOUT));
    }

    @Test
    public void receiveMoreThanBufferSize() throws Exception {
        WriteResponse[] writeResponseArr = (WriteResponse[]) Stream.generate(() -> {
            return WriteResponse.newBuilder().build();
        }).limit(10L).toArray(i -> {
            return new WriteResponse[i];
        });
        EXECUTOR.submit(() -> {
            for (WriteResponse writeResponse : writeResponseArr) {
                this.mResponseObserver.onNext(writeResponse);
            }
        });
        Thread.sleep(SHORT_TIMEOUT);
        for (WriteResponse writeResponse : writeResponseArr) {
            Assert.assertEquals(writeResponse, (WriteResponse) this.mStream.receive(TIMEOUT));
        }
    }

    @Test
    public void receiveErrorWhenBufferFull() throws Exception {
        WriteResponse[] writeResponseArr = (WriteResponse[]) Stream.generate(() -> {
            return WriteResponse.newBuilder().build();
        }).limit(5L).toArray(i -> {
            return new WriteResponse[i];
        });
        for (WriteResponse writeResponse : writeResponseArr) {
            this.mResponseObserver.onNext(writeResponse);
        }
        this.mResponseObserver.onError(Status.UNAUTHENTICATED.asRuntimeException());
        Assert.assertTrue(((Exception) Assert.assertThrows(UnauthenticatedException.class, () -> {
            for (WriteResponse writeResponse2 : writeResponseArr) {
                Assert.assertEquals(writeResponse2, (WriteResponse) this.mStream.receive(TIMEOUT));
            }
        })).getMessage().contains(TEST_MESSAGE));
    }

    @Test
    public void waitForComplete() throws Exception {
        WriteResponse[] writeResponseArr = (WriteResponse[]) Stream.generate(() -> {
            return WriteResponse.newBuilder().build();
        }).limit(10L).toArray(i -> {
            return new WriteResponse[i];
        });
        EXECUTOR.submit(() -> {
            for (WriteResponse writeResponse : writeResponseArr) {
                this.mResponseObserver.onNext(writeResponse);
            }
            try {
                Thread.sleep(SHORT_TIMEOUT);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            this.mResponseObserver.onCompleted();
        });
        Assert.assertEquals(writeResponseArr[0], (WriteResponse) this.mStream.receive(TIMEOUT));
        this.mStream.waitForComplete(TIMEOUT);
        Assert.assertEquals((Object) null, (WriteResponse) this.mStream.receive(TIMEOUT));
    }

    @Test
    public void waitForCompleteTimeout() throws Exception {
        WriteResponse[] writeResponseArr = (WriteResponse[]) Stream.generate(() -> {
            return WriteResponse.newBuilder().build();
        }).limit(10L).toArray(i -> {
            return new WriteResponse[i];
        });
        EXECUTOR.submit(() -> {
            for (WriteResponse writeResponse : writeResponseArr) {
                this.mResponseObserver.onNext(writeResponse);
            }
            try {
                Thread.sleep(TIMEOUT);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            this.mResponseObserver.onCompleted();
        });
        Assert.assertEquals(writeResponseArr[0], (WriteResponse) this.mStream.receive(SHORT_TIMEOUT));
        Assert.assertTrue(((Exception) Assert.assertThrows(DeadlineExceededException.class, () -> {
            this.mStream.waitForComplete(SHORT_TIMEOUT);
        })).getMessage().contains(TEST_MESSAGE));
    }
}
