package io.reactivesocket.client;

import io.reactivesocket.ClientReactiveSocket;
import io.reactivesocket.DuplexConnection;
import io.reactivesocket.Frame;
import io.reactivesocket.FrameType;
import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.ServerReactiveSocket;
import io.reactivesocket.StreamIdSupplier;
import io.reactivesocket.client.ReactiveSocketClient;
import io.reactivesocket.events.AbstractEventSource;
import io.reactivesocket.events.ClientEventListener;
import io.reactivesocket.events.ConnectionEventInterceptor;
import io.reactivesocket.internal.ClientServerInputMultiplexer;
import io.reactivesocket.internal.DisabledEventPublisher;
import io.reactivesocket.internal.EventPublisher;
import io.reactivesocket.lease.DisableLeaseSocket;
import io.reactivesocket.lease.LeaseHonoringSocket;
import io.reactivesocket.reactivestreams.extensions.Px;
import io.reactivesocket.reactivestreams.extensions.internal.publishers.InstrumentingPublisher;
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
import io.reactivesocket.util.Clock;
import io.reactivesocket.util.PayloadImpl;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;

/* loaded from: input_file:io/reactivesocket/client/SetupProviderImpl.class */
final class SetupProviderImpl extends AbstractEventSource<ClientEventListener> implements SetupProvider {
    private final Frame setupFrame;
    private final Function<ReactiveSocket, ? extends LeaseHonoringSocket> leaseDecorator;
    private final Consumer<Throwable> errorConsumer;
    private final KeepAliveProvider keepAliveProvider;

    /* loaded from: input_file:io/reactivesocket/client/SetupProviderImpl$ConnectInspector.class */
    private static class ConnectInspector {
        private static final ConnectInspector empty = new ConnectInspector(new DisabledEventPublisher());
        private final EventPublisher<ClientEventListener> publisher;
        private final long startTime = Clock.now();

        public ConnectInspector(EventPublisher<ClientEventListener> eventPublisher) {
            this.publisher = eventPublisher;
            if (eventPublisher.isEventPublishingEnabled()) {
                eventPublisher.getEventListener().connectStart();
            }
        }

        public void connectSuccess(ReactiveSocket reactiveSocket) {
            if (this.publisher.isEventPublishingEnabled()) {
                this.publisher.getEventListener().connectCompleted(() -> {
                    return reactiveSocket.availability();
                }, System.nanoTime() - this.startTime, TimeUnit.NANOSECONDS);
                reactiveSocket.onClose().subscribe(Subscribers.doOnTerminate(() -> {
                    if (this.publisher.isEventPublishingEnabled()) {
                        this.publisher.getEventListener().socketClosed(Clock.elapsedSince(this.startTime), Clock.unit());
                    }
                }));
            }
        }

        public void connectFailed(Throwable th) {
            if (this.publisher.isEventPublishingEnabled()) {
                this.publisher.getEventListener().connectFailed(System.nanoTime() - this.startTime, TimeUnit.NANOSECONDS, th);
            }
        }

