Skip to content

Commit 09e5d5b

Browse files
authored
Merge pull request #333 from modelcontextprotocol/ihrpr/resume-long-running-requests
StreamableHttp transport - resume long running requests
2 parents 9ed2254 + f9f2ba4 commit 09e5d5b

File tree

9 files changed

+469
-60
lines changed

9 files changed

+469
-60
lines changed

src/client/index.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,11 @@ export class Client<
126126

127127
override async connect(transport: Transport, options?: RequestOptions): Promise<void> {
128128
await super.connect(transport);
129-
129+
// When transport sessionId is already set this means we are trying to reconnect.
130+
// In this case we don't need to initialize again.
131+
if (transport.sessionId !== undefined) {
132+
return;
133+
}
130134
try {
131135
const result = await this.request(
132136
{

src/client/streamableHttp.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ describe("StreamableHTTPClientTransport", () => {
164164
// We expect the 405 error to be caught and handled gracefully
165165
// This should not throw an error that breaks the transport
166166
await transport.start();
167-
await expect(transport["_startOrAuthStandaloneSSE"]({})).resolves.not.toThrow("Failed to open SSE stream: Method Not Allowed");
167+
await expect(transport["_startOrAuthSse"]({})).resolves.not.toThrow("Failed to open SSE stream: Method Not Allowed");
168168
// Check that GET was attempted
169169
expect(global.fetch).toHaveBeenCalledWith(
170170
expect.anything(),
@@ -208,7 +208,7 @@ describe("StreamableHTTPClientTransport", () => {
208208
transport.onmessage = messageSpy;
209209

210210
await transport.start();
211-
await transport["_startOrAuthStandaloneSSE"]({});
211+
await transport["_startOrAuthSse"]({});
212212

213213
// Give time for the SSE event to be processed
214214
await new Promise(resolve => setTimeout(resolve, 50));
@@ -313,9 +313,9 @@ describe("StreamableHTTPClientTransport", () => {
313313
await transport.start();
314314
// Type assertion to access private method
315315
const transportWithPrivateMethods = transport as unknown as {
316-
_startOrAuthStandaloneSSE: (options: { lastEventId?: string }) => Promise<void>
316+
_startOrAuthSse: (options: { resumptionToken?: string }) => Promise<void>
317317
};
318-
await transportWithPrivateMethods._startOrAuthStandaloneSSE({ lastEventId: "test-event-id" });
318+
await transportWithPrivateMethods._startOrAuthSse({ resumptionToken: "test-event-id" });
319319

320320
// Verify fetch was called with the lastEventId header
321321
expect(fetchSpy).toHaveBeenCalled();
@@ -382,7 +382,7 @@ describe("StreamableHTTPClientTransport", () => {
382382

383383
await transport.start();
384384

385-
await transport["_startOrAuthStandaloneSSE"]({});
385+
await transport["_startOrAuthSse"]({});
386386
expect((actualReqInit.headers as Headers).get("x-custom-header")).toBe("CustomValue");
387387

388388
requestInit.headers["X-Custom-Header"] = "SecondCustomValue";

src/client/streamableHttp.ts

Lines changed: 64 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Transport } from "../shared/transport.js";
2-
import { isJSONRPCNotification, JSONRPCMessage, JSONRPCMessageSchema } from "../types.js";
2+
import { isJSONRPCNotification, isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema } from "../types.js";
33
import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js";
44
import { EventSourceParserStream } from "eventsource-parser/stream";
55

@@ -23,11 +23,26 @@ export class StreamableHTTPError extends Error {
2323
/**
2424
* Options for starting or authenticating an SSE connection
2525
*/
26-
export interface StartSSEOptions {
26+
interface StartSSEOptions {
2727
/**
28-
* The ID of the last received event, used for resuming a disconnected stream
28+
* The resumption token used to continue long-running requests that were interrupted.
29+
*
30+
* This allows clients to reconnect and continue from where they left off.
31+
*/
32+
resumptionToken?: string;
33+
34+
/**
35+
* A callback that is invoked when the resumption token changes.
36+
*
37+
* This allows clients to persist the latest token for potential reconnection.
2938
*/
30-
lastEventId?: string;
39+
onresumptiontoken?: (token: string) => void;
40+
41+
/**
42+
* Override Message ID to associate with the replay message
43+
* so that response can be associate with the new resumed request.
44+
*/
45+
replayMessageId?: string | number;
3146
}
3247

3348
/**
@@ -88,6 +103,12 @@ export type StreamableHTTPClientTransportOptions = {
88103
* Options to configure the reconnection behavior.
89104
*/
90105
reconnectionOptions?: StreamableHTTPReconnectionOptions;
106+
107+
/**
108+
* Session ID for the connection. This is used to identify the session on the server.
109+
* When not provided and connecting to a server that supports session IDs, the server will generate a new session ID.
110+
*/
111+
sessionId?: string;
91112
};
92113

93114
/**
@@ -114,6 +135,7 @@ export class StreamableHTTPClientTransport implements Transport {
114135
this._url = url;
115136
this._requestInit = opts?.requestInit;
116137
this._authProvider = opts?.authProvider;
138+
this._sessionId = opts?.sessionId;
117139
this._reconnectionOptions = opts?.reconnectionOptions ?? DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS;
118140
}
119141

@@ -134,7 +156,7 @@ export class StreamableHTTPClientTransport implements Transport {
134156
throw new UnauthorizedError();
135157
}
136158

137-
return await this._startOrAuthStandaloneSSE({ lastEventId: undefined });
159+
return await this._startOrAuthSse({ resumptionToken: undefined });
138160
}
139161

140162
private async _commonHeaders(): Promise<Headers> {
@@ -156,17 +178,17 @@ export class StreamableHTTPClientTransport implements Transport {
156178
}
157179

158180

159-
private async _startOrAuthStandaloneSSE(options: StartSSEOptions): Promise<void> {
160-
const { lastEventId } = options;
181+
private async _startOrAuthSse(options: StartSSEOptions): Promise<void> {
182+
const { resumptionToken } = options;
161183
try {
162184
// Try to open an initial SSE stream with GET to listen for server messages
163185
// This is optional according to the spec - server may not support it
164186
const headers = await this._commonHeaders();
165187
headers.set("Accept", "text/event-stream");
166188

167189
// Include Last-Event-ID header for resumable streams if provided
168-
if (lastEventId) {
169-
headers.set("last-event-id", lastEventId);
190+
if (resumptionToken) {
191+
headers.set("last-event-id", resumptionToken);
170192
}
171193

172194
const response = await fetch(this._url, {
@@ -193,7 +215,7 @@ export class StreamableHTTPClientTransport implements Transport {
193215
);
194216
}
195217

196-
this._handleSseStream(response.body);
218+
this._handleSseStream(response.body, options);
197219
} catch (error) {
198220
this.onerror?.(error as Error);
199221
throw error;
@@ -224,7 +246,7 @@ export class StreamableHTTPClientTransport implements Transport {
224246
* @param lastEventId The ID of the last received event for resumability
225247
* @param attemptCount Current reconnection attempt count for this specific stream
226248
*/
227-
private _scheduleReconnection(lastEventId: string, attemptCount = 0): void {
249+
private _scheduleReconnection(options: StartSSEOptions, attemptCount = 0): void {
228250
// Use provided options or default options
229251
const maxRetries = this._reconnectionOptions.maxRetries;
230252

@@ -240,18 +262,19 @@ export class StreamableHTTPClientTransport implements Transport {
240262
// Schedule the reconnection
241263
setTimeout(() => {
242264
// Use the last event ID to resume where we left off
243-
this._startOrAuthStandaloneSSE({ lastEventId }).catch(error => {
265+
this._startOrAuthSse(options).catch(error => {
244266
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`));
245267
// Schedule another attempt if this one failed, incrementing the attempt counter
246-
this._scheduleReconnection(lastEventId, attemptCount + 1);
268+
this._scheduleReconnection(options, attemptCount + 1);
247269
});
248270
}, delay);
249271
}
250272

251-
private _handleSseStream(stream: ReadableStream<Uint8Array> | null): void {
273+
private _handleSseStream(stream: ReadableStream<Uint8Array> | null, options: StartSSEOptions): void {
252274
if (!stream) {
253275
return;
254276
}
277+
const { onresumptiontoken, replayMessageId } = options;
255278

256279
let lastEventId: string | undefined;
257280
const processStream = async () => {
@@ -274,11 +297,15 @@ export class StreamableHTTPClientTransport implements Transport {
274297
// Update last event ID if provided
275298
if (event.id) {
276299
lastEventId = event.id;
300+
onresumptiontoken?.(event.id);
277301
}
278302

279303
if (!event.event || event.event === "message") {
280304
try {
281305
const message = JSONRPCMessageSchema.parse(JSON.parse(event.data));
306+
if (replayMessageId !== undefined && isJSONRPCResponse(message)) {
307+
message.id = replayMessageId;
308+
}
282309
this.onmessage?.(message);
283310
} catch (error) {
284311
this.onerror?.(error as Error);
@@ -294,7 +321,11 @@ export class StreamableHTTPClientTransport implements Transport {
294321
// Use the exponential backoff reconnection strategy
295322
if (lastEventId !== undefined) {
296323
try {
297-
this._scheduleReconnection(lastEventId, 0);
324+
this._scheduleReconnection({
325+
resumptionToken: lastEventId,
326+
onresumptiontoken,
327+
replayMessageId
328+
}, 0);
298329
}
299330
catch (error) {
300331
this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`));
@@ -338,8 +369,16 @@ export class StreamableHTTPClientTransport implements Transport {
338369
this.onclose?.();
339370
}
340371

341-
async send(message: JSONRPCMessage | JSONRPCMessage[]): Promise<void> {
372+
async send(message: JSONRPCMessage | JSONRPCMessage[], options?: { resumptionToken?: string, onresumptiontoken?: (token: string) => void }): Promise<void> {
342373
try {
374+
const { resumptionToken, onresumptiontoken } = options || {};
375+
376+
if (resumptionToken) {
377+
// If we have at last event ID, we need to reconnect the SSE stream
378+
this._startOrAuthSse({ resumptionToken, replayMessageId: isJSONRPCRequest(message) ? message.id : undefined }).catch(err => this.onerror?.(err));
379+
return;
380+
}
381+
343382
const headers = await this._commonHeaders();
344383
headers.set("content-type", "application/json");
345384
headers.set("accept", "application/json, text/event-stream");
@@ -383,7 +422,7 @@ export class StreamableHTTPClientTransport implements Transport {
383422
// if it's supported by the server
384423
if (isJSONRPCNotification(message) && message.method === "notifications/initialized") {
385424
// Start without a lastEventId since this is a fresh connection
386-
this._startOrAuthStandaloneSSE({ lastEventId: undefined }).catch(err => this.onerror?.(err));
425+
this._startOrAuthSse({ resumptionToken: undefined }).catch(err => this.onerror?.(err));
387426
}
388427
return;
389428
}
@@ -398,7 +437,10 @@ export class StreamableHTTPClientTransport implements Transport {
398437

399438
if (hasRequests) {
400439
if (contentType?.includes("text/event-stream")) {
401-
this._handleSseStream(response.body);
440+
// Handle SSE stream responses for requests
441+
// We use the same handler as standalone streams, which now supports
442+
// reconnection with the last event ID
443+
this._handleSseStream(response.body, { onresumptiontoken });
402444
} else if (contentType?.includes("application/json")) {
403445
// For non-streaming servers, we might get direct JSON responses
404446
const data = await response.json();
@@ -421,4 +463,8 @@ export class StreamableHTTPClientTransport implements Transport {
421463
throw error;
422464
}
423465
}
466+
467+
get sessionId(): string | undefined {
468+
return this._sessionId;
469+
}
424470
}

src/examples/client/simpleStreamableHttp.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ let notificationCount = 0;
2929
let client: Client | null = null;
3030
let transport: StreamableHTTPClientTransport | null = null;
3131
let serverUrl = 'http://localhost:3000/mcp';
32+
let notificationsToolLastEventId: string | undefined = undefined;
33+
let sessionId: string | undefined = undefined;
3234

3335
async function main(): Promise<void> {
3436
console.log('MCP Interactive Client');
@@ -109,7 +111,7 @@ function commandLoop(): void {
109111

110112
case 'start-notifications': {
111113
const interval = args[1] ? parseInt(args[1], 10) : 2000;
112-
const count = args[2] ? parseInt(args[2], 10) : 0;
114+
const count = args[2] ? parseInt(args[2], 10) : 10;
113115
await startNotifications(interval, count);
114116
break;
115117
}
@@ -186,7 +188,10 @@ async function connect(url?: string): Promise<void> {
186188
}
187189

188190
transport = new StreamableHTTPClientTransport(
189-
new URL(serverUrl)
191+
new URL(serverUrl),
192+
{
193+
sessionId: sessionId
194+
}
190195
);
191196

192197
// Set up notification handlers
@@ -218,6 +223,8 @@ async function connect(url?: string): Promise<void> {
218223

219224
// Connect the client
220225
await client.connect(transport);
226+
sessionId = transport.sessionId
227+
console.log('Transport created with session ID:', sessionId);
221228
console.log('Connected to MCP server');
222229
} catch (error) {
223230
console.error('Failed to connect:', error);
@@ -291,7 +298,12 @@ async function callTool(name: string, args: Record<string, unknown>): Promise<vo
291298
};
292299

293300
console.log(`Calling tool '${name}' with args:`, args);
294-
const result = await client.request(request, CallToolResultSchema);
301+
const onLastEventIdUpdate = (event: string) => {
302+
notificationsToolLastEventId = event;
303+
};
304+
const result = await client.request(request, CallToolResultSchema, {
305+
resumptionToken: notificationsToolLastEventId, onresumptiontoken: onLastEventIdUpdate
306+
});
295307

296308
console.log('Tool result:');
297309
result.content.forEach(item => {

src/examples/server/simpleStreamableHttp.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -181,14 +181,18 @@ server.tool(
181181

182182
while (count === 0 || counter < count) {
183183
counter++;
184-
await sendNotification({
185-
method: "notifications/message",
186-
params: {
187-
level: "info",
188-
data: `Periodic notification #${counter} at ${new Date().toISOString()}`
189-
}
190-
});
191-
184+
try {
185+
await sendNotification({
186+
method: "notifications/message",
187+
params: {
188+
level: "info",
189+
data: `Periodic notification #${counter} at ${new Date().toISOString()}`
190+
}
191+
});
192+
}
193+
catch (error) {
194+
console.error("Error sending notification:", error);
195+
}
192196
// Wait for the specified interval
193197
await sleep(interval);
194198
}

0 commit comments

Comments
 (0)