package io.reactivesocket;

import io.reactivesocket.Frame;
import io.reactivesocket.client.KeepAliveProvider;
import io.reactivesocket.events.EventListener;
import io.reactivesocket.events.EventPublishingSocket;
import io.reactivesocket.events.EventPublishingSocketImpl;
import io.reactivesocket.exceptions.CancelException;
import io.reactivesocket.exceptions.Exceptions;
import io.reactivesocket.frame.ErrorFrameFlyweight;
import io.reactivesocket.internal.DisabledEventPublisher;
import io.reactivesocket.internal.EventPublisher;
import io.reactivesocket.internal.KnownErrorFilter;
import io.reactivesocket.internal.RemoteReceiver;
import io.reactivesocket.internal.RemoteSender;
import io.reactivesocket.lease.Lease;
import io.reactivesocket.lease.LeaseImpl;
import io.reactivesocket.reactivestreams.extensions.DefaultSubscriber;
import io.reactivesocket.reactivestreams.extensions.Px;
import io.reactivesocket.reactivestreams.extensions.internal.FlowControlHelper;
import io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription;
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.CancellableSubscriber;
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.function.Consumer;
import org.agrona.collections.Int2ObjectHashMap;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/reactivesocket/ClientReactiveSocket.class */
public class ClientReactiveSocket implements ReactiveSocket {
    private final DuplexConnection connection;
    private final Consumer<Throwable> errorConsumer;
    private final StreamIdSupplier streamIdSupplier;
    private final KeepAliveProvider keepAliveProvider;
    private final EventPublishingSocket eventPublishingSocket;
    private final Int2ObjectHashMap<Subscription> senders;
    private final Int2ObjectHashMap<Subscriber<Frame>> receivers;
    private final BufferingSubscription transportReceiveSubscription;
    private CancellableSubscriber<Void> keepAliveSendSub;
    private volatile Consumer<Lease> leaseConsumer;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.reactivesocket.ClientReactiveSocket$1, reason: invalid class name */
    /* loaded from: input_file:io/reactivesocket/ClientReactiveSocket$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$reactivesocket$FrameType = new int[FrameType.values().length];

        static {
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.ERROR.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.LEASE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.KEEPALIVE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.NEXT_COMPLETE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.CANCEL.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.NEXT.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.REQUEST_N.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.COMPLETE.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/reactivesocket/ClientReactiveSocket$BufferingSubscription.class */
    public static class BufferingSubscription implements Subscription {
        private int requested;
        private boolean cancelled;
        private Subscription delegate;

        private BufferingSubscription() {
        }

        public void request(long j) {
            if (relay()) {
                this.delegate.request(j);
            } else {
                this.requested = FlowControlHelper.incrementRequestN(this.requested, j);
            }
        }

