package cn.hserver.modelcontextprotocol.server.transport;

import cn.hserver.modelcontextprotocol.spec.McpError;
import cn.hserver.modelcontextprotocol.spec.McpSchema;
import cn.hserver.modelcontextprotocol.spec.McpServerSession;
import cn.hserver.modelcontextprotocol.spec.McpServerTransport;
import cn.hserver.modelcontextprotocol.spec.McpServerTransportProvider;
import cn.hserver.plugin.web.context.WebConstConfig;
import cn.hserver.plugin.web.context.sse.SSeEvent;
import cn.hserver.plugin.web.context.sse.SSeStream;
import cn.hserver.plugin.web.interfaces.HttpRequest;
import cn.hserver.plugin.web.interfaces.HttpResponse;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:cn/hserver/modelcontextprotocol/server/transport/HServerSseServerTransportProvider.class */
public class HServerSseServerTransportProvider implements McpServerTransportProvider {
    private static final Logger logger = LoggerFactory.getLogger(HServerSseServerTransportProvider.class);
    public static final String MESSAGE_EVENT_TYPE = "message";
    public static final String ENDPOINT_EVENT_TYPE = "endpoint";
    private final String messageEndpoint;
    private final String sseEndpoint;
    private McpServerSession.Factory sessionFactory;
    private final ObjectMapper objectMapper = WebConstConfig.JSON;
    private final Map<String, McpServerSession> sessions = new ConcurrentHashMap();

    /* loaded from: input_file:cn/hserver/modelcontextprotocol/server/transport/HServerSseServerTransportProvider$HServerMcpSessionTransport.class */
    private class HServerMcpSessionTransport implements McpServerTransport {
        private final String sessionId;
        private final SSeStream writer;

        HServerMcpSessionTransport(String str, SSeStream sSeStream) {
            this.sessionId = str;
            this.writer = sSeStream;
            HServerSseServerTransportProvider.logger.debug("Session transport {} initialized with SSE writer", str);
        }

        @Override // cn.hserver.modelcontextprotocol.spec.McpTransport
        public Mono<Void> sendMessage(McpSchema.JSONRPCMessage jSONRPCMessage) {
            return Mono.fromRunnable(() -> {
                try {
                    HServerSseServerTransportProvider.this.sendEvent(this.writer, HServerSseServerTransportProvider.MESSAGE_EVENT_TYPE, HServerSseServerTransportProvider.this.objectMapper.writeValueAsString(jSONRPCMessage));
                    HServerSseServerTransportProvider.logger.debug("Message sent to session {}", this.sessionId);
                } catch (Exception e) {
                    HServerSseServerTransportProvider.logger.error("Failed to send message to session {}: {}", this.sessionId, e.getMessage());
                    HServerSseServerTransportProvider.this.sessions.remove(this.sessionId);
                }
            });
        }

        @Override // cn.hserver.modelcontextprotocol.spec.McpTransport
        public <T> T unmarshalFrom(Object obj, TypeReference<T> typeReference) {
            return (T) HServerSseServerTransportProvider.this.objectMapper.convertValue(obj, typeReference);
        }

        @Override // cn.hserver.modelcontextprotocol.spec.McpTransport
        public Mono<Void> closeGracefully() {
            return Mono.fromRunnable(() -> {
                HServerSseServerTransportProvider.logger.debug("Closing session transport: {}", this.sessionId);
                try {
                    HServerSseServerTransportProvider.this.sessions.remove(this.sessionId);
                    HServerSseServerTransportProvider.logger.debug("Successfully completed async context for session {}", this.sessionId);
                } catch (Exception e) {
                    HServerSseServerTransportProvider.logger.warn("Failed to complete async context for session {}: {}", this.sessionId, e.getMessage());
                }
            });
        }

        @Override // cn.hserver.modelcontextprotocol.spec.McpTransport
        public void close() {
            try {
                HServerSseServerTransportProvider.this.sessions.remove(this.sessionId);
                HServerSseServerTransportProvider.logger.debug("Successfully completed async context for session {}", this.sessionId);
            } catch (Exception e) {
                HServerSseServerTransportProvider.logger.warn("Failed to complete async context for session {}: {}", this.sessionId, e.getMessage());
            }
        }
    }

