package io.reactivesocket.server;

import io.reactivesocket.ClientReactiveSocket;
import io.reactivesocket.ConnectionSetupPayload;
import io.reactivesocket.DuplexConnection;
import io.reactivesocket.FrameType;
import io.reactivesocket.ServerReactiveSocket;
import io.reactivesocket.StreamIdSupplier;
import io.reactivesocket.client.KeepAliveProvider;
import io.reactivesocket.events.AbstractEventSource;
import io.reactivesocket.events.ConnectionEventInterceptor;
import io.reactivesocket.events.ServerEventListener;
import io.reactivesocket.internal.ClientServerInputMultiplexer;
import io.reactivesocket.lease.DefaultLeaseHonoringSocket;
import io.reactivesocket.reactivestreams.extensions.Px;
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
import io.reactivesocket.server.ReactiveSocketServer;
import io.reactivesocket.transport.TransportServer;
import io.reactivesocket.util.Clock;

/* loaded from: input_file:io/reactivesocket/server/DefaultReactiveSocketServer.class */
public final class DefaultReactiveSocketServer extends AbstractEventSource<ServerEventListener> implements ReactiveSocketServer {
    private final TransportServer transportServer;

    public DefaultReactiveSocketServer(TransportServer transportServer) {
        this.transportServer = transportServer;
    }

    @Override // io.reactivesocket.server.ReactiveSocketServer
    public TransportServer.StartedServer start(ReactiveSocketServer.SocketAcceptor socketAcceptor) {
        return this.transportServer.start(duplexConnection -> {
            DuplexConnection duplexConnection;
            if (isEventPublishingEnabled()) {
                long now = Clock.now();
                duplexConnection = new ConnectionEventInterceptor(duplexConnection, this);
                getEventListener().socketAccepted();
                duplexConnection.onClose().subscribe(Subscribers.doOnTerminate(() -> {
                    if (isEventPublishingEnabled()) {
                        getEventListener().socketClosed(Clock.elapsedSince(now), Clock.unit());
                    }
                }));
            } else {
                duplexConnection = duplexConnection;
            }
            DuplexConnection duplexConnection2 = duplexConnection;
            return Px.from(duplexConnection.receive()).switchTo(frame -> {
                if (frame.getType() != FrameType.SETUP) {
                    return Px.error(new IllegalStateException("Invalid first frame on the connection: " + duplexConnection2 + ", frame type received: " + frame.getType()));
                }
                ClientServerInputMultiplexer clientServerInputMultiplexer = new ClientServerInputMultiplexer(duplexConnection2);
                ConnectionSetupPayload create = ConnectionSetupPayload.create(frame);
                ClientReactiveSocket clientReactiveSocket = new ClientReactiveSocket(clientServerInputMultiplexer.asServerConnection(), (v0) -> {
                    v0.printStackTrace();
                }, StreamIdSupplier.serverSupplier(), KeepAliveProvider.never(), this);
                clientReactiveSocket.start(new DefaultLeaseHonoringSocket(clientReactiveSocket));
                new ServerReactiveSocket(clientServerInputMultiplexer.asClientConnection(), socketAcceptor.accept(create, clientReactiveSocket), create.willClientHonorLease(), (v0) -> {
                    v0.printStackTrace();
                }, this).start();
                return duplexConnection2.onClose();
            });
        });
    }
}
