package tech.ydb.core.impl.call;

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcFlowControl;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.grpc.GrpcReadWriteStream;
import tech.ydb.core.grpc.GrpcStatuses;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.core.impl.auth.AuthCallOptions;
import tech.ydb.shaded.google.protobuf.Message;
import tech.ydb.shaded.google.protobuf.TextFormat;
import tech.ydb.shaded.grpc.ClientCall;
import tech.ydb.shaded.grpc.Metadata;
import tech.ydb.shaded.javax.annotation.Nullable;

/* loaded from: input_file:tech/ydb/core/impl/call/ReadWriteStreamCall.class */
public class ReadWriteStreamCall<R, W> extends ClientCall.Listener<R> implements GrpcReadWriteStream<R, W> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) GrpcTransport.class);
    private final String traceId;
    private final ClientCall<W, R> call;
    private final GrpcStatusHandler statusConsumer;
    private final Metadata headers;
    private final AuthCallOptions callOptions;
    private final GrpcFlowControl.Call flow;
    private final Lock callLock = new ReentrantLock();
    private final Queue<W> messagesQueue = new ArrayDeque();
    private final CompletableFuture<Status> statusFuture = new CompletableFuture<>();
    private GrpcReadStream.Observer<R> consumer = null;

    public ReadWriteStreamCall(String str, ClientCall<W, R> clientCall, GrpcFlowControl grpcFlowControl, Metadata metadata, AuthCallOptions authCallOptions, GrpcStatusHandler grpcStatusHandler) {
        this.traceId = str;
        this.call = clientCall;
        this.headers = metadata;
        this.statusConsumer = grpcStatusHandler;
        this.callOptions = authCallOptions;
        this.flow = grpcFlowControl.newCall(this::nextRequest);
    }

    @Override // tech.ydb.core.grpc.GrpcReadWriteStream
    public String authToken() {
        return this.callOptions.getToken();
    }

    @Override // tech.ydb.core.grpc.GrpcReadStream
    public CompletableFuture<Status> start(GrpcReadStream.Observer<R> observer) {
        this.callLock.lock();
        try {
            try {
            } catch (Throwable th) {
                try {
                    this.call.cancel(null, th);
                } catch (Throwable th2) {
                    logger.error("Exception encountered while closing the unary call", th2);
                }
                this.statusFuture.completeExceptionally(th);
                this.callLock.unlock();
            }
            if (this.consumer != null) {
                throw new IllegalStateException("Read write stream call is already started");
            }
            if (observer == null) {
                throw new IllegalArgumentException("Observer must be not empty");
            }
            this.consumer = observer;
            this.call.start(this, this.headers);
            this.flow.onStart();
            this.callLock.unlock();
            return this.statusFuture;
        } catch (Throwable th3) {
            this.callLock.unlock();
            throw th3;
        }
    }

    @Override // tech.ydb.core.grpc.GrpcReadWriteStream
    public void sendNext(W w) {
        this.callLock.lock();
        try {
            if (flush()) {
                if (logger.isTraceEnabled()) {
                    logger.trace("ReadWriteStreamCall[{}] --> {}", this.traceId, TextFormat.shortDebugString((Message) w));
                }
                this.call.sendMessage(w);
            } else {
                this.messagesQueue.add(w);
            }
        } finally {
            this.callLock.unlock();
        }
    }

    private boolean flush() {
        while (this.call.isReady()) {
            W poll = this.messagesQueue.poll();
            if (poll == null) {
                return true;
            }
            if (logger.isTraceEnabled()) {
                logger.trace("ReadWriteStreamCall[{}] --> {}", this.traceId, TextFormat.shortDebugString((Message) poll));
            }
            this.call.sendMessage(poll);
        }
        return false;
    }

    @Override // tech.ydb.core.grpc.GrpcReadStream
    public void cancel() {
        this.callLock.lock();
        try {
            this.call.cancel("Cancelled on user request", new CancellationException());
        } finally {
            this.callLock.unlock();
        }
    }

    private void nextRequest(int i) {
        this.callLock.lock();
        try {
            this.call.request(i);
        } finally {
            this.callLock.unlock();
        }
    }

    @Override // tech.ydb.shaded.grpc.ClientCall.Listener
    public void onMessage(R r) {
        try {
            if (logger.isTraceEnabled()) {
                logger.trace("ReadWriteStreamCall[{}] <-- {}", this.traceId, TextFormat.shortDebugString((Message) r));
            }
            this.consumer.onNext(r);
            this.flow.onMessageRead();
        } catch (Exception e) {
            this.statusFuture.completeExceptionally(e);
            try {
                this.callLock.lock();
                try {
                    this.call.cancel("Canceled by exception from observer", e);
                    this.callLock.unlock();
                } catch (Throwable th) {
                    this.callLock.unlock();
                    throw th;
                }
            } catch (Throwable th2) {
                logger.error("Exception encountered while canceling the read write stream call", th2);
            }
        }
    }

    @Override // tech.ydb.shaded.grpc.ClientCall.Listener
    public void onReady() {
        this.callLock.lock();
        try {
            flush();
        } finally {
            this.callLock.unlock();
        }
    }

    @Override // tech.ydb.core.grpc.GrpcReadWriteStream
    public void close() {
        this.callLock.lock();
        try {
            this.call.halfClose();
        } finally {
            this.callLock.unlock();
        }
    }

    @Override // tech.ydb.shaded.grpc.ClientCall.Listener
    public void onClose(tech.ydb.shaded.grpc.Status status, @Nullable Metadata metadata) {
        if (logger.isTraceEnabled()) {
            logger.trace("ReadWriteStreamCall[{}] closed with status {}", this.traceId, status);
        }
        this.statusConsumer.accept(status, metadata);
        if (status.isOk()) {
            this.statusFuture.complete(Status.SUCCESS);
        } else {
            this.statusFuture.complete(GrpcStatuses.toStatus(status));
        }
    }
}
