Skip to content

Commit 6374f43

Browse files
committed
limit on retries
1 parent 3bd2882 commit 6374f43

File tree

4 files changed

+216
-109
lines changed

4 files changed

+216
-109
lines changed

src/client/streamableHttp.test.ts

Lines changed: 80 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { StreamableHTTPClientTransport } from "./streamableHttp.js";
1+
import { StreamableHTTPClientTransport, StreamableHTTPReconnectionOptions } from "./streamableHttp.js";
22
import { JSONRPCMessage } from "../types.js";
33

44

@@ -275,59 +275,62 @@ describe("StreamableHTTPClientTransport", () => {
275275
})).toBe(true);
276276
});
277277

278-
it("should handle reconnection with last-event-id for resumability", async () => {
279-
// Set up a stream that will send an event with ID then error
280-
const encoder = new TextEncoder();
281-
const stream = new ReadableStream({
282-
start(controller) {
283-
const event = "id: event-123\nevent: message\ndata: {\"jsonrpc\": \"2.0\", \"method\": \"serverNotification\", \"params\": {}}\n\n";
284-
controller.enqueue(encoder.encode(event));
285-
// Simulate network error
286-
setTimeout(() => {
287-
controller.error(new Error("Network error"));
288-
}, 10);
278+
it("should support custom reconnection options", () => {
279+
// Create a transport with custom reconnection options
280+
transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp"), {
281+
reconnectionOptions: {
282+
initialReconnectionDelay: 500,
283+
maxReconnectionDelay: 10000,
284+
reconnectionDelayGrowFactor: 2,
285+
maxRetries: 5,
289286
}
290287
});
291288

292-
// Mock the initial connection
293-
(global.fetch as jest.Mock).mockResolvedValueOnce({
289+
// Verify options were set correctly (checking implementation details)
290+
// Access private properties for testing
291+
const transportInstance = transport as unknown as {
292+
_defaultReconnectionOptions: StreamableHTTPReconnectionOptions;
293+
};
294+
expect(transportInstance._defaultReconnectionOptions.initialReconnectionDelay).toBe(1000);
295+
expect(transportInstance._defaultReconnectionOptions.maxRetries).toBe(2);
296+
});
297+
298+
it("should pass lastEventId when reconnecting", async () => {
299+
// Create a fresh transport
300+
transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp"));
301+
302+
// Mock fetch to verify headers sent
303+
const fetchSpy = global.fetch as jest.Mock;
304+
fetchSpy.mockReset();
305+
fetchSpy.mockResolvedValue({
294306
ok: true,
295307
status: 200,
296308
headers: new Headers({ "content-type": "text/event-stream" }),
297-
body: stream
309+
body: new ReadableStream()
298310
});
299311

300-
const errorSpy = jest.fn();
301-
transport.onerror = errorSpy;
302-
312+
// Call the reconnect method directly with a lastEventId
303313
await transport.start();
304-
await transport["_startOrAuthStandaloneSSE"]();
305-
306-
// Let the stream process and error
307-
await new Promise(resolve => setTimeout(resolve, 50));
308-
309-
// Verify error was caught
310-
expect(errorSpy).toHaveBeenCalled();
311-
312-
// Mock the reconnection (the transport should try to reconnect after error)
313-
(global.fetch as jest.Mock).mockResolvedValueOnce({
314-
ok: true,
315-
status: 200,
316-
headers: new Headers({ "content-type": "text/event-stream" }),
317-
body: null
318-
});
319-
320-
// Allow time for automatic reconnection
321-
await new Promise(resolve => setTimeout(resolve, 1100)); // > 1 second delay
322-
323-
// Verify that the client attempted to reconnect with the last-event-id
324-
const calls = (global.fetch as jest.Mock).mock.calls;
325-
const lastCall = calls[calls.length - 1];
326-
expect(lastCall[1].method).toBe("GET");
327-
expect(lastCall[1].headers.get("last-event-id")).toBe("event-123");
314+
// Type assertion to access private method
315+
const transportWithPrivateMethods = transport as unknown as {
316+
_startOrAuthStandaloneSSE: (lastEventId?: string) => Promise<void>
317+
};
318+
await transportWithPrivateMethods._startOrAuthStandaloneSSE("test-event-id");
319+
320+
// Verify fetch was called with the lastEventId header
321+
expect(fetchSpy).toHaveBeenCalled();
322+
const fetchCall = fetchSpy.mock.calls[0];
323+
const headers = fetchCall[1].headers;
324+
expect(headers.get("last-event-id")).toBe("test-event-id");
328325
});
329326

330327
it("should throw error when invalid content-type is received", async () => {
328+
// Clear any previous state from other tests
329+
jest.clearAllMocks();
330+
331+
// Create a fresh transport instance
332+
transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp"));
333+
331334
const message: JSONRPCMessage = {
332335
jsonrpc: "2.0",
333336
method: "test",
@@ -337,7 +340,7 @@ describe("StreamableHTTPClientTransport", () => {
337340

338341
const stream = new ReadableStream({
339342
start(controller) {
340-
controller.enqueue("invalid text response");
343+
controller.enqueue(new TextEncoder().encode("invalid text response"));
341344
controller.close();
342345
}
343346
});
@@ -389,4 +392,38 @@ describe("StreamableHTTPClientTransport", () => {
389392

390393
expect(global.fetch).toHaveBeenCalledTimes(2);
391394
});
395+
396+
397+
it("should have exponential backoff with configurable maxRetries", () => {
398+
// This test verifies the maxRetries and backoff calculation directly
399+
400+
// Create transport with specific options for testing
401+
transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp"), {
402+
reconnectionOptions: {
403+
initialReconnectionDelay: 100,
404+
maxReconnectionDelay: 5000,
405+
reconnectionDelayGrowFactor: 2,
406+
maxRetries: 3,
407+
}
408+
});
409+
410+
// Get access to the internal implementation
411+
const getDelay = transport["_getNextReconnectionDelay"].bind(transport);
412+
413+
// First retry - should use initial delay
414+
expect(getDelay(0)).toBe(100);
415+
416+
// Second retry - should double (2^1 * 100 = 200)
417+
expect(getDelay(1)).toBe(200);
418+
419+
// Third retry - should double again (2^2 * 100 = 400)
420+
expect(getDelay(2)).toBe(400);
421+
422+
// Fourth retry - should double again (2^3 * 100 = 800)
423+
expect(getDelay(3)).toBe(800);
424+
425+
// Tenth retry - should be capped at maxReconnectionDelay
426+
expect(getDelay(10)).toBe(5000);
427+
});
428+
392429
});

src/client/streamableHttp.ts

Lines changed: 124 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { log } from "node:console";
12
import { Transport } from "../shared/transport.js";
23
import { isJSONRPCNotification, JSONRPCMessage, JSONRPCMessageSchema } from "../types.js";
34
import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js";
@@ -12,6 +13,35 @@ export class StreamableHTTPError extends Error {
1213
}
1314
}
1415

16+
/**
17+
* Configuration options for reconnection behavior of the StreamableHTTPClientTransport.
18+
*/
19+
export interface StreamableHTTPReconnectionOptions {
20+
/**
21+
* Maximum backoff time between reconnection attempts in milliseconds.
22+
* Default is 30000 (30 seconds).
23+
*/
24+
maxReconnectionDelay: number;
25+
26+
/**
27+
* Initial backoff time between reconnection attempts in milliseconds.
28+
* Default is 1000 (1 second).
29+
*/
30+
initialReconnectionDelay: number;
31+
32+
/**
33+
* The factor by which the reconnection delay increases after each attempt.
34+
* Default is 1.5.
35+
*/
36+
reconnectionDelayGrowFactor: number;
37+
38+
/**
39+
* Maximum number of reconnection attempts before giving up.
40+
* Default is 0 (unlimited).
41+
*/
42+
maxRetries: number;
43+
}
44+
1545
/**
1646
* Configuration options for the `StreamableHTTPClientTransport`.
1747
*/
@@ -36,6 +66,11 @@ export type StreamableHTTPClientTransportOptions = {
3666
* Customizes HTTP requests to the server.
3767
*/
3868
requestInit?: RequestInit;
69+
70+
/**
71+
* Options to configure the reconnection behavior.
72+
*/
73+
reconnectionOptions?: StreamableHTTPReconnectionOptions;
3974
};
4075

4176
/**
@@ -49,6 +84,7 @@ export class StreamableHTTPClientTransport implements Transport {
4984
private _requestInit?: RequestInit;
5085
private _authProvider?: OAuthClientProvider;
5186
private _sessionId?: string;
87+
private _reconnectionOptions: StreamableHTTPReconnectionOptions;
5288

5389
onclose?: () => void;
5490
onerror?: (error: Error) => void;
@@ -61,6 +97,7 @@ export class StreamableHTTPClientTransport implements Transport {
6197
this._url = url;
6298
this._requestInit = opts?.requestInit;
6399
this._authProvider = opts?.authProvider;
100+
this._reconnectionOptions = opts?.reconnectionOptions || this._defaultReconnectionOptions;
64101
}
65102

66103
private async _authThenStart(): Promise<void> {
@@ -136,36 +173,101 @@ export class StreamableHTTPClientTransport implements Transport {
136173
`Failed to open SSE stream: ${response.statusText}`,
137174
);
138175
}
139-
// Successful connection, handle the SSE stream as a standalone listener
176+
140177
this._handleSseStream(response.body);
141178
} catch (error) {
142179
this.onerror?.(error as Error);
143180
throw error;
144181
}
145182
}
146183

184+
// Default reconnection options
185+
private readonly _defaultReconnectionOptions: StreamableHTTPReconnectionOptions = {
186+
initialReconnectionDelay: 1000,
187+
maxReconnectionDelay: 30000,
188+
reconnectionDelayGrowFactor: 1.5,
189+
maxRetries: 2,
190+
};
191+
192+
// We no longer need global reconnection state as it will be maintained per stream
193+
194+
/**
195+
* Calculates the next reconnection delay using exponential backoff algorithm
196+
* with jitter for more effective reconnections in high load scenarios.
197+
*
198+
* @param attempt Current reconnection attempt count for the specific stream
199+
* @returns Time to wait in milliseconds before next reconnection attempt
200+
*/
201+
private _getNextReconnectionDelay(attempt: number): number {
202+
// Access default values directly, ensuring they're never undefined
203+
const initialDelay = this._reconnectionOptions.initialReconnectionDelay;
204+
const growFactor = this._reconnectionOptions.reconnectionDelayGrowFactor;
205+
const maxDelay = this._reconnectionOptions.maxReconnectionDelay;
206+
207+
// Cap at maximum delay
208+
return Math.min(initialDelay * Math.pow(growFactor, attempt), maxDelay);
209+
210+
}
211+
212+
/**
213+
* Schedule a reconnection attempt with exponential backoff
214+
*
215+
* @param lastEventId The ID of the last received event for resumability
216+
* @param attemptCount Current reconnection attempt count for this specific stream
217+
*/
218+
private _scheduleReconnection(lastEventId: string, attemptCount = 0): void {
219+
// Use provided options or default options
220+
const maxRetries = this._reconnectionOptions.maxRetries;
221+
222+
// Check if we've exceeded maximum retry attempts
223+
if (maxRetries > 0 && attemptCount >= maxRetries) {
224+
this.onerror?.(new Error(`Maximum reconnection attempts (${maxRetries}) exceeded.`));
225+
return;
226+
}
227+
228+
// Calculate next delay based on current attempt count
229+
const delay = this._getNextReconnectionDelay(attemptCount);
230+
log(`Reconnection attempt ${attemptCount + 1} in ${delay}ms...`);
231+
232+
// Schedule the reconnection
233+
setTimeout(() => {
234+
// Use the last event ID to resume where we left off
235+
this._startOrAuthStandaloneSSE(lastEventId).catch(error => {
236+
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`));
237+
// Schedule another attempt if this one failed, incrementing the attempt counter
238+
this._scheduleReconnection(lastEventId, attemptCount + 1);
239+
});
240+
}, delay);
241+
}
242+
147243
private _handleSseStream(stream: ReadableStream<Uint8Array> | null): void {
148244
if (!stream) {
149245
return;
150246
}
151247

152248
let lastEventId: string | undefined;
153-
154249
const processStream = async () => {
155-
// Create a pipeline: binary stream -> text decoder -> SSE parser
156-
const eventStream = stream
157-
.pipeThrough(new TextDecoderStream())
158-
.pipeThrough(new EventSourceParserStream());
159-
250+
// this is the closest we can get to trying to cath network errors
251+
// if something happens reader will throw
160252
try {
161-
for await (const event of eventStream) {
253+
// Create a pipeline: binary stream -> text decoder -> SSE parser
254+
const reader = stream
255+
.pipeThrough(new TextDecoderStream())
256+
.pipeThrough(new EventSourceParserStream())
257+
.getReader();
258+
259+
260+
while (true) {
261+
const { value: event, done } = await reader.read();
262+
if (done) {
263+
break;
264+
}
265+
162266
// Update last event ID if provided
163267
if (event.id) {
164268
lastEventId = event.id;
165269
}
166270

167-
// Handle message events (default event type is undefined per docs)
168-
// or explicit 'message' event type
169271
if (!event.event || event.event === "message") {
170272
try {
171273
const message = JSONRPCMessageSchema.parse(JSON.parse(event.data));
@@ -179,31 +281,22 @@ export class StreamableHTTPClientTransport implements Transport {
179281
// Handle stream errors - likely a network disconnect
180282
this.onerror?.(new Error(`SSE stream disconnected: ${error instanceof Error ? error.message : String(error)}`));
181283

182-
// Attempt to reconnect if the stream disconnects unexpectedly
183-
// Wait a short time before reconnecting to avoid rapid reconnection loops
284+
// Attempt to reconnect if the stream disconnects unexpectedly and we aren't closing
184285
if (this._abortController && !this._abortController.signal.aborted) {
185-
setTimeout(() => {
186-
// Use the last event ID to resume where we left off
187-
this._startOrAuthStandaloneSSE(lastEventId).catch(reconnectError => {
188-
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${reconnectError instanceof Error ? reconnectError.message : String(reconnectError)}`));
189-
});
190-
}, 1000); // 1 second delay before reconnection attempt
286+
// Use the exponential backoff reconnection strategy
287+
if (lastEventId !== undefined) {
288+
try {
289+
this._scheduleReconnection(lastEventId, 0);
290+
}
291+
catch (error) {
292+
this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`));
293+
294+
}
295+
}
191296
}
192297
}
193298
};
194-
195-
processStream().catch(err => {
196-
this.onerror?.(err);
197-
198-
// Try to reconnect on unexpected errors
199-
if (this._abortController && !this._abortController.signal.aborted) {
200-
setTimeout(() => {
201-
this._startOrAuthStandaloneSSE(lastEventId).catch(reconnectError => {
202-
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${reconnectError instanceof Error ? reconnectError.message : String(reconnectError)}`));
203-
});
204-
}, 1000);
205-
}
206-
});
299+
processStream();
207300
}
208301

209302
async start() {

0 commit comments

Comments
 (0)