Skip to content

Commit 621e65c

Browse files
committed
Merge branch 'resumability' into ihrpr/resume-long-running-requests
2 parents 7aeb679 + 67a4595 commit 621e65c

File tree

6 files changed

+100
-100
lines changed

6 files changed

+100
-100
lines changed

src/client/streamableHttp.test.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ describe("StreamableHTTPClientTransport", () => {
164164
// We expect the 405 error to be caught and handled gracefully
165165
// This should not throw an error that breaks the transport
166166
await transport.start();
167-
await expect(transport["_startOrAuthSse"]()).resolves.not.toThrow("Failed to open SSE stream: Method Not Allowed");
167+
await expect(transport["_startOrAuthSse"]({})).resolves.not.toThrow("Failed to open SSE stream: Method Not Allowed");
168168
// Check that GET was attempted
169169
expect(global.fetch).toHaveBeenCalledWith(
170170
expect.anything(),
@@ -208,7 +208,7 @@ describe("StreamableHTTPClientTransport", () => {
208208
transport.onmessage = messageSpy;
209209

210210
await transport.start();
211-
await transport["_startOrAuthSse"]();
211+
await transport["_startOrAuthSse"]({});
212212

213213
// Give time for the SSE event to be processed
214214
await new Promise(resolve => setTimeout(resolve, 50));
@@ -289,10 +289,10 @@ describe("StreamableHTTPClientTransport", () => {
289289
// Verify options were set correctly (checking implementation details)
290290
// Access private properties for testing
291291
const transportInstance = transport as unknown as {
292-
_defaultReconnectionOptions: StreamableHTTPReconnectionOptions;
292+
_reconnectionOptions: StreamableHTTPReconnectionOptions;
293293
};
294-
expect(transportInstance._defaultReconnectionOptions.initialReconnectionDelay).toBe(1000);
295-
expect(transportInstance._defaultReconnectionOptions.maxRetries).toBe(2);
294+
expect(transportInstance._reconnectionOptions.initialReconnectionDelay).toBe(500);
295+
expect(transportInstance._reconnectionOptions.maxRetries).toBe(5);
296296
});
297297

298298
it("should pass lastEventId when reconnecting", async () => {
@@ -313,9 +313,9 @@ describe("StreamableHTTPClientTransport", () => {
313313
await transport.start();
314314
// Type assertion to access private method
315315
const transportWithPrivateMethods = transport as unknown as {
316-
_startOrAuthSse: (lastEventId?: string) => Promise<void>
316+
_startOrAuthSse: (options: { lastEventId?: string }) => Promise<void>
317317
};
318-
await transportWithPrivateMethods._startOrAuthSse("test-event-id");
318+
await transportWithPrivateMethods._startOrAuthSse({ lastEventId: "test-event-id" });
319319

320320
// Verify fetch was called with the lastEventId header
321321
expect(fetchSpy).toHaveBeenCalled();
@@ -382,7 +382,7 @@ describe("StreamableHTTPClientTransport", () => {
382382

383383
await transport.start();
384384

385-
await transport["_startOrAuthSse"]();
385+
await transport["_startOrAuthSse"]({});
386386
expect((actualReqInit.headers as Headers).get("x-custom-header")).toBe("CustomValue");
387387

388388
requestInit.headers["X-Custom-Header"] = "SecondCustomValue";

src/client/streamableHttp.ts

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
1-
import { log } from "node:console";
21
import { Transport } from "../shared/transport.js";
32
import { isJSONRPCNotification, JSONRPCMessage, JSONRPCMessageSchema } from "../types.js";
43
import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js";
54
import { EventSourceParserStream } from "eventsource-parser/stream";
65

6+
// Default reconnection options for StreamableHTTP connections
7+
const DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS: StreamableHTTPReconnectionOptions = {
8+
initialReconnectionDelay: 1000,
9+
maxReconnectionDelay: 30000,
10+
reconnectionDelayGrowFactor: 1.5,
11+
maxRetries: 2,
12+
};
13+
714
export class StreamableHTTPError extends Error {
815
constructor(
916
public readonly code: number | undefined,
@@ -13,6 +20,16 @@ export class StreamableHTTPError extends Error {
1320
}
1421
}
1522

23+
/**
24+
* Options for starting or authenticating an SSE connection
25+
*/
26+
export interface StartSSEOptions {
27+
/**
28+
* The ID of the last received event, used for resuming a disconnected stream
29+
*/
30+
lastEventId?: string;
31+
}
32+
1633
/**
1734
* Configuration options for reconnection behavior of the StreamableHTTPClientTransport.
1835
*/
@@ -37,7 +54,7 @@ export interface StreamableHTTPReconnectionOptions {
3754

3855
/**
3956
* Maximum number of reconnection attempts before giving up.
40-
* Default is 0 (unlimited).
57+
* Default is 2.
4158
*/
4259
maxRetries: number;
4360
}
@@ -102,8 +119,8 @@ export class StreamableHTTPClientTransport implements Transport {
102119
this._url = url;
103120
this._requestInit = opts?.requestInit;
104121
this._authProvider = opts?.authProvider;
105-
this._reconnectionOptions = opts?.reconnectionOptions || this._defaultReconnectionOptions;
106122
this._sessionId = opts?.sessionId;
123+
this._reconnectionOptions = opts?.reconnectionOptions ?? DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS;
107124
}
108125

109126
private async _authThenStart(): Promise<void> {
@@ -123,7 +140,7 @@ export class StreamableHTTPClientTransport implements Transport {
123140
throw new UnauthorizedError();
124141
}
125142

126-
return await this._startOrAuthSse();
143+
return await this._startOrAuthSse({ lastEventId: undefined });
127144
}
128145

129146
private async _commonHeaders(): Promise<Headers> {
@@ -144,7 +161,9 @@ export class StreamableHTTPClientTransport implements Transport {
144161
);
145162
}
146163

147-
private async _startOrAuthSse(lastEventId?: string): Promise<void> {
164+
165+
private async _startOrAuthSse(options: StartSSEOptions): Promise<void> {
166+
const { lastEventId } = options;
148167
try {
149168
// Try to open an initial SSE stream with GET to listen for server messages
150169
// This is optional according to the spec - server may not support it
@@ -187,19 +206,9 @@ export class StreamableHTTPClientTransport implements Transport {
187206
}
188207
}
189208

190-
// Default reconnection options
191-
private readonly _defaultReconnectionOptions: StreamableHTTPReconnectionOptions = {
192-
initialReconnectionDelay: 1000,
193-
maxReconnectionDelay: 30000,
194-
reconnectionDelayGrowFactor: 1.5,
195-
maxRetries: 2,
196-
};
197-
198-
// We no longer need global reconnection state as it will be maintained per stream
199209

200210
/**
201-
* Calculates the next reconnection delay using exponential backoff algorithm
202-
* with jitter for more effective reconnections in high load scenarios.
211+
* Calculates the next reconnection delay using backoff algorithm
203212
*
204213
* @param attempt Current reconnection attempt count for the specific stream
205214
* @returns Time to wait in milliseconds before next reconnection attempt
@@ -233,12 +242,11 @@ export class StreamableHTTPClientTransport implements Transport {
233242

234243
// Calculate next delay based on current attempt count
235244
const delay = this._getNextReconnectionDelay(attemptCount);
236-
log(`Reconnection attempt ${attemptCount + 1} in ${delay}ms...`);
237245

238246
// Schedule the reconnection
239247
setTimeout(() => {
240248
// Use the last event ID to resume where we left off
241-
this._startOrAuthSse(lastEventId).catch(error => {
249+
this._startOrAuthSse({ lastEventId }).catch(error => {
242250
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`));
243251
// Schedule another attempt if this one failed, incrementing the attempt counter
244252
this._scheduleReconnection(lastEventId, attemptCount + 1);
@@ -253,7 +261,7 @@ export class StreamableHTTPClientTransport implements Transport {
253261

254262
let lastEventId: string | undefined;
255263
const processStream = async () => {
256-
// this is the closest we can get to trying to cath network errors
264+
// this is the closest we can get to trying to catch network errors
257265
// if something happens reader will throw
258266
try {
259267
// Create a pipeline: binary stream -> text decoder -> SSE parser
@@ -286,7 +294,7 @@ export class StreamableHTTPClientTransport implements Transport {
286294
}
287295
} catch (error) {
288296
// Handle stream errors - likely a network disconnect
289-
this.onerror?.(new Error(`SSE stream disconnected: ${error instanceof Error ? error.message : String(error)}`));
297+
this.onerror?.(new Error(`SSE stream disconnected: ${error}`));
290298

291299
// Attempt to reconnect if the stream disconnects unexpectedly and we aren't closing
292300
if (this._abortController && !this._abortController.signal.aborted) {
@@ -343,7 +351,7 @@ export class StreamableHTTPClientTransport implements Transport {
343351
const { lastEventId, onLastEventIdUpdate } = options ?? {};
344352
if (lastEventId) {
345353
// If we have at last event ID, we need to reconnect the SSE stream
346-
this._startOrAuthSse(lastEventId).catch(err => this.onerror?.(err));
354+
this._startOrAuthSse({ lastEventId }).catch(err => this.onerror?.(err));
347355
return;
348356
}
349357

@@ -390,7 +398,7 @@ export class StreamableHTTPClientTransport implements Transport {
390398
// if it's supported by the server
391399
if (isJSONRPCNotification(message) && message.method === "notifications/initialized") {
392400
// Start without a lastEventId since this is a fresh connection
393-
this._startOrAuthSse().catch(err => this.onerror?.(err));
401+
this._startOrAuthSse({ lastEventId: undefined }).catch(err => this.onerror?.(err));
394402
}
395403
return;
396404
}

src/examples/server/simpleStreamableHttp.ts

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,18 @@ class InMemoryEventStore implements EventStore {
1212
/**
1313
* Generates a unique event ID for a given stream ID
1414
*/
15-
generateEventId(streamId: string): string {
15+
private generateEventId(streamId: string): string {
1616
return `${streamId}_${Date.now()}_${Math.random().toString(36).substring(2, 10)}`;
1717
}
1818

19-
getStreamIdFromEventId(eventId: string): string {
19+
private getStreamIdFromEventId(eventId: string): string {
2020
const parts = eventId.split('_');
2121
return parts.length > 0 ? parts[0] : '';
2222
}
2323

24-
2524
/**
2625
* Stores an event with a generated event ID
26+
* Implements EventStore.storeEvent
2727
*/
2828
async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
2929
const eventId = this.generateEventId(streamId);
@@ -33,17 +33,26 @@ class InMemoryEventStore implements EventStore {
3333
}
3434

3535
/**
36-
* Retrieves events that occurred after a specific event
36+
* Replays events that occurred after a specific event ID
37+
* Implements EventStore.replayEventsAfter
3738
*/
38-
async getEventsAfter(lastEventId: string): Promise<Array<{ eventId: string, message: JSONRPCMessage }>> {
39+
async replayEventsAfter(lastEventId: string,
40+
{ send }: { send: (eventId: string, message: JSONRPCMessage) => Promise<void> }
41+
): Promise<string> {
3942
if (!lastEventId || !this.events.has(lastEventId)) {
40-
return [];
43+
console.log(`No events found for lastEventId: ${lastEventId}`);
44+
return '';
4145
}
4246

4347
// Extract the stream ID from the event ID
44-
const streamId = lastEventId.split('_')[0];
45-
const result: Array<{ eventId: string, message: JSONRPCMessage }> = [];
48+
const streamId = this.getStreamIdFromEventId(lastEventId);
49+
if (!streamId) {
50+
console.log(`Could not extract streamId from lastEventId: ${lastEventId}`);
51+
return '';
52+
}
53+
4654
let foundLastEvent = false;
55+
let eventCount = 0;
4756

4857
// Sort events by eventId for chronological ordering
4958
const sortedEvents = [...this.events.entries()].sort((a, b) => a[0].localeCompare(b[0]));
@@ -54,21 +63,21 @@ class InMemoryEventStore implements EventStore {
5463
continue;
5564
}
5665

57-
// Start collecting events after we find the lastEventId
66+
// Start sending events after we find the lastEventId
5867
if (eventId === lastEventId) {
5968
foundLastEvent = true;
6069
continue;
6170
}
6271

6372
if (foundLastEvent) {
64-
result.push({ eventId, message });
73+
await send(eventId, message);
74+
eventCount++;
6575
}
6676
}
6777

68-
console.log(`Found ${result.length} events after ${lastEventId} for replay`);
69-
return result;
78+
console.log(`Replayed ${eventCount} events after ${lastEventId} for stream ${streamId}`);
79+
return streamId;
7080
}
71-
7281
}
7382

7483
// Create an MCP server with implementation details

src/integration-tests/taskResumability.test.ts

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ import { z } from 'zod';
1414
class InMemoryEventStore implements EventStore {
1515
private events: Map<string, { streamId: string, message: JSONRPCMessage }> = new Map();
1616

17-
generateEventId(streamId: string): string {
17+
private generateEventId(streamId: string): string {
1818
return `${streamId}_${Date.now()}_${Math.random().toString(36).substring(2, 10)}`;
1919
}
2020

21-
getStreamIdFromEventId(eventId: string): string {
21+
private getStreamIdFromEventId(eventId: string): string {
2222
const parts = eventId.split('_');
2323
return parts.length > 0 ? parts[0] : '';
2424
}
@@ -29,14 +29,18 @@ class InMemoryEventStore implements EventStore {
2929
return eventId;
3030
}
3131

32-
async getEventsAfter(lastEventId: string): Promise<Array<{ eventId: string, message: JSONRPCMessage }>> {
32+
async replayEventsAfter(lastEventId: string,
33+
{ send }: { send: (eventId: string, message: JSONRPCMessage) => Promise<void> }
34+
): Promise<string> {
3335
if (!lastEventId || !this.events.has(lastEventId)) {
34-
return [];
36+
return '';
3537
}
3638

3739
// Extract the stream ID from the event ID
3840
const streamId = this.getStreamIdFromEventId(lastEventId);
39-
const result: Array<{ eventId: string, message: JSONRPCMessage }> = [];
41+
if (!streamId) {
42+
return '';
43+
}
4044
let foundLastEvent = false;
4145

4246
// Sort events by eventId for chronological ordering
@@ -55,11 +59,11 @@ class InMemoryEventStore implements EventStore {
5559
}
5660

5761
if (foundLastEvent) {
58-
result.push({ eventId, message });
62+
await send(eventId, message);
5963
}
6064
}
6165

62-
return result;
66+
return streamId;
6367
}
6468
}
6569

src/server/streamableHttp.test.ts

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { createServer, type Server, IncomingMessage, ServerResponse } from "node:http";
22
import { AddressInfo } from "node:net";
33
import { randomUUID } from "node:crypto";
4-
import { EventStore, StreamableHTTPServerTransport } from "./streamableHttp.js";
4+
import { EventStore, StreamableHTTPServerTransport, EventId, StreamId } from "./streamableHttp.js";
55
import { McpServer } from "./mcp.js";
66
import { CallToolResult, JSONRPCMessage } from "../types.js";
77
import { z } from "zod";
@@ -912,31 +912,25 @@ describe("StreamableHTTPServerTransport with resumability", () => {
912912

913913
// Simple implementation of EventStore
914914
const eventStore: EventStore = {
915-
generateEventId(streamId: string): string {
916-
return `${streamId}_${randomUUID()}`;
917-
},
918-
getStreamIdFromEventId(eventId: string): string {
919-
return eventId.split('_')[0]; // Extract stream ID from the event ID
920-
},
915+
921916
async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
922-
const eventId = this.generateEventId(streamId);
917+
const eventId = `${streamId}_${randomUUID()}`;
923918
storedEvents.set(eventId, { eventId, message });
924919
return eventId;
925920
},
926921

927-
async getEventsAfter(lastEventId: string): Promise<Array<{ eventId: string, message: JSONRPCMessage }>> {
928-
const streamId = lastEventId.split('_')[0]; // Extract stream ID from the event ID
929-
const result: Array<{ eventId: string, message: JSONRPCMessage }> = [];
930-
922+
async replayEventsAfter(lastEventId: EventId, { send }: {
923+
send: (eventId: EventId, message: JSONRPCMessage) => Promise<void>
924+
}): Promise<StreamId> {
925+
const streamId = lastEventId.split('_')[0];
926+
// Extract stream ID from the event ID
931927
// For test simplicity, just return all events with matching streamId that aren't the lastEventId
932-
// This avoids issues with event ordering in tests
933928
for (const [eventId, { message }] of storedEvents.entries()) {
934929
if (eventId.startsWith(streamId) && eventId !== lastEventId) {
935-
result.push({ eventId, message });
930+
await send(eventId, message);
936931
}
937932
}
938-
939-
return result;
933+
return streamId;
940934
},
941935
};
942936

0 commit comments

Comments
 (0)