package io.kurrent.dbclient;

import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.kurrent.dbclient.proto.shared.Shared;
import io.kurrent.dbclient.proto.streams.StreamsOuterClass;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kurrent/dbclient/ReadResponseObserver.class */
class ReadResponseObserver implements ClientResponseObserver<StreamsOuterClass.ReadReq, StreamsOuterClass.ReadResp> {
    private static final Logger logger = LoggerFactory.getLogger(ReadResponseObserver.class);
    private final OptionsWithBackPressure<?> options;
    private final AtomicInteger requested = new AtomicInteger(0);
    private final AtomicBoolean completed = new AtomicBoolean(false);
    private final StreamConsumer consumer;
    private ClientCallStreamObserver<StreamsOuterClass.ReadReq> requestStream;
    private int outstandingRequests;
    private WorkItemArgs args;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/kurrent/dbclient/ReadResponseObserver$InternalSubscription.class */
    public static class InternalSubscription implements org.reactivestreams.Subscription {
        private final ReadResponseObserver observer;

        InternalSubscription(ReadResponseObserver readResponseObserver) {
            this.observer = readResponseObserver;
        }

        public void request(long j) {
            if (j <= 0) {
                throw new IllegalArgumentException("non-positive subscription request");
            }
            this.observer.requested.set((int) j);
        }

        public void cancel() {
            this.observer.cancel("subscription was manually cancelled", null);
        }
    }

    public ReadResponseObserver(OptionsWithBackPressure<?> optionsWithBackPressure, StreamConsumer streamConsumer) {
        this.options = optionsWithBackPressure;
        this.consumer = streamConsumer;
    }

    public org.reactivestreams.Subscription getSubscription() {
        return new InternalSubscription(this);
    }

    public void onConnected(WorkItemArgs workItemArgs) {
        this.args = workItemArgs;
    }

    void cancel(String str, Throwable th) {
        if (this.completed.compareAndSet(false, true) && this.requestStream != null) {
            this.requestStream.cancel(str, th);
            if (th instanceof StreamNotFoundException) {
                this.consumer.onStreamNotFound(((StreamNotFoundException) th).getStreamName());
            }
        }
    }

    void manageFlowControl() {
        int andSet = this.requested.getAndSet(0);
        int computeRequestThreshold = this.options.computeRequestThreshold();
        this.outstandingRequests = Math.max(this.outstandingRequests, 0);
        int batchSize = this.options.getBatchSize() - this.outstandingRequests;
        if (andSet > 0) {
            int min = Math.min(andSet, batchSize);
            this.requestStream.request(min);
            this.outstandingRequests += min;
        } else {
            if (batchSize < computeRequestThreshold) {
                return;
            }
            this.requestStream.request(batchSize);
            this.outstandingRequests += batchSize;
        }
    }

    public void beforeStart(ClientCallStreamObserver<StreamsOuterClass.ReadReq> clientCallStreamObserver) {
        this.requestStream = clientCallStreamObserver;
        if (this.completed.get()) {
            this.requestStream.cancel("the streaming operation was cancelled manually", (Throwable) null);
            return;
        }
        this.requestStream.disableAutoRequestWithInitial(this.options.getBatchSize());
        this.outstandingRequests = this.options.getBatchSize();
        this.consumer.onSubscribe(getSubscription());
    }

    public void onNext(StreamsOuterClass.ReadResp readResp) {
        if (this.completed.get()) {
            return;
        }
        this.outstandingRequests--;
        if (readResp.hasStreamNotFound()) {
            String byteString = readResp.getStreamNotFound().getStreamIdentifier().getStreamName().toString(Charset.defaultCharset());
            cancel(String.format("stream '%s' is not found", byteString), new StreamNotFoundException(byteString));
            return;
        }
        if (readResp.hasEvent()) {
            this.consumer.onEvent(ResolvedEvent.fromWire(readResp.getEvent()));
        } else if (readResp.hasConfirmation()) {
            this.consumer.onSubscriptionConfirmation(readResp.getConfirmation().getSubscriptionId());
        } else if (readResp.hasCheckpoint()) {
            StreamsOuterClass.ReadResp.Checkpoint checkpoint = readResp.getCheckpoint();
            this.consumer.onCheckpoint(checkpoint.getCommitPosition(), checkpoint.getPreparePosition());
        } else if (readResp.hasFirstStreamPosition()) {
            this.consumer.onFirstStreamPosition(readResp.getFirstStreamPosition());
        } else if (readResp.hasLastStreamPosition()) {
            this.consumer.onLastStreamPosition(readResp.getLastStreamPosition());
        } else if (readResp.hasLastAllStreamPosition()) {
            Shared.AllStreamPosition lastAllStreamPosition = readResp.getLastAllStreamPosition();
            this.consumer.onLastAllStreamPosition(lastAllStreamPosition.getCommitPosition(), lastAllStreamPosition.getPreparePosition());
        } else if (readResp.hasCaughtUp()) {
            StreamsOuterClass.ReadResp.CaughtUp caughtUp = readResp.getCaughtUp();
            this.consumer.onCaughtUp(Instant.ofEpochSecond(caughtUp.getTimestamp().getSeconds(), caughtUp.getTimestamp().getNanos()), caughtUp.hasStreamRevision() ? Long.valueOf(caughtUp.getStreamRevision()) : null, caughtUp.hasPosition() ? new Position(caughtUp.getPosition().getCommitPosition(), caughtUp.getPosition().getPreparePosition()) : null);
        } else if (readResp.hasFellBehind()) {
            StreamsOuterClass.ReadResp.FellBehind fellBehind = readResp.getFellBehind();
            this.consumer.onFellBehind(Instant.ofEpochSecond(fellBehind.getTimestamp().getSeconds(), fellBehind.getTimestamp().getNanos()), fellBehind.hasStreamRevision() ? Long.valueOf(fellBehind.getStreamRevision()) : null, fellBehind.hasPosition() ? new Position(fellBehind.getPosition().getCommitPosition(), fellBehind.getPosition().getPreparePosition()) : null);
        } else {
            logger.warn("received unknown message variant");
        }
        manageFlowControl();
    }

    public void onError(Throwable th) {
        if (this.completed.compareAndSet(false, true)) {
            if (th instanceof StatusRuntimeException) {
                StatusRuntimeException statusRuntimeException = (StatusRuntimeException) th;
                if (statusRuntimeException.getStatus().getCode() == Status.Code.CANCELLED) {
                    return;
                }
                Metadata trailers = statusRuntimeException.getTrailers();
                if (trailers != null) {
                    String str = (String) trailers.get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER));
                    String str2 = (String) trailers.get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER));
                    if (str != null && str2 != null) {
                        int parseInt = Integer.parseInt(str2);
                        this.args.reportNewLeader(str, parseInt);
                        th = new NotLeaderException(str, parseInt);
                    }
                }
            }
            this.consumer.onCancelled(th);
        }
    }

    public void onCompleted() {
        if (this.completed.compareAndSet(false, true)) {
            this.consumer.onComplete();
        }
    }
}
