package org.springframework.web.reactive.socket.adapter;

import com.hazelcast.sql.impl.SqlErrorCode;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.websocket.api.Callback;
import org.eclipse.jetty.websocket.api.Session;
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketMessage;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:BOOT-INF/lib/spring-webflux-6.2.6.jar:org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.class */
public class JettyWebSocketSession extends AbstractWebSocketSession<Session> {
    private final Flux<WebSocketMessage> flux;
    private final Sinks.One<CloseStatus> closeStatusSink;
    private final Lock lock;
    private long requested;
    private boolean awaitingMessage;

    @Nullable
    private FluxSink<WebSocketMessage> sink;

    @Nullable
    private final Sinks.Empty<Void> handlerCompletionSink;

    public JettyWebSocketSession(Session session, HandshakeInfo handshakeInfo, DataBufferFactory dataBufferFactory) {
        this(session, handshakeInfo, dataBufferFactory, null);
    }

    public JettyWebSocketSession(Session session, HandshakeInfo handshakeInfo, DataBufferFactory dataBufferFactory, @Nullable Sinks.Empty<Void> empty) {
        super(session, ObjectUtils.getIdentityHexString(session), handshakeInfo, dataBufferFactory);
        this.closeStatusSink = Sinks.one();
        this.lock = new ReentrantLock();
        this.requested = 0L;
        this.awaitingMessage = false;
        this.handlerCompletionSink = empty;
        this.flux = Flux.create(fluxSink -> {
            this.sink = fluxSink;
            fluxSink.onRequest(j -> {
                boolean z = false;
                this.lock.lock();
                try {
                    this.requested = Math.addExact(this.requested, j);
                    if (this.requested < 0) {
                        this.requested = Long.MAX_VALUE;
                    }
                    if (!this.awaitingMessage && this.requested > 0) {
                        if (this.requested != Long.MAX_VALUE) {
                            this.requested--;
                        }
                        this.awaitingMessage = true;
                        z = true;
                    }
                    if (z) {
                        getDelegate().demand();
                    }
                } finally {
                    this.lock.unlock();
                }
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleMessage(WebSocketMessage webSocketMessage) {
        Assert.state(this.sink != null, "No sink available");
        this.sink.next(webSocketMessage);
        boolean z = false;
        this.lock.lock();
        try {
            if (!this.awaitingMessage) {
                throw new IllegalStateException();
            }
            this.awaitingMessage = false;
            if (this.requested > 0) {
                if (this.requested != Long.MAX_VALUE) {
                    this.requested--;
                }
                this.awaitingMessage = true;
                z = true;
            }
            if (z) {
                getDelegate().demand();
            }
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleError(Throwable th) {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleClose(CloseStatus closeStatus) {
        this.closeStatusSink.tryEmitValue(closeStatus);
        if (this.sink != null) {
            this.sink.complete();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onHandlerError(Throwable th) {
        if (this.handlerCompletionSink != null) {
            this.handlerCompletionSink.tryEmitError(th);
        }
        getDelegate().close(SqlErrorCode.TOPOLOGY_CHANGE, th.getMessage(), Callback.NOOP);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onHandleComplete() {
        if (this.handlerCompletionSink != null) {
            this.handlerCompletionSink.tryEmitEmpty();
        }
        getDelegate().close(1000, (String) null, Callback.NOOP);
    }

    @Override // org.springframework.web.reactive.socket.WebSocketSession
    public boolean isOpen() {
        return getDelegate().isOpen();
    }

    @Override // org.springframework.web.reactive.socket.WebSocketSession
    public Mono<Void> close(CloseStatus closeStatus) {
        Callback.Completable completable = new Callback.Completable();
        getDelegate().close(closeStatus.getCode(), closeStatus.getReason(), completable);
        return Mono.fromFuture((CompletableFuture) completable);
    }

    @Override // org.springframework.web.reactive.socket.WebSocketSession
    public Mono<CloseStatus> closeStatus() {
        return this.closeStatusSink.asMono();
    }

    @Override // org.springframework.web.reactive.socket.adapter.AbstractWebSocketSession, org.springframework.web.reactive.socket.WebSocketSession
    public Flux<WebSocketMessage> receive() {
        return this.flux;
    }

    @Override // org.springframework.web.reactive.socket.adapter.AbstractWebSocketSession, org.springframework.web.reactive.socket.WebSocketSession
    public Mono<Void> send(Publisher<WebSocketMessage> publisher) {
        return Flux.from(publisher).flatMap(this::sendMessage, 1).then();
    }

    /* JADX WARN: Type inference failed for: r0v21, types: [org.springframework.web.reactive.socket.adapter.JettyWebSocketSession$1] */
    protected Mono<Void> sendMessage(WebSocketMessage webSocketMessage) {
        final Callback.Completable completable = new Callback.Completable();
        DataBuffer payload = webSocketMessage.getPayload();
        final Session delegate = getDelegate();
        if (WebSocketMessage.Type.TEXT.equals(webSocketMessage.getType())) {
            delegate.sendText(payload.toString(StandardCharsets.UTF_8), completable);
        } else {
            switch (webSocketMessage.getType()) {
                case BINARY:
                    final DataBuffer.ByteBufferIterator readableByteBuffers = payload.readableByteBuffers();
                    new IteratingCallback() { // from class: org.springframework.web.reactive.socket.adapter.JettyWebSocketSession.1
                        protected IteratingCallback.Action process() {
                            if (!readableByteBuffers.hasNext()) {
                                return IteratingCallback.Action.SUCCEEDED;
                            }
                            delegate.sendPartialBinary(readableByteBuffers.next(), readableByteBuffers.hasNext(), Callback.from(this::succeeded, this::failed));
                            return IteratingCallback.Action.SCHEDULED;
                        }

                        protected void onCompleteSuccess() {
                            readableByteBuffers.close();
                            completable.succeed();
                        }

                        protected void onCompleteFailure(Throwable th) {
                            readableByteBuffers.close();
                            completable.fail(th);
                        }
                    }.iterate();
                    break;
                case PING:
                    ByteBuffer allocate = BufferUtil.allocate(125);
                    payload.toByteBuffer(allocate);
                    delegate.sendPing(allocate, completable);
                    break;
                case PONG:
                    ByteBuffer allocate2 = BufferUtil.allocate(125);
                    payload.toByteBuffer(allocate2);
                    delegate.sendPong(allocate2, completable);
                    break;
                default:
                    throw new IllegalArgumentException("Unexpected message type: " + String.valueOf(webSocketMessage.getType()));
            }
        }
        return Mono.fromFuture((CompletableFuture) completable);
    }
}
