diff --git a/src/client/index.ts b/src/client/index.ts index 64ea62c4..a3edd0be 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -126,7 +126,11 @@ export class Client< override async connect(transport: Transport, options?: RequestOptions): Promise { await super.connect(transport); - + // When transport sessionId is already set this means we are trying to reconnect. + // In this case we don't need to initialize again. + if (transport.sessionId !== undefined) { + return; + } try { const result = await this.request( { diff --git a/src/client/streamableHttp.test.ts b/src/client/streamableHttp.test.ts index 073cc9ac..93e44150 100644 --- a/src/client/streamableHttp.test.ts +++ b/src/client/streamableHttp.test.ts @@ -164,7 +164,7 @@ describe("StreamableHTTPClientTransport", () => { // We expect the 405 error to be caught and handled gracefully // This should not throw an error that breaks the transport await transport.start(); - await expect(transport["_startOrAuthStandaloneSSE"]({})).resolves.not.toThrow("Failed to open SSE stream: Method Not Allowed"); + await expect(transport["_startOrAuthSse"]({})).resolves.not.toThrow("Failed to open SSE stream: Method Not Allowed"); // Check that GET was attempted expect(global.fetch).toHaveBeenCalledWith( expect.anything(), @@ -208,7 +208,7 @@ describe("StreamableHTTPClientTransport", () => { transport.onmessage = messageSpy; await transport.start(); - await transport["_startOrAuthStandaloneSSE"]({}); + await transport["_startOrAuthSse"]({}); // Give time for the SSE event to be processed await new Promise(resolve => setTimeout(resolve, 50)); @@ -313,9 +313,9 @@ describe("StreamableHTTPClientTransport", () => { await transport.start(); // Type assertion to access private method const transportWithPrivateMethods = transport as unknown as { - _startOrAuthStandaloneSSE: (options: { lastEventId?: string }) => Promise + _startOrAuthSse: (options: { resumptionToken?: string }) => Promise }; - await transportWithPrivateMethods._startOrAuthStandaloneSSE({ lastEventId: "test-event-id" }); + await transportWithPrivateMethods._startOrAuthSse({ resumptionToken: "test-event-id" }); // Verify fetch was called with the lastEventId header expect(fetchSpy).toHaveBeenCalled(); @@ -382,7 +382,7 @@ describe("StreamableHTTPClientTransport", () => { await transport.start(); - await transport["_startOrAuthStandaloneSSE"]({}); + await transport["_startOrAuthSse"]({}); expect((actualReqInit.headers as Headers).get("x-custom-header")).toBe("CustomValue"); requestInit.headers["X-Custom-Header"] = "SecondCustomValue"; diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 7bb88c0d..077b0f15 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -1,5 +1,5 @@ import { Transport } from "../shared/transport.js"; -import { isJSONRPCNotification, JSONRPCMessage, JSONRPCMessageSchema } from "../types.js"; +import { isJSONRPCNotification, isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema } from "../types.js"; import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js"; import { EventSourceParserStream } from "eventsource-parser/stream"; @@ -23,11 +23,26 @@ export class StreamableHTTPError extends Error { /** * Options for starting or authenticating an SSE connection */ -export interface StartSSEOptions { +interface StartSSEOptions { /** - * The ID of the last received event, used for resuming a disconnected stream + * The resumption token used to continue long-running requests that were interrupted. + * + * This allows clients to reconnect and continue from where they left off. + */ + resumptionToken?: string; + + /** + * A callback that is invoked when the resumption token changes. + * + * This allows clients to persist the latest token for potential reconnection. */ - lastEventId?: string; + onresumptiontoken?: (token: string) => void; + + /** + * Override Message ID to associate with the replay message + * so that response can be associate with the new resumed request. + */ + replayMessageId?: string | number; } /** @@ -88,6 +103,12 @@ export type StreamableHTTPClientTransportOptions = { * Options to configure the reconnection behavior. */ reconnectionOptions?: StreamableHTTPReconnectionOptions; + + /** + * Session ID for the connection. This is used to identify the session on the server. + * When not provided and connecting to a server that supports session IDs, the server will generate a new session ID. + */ + sessionId?: string; }; /** @@ -114,6 +135,7 @@ export class StreamableHTTPClientTransport implements Transport { this._url = url; this._requestInit = opts?.requestInit; this._authProvider = opts?.authProvider; + this._sessionId = opts?.sessionId; this._reconnectionOptions = opts?.reconnectionOptions ?? DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS; } @@ -134,7 +156,7 @@ export class StreamableHTTPClientTransport implements Transport { throw new UnauthorizedError(); } - return await this._startOrAuthStandaloneSSE({ lastEventId: undefined }); + return await this._startOrAuthSse({ resumptionToken: undefined }); } private async _commonHeaders(): Promise { @@ -156,8 +178,8 @@ export class StreamableHTTPClientTransport implements Transport { } - private async _startOrAuthStandaloneSSE(options: StartSSEOptions): Promise { - const { lastEventId } = options; + private async _startOrAuthSse(options: StartSSEOptions): Promise { + const { resumptionToken } = options; try { // Try to open an initial SSE stream with GET to listen for server messages // This is optional according to the spec - server may not support it @@ -165,8 +187,8 @@ export class StreamableHTTPClientTransport implements Transport { headers.set("Accept", "text/event-stream"); // Include Last-Event-ID header for resumable streams if provided - if (lastEventId) { - headers.set("last-event-id", lastEventId); + if (resumptionToken) { + headers.set("last-event-id", resumptionToken); } const response = await fetch(this._url, { @@ -193,7 +215,7 @@ export class StreamableHTTPClientTransport implements Transport { ); } - this._handleSseStream(response.body); + this._handleSseStream(response.body, options); } catch (error) { this.onerror?.(error as Error); throw error; @@ -224,7 +246,7 @@ export class StreamableHTTPClientTransport implements Transport { * @param lastEventId The ID of the last received event for resumability * @param attemptCount Current reconnection attempt count for this specific stream */ - private _scheduleReconnection(lastEventId: string, attemptCount = 0): void { + private _scheduleReconnection(options: StartSSEOptions, attemptCount = 0): void { // Use provided options or default options const maxRetries = this._reconnectionOptions.maxRetries; @@ -240,18 +262,19 @@ export class StreamableHTTPClientTransport implements Transport { // Schedule the reconnection setTimeout(() => { // Use the last event ID to resume where we left off - this._startOrAuthStandaloneSSE({ lastEventId }).catch(error => { + this._startOrAuthSse(options).catch(error => { this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`)); // Schedule another attempt if this one failed, incrementing the attempt counter - this._scheduleReconnection(lastEventId, attemptCount + 1); + this._scheduleReconnection(options, attemptCount + 1); }); }, delay); } - private _handleSseStream(stream: ReadableStream | null): void { + private _handleSseStream(stream: ReadableStream | null, options: StartSSEOptions): void { if (!stream) { return; } + const { onresumptiontoken, replayMessageId } = options; let lastEventId: string | undefined; const processStream = async () => { @@ -274,11 +297,15 @@ export class StreamableHTTPClientTransport implements Transport { // Update last event ID if provided if (event.id) { lastEventId = event.id; + onresumptiontoken?.(event.id); } if (!event.event || event.event === "message") { try { const message = JSONRPCMessageSchema.parse(JSON.parse(event.data)); + if (replayMessageId !== undefined && isJSONRPCResponse(message)) { + message.id = replayMessageId; + } this.onmessage?.(message); } catch (error) { this.onerror?.(error as Error); @@ -294,7 +321,11 @@ export class StreamableHTTPClientTransport implements Transport { // Use the exponential backoff reconnection strategy if (lastEventId !== undefined) { try { - this._scheduleReconnection(lastEventId, 0); + this._scheduleReconnection({ + resumptionToken: lastEventId, + onresumptiontoken, + replayMessageId + }, 0); } catch (error) { this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`)); @@ -338,8 +369,16 @@ export class StreamableHTTPClientTransport implements Transport { this.onclose?.(); } - async send(message: JSONRPCMessage | JSONRPCMessage[]): Promise { + async send(message: JSONRPCMessage | JSONRPCMessage[], options?: { resumptionToken?: string, onresumptiontoken?: (token: string) => void }): Promise { try { + const { resumptionToken, onresumptiontoken } = options || {}; + + if (resumptionToken) { + // If we have at last event ID, we need to reconnect the SSE stream + this._startOrAuthSse({ resumptionToken, replayMessageId: isJSONRPCRequest(message) ? message.id : undefined }).catch(err => this.onerror?.(err)); + return; + } + const headers = await this._commonHeaders(); headers.set("content-type", "application/json"); headers.set("accept", "application/json, text/event-stream"); @@ -383,7 +422,7 @@ export class StreamableHTTPClientTransport implements Transport { // if it's supported by the server if (isJSONRPCNotification(message) && message.method === "notifications/initialized") { // Start without a lastEventId since this is a fresh connection - this._startOrAuthStandaloneSSE({ lastEventId: undefined }).catch(err => this.onerror?.(err)); + this._startOrAuthSse({ resumptionToken: undefined }).catch(err => this.onerror?.(err)); } return; } @@ -398,7 +437,10 @@ export class StreamableHTTPClientTransport implements Transport { if (hasRequests) { if (contentType?.includes("text/event-stream")) { - this._handleSseStream(response.body); + // Handle SSE stream responses for requests + // We use the same handler as standalone streams, which now supports + // reconnection with the last event ID + this._handleSseStream(response.body, { onresumptiontoken }); } else if (contentType?.includes("application/json")) { // For non-streaming servers, we might get direct JSON responses const data = await response.json(); @@ -421,4 +463,8 @@ export class StreamableHTTPClientTransport implements Transport { throw error; } } + + get sessionId(): string | undefined { + return this._sessionId; + } } diff --git a/src/examples/client/simpleStreamableHttp.ts b/src/examples/client/simpleStreamableHttp.ts index 923ffbc2..d0d5408b 100644 --- a/src/examples/client/simpleStreamableHttp.ts +++ b/src/examples/client/simpleStreamableHttp.ts @@ -29,6 +29,8 @@ let notificationCount = 0; let client: Client | null = null; let transport: StreamableHTTPClientTransport | null = null; let serverUrl = 'http://localhost:3000/mcp'; +let notificationsToolLastEventId: string | undefined = undefined; +let sessionId: string | undefined = undefined; async function main(): Promise { console.log('MCP Interactive Client'); @@ -109,7 +111,7 @@ function commandLoop(): void { case 'start-notifications': { const interval = args[1] ? parseInt(args[1], 10) : 2000; - const count = args[2] ? parseInt(args[2], 10) : 0; + const count = args[2] ? parseInt(args[2], 10) : 10; await startNotifications(interval, count); break; } @@ -186,7 +188,10 @@ async function connect(url?: string): Promise { } transport = new StreamableHTTPClientTransport( - new URL(serverUrl) + new URL(serverUrl), + { + sessionId: sessionId + } ); // Set up notification handlers @@ -218,6 +223,8 @@ async function connect(url?: string): Promise { // Connect the client await client.connect(transport); + sessionId = transport.sessionId + console.log('Transport created with session ID:', sessionId); console.log('Connected to MCP server'); } catch (error) { console.error('Failed to connect:', error); @@ -291,7 +298,12 @@ async function callTool(name: string, args: Record): Promise { + notificationsToolLastEventId = event; + }; + const result = await client.request(request, CallToolResultSchema, { + resumptionToken: notificationsToolLastEventId, onresumptiontoken: onLastEventIdUpdate + }); console.log('Tool result:'); result.content.forEach(item => { diff --git a/src/examples/server/simpleStreamableHttp.ts b/src/examples/server/simpleStreamableHttp.ts index 153e35b7..98333730 100644 --- a/src/examples/server/simpleStreamableHttp.ts +++ b/src/examples/server/simpleStreamableHttp.ts @@ -181,14 +181,18 @@ server.tool( while (count === 0 || counter < count) { counter++; - await sendNotification({ - method: "notifications/message", - params: { - level: "info", - data: `Periodic notification #${counter} at ${new Date().toISOString()}` - } - }); - + try { + await sendNotification({ + method: "notifications/message", + params: { + level: "info", + data: `Periodic notification #${counter} at ${new Date().toISOString()}` + } + }); + } + catch (error) { + console.error("Error sending notification:", error); + } // Wait for the specified interval await sleep(interval); } diff --git a/src/integration-tests/taskResumability.test.ts b/src/integration-tests/taskResumability.test.ts new file mode 100644 index 00000000..3bd23498 --- /dev/null +++ b/src/integration-tests/taskResumability.test.ts @@ -0,0 +1,329 @@ +import { createServer, type Server } from 'node:http'; +import { AddressInfo } from 'node:net'; +import { randomUUID } from 'node:crypto'; +import { Client } from '../client/index.js'; +import { StreamableHTTPClientTransport } from '../client/streamableHttp.js'; +import { McpServer } from '../server/mcp.js'; +import { EventStore, StreamableHTTPServerTransport } from '../server/streamableHttp.js'; +import { CallToolResultSchema, JSONRPCMessage, LoggingMessageNotificationSchema } from '../types.js'; +import { z } from 'zod'; + +/** + * Simple in-memory event store implementation for resumability + */ +class InMemoryEventStore implements EventStore { + private events: Map = new Map(); + + private generateEventId(streamId: string): string { + return `${streamId}_${Date.now()}_${Math.random().toString(36).substring(2, 10)}`; + } + + private getStreamIdFromEventId(eventId: string): string { + const parts = eventId.split('_'); + return parts.length > 0 ? parts[0] : ''; + } + + async storeEvent(streamId: string, message: JSONRPCMessage): Promise { + const eventId = this.generateEventId(streamId); + this.events.set(eventId, { streamId, message }); + return eventId; + } + + async replayEventsAfter(lastEventId: string, + { send }: { send: (eventId: string, message: JSONRPCMessage) => Promise } + ): Promise { + if (!lastEventId || !this.events.has(lastEventId)) { + return ''; + } + + // Extract the stream ID from the event ID + const streamId = this.getStreamIdFromEventId(lastEventId); + if (!streamId) { + return ''; + } + let foundLastEvent = false; + + // Sort events by eventId for chronological ordering + const sortedEvents = [...this.events.entries()].sort((a, b) => a[0].localeCompare(b[0])); + + for (const [eventId, { streamId: eventStreamId, message }] of sortedEvents) { + // Only include events from the same stream + if (eventStreamId !== streamId) { + continue; + } + + // Start collecting events after we find the lastEventId + if (eventId === lastEventId) { + foundLastEvent = true; + continue; + } + + if (foundLastEvent) { + await send(eventId, message); + } + } + + return streamId; + } +} + + +describe('Transport resumability', () => { + let server: Server; + let mcpServer: McpServer; + let serverTransport: StreamableHTTPServerTransport; + let baseUrl: URL; + let eventStore: InMemoryEventStore; + + beforeEach(async () => { + // Create event store for resumability + eventStore = new InMemoryEventStore(); + + // Create a simple MCP server + mcpServer = new McpServer( + { name: 'test-server', version: '1.0.0' }, + { capabilities: { logging: {} } } + ); + + // Add a simple notification tool that completes quickly + mcpServer.tool( + 'send-notification', + 'Sends a single notification', + { + message: z.string().describe('Message to send').default('Test notification') + }, + async ({ message }, { sendNotification }) => { + // Send notification immediately + await sendNotification({ + method: "notifications/message", + params: { + level: "info", + data: message + } + }); + + return { + content: [{ type: 'text', text: 'Notification sent' }] + }; + } + ); + + // Add a long-running tool that sends multiple notifications + mcpServer.tool( + 'run-notifications', + 'Sends multiple notifications over time', + { + count: z.number().describe('Number of notifications to send').default(10), + interval: z.number().describe('Interval between notifications in ms').default(50) + }, + async ({ count, interval }, { sendNotification }) => { + // Send notifications at specified intervals + for (let i = 0; i < count; i++) { + await sendNotification({ + method: "notifications/message", + params: { + level: "info", + data: `Notification ${i + 1} of ${count}` + } + }); + + // Wait for the specified interval before sending next notification + if (i < count - 1) { + await new Promise(resolve => setTimeout(resolve, interval)); + } + } + + return { + content: [{ type: 'text', text: `Sent ${count} notifications` }] + }; + } + ); + + // Create a transport with the event store + serverTransport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + eventStore + }); + + // Connect the transport to the MCP server + await mcpServer.connect(serverTransport); + + // Create and start an HTTP server + server = createServer(async (req, res) => { + await serverTransport.handleRequest(req, res); + }); + + // Start the server on a random port + baseUrl = await new Promise((resolve) => { + server.listen(0, '127.0.0.1', () => { + const addr = server.address() as AddressInfo; + resolve(new URL(`http://127.0.0.1:${addr.port}`)); + }); + }); + }); + + afterEach(async () => { + // Clean up resources + await mcpServer.close().catch(() => { }); + await serverTransport.close().catch(() => { }); + server.close(); + }); + + it('should store session ID when client connects', async () => { + // Create and connect a client + const client = new Client({ + name: 'test-client', + version: '1.0.0' + }); + + const transport = new StreamableHTTPClientTransport(baseUrl); + await client.connect(transport); + + // Verify session ID was generated + expect(transport.sessionId).toBeDefined(); + + // Clean up + await transport.close(); + }); + + it('should have session ID functionality', async () => { + // The ability to store a session ID when connecting + const client = new Client({ + name: 'test-client-reconnection', + version: '1.0.0' + }); + + const transport = new StreamableHTTPClientTransport(baseUrl); + + // Make sure the client can connect and get a session ID + await client.connect(transport); + expect(transport.sessionId).toBeDefined(); + + // Clean up + await transport.close(); + }); + + // This test demonstrates the capability to resume long-running tools + // across client disconnection/reconnection + it('should resume long-running notifications with lastEventId', async () => { + // Create unique client ID for this test + const clientId = 'test-client-long-running'; + const notifications = []; + let lastEventId: string | undefined; + + // Create first client + const client1 = new Client({ + id: clientId, + name: 'test-client', + version: '1.0.0' + }); + + // Set up notification handler for first client + client1.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => { + if (notification.method === 'notifications/message') { + notifications.push(notification.params); + } + }); + + // Connect first client + const transport1 = new StreamableHTTPClientTransport(baseUrl); + await client1.connect(transport1); + const sessionId = transport1.sessionId; + expect(sessionId).toBeDefined(); + + // Start a long-running notification stream with tracking of lastEventId + const onLastEventIdUpdate = jest.fn((eventId: string) => { + lastEventId = eventId; + }); + expect(lastEventId).toBeUndefined(); + // Start the notification tool with event tracking using request + const toolPromise = client1.request({ + method: 'tools/call', + params: { + name: 'run-notifications', + arguments: { + count: 3, + interval: 10 + } + } + }, CallToolResultSchema, { + resumptionToken: lastEventId, + onresumptiontoken: onLastEventIdUpdate + }); + + // Wait for some notifications to arrive (not all) - shorter wait time + await new Promise(resolve => setTimeout(resolve, 20)); + + // Verify we received some notifications and lastEventId was updated + expect(notifications.length).toBeGreaterThan(0); + expect(notifications.length).toBeLessThan(4); + expect(onLastEventIdUpdate).toHaveBeenCalled(); + expect(lastEventId).toBeDefined(); + + + // Disconnect first client without waiting for completion + // When we close the connection, it will cause a ConnectionClosed error for + // any in-progress requests, which is expected behavior + await transport1.close(); + // Save the promise so we can catch it after closing + const catchPromise = toolPromise.catch(err => { + // This error is expected - the connection was intentionally closed + if (err?.code !== -32000) { // ConnectionClosed error code + console.error("Unexpected error type during transport close:", err); + } + }); + + + + // Add a short delay to ensure clean disconnect before reconnecting + await new Promise(resolve => setTimeout(resolve, 10)); + + // Wait for the rejection to be handled + await catchPromise; + + + // Create second client with same client ID + const client2 = new Client({ + id: clientId, + name: 'test-client', + version: '1.0.0' + }); + + // Set up notification handler for second client + client2.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => { + if (notification.method === 'notifications/message') { + notifications.push(notification.params); + } + }); + + // Connect second client with same session ID + const transport2 = new StreamableHTTPClientTransport(baseUrl, { + sessionId + }); + await client2.connect(transport2); + + // Resume the notification stream using lastEventId + // This is the key part - we're resuming the same long-running tool using lastEventId + await client2.request({ + method: 'tools/call', + params: { + name: 'run-notifications', + arguments: { + count: 1, + interval: 5 + } + } + }, CallToolResultSchema, { + resumptionToken: lastEventId, // Pass the lastEventId from the previous session + onresumptiontoken: onLastEventIdUpdate + }); + + // Verify we eventually received at leaset a few motifications + expect(notifications.length).toBeGreaterThan(1); + + + // Clean up + await transport2.close(); + + }); +}); \ No newline at end of file diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index 7ddfa3ab..31aad09c 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -202,7 +202,6 @@ export class StreamableHTTPServerTransport implements Transport { // Assign the response to the standalone SSE stream this._streamMapping.set(this._standaloneSseStreamId, res); - // Set up close handler for client disconnects res.on("close", () => { this._streamMapping.delete(this._standaloneSseStreamId); @@ -333,6 +332,8 @@ export class StreamableHTTPServerTransport implements Transport { msg => 'method' in msg && msg.method === 'initialize' ); if (isInitializationRequest) { + // If it's a server with session management and the session ID is already set we should reject the request + // to avoid re-initialization. if (this._initialized) { res.writeHead(400).end(JSON.stringify({ jsonrpc: "2.0", @@ -404,17 +405,8 @@ export class StreamableHTTPServerTransport implements Transport { this._requestToStreamMapping.set(message.id, streamId); } } - // Set up close handler for client disconnects res.on("close", () => { - // find a stream ID for this response - // Remove all entries that reference this response - for (const [id, stream] of this._requestToStreamMapping.entries()) { - if (streamId === stream) { - this._requestToStreamMapping.delete(id); - this._requestResponseMap.delete(id); - } - } this._streamMapping.delete(streamId); }); @@ -562,7 +554,7 @@ export class StreamableHTTPServerTransport implements Transport { // Get the response for this request const streamId = this._requestToStreamMapping.get(requestId); const response = this._streamMapping.get(streamId!); - if (!streamId || !response) { + if (!streamId) { throw new Error(`No connection established for request ID: ${String(requestId)}`); } @@ -573,9 +565,10 @@ export class StreamableHTTPServerTransport implements Transport { if (this._eventStore) { eventId = await this._eventStore.storeEvent(streamId, message); } - - // Write the event to the response stream - this.writeSSEEvent(response, message, eventId); + if (response) { + // Write the event to the response stream + this.writeSSEEvent(response, message, eventId); + } } if (isJSONRPCResponse(message)) { @@ -588,6 +581,9 @@ export class StreamableHTTPServerTransport implements Transport { const allResponsesReady = relatedIds.every(id => this._requestResponseMap.has(id)); if (allResponsesReady) { + if (!response) { + throw new Error(`No connection established for request ID: ${String(requestId)}`); + } if (this._enableJsonResponse) { // All responses ready, send as JSON const headers: Record = { diff --git a/src/shared/protocol.ts b/src/shared/protocol.ts index 91fa8366..c9ea79fd 100644 --- a/src/shared/protocol.ts +++ b/src/shared/protocol.ts @@ -22,7 +22,7 @@ import { Result, ServerCapabilities, } from "../types.js"; -import { Transport } from "./transport.js"; +import { Transport, TransportSendOptions } from "./transport.js"; import { AuthInfo } from "../server/auth/types.js"; /** @@ -83,12 +83,7 @@ export type RequestOptions = { * If not specified, there is no maximum total timeout. */ maxTotalTimeout?: number; - - /** - * May be used to indicate to the transport which incoming request to associate this outgoing request with. - */ - relatedRequestId?: RequestId; -}; +} & TransportSendOptions; /** * Options that can be given per notification. @@ -507,7 +502,7 @@ export abstract class Protocol< resultSchema: T, options?: RequestOptions, ): Promise> { - const { relatedRequestId } = options ?? {}; + const { relatedRequestId, resumptionToken, onresumptiontoken } = options ?? {}; return new Promise((resolve, reject) => { if (!this._transport) { @@ -549,7 +544,7 @@ export abstract class Protocol< requestId: messageId, reason: String(reason), }, - }, { relatedRequestId }) + }, { relatedRequestId, resumptionToken, onresumptiontoken }) .catch((error) => this._onerror(new Error(`Failed to send cancellation: ${error}`)), ); @@ -587,7 +582,7 @@ export abstract class Protocol< this._setupTimeout(messageId, timeout, options?.maxTotalTimeout, timeoutHandler, options?.resetTimeoutOnProgress ?? false); - this._transport.send(jsonrpcRequest, { relatedRequestId }).catch((error) => { + this._transport.send(jsonrpcRequest, { relatedRequestId, resumptionToken, onresumptiontoken }).catch((error) => { this._cleanupTimeout(messageId); reject(error); }); diff --git a/src/shared/transport.ts b/src/shared/transport.ts index c2732391..fe0a60e6 100644 --- a/src/shared/transport.ts +++ b/src/shared/transport.ts @@ -1,6 +1,29 @@ import { AuthInfo } from "../server/auth/types.js"; import { JSONRPCMessage, RequestId } from "../types.js"; +/** + * Options for sending a JSON-RPC message. + */ +export type TransportSendOptions = { + /** + * If present, `relatedRequestId` is used to indicate to the transport which incoming request to associate this outgoing message with. + */ + relatedRequestId?: RequestId; + + /** + * The resumption token used to continue long-running requests that were interrupted. + * + * This allows clients to reconnect and continue from where they left off, if supported by the transport. + */ + resumptionToken?: string; + + /** + * A callback that is invoked when the resumption token changes, if supported by the transport. + * + * This allows clients to persist the latest token for potential reconnection. + */ + onresumptiontoken?: (token: string) => void; +} /** * Describes the minimal contract for a MCP transport that a client or server can communicate over. */ @@ -19,7 +42,7 @@ export interface Transport { * * If present, `relatedRequestId` is used to indicate to the transport which incoming request to associate this outgoing message with. */ - send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise; + send(message: JSONRPCMessage, options?: TransportSendOptions): Promise; /** * Closes the connection.