package io.reactivesocket;

import io.reactivesocket.Frame;
import io.reactivesocket.internal.KnownErrorFilter;
import io.reactivesocket.internal.RemoteReceiver;
import io.reactivesocket.internal.RemoteSender;
import io.reactivesocket.lease.LeaseEnforcingSocket;
import io.reactivesocket.reactivestreams.extensions.Px;
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
import java.util.Collection;
import java.util.function.Consumer;
import org.agrona.collections.Int2ObjectHashMap;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/reactivesocket/ServerReactiveSocket.class */
public class ServerReactiveSocket implements ReactiveSocket {
    private final DuplexConnection connection;
    private final Publisher<Frame> serverInput;
    private final Consumer<Throwable> errorConsumer;
    private final Int2ObjectHashMap<Subscription> subscriptions;
    private final Int2ObjectHashMap<RemoteReceiver> channelProcessors;
    private final ReactiveSocket requestHandler;
    private Subscription receiversSubscription;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.reactivesocket.ServerReactiveSocket$2, reason: invalid class name */
    /* loaded from: input_file:io/reactivesocket/ServerReactiveSocket$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$io$reactivesocket$FrameType = new int[FrameType.values().length];

        static {
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.SETUP.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.REQUEST_RESPONSE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.CANCEL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.KEEPALIVE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.REQUEST_N.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.REQUEST_STREAM.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.FIRE_AND_FORGET.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.REQUEST_SUBSCRIPTION.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.REQUEST_CHANNEL.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.RESPONSE.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.METADATA_PUSH.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.LEASE.ordinal()] = 12;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.NEXT.ordinal()] = 13;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.COMPLETE.ordinal()] = 14;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.ERROR.ordinal()] = 15;
            } catch (NoSuchFieldError e15) {
            }
            try {
                $SwitchMap$io$reactivesocket$FrameType[FrameType.NEXT_COMPLETE.ordinal()] = 16;
            } catch (NoSuchFieldError e16) {
            }
        }
    }

    public ServerReactiveSocket(DuplexConnection duplexConnection, ReactiveSocket reactiveSocket, boolean z, Consumer<Throwable> consumer) {
        this.requestHandler = reactiveSocket;
        this.connection = duplexConnection;
        this.serverInput = duplexConnection.receive();
        this.errorConsumer = new KnownErrorFilter(consumer);
        this.subscriptions = new Int2ObjectHashMap<>();
        this.channelProcessors = new Int2ObjectHashMap<>();
        Px.from(duplexConnection.onClose()).subscribe(Subscribers.cleanup(() -> {
            cleanup();
        }));
        if (reactiveSocket instanceof LeaseEnforcingSocket) {
            ((LeaseEnforcingSocket) reactiveSocket).acceptLeaseSender(lease -> {
                if (z) {
                    Px.from(duplexConnection.sendOne(Frame.Lease.from(lease.getTtl(), lease.getAllowedRequests(), lease.metadata()))).doOnError(consumer).subscribe();
                }
            });
        }
    }

    public ServerReactiveSocket(DuplexConnection duplexConnection, ReactiveSocket reactiveSocket, Consumer<Throwable> consumer) {
        this(duplexConnection, reactiveSocket, true, consumer);
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Publisher<Void> fireAndForget(Payload payload) {
        return this.requestHandler.fireAndForget(payload);
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Publisher<Payload> requestResponse(Payload payload) {
        return this.requestHandler.requestResponse(payload);
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Publisher<Payload> requestStream(Payload payload) {
        return this.requestHandler.requestStream(payload);
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Publisher<Payload> requestSubscription(Payload payload) {
        return this.requestHandler.requestSubscription(payload);
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Publisher<Payload> requestChannel(Publisher<Payload> publisher) {
        return this.requestHandler.requestChannel(publisher);
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Publisher<Void> metadataPush(Payload payload) {
        return this.requestHandler.metadataPush(payload);
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Publisher<Void> close() {
        return Px.concatEmpty(Px.defer(() -> {
            cleanup();
            return Px.empty();
        }), this.connection.close());
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Publisher<Void> onClose() {
        return this.connection.onClose();
    }

    public ServerReactiveSocket start() {
        Px.from(this.serverInput).doOnNext(frame -> {
            handleFrame(frame).subscribe(Subscribers.doOnError(this.errorConsumer));
        }).doOnError(th -> {
            Collection values;
            this.errorConsumer.accept(th);
            synchronized (this) {
                values = this.subscriptions.values();
            }
            values.forEach((v0) -> {
                v0.cancel();
            });
        }).doOnSubscribe(subscription -> {
            this.receiversSubscription = new Subscription() { // from class: io.reactivesocket.ServerReactiveSocket.1
                public void request(long j) {
                    subscription.request(j);
                }

                public void cancel() {
                    subscription.cancel();
                }
            };
        }).subscribe();
        return this;
    }

    /* JADX WARN: Removed duplicated region for block: B:103:0x01ba  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private org.reactivestreams.Publisher<java.lang.Void> handleFrame(io.reactivesocket.Frame r8) {
        /*
            Method dump skipped, instructions count: 456
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.reactivesocket.ServerReactiveSocket.handleFrame(io.reactivesocket.Frame):org.reactivestreams.Publisher");
    }

    private synchronized void removeChannelProcessor(int i) {
        this.channelProcessors.remove(i);
    }

    private synchronized void removeSubscriptions(int i) {
        this.subscriptions.remove(i);
    }

    private synchronized void cleanup() {
        this.subscriptions.values().forEach((v0) -> {
            v0.cancel();
        });
        this.subscriptions.clear();
        this.channelProcessors.values().forEach((v0) -> {
            v0.cancel();
        });
        this.subscriptions.clear();
        this.requestHandler.close().subscribe(Subscribers.empty());
    }

    private Publisher<Void> handleReceive(int i, Publisher<Payload> publisher) {
        Runnable runnable = () -> {
            synchronized (this) {
                this.subscriptions.remove(i);
            }
        };
        return Px.from(this.connection.send(Px.from(publisher).doOnSubscribe(subscription -> {
            synchronized (this) {
                this.subscriptions.put(i, subscription);
            }
        }).map(payload -> {
            return Frame.Response.from(i, FrameType.RESPONSE, payload.getMetadata(), payload.getData(), 4096);
        }).doOnComplete(runnable).emitOnCancelOrError(() -> {
            runnable.run();
            return Frame.Cancel.from(i);
        }, th -> {
            runnable.run();
            return Frame.Error.from(i, th);
        })));
    }

    private Publisher<Void> doReceive(int i, Publisher<Payload> publisher) {
        Publisher<Frame> remoteSender = new RemoteSender(Px.from(publisher).map(payload -> {
            return Frame.Response.from(i, FrameType.RESPONSE, payload);
        }), () -> {
        }, i, 2);
        this.subscriptions.put(i, remoteSender);
        return this.connection.send(remoteSender);
    }

    private Publisher<Void> handleChannel(int i, Frame frame) {
        int initialRequestN = Frame.Request.initialRequestN(frame);
        RemoteReceiver remoteReceiver = new RemoteReceiver(this.connection, i, () -> {
            removeChannelProcessor(i);
        }, Frame.Request.from(i, FrameType.NEXT, frame, initialRequestN), this.receiversSubscription, true);
        this.channelProcessors.put(i, remoteReceiver);
        Publisher<Frame> remoteSender = new RemoteSender(Px.from(requestChannel(remoteReceiver)).map(payload -> {
            return Frame.Response.from(i, FrameType.RESPONSE, payload);
        }), () -> {
            removeSubscriptions(i);
        }, i, initialRequestN);
        synchronized (this) {
            this.subscriptions.put(i, remoteSender);
        }
        return this.connection.send(remoteSender);
    }

    private Publisher<Void> handleFireAndForget(int i, Publisher<Void> publisher) {
        return Px.from(publisher).doOnSubscribe(subscription -> {
            addSubscription(i, subscription);
        }).doOnError(th -> {
            removeSubscription(i);
            this.errorConsumer.accept(th);
        }).doOnComplete(() -> {
            removeSubscription(i);
        });
    }

    private Publisher<Void> handleKeepAliveFrame(Frame frame) {
        return Frame.Keepalive.hasRespondFlag(frame) ? Px.from(this.connection.sendOne(Frame.Keepalive.from(Frame.NULL_BYTEBUFFER, false))).doOnError(this.errorConsumer) : Px.empty();
    }

    private Publisher<Void> handleCancelFrame(int i) {
        Subscription subscription;
        synchronized (this) {
            subscription = (Subscription) this.subscriptions.remove(i);
        }
        if (subscription != null) {
            subscription.cancel();
        }
        return Px.empty();
    }

    private Publisher<Void> handleError(int i, Throwable th) {
        return Px.from(this.connection.sendOne(Frame.Error.from(i, th))).doOnError(this.errorConsumer);
    }

    private Px<Void> handleRequestN(int i, Frame frame) {
        Subscription subscription;
        synchronized (this) {
            subscription = (Subscription) this.subscriptions.get(i);
        }
        if (subscription != null) {
            int requestN = Frame.RequestN.requestN(frame);
            subscription.request(requestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : requestN);
        }
        return Px.empty();
    }

    private synchronized void addSubscription(int i, Subscription subscription) {
        this.subscriptions.put(i, subscription);
    }

    private synchronized void removeSubscription(int i) {
        this.subscriptions.remove(i);
    }
}
