Skip to content

Commit 583202f

Browse files
committed
address client comments
1 parent f9f3807 commit 583202f

File tree

1 file changed

+29
-24
lines changed

1 file changed

+29
-24
lines changed

src/client/streamableHttp.ts

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
1-
import { log } from "node:console";
21
import { Transport } from "../shared/transport.js";
32
import { isJSONRPCNotification, JSONRPCMessage, JSONRPCMessageSchema } from "../types.js";
43
import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js";
54
import { EventSourceParserStream } from "eventsource-parser/stream";
65

6+
// Default reconnection options for StreamableHTTP connections
7+
const DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS: StreamableHTTPReconnectionOptions = {
8+
initialReconnectionDelay: 1000,
9+
maxReconnectionDelay: 30000,
10+
reconnectionDelayGrowFactor: 1.5,
11+
maxRetries: 2,
12+
};
13+
714
export class StreamableHTTPError extends Error {
815
constructor(
916
public readonly code: number | undefined,
@@ -13,6 +20,16 @@ export class StreamableHTTPError extends Error {
1320
}
1421
}
1522

23+
/**
24+
* Options for starting or authenticating an SSE connection
25+
*/
26+
export interface StartSSEOptions {
27+
/**
28+
* The ID of the last received event, used for resuming a disconnected stream
29+
*/
30+
lastEventId?: string;
31+
}
32+
1633
/**
1734
* Configuration options for reconnection behavior of the StreamableHTTPClientTransport.
1835
*/
@@ -37,7 +54,7 @@ export interface StreamableHTTPReconnectionOptions {
3754

3855
/**
3956
* Maximum number of reconnection attempts before giving up.
40-
* Default is 0 (unlimited).
57+
* Default is 2.
4158
*/
4259
maxRetries: number;
4360
}
@@ -97,7 +114,7 @@ export class StreamableHTTPClientTransport implements Transport {
97114
this._url = url;
98115
this._requestInit = opts?.requestInit;
99116
this._authProvider = opts?.authProvider;
100-
this._reconnectionOptions = opts?.reconnectionOptions || this._defaultReconnectionOptions;
117+
this._reconnectionOptions = opts?.reconnectionOptions ?? DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS;
101118
}
102119

103120
private async _authThenStart(): Promise<void> {
@@ -117,7 +134,7 @@ export class StreamableHTTPClientTransport implements Transport {
117134
throw new UnauthorizedError();
118135
}
119136

120-
return await this._startOrAuthStandaloneSSE();
137+
return await this._startOrAuthStandaloneSSE({ lastEventId: undefined });
121138
}
122139

123140
private async _commonHeaders(): Promise<Headers> {
@@ -138,7 +155,9 @@ export class StreamableHTTPClientTransport implements Transport {
138155
);
139156
}
140157

141-
private async _startOrAuthStandaloneSSE(lastEventId?: string): Promise<void> {
158+
159+
private async _startOrAuthStandaloneSSE(options: StartSSEOptions): Promise<void> {
160+
const { lastEventId } = options;
142161
try {
143162
// Try to open an initial SSE stream with GET to listen for server messages
144163
// This is optional according to the spec - server may not support it
@@ -181,19 +200,9 @@ export class StreamableHTTPClientTransport implements Transport {
181200
}
182201
}
183202

184-
// Default reconnection options
185-
private readonly _defaultReconnectionOptions: StreamableHTTPReconnectionOptions = {
186-
initialReconnectionDelay: 1000,
187-
maxReconnectionDelay: 30000,
188-
reconnectionDelayGrowFactor: 1.5,
189-
maxRetries: 2,
190-
};
191-
192-
// We no longer need global reconnection state as it will be maintained per stream
193203

194204
/**
195-
* Calculates the next reconnection delay using exponential backoff algorithm
196-
* with jitter for more effective reconnections in high load scenarios.
205+
* Calculates the next reconnection delay using backoff algorithm
197206
*
198207
* @param attempt Current reconnection attempt count for the specific stream
199208
* @returns Time to wait in milliseconds before next reconnection attempt
@@ -227,12 +236,11 @@ export class StreamableHTTPClientTransport implements Transport {
227236

228237
// Calculate next delay based on current attempt count
229238
const delay = this._getNextReconnectionDelay(attemptCount);
230-
log(`Reconnection attempt ${attemptCount + 1} in ${delay}ms...`);
231239

232240
// Schedule the reconnection
233241
setTimeout(() => {
234242
// Use the last event ID to resume where we left off
235-
this._startOrAuthStandaloneSSE(lastEventId).catch(error => {
243+
this._startOrAuthStandaloneSSE({ lastEventId }).catch(error => {
236244
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`));
237245
// Schedule another attempt if this one failed, incrementing the attempt counter
238246
this._scheduleReconnection(lastEventId, attemptCount + 1);
@@ -247,7 +255,7 @@ export class StreamableHTTPClientTransport implements Transport {
247255

248256
let lastEventId: string | undefined;
249257
const processStream = async () => {
250-
// this is the closest we can get to trying to cath network errors
258+
// this is the closest we can get to trying to catch network errors
251259
// if something happens reader will throw
252260
try {
253261
// Create a pipeline: binary stream -> text decoder -> SSE parser
@@ -279,7 +287,7 @@ export class StreamableHTTPClientTransport implements Transport {
279287
}
280288
} catch (error) {
281289
// Handle stream errors - likely a network disconnect
282-
this.onerror?.(new Error(`SSE stream disconnected: ${error instanceof Error ? error.message : String(error)}`));
290+
this.onerror?.(new Error(`SSE stream disconnected: ${error}`));
283291

284292
// Attempt to reconnect if the stream disconnects unexpectedly and we aren't closing
285293
if (this._abortController && !this._abortController.signal.aborted) {
@@ -375,7 +383,7 @@ export class StreamableHTTPClientTransport implements Transport {
375383
// if it's supported by the server
376384
if (isJSONRPCNotification(message) && message.method === "notifications/initialized") {
377385
// Start without a lastEventId since this is a fresh connection
378-
this._startOrAuthStandaloneSSE().catch(err => this.onerror?.(err));
386+
this._startOrAuthStandaloneSSE({ lastEventId: undefined }).catch(err => this.onerror?.(err));
379387
}
380388
return;
381389
}
@@ -390,9 +398,6 @@ export class StreamableHTTPClientTransport implements Transport {
390398

391399
if (hasRequests) {
392400
if (contentType?.includes("text/event-stream")) {
393-
// Handle SSE stream responses for requests
394-
// We use the same handler as standalone streams, which now supports
395-
// reconnection with the last event ID
396401
this._handleSseStream(response.body);
397402
} else if (contentType?.includes("application/json")) {
398403
// For non-streaming servers, we might get direct JSON responses

0 commit comments

Comments
 (0)