    public HServerSseServerTransportProvider(String str) {
        this.sseEndpoint = str;
        this.messageEndpoint = str + "/message";
    }

    @Override // cn.hserver.modelcontextprotocol.spec.McpServerTransportProvider
    public void setSessionFactory(McpServerSession.Factory factory) {
        this.sessionFactory = factory;
    }

    @Override // cn.hserver.modelcontextprotocol.spec.McpServerTransportProvider
    public Mono<Void> notifyClients(String str, Map<String, Object> map) {
        if (this.sessions.isEmpty()) {
            logger.debug("No active sessions to broadcast message to");
            return Mono.empty();
        }
        logger.debug("Attempting to broadcast message to {} active sessions", Integer.valueOf(this.sessions.size()));
        return Flux.fromIterable(this.sessions.values()).flatMap(mcpServerSession -> {
            return mcpServerSession.sendNotification(str, map).doOnError(th -> {
                logger.error("Failed to send message to session {}: {}", mcpServerSession.getId(), th.getMessage());
            }).onErrorComplete();
        }).then();
    }

    public void doGet(HttpRequest httpRequest, HttpResponse httpResponse) {
        if (this.sseEndpoint.equals(httpRequest.getUri())) {
            httpResponse.setHeader("Cache-Control", "no-cache");
            httpResponse.setHeader("Connection", "keep-alive");
            httpResponse.setHeader("Access-Control-Allow-Origin", "*");
            String uuid = UUID.randomUUID().toString();
            SSeStream sSeStream = httpResponse.getSSeStream();
            HServerMcpSessionTransport hServerMcpSessionTransport = new HServerMcpSessionTransport(uuid, sSeStream);
            hServerMcpSessionTransport.getClass();
            sSeStream.addCloseListener(hServerMcpSessionTransport::close);
            this.sessions.put(uuid, this.sessionFactory.create(hServerMcpSessionTransport));
            sendEvent(sSeStream, ENDPOINT_EVENT_TYPE, this.messageEndpoint + "?sessionId=" + uuid);
        }
    }

    public void doPost(HttpRequest httpRequest, HttpResponse httpResponse) {
        if (this.messageEndpoint.equals(httpRequest.getUri())) {
            String query = httpRequest.query("sessionId");
            if (query == null) {
                httpResponse.sendStatusCode(HttpResponseStatus.BAD_REQUEST);
                httpResponse.sendJson(new McpError("Session ID missing in message endpoint"));
                return;
            }
            McpServerSession mcpServerSession = this.sessions.get(query);
            if (mcpServerSession == null) {
                httpResponse.sendStatusCode(HttpResponseStatus.NOT_FOUND);
                httpResponse.sendJson(new McpError("Session not found: " + query));
                return;
            }
            try {
                mcpServerSession.handle(McpSchema.deserializeJsonRpcMessage(this.objectMapper, httpRequest.getRawData())).block();
                httpResponse.sendStatusCode(HttpResponseStatus.OK);
                httpResponse.sendText("");
            } catch (Exception e) {
                logger.error("Error processing message: {}", e.getMessage());
                McpError mcpError = new McpError(e.getMessage());
                httpResponse.sendStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR);
                httpResponse.sendJson(mcpError);
            }
        }
    }

    @Override // cn.hserver.modelcontextprotocol.spec.McpServerTransportProvider
    public Mono<Void> closeGracefully() {
        logger.debug("Initiating graceful shutdown with {} active sessions", Integer.valueOf(this.sessions.size()));
        return Flux.fromIterable(this.sessions.values()).flatMap((v0) -> {
            return v0.closeGracefully();
        }).then();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendEvent(SSeStream sSeStream, String str, String str2) {
        sSeStream.sendSseEvent(new SSeEvent.Builder().event(str).data(str2).build());
    }
}