        public void connectCancelled() {
            if (this.publisher.isEventPublishingEnabled()) {
                this.publisher.getEventListener().connectCancelled(System.nanoTime() - this.startTime, TimeUnit.NANOSECONDS);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SetupProviderImpl(Frame frame, Function<ReactiveSocket, ? extends LeaseHonoringSocket> function, KeepAliveProvider keepAliveProvider, Consumer<Throwable> consumer) {
        this.keepAliveProvider = keepAliveProvider;
        this.errorConsumer = consumer;
        Frame.ensureFrameType(FrameType.SETUP, frame);
        this.leaseDecorator = function;
        this.setupFrame = frame;
    }

    @Override // io.reactivesocket.client.SetupProvider
    public Publisher<ReactiveSocket> accept(DuplexConnection duplexConnection, ReactiveSocketClient.SocketAcceptor socketAcceptor) {
        return new InstrumentingPublisher(_setup(isEventPublishingEnabled() ? new ConnectionEventInterceptor(duplexConnection, this) : duplexConnection, socketAcceptor), subscriber -> {
            return !isEventPublishingEnabled() ? ConnectInspector.empty : new ConnectInspector(this);
        }, (v0, v1) -> {
            v0.connectFailed(v1);
        }, (Consumer) null, (v0) -> {
            v0.connectCancelled();
        }, (v0, v1) -> {
            v0.connectSuccess(v1);
        });
    }

    @Override // io.reactivesocket.client.SetupProvider
    public SetupProvider dataMimeType(String str) {
        return new SetupProviderImpl(Frame.Setup.from(Frame.Setup.getFlags(this.setupFrame), Frame.Setup.keepaliveInterval(this.setupFrame), Frame.Setup.maxLifetime(this.setupFrame), Frame.Setup.metadataMimeType(this.setupFrame), str, this.setupFrame), this.leaseDecorator, this.keepAliveProvider, this.errorConsumer);
    }

    @Override // io.reactivesocket.client.SetupProvider
    public SetupProvider metadataMimeType(String str) {
        return new SetupProviderImpl(Frame.Setup.from(Frame.Setup.getFlags(this.setupFrame), Frame.Setup.keepaliveInterval(this.setupFrame), Frame.Setup.maxLifetime(this.setupFrame), str, Frame.Setup.dataMimeType(this.setupFrame), this.setupFrame), this.leaseDecorator, this.keepAliveProvider, this.errorConsumer);
    }

    @Override // io.reactivesocket.client.SetupProvider
    public SetupProvider honorLease(Function<ReactiveSocket, LeaseHonoringSocket> function) {
        return new SetupProviderImpl(this.setupFrame, function, this.keepAliveProvider, this.errorConsumer);
    }

    @Override // io.reactivesocket.client.SetupProvider
    public SetupProvider disableLease() {
        return disableLease(DisableLeaseSocket::new);
    }

    @Override // io.reactivesocket.client.SetupProvider
    public SetupProvider disableLease(Function<ReactiveSocket, DisableLeaseSocket> function) {
        return new SetupProviderImpl(Frame.Setup.from(Frame.Setup.getFlags(this.setupFrame) & (-33), Frame.Setup.keepaliveInterval(this.setupFrame), Frame.Setup.maxLifetime(this.setupFrame), Frame.Setup.metadataMimeType(this.setupFrame), Frame.Setup.dataMimeType(this.setupFrame), this.setupFrame), function, this.keepAliveProvider, this.errorConsumer);
    }

    @Override // io.reactivesocket.client.SetupProvider
    public SetupProvider setupPayload(Payload payload) {
        return new SetupProviderImpl(Frame.Setup.from(Frame.Setup.getFlags(this.setupFrame) & (-33), Frame.Setup.keepaliveInterval(this.setupFrame), Frame.Setup.maxLifetime(this.setupFrame), Frame.Setup.metadataMimeType(this.setupFrame), Frame.Setup.dataMimeType(this.setupFrame), payload), reactiveSocket -> {
            return new DisableLeaseSocket(reactiveSocket);
        }, this.keepAliveProvider, this.errorConsumer);
    }

    private Frame copySetupFrame() {
        return Frame.Setup.from(Frame.Setup.getFlags(this.setupFrame), Frame.Setup.keepaliveInterval(this.setupFrame), Frame.Setup.maxLifetime(this.setupFrame), Frame.Setup.metadataMimeType(this.setupFrame), Frame.Setup.dataMimeType(this.setupFrame), new PayloadImpl(this.setupFrame.getData().duplicate(), this.setupFrame.getMetadata().duplicate()));
    }

    private Publisher<ReactiveSocket> _setup(DuplexConnection duplexConnection, ReactiveSocketClient.SocketAcceptor socketAcceptor) {
        return Px.from(duplexConnection.sendOne(copySetupFrame())).cast(ReactiveSocket.class).concatWith(Px.defer(() -> {
            ClientServerInputMultiplexer clientServerInputMultiplexer = new ClientServerInputMultiplexer(duplexConnection);
            ClientReactiveSocket clientReactiveSocket = new ClientReactiveSocket(clientServerInputMultiplexer.asClientConnection(), this.errorConsumer, StreamIdSupplier.clientSupplier(), this.keepAliveProvider, this);
            LeaseHonoringSocket apply = this.leaseDecorator.apply(clientReactiveSocket);
            clientReactiveSocket.start(apply);
            new ServerReactiveSocket(clientServerInputMultiplexer.asServerConnection(), socketAcceptor.accept(clientReactiveSocket), true, this.errorConsumer, this).start();
            return Px.just(apply);
        }));
    }
}
