package org.springframework.messaging.rsocket;

import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:WEB-INF/lib/spring-messaging-5.2.0.M1.jar:org/springframework/messaging/rsocket/MessagingRSocket.class */
class MessagingRSocket extends AbstractRSocket {
    private final Function<Message<?>, Mono<Void>> handler;
    private final RSocketRequester requester;

    @Nullable
    private MimeType dataMimeType;
    private final RSocketStrategies strategies;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessagingRSocket(Function<Message<?>, Mono<Void>> function, RSocket rSocket, @Nullable MimeType mimeType, RSocketStrategies rSocketStrategies) {
        Assert.notNull(function, "'handler' is required");
        Assert.notNull(rSocket, "'sendingRSocket' is required");
        this.handler = function;
        this.requester = RSocketRequester.create(rSocket, mimeType, rSocketStrategies);
        this.dataMimeType = mimeType;
        this.strategies = rSocketStrategies;
    }

    public Mono<Void> handleConnectionSetupPayload(ConnectionSetupPayload connectionSetupPayload) {
        if (StringUtils.hasText(connectionSetupPayload.dataMimeType())) {
            this.dataMimeType = MimeTypeUtils.parseMimeType(connectionSetupPayload.dataMimeType());
        }
        connectionSetupPayload.retain();
        return handle(connectionSetupPayload);
    }

    public Mono<Void> fireAndForget(Payload payload) {
        return handle(payload);
    }

    public Mono<Payload> requestResponse(Payload payload) {
        return handleAndReply(payload, Flux.just(payload)).next();
    }

    public Flux<Payload> requestStream(Payload payload) {
        return handleAndReply(payload, Flux.just(payload));
    }

    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        return Flux.from(publisher).switchOnFirst((signal, flux) -> {
            Payload payload = (Payload) signal.get();
            return payload == null ? flux : handleAndReply(payload, flux);
        });
    }

    public Mono<Void> metadataPush(Payload payload) {
        return handle(payload);
    }

    private Mono<Void> handle(Payload payload) {
        MessageHeaders createHeaders = createHeaders(getDestination(payload), null);
        DataBuffer retainDataAndReleasePayload = retainDataAndReleasePayload(payload);
        int refCount = refCount(retainDataAndReleasePayload);
        Message createMessage = MessageBuilder.createMessage(retainDataAndReleasePayload, createHeaders);
        return Mono.defer(() -> {
            return this.handler.apply(createMessage);
        }).doFinally(signalType -> {
            if (refCount(retainDataAndReleasePayload) == refCount) {
                DataBufferUtils.release(retainDataAndReleasePayload);
            }
        });
    }

    private int refCount(DataBuffer dataBuffer) {
        if (dataBuffer instanceof NettyDataBuffer) {
            return ((NettyDataBuffer) dataBuffer).getNativeBuffer().refCnt();
        }
        return 1;
    }

    private Flux<Payload> handleAndReply(Payload payload, Flux<Payload> flux) {
        MonoProcessor<?> create = MonoProcessor.create();
        MessageHeaders createHeaders = createHeaders(getDestination(payload), create);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Flux doOnSubscribe = flux.map(this::retainDataAndReleasePayload).doOnSubscribe(subscription -> {
            atomicBoolean.set(true);
        });
        Message createMessage = MessageBuilder.createMessage(doOnSubscribe, createHeaders);
        return Mono.defer(() -> {
            return this.handler.apply(createMessage);
        }).doFinally(signalType -> {
            if (atomicBoolean.get()) {
                return;
            }
            doOnSubscribe.subscribe(DataBufferUtils::release);
        }).thenMany(Flux.defer(() -> {
            return create.isTerminated() ? create.flatMapMany(Function.identity()) : Mono.error(new IllegalStateException("Something went wrong: reply Mono not set"));
        }));
    }

    private String getDestination(Payload payload) {
        return payload.getMetadataUtf8();
    }

    private DataBuffer retainDataAndReleasePayload(Payload payload) {
        return PayloadUtils.retainDataAndReleasePayload(payload, this.strategies.dataBufferFactory());
    }

    private MessageHeaders createHeaders(String str, @Nullable MonoProcessor<?> monoProcessor) {
        MessageHeaderAccessor messageHeaderAccessor = new MessageHeaderAccessor();
        messageHeaderAccessor.setLeaveMutable(true);
        messageHeaderAccessor.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, str);
        if (this.dataMimeType != null) {
            messageHeaderAccessor.setContentType(this.dataMimeType);
        }
        messageHeaderAccessor.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, this.requester);
        if (monoProcessor != null) {
            messageHeaderAccessor.setHeader(RSocketPayloadReturnValueHandler.RESPONSE_HEADER, monoProcessor);
        }
        messageHeaderAccessor.setHeader(HandlerMethodReturnValueHandler.DATA_BUFFER_FACTORY_HEADER, this.strategies.dataBufferFactory());
        return messageHeaderAccessor.getMessageHeaders();
    }
}