        public void cancel() {
            if (relay()) {
                this.delegate.cancel();
            } else {
                this.cancelled = true;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void switchTo(Subscription subscription) {
            synchronized (this) {
                this.delegate = subscription;
            }
            if (this.requested > 0) {
                subscription.request(this.requested);
            }
            if (this.cancelled) {
                subscription.cancel();
            }
        }

        private synchronized boolean relay() {
            return this.delegate != null;
        }

        /* synthetic */ BufferingSubscription(AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    public ClientReactiveSocket(DuplexConnection duplexConnection, Consumer<Throwable> consumer, StreamIdSupplier streamIdSupplier, KeepAliveProvider keepAliveProvider, EventPublisher<? extends EventListener> eventPublisher) {
        this.transportReceiveSubscription = new BufferingSubscription(null);
        this.connection = duplexConnection;
        this.errorConsumer = new KnownErrorFilter(consumer);
        this.streamIdSupplier = streamIdSupplier;
        this.keepAliveProvider = keepAliveProvider;
        this.eventPublishingSocket = eventPublisher.isEventPublishingEnabled() ? new EventPublishingSocketImpl(eventPublisher, true) : EventPublishingSocket.DISABLED;
        this.senders = new Int2ObjectHashMap<>(256, 0.9f);
        this.receivers = new Int2ObjectHashMap<>(256, 0.9f);
        duplexConnection.onClose().subscribe(Subscribers.cleanup(() -> {
            cleanup();
        }));
    }

    public ClientReactiveSocket(DuplexConnection duplexConnection, Consumer<Throwable> consumer, StreamIdSupplier streamIdSupplier, KeepAliveProvider keepAliveProvider) {
        this(duplexConnection, consumer, streamIdSupplier, keepAliveProvider, new DisabledEventPublisher());
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Publisher<Void> fireAndForget(Payload payload) {
        return Px.defer(() -> {
            return this.connection.sendOne(Frame.Request.from(nextStreamId(), FrameType.FIRE_AND_FORGET, payload, 0));
        });
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Publisher<Payload> requestResponse(Payload payload) {
        return handleRequestResponse(payload);
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Publisher<Payload> requestStream(Payload payload) {
        return handleStreamResponse(Px.just(payload), FrameType.REQUEST_STREAM);
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Publisher<Payload> requestSubscription(Payload payload) {
        return handleStreamResponse(Px.just(payload), FrameType.REQUEST_SUBSCRIPTION);
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Publisher<Payload> requestChannel(Publisher<Payload> publisher) {
        return handleStreamResponse(Px.from(publisher), FrameType.REQUEST_CHANNEL);
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Publisher<Void> metadataPush(Payload payload) {
        return this.connection.sendOne(Frame.Request.from(0, FrameType.METADATA_PUSH, payload, 0));
    }

    @Override // io.reactivesocket.ReactiveSocket, io.reactivesocket.Availability
    public double availability() {
        return this.connection.availability();
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Publisher<Void> close() {
        return Px.concatEmpty(Px.defer(() -> {
            cleanup();
            return Px.empty();
        }), this.connection.close());
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Publisher<Void> onClose() {
        return this.connection.onClose();
    }

    public ClientReactiveSocket start(Consumer<Lease> consumer) {
        this.leaseConsumer = consumer;
        startKeepAlive();
        startReceivingRequests();
        return this;
    }

    private Publisher<Payload> handleRequestResponse(Payload payload) {
        return Px.create(subscriber -> {
            int nextStreamId = nextStreamId();
            Frame from = Frame.Request.from(nextStreamId, FrameType.REQUEST_RESPONSE, payload, 1);
            synchronized (this) {
                this.receivers.put(nextStreamId, subscriber);
            }
            this.eventPublishingSocket.decorateReceive(nextStreamId, Px.concatEmpty(this.eventPublishingSocket.decorateSend(nextStreamId, this.connection.sendOne(from), 0L, EventListener.RequestType.RequestResponse), Px.never()).cast(Payload.class).doOnCancel(() -> {
                if (this.connection.availability() > 0.0d) {
                    this.connection.sendOne(Frame.Cancel.from(nextStreamId)).subscribe(DefaultSubscriber.defaultInstance());
                }
                removeReceiver(nextStreamId);
            }), EventListener.RequestType.RequestResponse).subscribe(subscriber);
        });
    }

    private Publisher<Payload> handleStreamResponse(Px<Payload> px, FrameType frameType) {
        return Px.defer(() -> {
            int nextStreamId = nextStreamId();
            RemoteSender remoteSender = new RemoteSender(px.map(payload -> {
                return Frame.Request.from(nextStreamId, frameType, payload, 1);
            }), removeSenderLambda(nextStreamId), nextStreamId, 1);
            Subscriber<Frame> remoteReceiver = new RemoteReceiver(subscriber -> {
                CancellableSubscriber doOnError = Subscribers.doOnError(th -> {
                    subscriber.onError(th);
                });
                ValidatingSubscription create = ValidatingSubscription.create(subscriber, () -> {
                    doOnError.cancel();
                }, j -> {
                    this.transportReceiveSubscription.request(j);
                });
                this.eventPublishingSocket.decorateSend(nextStreamId, this.connection.send(remoteSender), 0L, EventListener.RequestType.fromFrameType(frameType)).subscribe(doOnError);
                subscriber.onSubscribe(create);
            }, this.connection, nextStreamId, removeReceiverLambda(nextStreamId), true);
            registerSenderReceiver(nextStreamId, remoteSender, remoteReceiver);
            return this.eventPublishingSocket.decorateReceive(nextStreamId, remoteReceiver, EventListener.RequestType.fromFrameType(frameType));
        });
    }

    private void startKeepAlive() {
        this.keepAliveSendSub = Subscribers.doOnError(this.errorConsumer);
        this.connection.send(Px.from(this.keepAliveProvider.ticks()).map(l -> {
            return Frame.Keepalive.from(Frame.NULL_BYTEBUFFER, true);
        })).subscribe(this.keepAliveSendSub);
    }

    private void startReceivingRequests() {
        Px.from(this.connection.receive()).doOnSubscribe(subscription -> {
            this.transportReceiveSubscription.switchTo(subscription);
        }).doOnNext(this::handleIncomingFrames).subscribe();
    }

    protected void cleanup() {
        if (null != this.keepAliveSendSub) {
            this.keepAliveSendSub.cancel();
        }
        this.transportReceiveSubscription.cancel();
    }

    private void handleIncomingFrames(Frame frame) {
        int streamId = frame.getStreamId();
        FrameType type = frame.getType();
        if (streamId == 0) {
            handleStreamZero(type, frame);
        } else {
            handleFrame(streamId, type, frame);
        }
    }

    private void handleStreamZero(FrameType frameType, Frame frame) {
        switch (AnonymousClass1.$SwitchMap$io$reactivesocket$FrameType[frameType.ordinal()]) {
            case ErrorFrameFlyweight.INVALID_SETUP /* 1 */:
                throw Exceptions.from(frame);
            case ErrorFrameFlyweight.UNSUPPORTED_SETUP /* 2 */:
                if (this.leaseConsumer != null) {
                    this.leaseConsumer.accept(new LeaseImpl(frame));
                    return;
                }
                return;
            case 3:
                if (Frame.Keepalive.hasRespondFlag(frame)) {
                    return;
                }
                this.keepAliveProvider.ack();
                return;
            default:
                this.errorConsumer.accept(new IllegalStateException("Client received supported frame on stream 0: " + frame.toString()));
                return;
        }
    }

    private void handleFrame(int i, FrameType frameType, Frame frame) {
        Subscriber subscriber;
        Subscription subscription;
        Subscription subscription2;
        synchronized (this) {
            subscriber = (Subscriber) this.receivers.get(i);
        }
        if (subscriber == null) {
            handleMissingResponseProcessor(i, frameType, frame);
            return;
        }
        switch (AnonymousClass1.$SwitchMap$io$reactivesocket$FrameType[frameType.ordinal()]) {
            case ErrorFrameFlyweight.INVALID_SETUP /* 1 */:
                subscriber.onError(Exceptions.from(frame));
                synchronized (this) {
                    this.receivers.remove(i);
                }
                return;
            case ErrorFrameFlyweight.UNSUPPORTED_SETUP /* 2 */:
            case 3:
            default:
                throw new IllegalStateException("Client received supported frame on stream " + i + ": " + frame.toString());
            case 4:
                subscriber.onNext(frame);
                subscriber.onComplete();
                synchronized (this) {
                    this.receivers.remove(i);
                }
                return;
            case 5:
                synchronized (this) {
                    subscription2 = (Subscription) this.senders.remove(i);
                    this.receivers.remove(i);
                }
                if (subscription2 != null) {
                    subscription2.cancel();
                }
                subscriber.onError(new CancelException("cancelling stream id " + i));
                return;
            case 6:
                subscriber.onNext(frame);
                return;
            case 7:
                synchronized (this) {
                    subscription = (Subscription) this.senders.get(i);
                }
                if (subscription != null) {
                    subscription.request(Frame.RequestN.requestN(frame));
                    return;
                }
                return;
            case 8:
                subscriber.onComplete();
                synchronized (this) {
                    this.receivers.remove(i);
                }
                return;
        }
    }

    private void handleMissingResponseProcessor(int i, FrameType frameType, Frame frame) {
        if (this.streamIdSupplier.isBeforeOrCurrent(i)) {
            return;
        }
        if (frameType != FrameType.ERROR) {
            throw new IllegalStateException("Client received message for non-existent stream: " + i + ", frame type: " + frameType);
        }
        throw new IllegalStateException("Client received error for non-existent stream: " + i + " Message: " + getByteBufferAsString(frame.getData()));
    }

    private int nextStreamId() {
        return this.streamIdSupplier.nextStreamId();
    }

    private static String getByteBufferAsString(ByteBuffer byteBuffer) {
        byte[] bArr = new byte[byteBuffer.remaining()];
        byteBuffer.get(bArr);
        return new String(bArr, StandardCharsets.UTF_8);
    }

    private Runnable removeReceiverLambda(int i) {
        return () -> {
            removeReceiver(i);
        };
    }

    private synchronized void removeReceiver(int i) {
        this.receivers.remove(i);
    }

    private Runnable removeSenderLambda(int i) {
        return () -> {
            removeSender(i);
        };
    }

    private synchronized void removeSender(int i) {
        this.senders.remove(i);
    }

    private synchronized void registerSenderReceiver(int i, Subscription subscription, Subscriber<Frame> subscriber) {
        this.senders.put(i, subscription);
        this.receivers.put(i, subscriber);
    }
}
