package io.scalecube.services.gateway.client.websocket;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOption;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.scalecube.services.Address;
import io.scalecube.services.ServiceReference;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.gateway.client.GatewayClientCodec;
import io.scalecube.services.gateway.client.ServiceMessageCodec;
import io.scalecube.services.transport.api.ClientChannel;
import io.scalecube.services.transport.api.ClientTransport;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;

/* loaded from: input_file:io/scalecube/services/gateway/client/websocket/WebsocketGatewayClientTransport.class */
public final class WebsocketGatewayClientTransport implements ClientChannel, ClientTransport {
    private static final String STREAM_ID = "sid";
    private static final String CONTENT_TYPE = "application/json";
    private final GatewayClientCodec clientCodec;
    private final LoopResources loopResources;
    private final Duration keepAliveInterval;
    private final Function<HttpClient, HttpClient> operator;
    private final boolean ownsLoopResources;
    private final AtomicLong sidCounter = new AtomicLong();
    private final AtomicReference<WebsocketGatewayClientSession> clientSessionReference = new AtomicReference<>();
    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketGatewayClientTransport.class);
    private static final WebsocketGatewayClientCodec CLIENT_CODEC = new WebsocketGatewayClientCodec();
    private static final int CONNECT_TIMEOUT_MILLIS = (int) Duration.ofSeconds(5).toMillis();

    /* loaded from: input_file:io/scalecube/services/gateway/client/websocket/WebsocketGatewayClientTransport$Builder.class */
    public static class Builder {
        private LoopResources loopResources;
        private GatewayClientCodec clientCodec = WebsocketGatewayClientTransport.CLIENT_CODEC;
        private Duration keepAliveInterval = Duration.ZERO;
        private Function<HttpClient, HttpClient> operator = httpClient -> {
            return httpClient;
        };

        public Builder clientCodec(GatewayClientCodec gatewayClientCodec) {
            this.clientCodec = gatewayClientCodec;
            return this;
        }

        public Builder loopResources(LoopResources loopResources) {
            this.loopResources = loopResources;
            return this;
        }

        public Builder httpClient(UnaryOperator<HttpClient> unaryOperator) {
            this.operator = this.operator.andThen(unaryOperator);
            return this;
        }

        public Builder address(Address address) {
            return httpClient(httpClient -> {
                return httpClient.host(address.host()).port(address.port());
            });
        }

        public Builder connectTimeout(Duration duration) {
            return httpClient(httpClient -> {
                return httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf((int) duration.toMillis()));
            });
        }

        public Builder contentType(String str) {
            return httpClient(httpClient -> {
                return httpClient.headers(httpHeaders -> {
                    httpHeaders.set(HttpHeaderNames.CONTENT_TYPE, str);
                });
            });
        }

        public Builder keepAliveInterval(Duration duration) {
            this.keepAliveInterval = duration;
            return this;
        }

        public Builder headers(Map<String, String> map) {
            return httpClient(httpClient -> {
                return httpClient.headers(httpHeaders -> {
                    Objects.requireNonNull(httpHeaders);
                    map.forEach((v1, v2) -> {
                        r1.set(v1, v2);
                    });
                });
            });
        }

        public WebsocketGatewayClientTransport build() {
            return new WebsocketGatewayClientTransport(this);
        }
    }

    private WebsocketGatewayClientTransport(Builder builder) {
        this.clientCodec = builder.clientCodec;
        this.keepAliveInterval = builder.keepAliveInterval;
        this.operator = builder.operator;
        this.loopResources = builder.loopResources == null ? LoopResources.create("websocket-gateway-client", 1, true) : builder.loopResources;
        this.ownsLoopResources = builder.loopResources == null;
    }

    public ClientChannel create(ServiceReference serviceReference) {
        this.clientSessionReference.getAndUpdate(websocketGatewayClientSession -> {
            return websocketGatewayClientSession != null ? websocketGatewayClientSession : clientSession(this.operator.apply(HttpClient.create(ConnectionProvider.newConnection()).runOn(this.loopResources).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(CONNECT_TIMEOUT_MILLIS)).option(ChannelOption.TCP_NODELAY, true).headers(httpHeaders -> {
                httpHeaders.set(HttpHeaderNames.CONTENT_TYPE, CONTENT_TYPE);
            })));
        });
        return this;
    }

    private WebsocketGatewayClientSession clientSession(HttpClient httpClient) {
        try {
            return (WebsocketGatewayClientSession) httpClient.websocket().uri("/").connect().map(connection -> {
                return this.keepAliveInterval != Duration.ZERO ? connection.onReadIdle(this.keepAliveInterval.toMillis(), () -> {
                    onReadIdle(connection);
                }).onWriteIdle(this.keepAliveInterval.toMillis(), () -> {
                    onWriteIdle(connection);
                }) : connection;
            }).map(connection2 -> {
                WebsocketGatewayClientSession websocketGatewayClientSession = new WebsocketGatewayClientSession(this.clientCodec, connection2);
                LOGGER.info("Created session: {}", websocketGatewayClientSession);
                websocketGatewayClientSession.onClose().doOnTerminate(() -> {
                    LOGGER.info("Closed session: {}", websocketGatewayClientSession);
                }).subscribe((Consumer) null, th -> {
                    LOGGER.warn("Exception on closing session: {}, cause: {}", websocketGatewayClientSession, th.toString());
                });
                return websocketGatewayClientSession;
            }).doOnError(th -> {
                LOGGER.warn("Failed to connect, cause: {}", th.toString());
            }).toFuture().get();
        } catch (Exception e) {
            throw new RuntimeException(getRootCause(e));
        }
    }

    public Mono<ServiceMessage> requestResponse(ServiceMessage serviceMessage, Type type) {
        return Mono.defer(() -> {
            long incrementAndGet = this.sidCounter.incrementAndGet();
            WebsocketGatewayClientSession websocketGatewayClientSession = this.clientSessionReference.get();
            return websocketGatewayClientSession.send(encodeRequest(serviceMessage, incrementAndGet)).doOnSubscribe(subscription -> {
                LOGGER.debug("Sending request {}", serviceMessage);
            }).then(websocketGatewayClientSession.newMonoProcessor(incrementAndGet).asMono()).map(serviceMessage2 -> {
                return ServiceMessageCodec.decodeData(serviceMessage2, type);
            }).doOnCancel(() -> {
                websocketGatewayClientSession.cancel(incrementAndGet, serviceMessage.qualifier());
            }).doFinally(signalType -> {
                websocketGatewayClientSession.removeProcessor(incrementAndGet);
            });
        });
    }

    public Flux<ServiceMessage> requestStream(ServiceMessage serviceMessage, Type type) {
        return Flux.defer(() -> {
            long incrementAndGet = this.sidCounter.incrementAndGet();
            WebsocketGatewayClientSession websocketGatewayClientSession = this.clientSessionReference.get();
            return websocketGatewayClientSession.send(encodeRequest(serviceMessage, incrementAndGet)).doOnSubscribe(subscription -> {
                LOGGER.debug("Sending request {}", serviceMessage);
            }).thenMany(websocketGatewayClientSession.newUnicastProcessor(incrementAndGet).asFlux()).map(serviceMessage2 -> {
                return ServiceMessageCodec.decodeData(serviceMessage2, type);
            }).doOnCancel(() -> {
                websocketGatewayClientSession.cancel(incrementAndGet, serviceMessage.qualifier());
            }).doFinally(signalType -> {
                websocketGatewayClientSession.removeProcessor(incrementAndGet);
            });
        });
    }

    public Flux<ServiceMessage> requestChannel(Publisher<ServiceMessage> publisher, Type type) {
        return Flux.error(new UnsupportedOperationException("requestChannel is not supported"));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void onWriteIdle(Connection connection) {
        connection.outbound().sendObject(new PingWebSocketFrame()).then().subscribe((Consumer) null, th -> {
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void onReadIdle(Connection connection) {
        connection.outbound().sendObject(new PingWebSocketFrame()).then().subscribe((Consumer) null, th -> {
        });
    }

    private ByteBuf encodeRequest(ServiceMessage serviceMessage, long j) {
        return this.clientCodec.encode(ServiceMessage.from(serviceMessage).header(STREAM_ID, Long.valueOf(j)).build());
    }

    private static Throwable getRootCause(Throwable th) {
        Throwable cause = th.getCause();
        return cause == null ? th : getRootCause(cause);
    }

    public void close() {
        if (this.ownsLoopResources) {
            this.loopResources.dispose();
        }
    }
}
