From 111586c523389ed72a0f80a37232e55d35c78d04 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Thu, 10 Apr 2025 14:23:34 +0100 Subject: [PATCH 01/11] support sse though get --- src/examples/client/simpleStreamableHttp.ts | 104 +++++++++----- .../standaloneSseWithGetStreamableHttp.ts | 136 ++++++++++++++++++ src/server/streamableHttp.test.ts | 113 ++++++++++++++- src/server/streamableHttp.ts | 97 ++++++++++++- 4 files changed, 403 insertions(+), 47 deletions(-) create mode 100644 src/examples/server/standaloneSseWithGetStreamableHttp.ts diff --git a/src/examples/client/simpleStreamableHttp.ts b/src/examples/client/simpleStreamableHttp.ts index b17add14..0321bea4 100644 --- a/src/examples/client/simpleStreamableHttp.ts +++ b/src/examples/client/simpleStreamableHttp.ts @@ -11,7 +11,8 @@ import { GetPromptResultSchema, ListResourcesRequest, ListResourcesResultSchema, - LoggingMessageNotificationSchema + LoggingMessageNotificationSchema, + ResourceListChangedNotificationSchema } from '../../types.js'; async function main(): Promise { @@ -24,50 +25,79 @@ async function main(): Promise { const transport = new StreamableHTTPClientTransport( new URL('http://localhost:3000/mcp') ); + const supportsStandaloneSse = false; // Connect the client using the transport and initialize the server await client.connect(transport); + console.log('Connected to MCP server'); + // Open a standalone SSE stream to receive server-initiated messages + console.log('Opening SSE stream to receive server notifications...'); + try { + await transport.openSseStream(); + const supportsStandaloneSse = false; + console.log('SSE stream established successfully. Waiting for notifications...'); + } + catch (error) { + console.error('Failed to open SSE stream:', error); + } + + // Set up notification handlers for server-initiated messages client.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => { console.log(`Notification received: ${notification.params.level} - ${notification.params.data}`); }); + client.setNotificationHandler(ResourceListChangedNotificationSchema, async (_) => { + console.log(`Resource list changed notification received!`); + const resourcesRequest: ListResourcesRequest = { + method: 'resources/list', + params: {} + }; + const resourcesResult = await client.request(resourcesRequest, ListResourcesResultSchema); + console.log('Available resources count:', resourcesResult.resources.length); + }); - - console.log('Connected to MCP server'); // List available tools - const toolsRequest: ListToolsRequest = { - method: 'tools/list', - params: {} - }; - const toolsResult = await client.request(toolsRequest, ListToolsResultSchema); - console.log('Available tools:', toolsResult.tools); + try { + const toolsRequest: ListToolsRequest = { + method: 'tools/list', + params: {} + }; + const toolsResult = await client.request(toolsRequest, ListToolsResultSchema); + console.log('Available tools:', toolsResult.tools); - // Call the 'greet' tool - const greetRequest: CallToolRequest = { - method: 'tools/call', - params: { - name: 'greet', - arguments: { name: 'MCP User' } - } - }; - const greetResult = await client.request(greetRequest, CallToolResultSchema); - console.log('Greeting result:', greetResult.content[0].text); + if (toolsResult.tools.length === 0) { + console.log('No tools available from the server'); + } else { + // Call the 'greet' tool + const greetRequest: CallToolRequest = { + method: 'tools/call', + params: { + name: 'greet', + arguments: { name: 'MCP User' } + } + }; + const greetResult = await client.request(greetRequest, CallToolResultSchema); + console.log('Greeting result:', greetResult.content[0].text); - // Call the new 'multi-greet' tool - console.log('\nCalling multi-greet tool (with notifications)...'); - const multiGreetRequest: CallToolRequest = { - method: 'tools/call', - params: { - name: 'multi-greet', - arguments: { name: 'MCP User' } + // Call the new 'multi-greet' tool + console.log('\nCalling multi-greet tool (with notifications)...'); + const multiGreetRequest: CallToolRequest = { + method: 'tools/call', + params: { + name: 'multi-greet', + arguments: { name: 'MCP User' } + } + }; + const multiGreetResult = await client.request(multiGreetRequest, CallToolResultSchema); + console.log('Multi-greet results:'); + multiGreetResult.content.forEach(item => { + if (item.type === 'text') { + console.log(`- ${item.text}`); + } + }); } - }; - const multiGreetResult = await client.request(multiGreetRequest, CallToolResultSchema); - console.log('Multi-greet results:'); - multiGreetResult.content.forEach(item => { - if (item.type === 'text') { - console.log(`- ${item.text}`); - } - }); + } catch (error) { + console.log(`Tools not supported by this server (${error})`); + } // List available prompts try { @@ -107,9 +137,11 @@ async function main(): Promise { } catch (error) { console.log(`Resources not supported by this server (${error})`); } + if (supportsStandaloneSse) { + // Instead of closing immediately, keep the connection open to receive notifications + console.log('\nKeeping connection open to receive notifications. Press Ctrl+C to exit.'); + } - // Close the connection - await client.close(); } main().catch((error: unknown) => { diff --git a/src/examples/server/standaloneSseWithGetStreamableHttp.ts b/src/examples/server/standaloneSseWithGetStreamableHttp.ts new file mode 100644 index 00000000..bfe5b2af --- /dev/null +++ b/src/examples/server/standaloneSseWithGetStreamableHttp.ts @@ -0,0 +1,136 @@ +import express, { Request, Response } from 'express'; +import { randomUUID } from 'node:crypto'; +import { McpServer } from '../../server/mcp.js'; +import { StreamableHTTPServerTransport } from '../../server/streamableHttp.js'; +import { ReadResourceResult } from '../../types.js'; + +// Create an MCP server with implementation details +const server = new McpServer({ + name: 'resource-list-changed-notification-server', + version: '1.0.0', +}, { + capabilities: { + resources: { + listChanged: true, // Support notifications for resource list changes + }, + } +}); + +// Store transports by session ID to send notifications +const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {}; + +const addResource = (name: string, content: string) => { + const uri = `https://mcp-example.com/dynamic/${encodeURIComponent(name)}`; + server.resource( + name, + uri, + { mimeType: 'text/plain', description: `Dynamic resource: ${name}` }, + async (): Promise => { + return { + contents: [{ uri, text: content }], + }; + } + ); + +}; + +addResource('example-resource', 'Initial content for example-resource'); + +const resourceChangeInterval = setInterval(() => { + const name = randomUUID(); + addResource(name, `Content for ${name}`); +}, 5000); // Change resources every 5 seconds for testing + +const app = express(); +app.use(express.json()); + +app.post('/mcp', async (req: Request, res: Response) => { + console.log('Received MCP request:', req.body); + try { + // Check for existing session ID + const sessionId = req.headers['mcp-session-id'] as string | undefined; + let transport: StreamableHTTPServerTransport; + + if (sessionId && transports[sessionId]) { + // Reuse existing transport + transport = transports[sessionId]; + } else if (!sessionId && isInitializeRequest(req.body)) { + // New initialization request + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + }); + + // Connect the transport to the MCP server + await server.connect(transport); + + await transport.handleRequest(req, res, req.body); + + // Store the transport by session ID for future requests + if (transport.sessionId) { + transports[transport.sessionId] = transport; + } + return; // Already handled + } else { + // Invalid request - no session ID or not initialization request + res.status(400).json({ + jsonrpc: '2.0', + error: { + code: -32000, + message: 'Bad Request: No valid session ID provided', + }, + id: null, + }); + return; + } + + // Handle the request with existing transport + await transport.handleRequest(req, res, req.body); + } catch (error) { + console.error('Error handling MCP request:', error); + if (!res.headersSent) { + res.status(500).json({ + jsonrpc: '2.0', + error: { + code: -32603, + message: 'Internal server error', + }, + id: null, + }); + } + } +}); + +// Handle GET requests for SSE streams (now using built-in support from StreamableHTTP) +app.get('/mcp', async (req: Request, res: Response) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + if (!sessionId || !transports[sessionId]) { + res.status(400).send('Invalid or missing session ID'); + return; + } + + console.log(`Establishing SSE stream for session ${sessionId}`); + const transport = transports[sessionId]; + await transport.handleRequest(req, res); +}); + +// Helper function to detect initialize requests +function isInitializeRequest(body: unknown): boolean { + if (Array.isArray(body)) { + return body.some(msg => typeof msg === 'object' && msg !== null && 'method' in msg && msg.method === 'initialize'); + } + return typeof body === 'object' && body !== null && 'method' in body && body.method === 'initialize'; +} + +// Start the server +const PORT = 3000; +app.listen(PORT, () => { + console.log(`Server listening on port ${PORT}`); +}); + +// Handle server shutdown +process.on('SIGINT', async () => { + console.log('Shutting down server...'); + clearInterval(resourceChangeInterval); + await server.close(); + process.exit(0); +}); \ No newline at end of file diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index ad80ea62..69a77f44 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -564,22 +564,123 @@ describe("StreamableHTTPServerTransport", () => { mockResponse.writeHead.mockClear(); }); - it("should reject GET requests for SSE with 405 Method Not Allowed", async () => { + it("should accept GET requests for SSE with proper Accept header", async () => { const req = createMockRequest({ method: "GET", headers: { - "accept": "application/json, text/event-stream", + "accept": "text/event-stream", "mcp-session-id": transport.sessionId, }, }); await transport.handleRequest(req, mockResponse); - expect(mockResponse.writeHead).toHaveBeenCalledWith(405, expect.objectContaining({ - "Allow": "POST, DELETE" + expect(mockResponse.writeHead).toHaveBeenCalledWith(200, expect.objectContaining({ + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "mcp-session-id": transport.sessionId, })); - expect(mockResponse.end).toHaveBeenCalledWith(expect.stringContaining('"jsonrpc":"2.0"')); - expect(mockResponse.end).toHaveBeenCalledWith(expect.stringContaining('Method not allowed')); + }); + + it("should reject GET requests without Accept: text/event-stream header", async () => { + const req = createMockRequest({ + method: "GET", + headers: { + "accept": "application/json", + "mcp-session-id": transport.sessionId, + }, + }); + + await transport.handleRequest(req, mockResponse); + + expect(mockResponse.writeHead).toHaveBeenCalledWith(406); + expect(mockResponse.end).toHaveBeenCalledWith(expect.stringContaining('"message":"Not Acceptable: Client must accept text/event-stream"')); + }); + + it("should send server-initiated requests to GET SSE stream", async () => { + // Open a standalone SSE stream with GET + const req = createMockRequest({ + method: "GET", + headers: { + "accept": "text/event-stream", + "mcp-session-id": transport.sessionId, + }, + }); + + const sseResponse = createMockResponse(); + await transport.handleRequest(req, sseResponse); + + // Send a notification without a related request ID + const notification: JSONRPCMessage = { + jsonrpc: "2.0", + method: "notifications/resources/updated", + params: { uri: "someuri" } + }; + + await transport.send(notification); + + // Verify notification was sent on SSE stream + expect(sseResponse.write).toHaveBeenCalledWith( + expect.stringContaining(`event: message\ndata: ${JSON.stringify(notification)}\n\n`) + ); + }); + + it("should not close GET SSE stream after sending server requests or notifications", async () => { + // Open a standalone SSE stream + const req = createMockRequest({ + method: "GET", + headers: { + "accept": "text/event-stream", + "mcp-session-id": transport.sessionId, + }, + }); + + const sseResponse = createMockResponse(); + await transport.handleRequest(req, sseResponse); + + // Send multiple notifications + const notification1: JSONRPCMessage = { jsonrpc: "2.0", method: "event1", params: {} }; + const notification2: JSONRPCMessage = { jsonrpc: "2.0", method: "event2", params: {} }; + + await transport.send(notification1); + await transport.send(notification2); + + // Stream should remain open + expect(sseResponse.end).not.toHaveBeenCalled(); + }); + + it("should reject second GET SSE stream for the same session", async () => { + // Open first SSE stream - should succeed + const req1 = createMockRequest({ + method: "GET", + headers: { + "accept": "text/event-stream", + "mcp-session-id": transport.sessionId, + }, + }); + + const sseResponse1 = createMockResponse(); + await transport.handleRequest(req1, sseResponse1); + + // Try to open a second SSE stream - should be rejected + const req2 = createMockRequest({ + method: "GET", + headers: { + "accept": "text/event-stream", + "mcp-session-id": transport.sessionId, + }, + }); + + const sseResponse2 = createMockResponse(); + await transport.handleRequest(req2, sseResponse2); + + // First stream should be good + expect(sseResponse1.writeHead).toHaveBeenCalledWith(200, expect.anything()); + + // Second stream should get 409 Conflict + expect(sseResponse2.writeHead).toHaveBeenCalledWith(409); + expect(sseResponse2.end).toHaveBeenCalledWith(expect.stringContaining('"message":"Conflict: Only one SSE stream is allowed per session"')); }); it("should reject POST requests without proper Accept header", async () => { diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index e8844529..58fc79c0 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -68,6 +68,7 @@ export class StreamableHTTPServerTransport implements Transport { private _requestResponseMap: Map = new Map(); private _initialized: boolean = false; private _enableJsonResponse: boolean = false; + private _sseStreamKey = "standalone-sse"; sessionId?: string | undefined; @@ -97,6 +98,8 @@ export class StreamableHTTPServerTransport implements Transport { async handleRequest(req: IncomingMessage, res: ServerResponse, parsedBody?: unknown): Promise { if (req.method === "POST") { await this.handlePostRequest(req, res, parsedBody); + } else if (req.method === "GET") { + await this.handleGetRequest(req, res); } else if (req.method === "DELETE") { await this.handleDeleteRequest(req, res); } else { @@ -105,12 +108,80 @@ export class StreamableHTTPServerTransport implements Transport { } /** - * Handles unsupported requests (GET, PUT, PATCH, etc.) - * For now we support only POST and DELETE requests. Support for GET for SSE connections will be added later. + * Handles GET requests for SSE stream + */ + private async handleGetRequest(req: IncomingMessage, res: ServerResponse): Promise { + // The client MUST include an Accept header, listing text/event-stream as a supported content type. + const acceptHeader = req.headers.accept; + if (!acceptHeader?.includes("text/event-stream")) { + res.writeHead(406).end(JSON.stringify({ + jsonrpc: "2.0", + error: { + code: -32000, + message: "Not Acceptable: Client must accept text/event-stream" + }, + id: null + })); + return; + } + + // If an Mcp-Session-Id is returned by the server during initialization, + // clients using the Streamable HTTP transport MUST include it + // in the Mcp-Session-Id header on all of their subsequent HTTP requests. + if (!this.validateSession(req, res)) { + return; + } + + // The server MUST either return Content-Type: text/event-stream in response to this HTTP GET, + // or else return HTTP 405 Method Not Allowed + const headers: Record = { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }; + + // After initialization, always include the session ID if we have one + if (this.sessionId !== undefined) { + headers["mcp-session-id"] = this.sessionId; + } + // The server MAY include a Last-Event-ID header in the response to this HTTP GET. + // Resumability will be supported in the future + + // Check if there's already an active standalone SSE stream for this session + const existingStream = this._responseMapping.get(this._sseStreamKey); + + if (existingStream !== undefined) { + // Only one GET SSE stream is allowed per session + res.writeHead(409).end(JSON.stringify({ + jsonrpc: "2.0", + error: { + code: -32000, + message: "Conflict: Only one SSE stream is allowed per session" + }, + id: null + })); + return; + } + + res.writeHead(200, headers); + + // Store the response for this request so we can use it for standalone server notifications + // This response doesn't have an associated request ID, so we'll use a special string to track it + this._responseMapping.set(this._sseStreamKey, res); + + // Set up close handler for client disconnects + res.on("close", () => { + // Clean up resources associated with this connection + this._responseMapping.delete(this._sseStreamKey); + }); + } + + /** + * Handles unsupported requests (PUT, PATCH, etc.) */ private async handleUnsupportedRequest(res: ServerResponse): Promise { res.writeHead(405, { - "Allow": "POST, DELETE" + "Allow": "GET, POST, DELETE" }).end(JSON.stringify({ jsonrpc: "2.0", error: { @@ -379,8 +450,25 @@ export class StreamableHTTPServerTransport implements Transport { // If the message is a response, use the request ID from the message requestId = message.id; } + + // Check if this message should be sent on the standalone SSE stream (no request ID) + // Ignore notifications from tools (which have relatedRequestId set) + // Those will be sent via dedicated response SSE streams if (requestId === undefined) { - throw new Error("No request ID provided for the message"); + // For standalone SSE streams, we can only send requests and notifications + if ('result' in message || 'error' in message) { + throw new Error("Cannot send a response on a standalone SSE stream unless resuming a previous client request"); + } + + const standaloneStream = this._responseMapping.get(this._sseStreamKey); + if (standaloneStream === undefined) { + // The spec says the server MAY send messages on the stream, so it's ok to discard if no stream + return; + } + + // Send the message to the standalone SSE stream + standaloneStream.write(`event: message\ndata: ${JSON.stringify(message)}\n\n`); + return; } // Get the response for this request @@ -389,7 +477,6 @@ export class StreamableHTTPServerTransport implements Transport { throw new Error(`No connection established for request ID: ${String(requestId)}`); } - if (!this._enableJsonResponse) { response.write(`event: message\ndata: ${JSON.stringify(message)}\n\n`); } From 0c1a4f22be712b787275b422d604c2817765e084 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Thu, 10 Apr 2025 14:34:28 +0100 Subject: [PATCH 02/11] add get sse to README --- src/examples/README.md | 97 +++++++++++++++++++++++++----------------- 1 file changed, 57 insertions(+), 40 deletions(-) diff --git a/src/examples/README.md b/src/examples/README.md index cc6af51c..6ef35ba4 100644 --- a/src/examples/README.md +++ b/src/examples/README.md @@ -2,24 +2,37 @@ This directory contains example implementations of MCP clients and servers using the TypeScript SDK. +## Table of Contents + +- [Streamable HTTP Servers - Single Node Deployment](#streamable-http---single-node-deployment-with-basic-session-state-management) + - [Simple Server with Streamable HTTP](#simple-server-with-streamable-http-transport-serversimplestreamablehttpts) + - [Server Supporting SSE via GET](#server-supporting-with-sse-via-get-serverstandalonessewithgetstreamablehttpts) + - [Server with JSON Response Mode](#server-with-json-response-mode-serverjsonresponsestreamablehttpts) +- [Client Example - Streamable HTTP](#client-clientsimplestreamablehttpts) + ## Streamable HTTP - single node deployment with basic session state management -Multi node with stete management example will be added soon after we add support. +Multi node with state management example will be added soon after we add support. -### Server with JSON response mode (`server/jsonResponseStreamableHttp.ts`) -A simple MCP server that uses the Streamable HTTP transport with JSON response mode enabled, implemented with Express. The server provides a simple `greet` tool that returns a greeting for a name. +### Simple Server with Streamable Http transport (`server/simpleStreamableHttp.ts`) + +A simple MCP server that uses the Streamable HTTP transport, implemented with Express. The server provides: + +- A simple `greet` tool that returns a greeting for a name +- A `greeting-template` prompt that generates a greeting template +- A static `greeting-resource` resource #### Running the server ```bash -npx tsx src/examples/server/jsonResponseStreamableHttp.ts +npx tsx src/examples/server/simpleStreamableHttp.ts ``` -The server will start on port 3000. You can test the initialization and tool calling: +The server will start on port 3000. You can test the initialization and tool listing: ```bash -# Initialize the server and get the session ID from headers +# First initialize the server and save the session ID to a variable SESSION_ID=$(curl -X POST \ -H "Content-Type: application/json" \ -H "Accept: application/json" \ @@ -38,48 +51,42 @@ SESSION_ID=$(curl -X POST \ "id": "1" }' \ -i http://localhost:3000/mcp 2>&1 | grep -i "mcp-session-id" | cut -d' ' -f2 | tr -d '\r') -echo "Session ID: $SESSION_ID" +echo "Session ID: $SESSION_ID -# Call the greet tool using the saved session ID -curl -X POST \ - -H "Content-Type: application/json" \ - -H "Accept: application/json" \ - -H "Accept: text/event-stream" \ +# Then list tools using the saved session ID +curl -X POST -H "Content-Type: application/json" -H "Accept: application/json, text/event-stream" \ -H "mcp-session-id: $SESSION_ID" \ - -d '{ - "jsonrpc": "2.0", - "method": "tools/call", - "params": { - "name": "greet", - "arguments": { - "name": "World" - } - }, - "id": "2" - }' \ + -d '{"jsonrpc":"2.0","method":"tools/list","params":{},"id":"2"}' \ http://localhost:3000/mcp ``` -Note that in this example, we're using plain JSON response mode by setting `Accept: application/json` header. +### Server supporting with SSE via GET (`server/standaloneSseWithGetStreamableHttp.ts`) -### Server (`server/simpleStreamableHttp.ts`) +An MCP server that demonstrates how to support SSE notifications via GET requests using the Streamable HTTP transport with Express. The server dynamically adds resources at regular intervals and supports notifications for resource list changes (server notifications are available through the standalone SSE connection established by GET request). -A simple MCP server that uses the Streamable HTTP transport, implemented with Express. The server provides: +#### Running the server -- A simple `greet` tool that returns a greeting for a name -- A `greeting-template` prompt that generates a greeting template -- A static `greeting-resource` resource +```bash +npx tsx src/examples/server/standaloneSseWithGetStreamableHttp.ts +``` + +The server will start on port 3000 and automatically create new resources every 5 seconds. + +### Server with JSON response mode (`server/jsonResponseStreamableHttp.ts`) + +This is not recommented way to use the transport, as its quite limiting and not supporting features like logging and progress notifications on tool execution. +A simple MCP server that uses the Streamable HTTP transport with JSON response mode enabled, implemented with Express. The server provides a simple `greet` tool that returns a greeting for a name. #### Running the server ```bash -npx tsx src/examples/server/simpleStreamableHttp.ts +npx tsx src/examples/server/jsonResponseStreamableHttp.ts ``` -The server will start on port 3000. You can test the initialization and tool listing: +The server will start on port 3000. You can test the initialization and tool calling: ```bash -# First initialize the server and save the session ID to a variable +# Initialize the server and get the session ID from headers SESSION_ID=$(curl -X POST \ -H "Content-Type: application/json" \ -H "Accept: application/json" \ @@ -98,15 +105,29 @@ SESSION_ID=$(curl -X POST \ "id": "1" }' \ -i http://localhost:3000/mcp 2>&1 | grep -i "mcp-session-id" | cut -d' ' -f2 | tr -d '\r') -echo "Session ID: $SESSION_ID +echo "Session ID: $SESSION_ID" -# Then list tools using the saved session ID -curl -X POST -H "Content-Type: application/json" -H "Accept: application/json, text/event-stream" \ +# Call the greet tool using the saved session ID +curl -X POST \ + -H "Content-Type: application/json" \ + -H "Accept: application/json" \ + -H "Accept: text/event-stream" \ -H "mcp-session-id: $SESSION_ID" \ - -d '{"jsonrpc":"2.0","method":"tools/list","params":{},"id":"2"}' \ + -d '{ + "jsonrpc": "2.0", + "method": "tools/call", + "params": { + "name": "greet", + "arguments": { + "name": "World" + } + }, + "id": "2" + }' \ http://localhost:3000/mcp ``` + ### Client (`client/simpleStreamableHttp.ts`) A client that connects to the server, initializes it, and demonstrates how to: @@ -123,8 +144,4 @@ npx tsx src/examples/client/simpleStreamableHttp.ts Make sure the server is running before starting the client. -## Notes -- These examples demonstrate the basic usage of the Streamable HTTP transport -- The server manages sessions between the calls -- The client handles both direct HTTP responses and SSE streaming responses From 755534ee1ae685c963d96306431990cb67a89844 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Thu, 10 Apr 2025 14:41:48 +0100 Subject: [PATCH 03/11] clean up examples README --- src/examples/README.md | 92 +++++++++++++++++++----------------------- 1 file changed, 41 insertions(+), 51 deletions(-) diff --git a/src/examples/README.md b/src/examples/README.md index 6ef35ba4..22e36de9 100644 --- a/src/examples/README.md +++ b/src/examples/README.md @@ -9,6 +9,7 @@ This directory contains example implementations of MCP clients and servers using - [Server Supporting SSE via GET](#server-supporting-with-sse-via-get-serverstandalonessewithgetstreamablehttpts) - [Server with JSON Response Mode](#server-with-json-response-mode-serverjsonresponsestreamablehttpts) - [Client Example - Streamable HTTP](#client-clientsimplestreamablehttpts) +- [Useful bash commands for testing](#useful-commands-for-testing) ## Streamable HTTP - single node deployment with basic session state management @@ -31,35 +32,6 @@ npx tsx src/examples/server/simpleStreamableHttp.ts The server will start on port 3000. You can test the initialization and tool listing: -```bash -# First initialize the server and save the session ID to a variable -SESSION_ID=$(curl -X POST \ - -H "Content-Type: application/json" \ - -H "Accept: application/json" \ - -H "Accept: text/event-stream" \ - -d '{ - "jsonrpc": "2.0", - "method": "initialize", - "params": { - "capabilities": {}, - "protocolVersion": "2025-03-26", - "clientInfo": { - "name": "test", - "version": "1.0.0" - } - }, - "id": "1" - }' \ - -i http://localhost:3000/mcp 2>&1 | grep -i "mcp-session-id" | cut -d' ' -f2 | tr -d '\r') -echo "Session ID: $SESSION_ID - -# Then list tools using the saved session ID -curl -X POST -H "Content-Type: application/json" -H "Accept: application/json, text/event-stream" \ - -H "mcp-session-id: $SESSION_ID" \ - -d '{"jsonrpc":"2.0","method":"tools/list","params":{},"id":"2"}' \ - http://localhost:3000/mcp -``` - ### Server supporting with SSE via GET (`server/standaloneSseWithGetStreamableHttp.ts`) An MCP server that demonstrates how to support SSE notifications via GET requests using the Streamable HTTP transport with Express. The server dynamically adds resources at regular intervals and supports notifications for resource list changes (server notifications are available through the standalone SSE connection established by GET request). @@ -83,10 +55,31 @@ A simple MCP server that uses the Streamable HTTP transport with JSON response m npx tsx src/examples/server/jsonResponseStreamableHttp.ts ``` -The server will start on port 3000. You can test the initialization and tool calling: + +### Client (`client/simpleStreamableHttp.ts`) + +A client that connects to the server, initializes it, and demonstrates how to: + +- List available tools and call the `greet` tool +- List available prompts and get the `greeting-template` prompt +- List available resources + +#### Running the client + +```bash +npx tsx src/examples/client/simpleStreamableHttp.ts +``` + +Make sure the server is running before starting the client. + + +### Useful commands for testing + +#### Initialize +Streamable HTTP transport requires to do the initialization first. ```bash -# Initialize the server and get the session ID from headers +# First initialize the server and save the session ID to a variable SESSION_ID=$(curl -X POST \ -H "Content-Type: application/json" \ -H "Accept: application/json" \ @@ -105,8 +98,24 @@ SESSION_ID=$(curl -X POST \ "id": "1" }' \ -i http://localhost:3000/mcp 2>&1 | grep -i "mcp-session-id" | cut -d' ' -f2 | tr -d '\r') -echo "Session ID: $SESSION_ID" +echo "Session ID: $SESSION_ID + +``` + +Once thre is a session we can send POST requests + +#### List tools +```bash +# Then list tools using the saved session ID +curl -X POST -H "Content-Type: application/json" -H "Accept: application/json, text/event-stream" \ + -H "mcp-session-id: $SESSION_ID" \ + -d '{"jsonrpc":"2.0","method":"tools/list","params":{},"id":"2"}' \ + http://localhost:3000/mcp +``` + +#### Call tool +```bash # Call the greet tool using the saved session ID curl -X POST \ -H "Content-Type: application/json" \ @@ -126,22 +135,3 @@ curl -X POST \ }' \ http://localhost:3000/mcp ``` - - -### Client (`client/simpleStreamableHttp.ts`) - -A client that connects to the server, initializes it, and demonstrates how to: - -- List available tools and call the `greet` tool -- List available prompts and get the `greeting-template` prompt -- List available resources - -#### Running the client - -```bash -npx tsx src/examples/client/simpleStreamableHttp.ts -``` - -Make sure the server is running before starting the client. - - From 5c5a33021cf3e09123895744882fcf2948ad331b Mon Sep 17 00:00:00 2001 From: ihrpr Date: Thu, 10 Apr 2025 15:28:39 +0100 Subject: [PATCH 04/11] add GET to examples servers --- src/client/streamableHttp.ts | 1 - src/examples/client/simpleStreamableHttp.ts | 4 ++-- src/examples/server/jsonResponseStreamableHttp.ts | 7 +++++++ src/examples/server/simpleStreamableHttp.ts | 13 +++++++++++++ 4 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 5ea537c7..ae54bb26 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -131,7 +131,6 @@ export class StreamableHTTPClientTransport implements Transport { `Failed to open SSE stream: ${response.statusText}`, ); } - // Successful connection, handle the SSE stream as a standalone listener this._handleSseStream(response.body); } catch (error) { diff --git a/src/examples/client/simpleStreamableHttp.ts b/src/examples/client/simpleStreamableHttp.ts index 0321bea4..3eb2485d 100644 --- a/src/examples/client/simpleStreamableHttp.ts +++ b/src/examples/client/simpleStreamableHttp.ts @@ -25,7 +25,7 @@ async function main(): Promise { const transport = new StreamableHTTPClientTransport( new URL('http://localhost:3000/mcp') ); - const supportsStandaloneSse = false; + let supportsStandaloneSse = false; // Connect the client using the transport and initialize the server await client.connect(transport); @@ -34,7 +34,7 @@ async function main(): Promise { console.log('Opening SSE stream to receive server notifications...'); try { await transport.openSseStream(); - const supportsStandaloneSse = false; + supportsStandaloneSse = true; console.log('SSE stream established successfully. Waiting for notifications...'); } catch (error) { diff --git a/src/examples/server/jsonResponseStreamableHttp.ts b/src/examples/server/jsonResponseStreamableHttp.ts index 1d322112..34ab65d1 100644 --- a/src/examples/server/jsonResponseStreamableHttp.ts +++ b/src/examples/server/jsonResponseStreamableHttp.ts @@ -138,6 +138,13 @@ app.post('/mcp', async (req: Request, res: Response) => { } }); +// Handle GET requests for SSE streams according to spec +app.get('/mcp', async (req: Request, res: Response) => { + // Since this is a very simple example, we don't support GET requests for this server + // The spec requires returning 405 Method Not Allowed in this case + res.status(405).set('Allow', 'POST').send('Method Not Allowed'); +}); + // Helper function to detect initialize requests function isInitializeRequest(body: unknown): boolean { if (Array.isArray(body)) { diff --git a/src/examples/server/simpleStreamableHttp.ts b/src/examples/server/simpleStreamableHttp.ts index 5b228cbd..6e4b2548 100644 --- a/src/examples/server/simpleStreamableHttp.ts +++ b/src/examples/server/simpleStreamableHttp.ts @@ -174,6 +174,19 @@ app.post('/mcp', async (req: Request, res: Response) => { } }); +// Handle GET requests for SSE streams (now using built-in support from StreamableHTTP) +app.get('/mcp', async (req: Request, res: Response) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + if (!sessionId || !transports[sessionId]) { + res.status(400).send('Invalid or missing session ID'); + return; + } + + console.log(`Establishing SSE stream for session ${sessionId}`); + const transport = transports[sessionId]; + await transport.handleRequest(req, res); +}); + // Helper function to detect initialize requests function isInitializeRequest(body: unknown): boolean { if (Array.isArray(body)) { From 3dd4ff1542cf2a42332a18b4b6432b2433717770 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Thu, 10 Apr 2025 15:53:05 +0100 Subject: [PATCH 05/11] fix hanging issue --- src/server/streamableHttp.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index 58fc79c0..081e1982 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -162,8 +162,9 @@ export class StreamableHTTPServerTransport implements Transport { })); return; } - - res.writeHead(200, headers); + // We need to send headers immediately as message will arrive much later, + // otherwise the client will just wait for the first message + res.writeHead(200, headers).flushHeaders(); // Store the response for this request so we can use it for standalone server notifications // This response doesn't have an associated request ID, so we'll use a special string to track it From 3adca3f3bc19c03b7e625d3dec273d2d7c0280b6 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Thu, 10 Apr 2025 16:01:21 +0100 Subject: [PATCH 06/11] fix tests --- src/server/streamableHttp.test.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index 69a77f44..d6f7cb8a 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -32,6 +32,7 @@ function createMockResponse(): jest.Mocked { emit: jest.fn().mockReturnThis(), getHeader: jest.fn(), setHeader: jest.fn(), + flushHeaders: jest.fn(), } as unknown as jest.Mocked; return response; } From 5730f3e92af60f727e90529d407b30e9814cd8c4 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Fri, 11 Apr 2025 10:19:09 +0100 Subject: [PATCH 07/11] fixes in README --- src/examples/README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/examples/README.md b/src/examples/README.md index 22e36de9..2ea1a596 100644 --- a/src/examples/README.md +++ b/src/examples/README.md @@ -16,7 +16,7 @@ This directory contains example implementations of MCP clients and servers using Multi node with state management example will be added soon after we add support. -### Simple Server with Streamable Http transport (`server/simpleStreamableHttp.ts`) +### Simple server with Streamable HTTP transport (`server/simpleStreamableHttp.ts`) A simple MCP server that uses the Streamable HTTP transport, implemented with Express. The server provides: @@ -32,7 +32,7 @@ npx tsx src/examples/server/simpleStreamableHttp.ts The server will start on port 3000. You can test the initialization and tool listing: -### Server supporting with SSE via GET (`server/standaloneSseWithGetStreamableHttp.ts`) +### Server supporting SSE via GET (`server/standaloneSseWithGetStreamableHttp.ts`) An MCP server that demonstrates how to support SSE notifications via GET requests using the Streamable HTTP transport with Express. The server dynamically adds resources at regular intervals and supports notifications for resource list changes (server notifications are available through the standalone SSE connection established by GET request). @@ -46,9 +46,10 @@ The server will start on port 3000 and automatically create new resources every ### Server with JSON response mode (`server/jsonResponseStreamableHttp.ts`) -This is not recommented way to use the transport, as its quite limiting and not supporting features like logging and progress notifications on tool execution. A simple MCP server that uses the Streamable HTTP transport with JSON response mode enabled, implemented with Express. The server provides a simple `greet` tool that returns a greeting for a name. +_NOTE: This demonstrates a server that does not use SSE at all. Note that this limits its support for MCP features; for example, it cannot provide logging and progress notifications for tool execution._ + #### Running the server ```bash @@ -101,8 +102,7 @@ SESSION_ID=$(curl -X POST \ echo "Session ID: $SESSION_ID ``` - -Once thre is a session we can send POST requests +Once a sessionĀ is established, we can send POST requests: #### List tools ```bash From e3b4496579dc8406e4e72265d6d1b03985fa164e Mon Sep 17 00:00:00 2001 From: ihrpr Date: Fri, 11 Apr 2025 10:42:41 +0100 Subject: [PATCH 08/11] clean up and small fixes to comments --- src/examples/client/simpleStreamableHttp.ts | 1 - src/examples/server/simpleStreamableHttp.ts | 2 +- src/examples/server/standaloneSseWithGetStreamableHttp.ts | 6 ------ src/server/streamableHttp.ts | 2 +- 4 files changed, 2 insertions(+), 9 deletions(-) diff --git a/src/examples/client/simpleStreamableHttp.ts b/src/examples/client/simpleStreamableHttp.ts index 3eb2485d..027db881 100644 --- a/src/examples/client/simpleStreamableHttp.ts +++ b/src/examples/client/simpleStreamableHttp.ts @@ -30,7 +30,6 @@ async function main(): Promise { // Connect the client using the transport and initialize the server await client.connect(transport); console.log('Connected to MCP server'); - // Open a standalone SSE stream to receive server-initiated messages console.log('Opening SSE stream to receive server notifications...'); try { await transport.openSseStream(); diff --git a/src/examples/server/simpleStreamableHttp.ts b/src/examples/server/simpleStreamableHttp.ts index 6e4b2548..f0f74439 100644 --- a/src/examples/server/simpleStreamableHttp.ts +++ b/src/examples/server/simpleStreamableHttp.ts @@ -174,7 +174,7 @@ app.post('/mcp', async (req: Request, res: Response) => { } }); -// Handle GET requests for SSE streams (now using built-in support from StreamableHTTP) +// Handle GET requests for SSE streams (using built-in support from StreamableHTTP) app.get('/mcp', async (req: Request, res: Response) => { const sessionId = req.headers['mcp-session-id'] as string | undefined; if (!sessionId || !transports[sessionId]) { diff --git a/src/examples/server/standaloneSseWithGetStreamableHttp.ts b/src/examples/server/standaloneSseWithGetStreamableHttp.ts index bfe5b2af..f9d3696b 100644 --- a/src/examples/server/standaloneSseWithGetStreamableHttp.ts +++ b/src/examples/server/standaloneSseWithGetStreamableHttp.ts @@ -8,12 +8,6 @@ import { ReadResourceResult } from '../../types.js'; const server = new McpServer({ name: 'resource-list-changed-notification-server', version: '1.0.0', -}, { - capabilities: { - resources: { - listChanged: true, // Support notifications for resource list changes - }, - } }); // Store transports by session ID to send notifications diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index 081e1982..05ba6146 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -136,7 +136,7 @@ export class StreamableHTTPServerTransport implements Transport { // or else return HTTP 405 Method Not Allowed const headers: Record = { "Content-Type": "text/event-stream", - "Cache-Control": "no-cache", + "Cache-Control": "no-cache, no-transform", Connection: "keep-alive", }; From 18e4981bd3d0fde152659b95d4250828872718f8 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Fri, 11 Apr 2025 14:06:51 +0100 Subject: [PATCH 09/11] move get sse stream from a separate method to automatic connection when notifications/initialized received --- src/client/streamableHttp.test.ts | 11 ++++---- src/client/streamableHttp.ts | 30 +++++++++------------ src/examples/client/simpleStreamableHttp.ts | 21 +++------------ src/server/streamableHttp.test.ts | 2 +- 4 files changed, 23 insertions(+), 41 deletions(-) diff --git a/src/client/streamableHttp.test.ts b/src/client/streamableHttp.test.ts index 40f22139..1572e912 100644 --- a/src/client/streamableHttp.test.ts +++ b/src/client/streamableHttp.test.ts @@ -164,8 +164,7 @@ describe("StreamableHTTPClientTransport", () => { // We expect the 405 error to be caught and handled gracefully // This should not throw an error that breaks the transport await transport.start(); - await expect(transport.openSseStream()).rejects.toThrow("Failed to open SSE stream: Method Not Allowed"); - + await expect(transport["_startOrAuthStandaloneSSE"]()).resolves.not.toThrow("Failed to open SSE stream: Method Not Allowed"); // Check that GET was attempted expect(global.fetch).toHaveBeenCalledWith( expect.anything(), @@ -209,7 +208,7 @@ describe("StreamableHTTPClientTransport", () => { transport.onmessage = messageSpy; await transport.start(); - await transport.openSseStream(); + await transport["_startOrAuthStandaloneSSE"](); // Give time for the SSE event to be processed await new Promise(resolve => setTimeout(resolve, 50)); @@ -295,7 +294,7 @@ describe("StreamableHTTPClientTransport", () => { }); await transport.start(); - await transport.openSseStream(); + await transport["_startOrAuthStandaloneSSE"](); await new Promise(resolve => setTimeout(resolve, 50)); // Now simulate attempting to reconnect @@ -306,7 +305,7 @@ describe("StreamableHTTPClientTransport", () => { body: null }); - await transport.openSseStream(); + await transport["_startOrAuthStandaloneSSE"](); // Check that Last-Event-ID was included const calls = (global.fetch as jest.Mock).mock.calls; @@ -366,7 +365,7 @@ describe("StreamableHTTPClientTransport", () => { await transport.start(); - await transport.openSseStream(); + await transport["_startOrAuthStandaloneSSE"](); expect((actualReqInit.headers as Headers).get("x-custom-header")).toBe("CustomValue"); requestInit.headers["X-Custom-Header"] = "SecondCustomValue"; diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index ae54bb26..ea69ee77 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -1,5 +1,5 @@ import { Transport } from "../shared/transport.js"; -import { JSONRPCMessage, JSONRPCMessageSchema } from "../types.js"; +import { isJSONRPCNotification, JSONRPCMessage, JSONRPCMessageSchema } from "../types.js"; import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js"; import { EventSourceParserStream } from "eventsource-parser/stream"; @@ -126,6 +126,12 @@ export class StreamableHTTPClientTransport implements Transport { return await this._authThenStart(); } + // 405 indicates that the server does not offer an SSE stream at GET endpoint + // This is an expected case that should not trigger an error + if (response.status === 405) { + return; + } + throw new StreamableHTTPError( response.status, `Failed to open SSE stream: ${response.statusText}`, @@ -243,6 +249,12 @@ export class StreamableHTTPClientTransport implements Transport { // If the response is 202 Accepted, there's no body to process if (response.status === 202) { + // if the accepted notification is initialized, we start the SSE stream + // if it's supported by the server + if (isJSONRPCNotification(message) && message.method === "notifications/initialized") { + // We don't need to handle 405 here anymore as it's handled in _startOrAuthStandaloneSSE + this._startOrAuthStandaloneSSE().catch(err => this.onerror?.(err)); + } return; } @@ -279,20 +291,4 @@ export class StreamableHTTPClientTransport implements Transport { throw error; } } - - /** - * Opens SSE stream to receive messages from the server. - * - * This allows the server to push messages to the client without requiring the client - * to first send a request via HTTP POST. Some servers may not support this feature. - * If authentication is required but fails, this method will throw an UnauthorizedError. - */ - async openSseStream(): Promise { - if (!this._abortController) { - throw new Error( - "StreamableHTTPClientTransport not started! Call connect() before openSseStream().", - ); - } - await this._startOrAuthStandaloneSSE(); - } } diff --git a/src/examples/client/simpleStreamableHttp.ts b/src/examples/client/simpleStreamableHttp.ts index 027db881..c23c17f8 100644 --- a/src/examples/client/simpleStreamableHttp.ts +++ b/src/examples/client/simpleStreamableHttp.ts @@ -25,20 +25,10 @@ async function main(): Promise { const transport = new StreamableHTTPClientTransport( new URL('http://localhost:3000/mcp') ); - let supportsStandaloneSse = false; - - // Connect the client using the transport and initialize the server + + // Connect the client using the transport and initialize the server await client.connect(transport); console.log('Connected to MCP server'); - console.log('Opening SSE stream to receive server notifications...'); - try { - await transport.openSseStream(); - supportsStandaloneSse = true; - console.log('SSE stream established successfully. Waiting for notifications...'); - } - catch (error) { - console.error('Failed to open SSE stream:', error); - } // Set up notification handlers for server-initiated messages client.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => { @@ -136,11 +126,8 @@ async function main(): Promise { } catch (error) { console.log(`Resources not supported by this server (${error})`); } - if (supportsStandaloneSse) { - // Instead of closing immediately, keep the connection open to receive notifications - console.log('\nKeeping connection open to receive notifications. Press Ctrl+C to exit.'); - } - + // Keep the connection open to receive notifications + console.log('\nKeeping connection open to receive notifications. Press Ctrl+C to exit.'); } main().catch((error: unknown) => { diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index d6f7cb8a..85fcae2f 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -578,7 +578,7 @@ describe("StreamableHTTPServerTransport", () => { expect(mockResponse.writeHead).toHaveBeenCalledWith(200, expect.objectContaining({ "Content-Type": "text/event-stream", - "Cache-Control": "no-cache", + "Cache-Control": "no-cache, no-transform", "Connection": "keep-alive", "mcp-session-id": transport.sessionId, })); From 2c2cf5b4a2c09c336558cee3078320044e875c16 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Fri, 11 Apr 2025 14:15:43 +0100 Subject: [PATCH 10/11] cleam up example client --- src/examples/client/simpleStreamableHttp.ts | 104 ++++++++++++-------- 1 file changed, 61 insertions(+), 43 deletions(-) diff --git a/src/examples/client/simpleStreamableHttp.ts b/src/examples/client/simpleStreamableHttp.ts index c23c17f8..739e1164 100644 --- a/src/examples/client/simpleStreamableHttp.ts +++ b/src/examples/client/simpleStreamableHttp.ts @@ -12,7 +12,7 @@ import { ListResourcesRequest, ListResourcesResultSchema, LoggingMessageNotificationSchema, - ResourceListChangedNotificationSchema + ResourceListChangedNotificationSchema, } from '../../types.js'; async function main(): Promise { @@ -25,7 +25,7 @@ async function main(): Promise { const transport = new StreamableHTTPClientTransport( new URL('http://localhost:3000/mcp') ); - + // Connect the client using the transport and initialize the server await client.connect(transport); console.log('Connected to MCP server'); @@ -44,49 +44,12 @@ async function main(): Promise { console.log('Available resources count:', resourcesResult.resources.length); }); - // List available tools - try { - const toolsRequest: ListToolsRequest = { - method: 'tools/list', - params: {} - }; - const toolsResult = await client.request(toolsRequest, ListToolsResultSchema); - console.log('Available tools:', toolsResult.tools); + // List and call tools + await listTools(client); - if (toolsResult.tools.length === 0) { - console.log('No tools available from the server'); - } else { - // Call the 'greet' tool - const greetRequest: CallToolRequest = { - method: 'tools/call', - params: { - name: 'greet', - arguments: { name: 'MCP User' } - } - }; - const greetResult = await client.request(greetRequest, CallToolResultSchema); - console.log('Greeting result:', greetResult.content[0].text); + await callGreetTool(client); + await callMultiGreetTool(client); - // Call the new 'multi-greet' tool - console.log('\nCalling multi-greet tool (with notifications)...'); - const multiGreetRequest: CallToolRequest = { - method: 'tools/call', - params: { - name: 'multi-greet', - arguments: { name: 'MCP User' } - } - }; - const multiGreetResult = await client.request(multiGreetRequest, CallToolResultSchema); - console.log('Multi-greet results:'); - multiGreetResult.content.forEach(item => { - if (item.type === 'text') { - console.log(`- ${item.text}`); - } - }); - } - } catch (error) { - console.log(`Tools not supported by this server (${error})`); - } // List available prompts try { @@ -130,6 +93,61 @@ async function main(): Promise { console.log('\nKeeping connection open to receive notifications. Press Ctrl+C to exit.'); } +async function listTools(client: Client): Promise { + try { + const toolsRequest: ListToolsRequest = { + method: 'tools/list', + params: {} + }; + const toolsResult = await client.request(toolsRequest, ListToolsResultSchema); + console.log('Available tools:', toolsResult.tools); + if (toolsResult.tools.length === 0) { + console.log('No tools available from the server'); + } + } catch (error) { + console.log(`Tools not supported by this server (${error})`); + return + } +} + +async function callGreetTool(client: Client): Promise { + try { + const greetRequest: CallToolRequest = { + method: 'tools/call', + params: { + name: 'greet', + arguments: { name: 'MCP User' } + } + }; + const greetResult = await client.request(greetRequest, CallToolResultSchema); + console.log('Greeting result:', greetResult.content[0].text); + } catch (error) { + console.log(`Error calling greet tool: ${error}`); + } +} + +async function callMultiGreetTool(client: Client): Promise { + try { + console.log('\nCalling multi-greet tool (with notifications)...'); + const multiGreetRequest: CallToolRequest = { + method: 'tools/call', + params: { + name: 'multi-greet', + arguments: { name: 'MCP User' } + } + }; + const multiGreetResult = await client.request(multiGreetRequest, CallToolResultSchema); + console.log('Multi-greet results:'); + multiGreetResult.content.forEach(item => { + if (item.type === 'text') { + console.log(`- ${item.text}`); + } + }); + } catch (error) { + console.log(`Error calling multi-greet tool: ${error}`); + } +} + main().catch((error: unknown) => { console.error('Error running MCP client:', error); process.exit(1); From 44330448da66295857172d0587ac3d33f1a94276 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Fri, 11 Apr 2025 14:21:12 +0100 Subject: [PATCH 11/11] separate field on the class for standaloneSSE --- src/server/streamableHttp.ts | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index 05ba6146..ec8d2aa7 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -68,7 +68,7 @@ export class StreamableHTTPServerTransport implements Transport { private _requestResponseMap: Map = new Map(); private _initialized: boolean = false; private _enableJsonResponse: boolean = false; - private _sseStreamKey = "standalone-sse"; + private _standaloneSSE: ServerResponse | undefined; sessionId?: string | undefined; @@ -148,9 +148,8 @@ export class StreamableHTTPServerTransport implements Transport { // Resumability will be supported in the future // Check if there's already an active standalone SSE stream for this session - const existingStream = this._responseMapping.get(this._sseStreamKey); - if (existingStream !== undefined) { + if (this._standaloneSSE !== undefined) { // Only one GET SSE stream is allowed per session res.writeHead(409).end(JSON.stringify({ jsonrpc: "2.0", @@ -166,14 +165,12 @@ export class StreamableHTTPServerTransport implements Transport { // otherwise the client will just wait for the first message res.writeHead(200, headers).flushHeaders(); - // Store the response for this request so we can use it for standalone server notifications - // This response doesn't have an associated request ID, so we'll use a special string to track it - this._responseMapping.set(this._sseStreamKey, res); + // Assing the response to the standalone SSE stream + this._standaloneSSE = res; // Set up close handler for client disconnects res.on("close", () => { - // Clean up resources associated with this connection - this._responseMapping.delete(this._sseStreamKey); + this._standaloneSSE = undefined; }); } @@ -461,14 +458,13 @@ export class StreamableHTTPServerTransport implements Transport { throw new Error("Cannot send a response on a standalone SSE stream unless resuming a previous client request"); } - const standaloneStream = this._responseMapping.get(this._sseStreamKey); - if (standaloneStream === undefined) { + if (this._standaloneSSE === undefined) { // The spec says the server MAY send messages on the stream, so it's ok to discard if no stream return; } // Send the message to the standalone SSE stream - standaloneStream.write(`event: message\ndata: ${JSON.stringify(message)}\n\n`); + this._standaloneSSE.write(`event: message\ndata: ${JSON.stringify(message)}\n\n`); return; }