Skip to content

Commit 8bc49f9

Browse files
committed
change names to resumptionToken instead of last event id
1 parent f1c5ef1 commit 8bc49f9

File tree

2 files changed

+26
-20
lines changed

2 files changed

+26
-20
lines changed

src/client/streamableHttp.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,9 +313,9 @@ describe("StreamableHTTPClientTransport", () => {
313313
await transport.start();
314314
// Type assertion to access private method
315315
const transportWithPrivateMethods = transport as unknown as {
316-
_startOrAuthSse: (options: { lastEventId?: string }) => Promise<void>
316+
_startOrAuthSse: (options: { resumptionToken?: string }) => Promise<void>
317317
};
318-
await transportWithPrivateMethods._startOrAuthSse({ lastEventId: "test-event-id" });
318+
await transportWithPrivateMethods._startOrAuthSse({ resumptionToken: "test-event-id" });
319319

320320
// Verify fetch was called with the lastEventId header
321321
expect(fetchSpy).toHaveBeenCalled();

src/client/streamableHttp.ts

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,18 @@ export class StreamableHTTPError extends Error {
2525
*/
2626
interface StartSSEOptions {
2727
/**
28-
* The ID of the last received event, used for resuming a disconnected stream
28+
* The resumption token used to continue long-running requests that were interrupted.
29+
*
30+
* This allows clients to reconnect and continue from where they left off.
2931
*/
30-
lastEventId?: string;
32+
resumptionToken?: string;
3133

3234
/**
33-
* The callback function that is invoked when the last event ID changes
35+
* A callback that is invoked when the resumption token changes.
36+
*
37+
* This allows clients to persist the latest token for potential reconnection.
3438
*/
35-
onLastEventIdUpdate?: (event: string) => void
39+
onresumptiontoken?: (token: string) => void;
3640

3741
/**
3842
* Override Message ID to associate with the replay message
@@ -152,7 +156,7 @@ export class StreamableHTTPClientTransport implements Transport {
152156
throw new UnauthorizedError();
153157
}
154158

155-
return await this._startOrAuthSse({ lastEventId: undefined });
159+
return await this._startOrAuthSse({ resumptionToken: undefined });
156160
}
157161

158162
private async _commonHeaders(): Promise<Headers> {
@@ -175,16 +179,16 @@ export class StreamableHTTPClientTransport implements Transport {
175179

176180

177181
private async _startOrAuthSse(options: StartSSEOptions): Promise<void> {
178-
const { lastEventId } = options;
182+
const { resumptionToken } = options;
179183
try {
180184
// Try to open an initial SSE stream with GET to listen for server messages
181185
// This is optional according to the spec - server may not support it
182186
const headers = await this._commonHeaders();
183187
headers.set("Accept", "text/event-stream");
184188

185189
// Include Last-Event-ID header for resumable streams if provided
186-
if (lastEventId) {
187-
headers.set("last-event-id", lastEventId);
190+
if (resumptionToken) {
191+
headers.set("last-event-id", resumptionToken);
188192
}
189193

190194
const response = await fetch(this._url, {
@@ -270,7 +274,7 @@ export class StreamableHTTPClientTransport implements Transport {
270274
if (!stream) {
271275
return;
272276
}
273-
const { onLastEventIdUpdate, replayMessageId } = options;
277+
const { onresumptiontoken, replayMessageId } = options;
274278

275279
let lastEventId: string | undefined;
276280
const processStream = async () => {
@@ -293,7 +297,7 @@ export class StreamableHTTPClientTransport implements Transport {
293297
// Update last event ID if provided
294298
if (event.id) {
295299
lastEventId = event.id;
296-
onLastEventIdUpdate?.(lastEventId);
300+
onresumptiontoken?.(event.id);
297301
}
298302

299303
if (!event.event || event.event === "message") {
@@ -317,7 +321,11 @@ export class StreamableHTTPClientTransport implements Transport {
317321
// Use the exponential backoff reconnection strategy
318322
if (lastEventId !== undefined) {
319323
try {
320-
this._scheduleReconnection(options, 0);
324+
this._scheduleReconnection({
325+
resumptionToken: lastEventId,
326+
onresumptiontoken,
327+
replayMessageId
328+
}, 0);
321329
}
322330
catch (error) {
323331
this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`));
@@ -363,13 +371,11 @@ export class StreamableHTTPClientTransport implements Transport {
363371

364372
async send(message: JSONRPCMessage | JSONRPCMessage[], options?: { resumptionToken?: string, onresumptiontoken?: (token: string) => void }): Promise<void> {
365373
try {
366-
// If client passes in a lastEventId in the request options, we need to reconnect the SSE stream
367-
const lastEventId = options?.resumptionToken
368-
const onLastEventIdUpdate = options?.onresumptiontoken;
369-
if (lastEventId) {
374+
const { resumptionToken, onresumptiontoken } = options || {};
370375

376+
if (resumptionToken) {
371377
// If we have at last event ID, we need to reconnect the SSE stream
372-
this._startOrAuthSse({ lastEventId, replayMessageId: isJSONRPCRequest(message) ? message.id : undefined }).catch(err => this.onerror?.(err));
378+
this._startOrAuthSse({ resumptionToken, replayMessageId: isJSONRPCRequest(message) ? message.id : undefined }).catch(err => this.onerror?.(err));
373379
return;
374380
}
375381

@@ -416,7 +422,7 @@ export class StreamableHTTPClientTransport implements Transport {
416422
// if it's supported by the server
417423
if (isJSONRPCNotification(message) && message.method === "notifications/initialized") {
418424
// Start without a lastEventId since this is a fresh connection
419-
this._startOrAuthSse({ lastEventId: undefined }).catch(err => this.onerror?.(err));
425+
this._startOrAuthSse({ resumptionToken: undefined }).catch(err => this.onerror?.(err));
420426
}
421427
return;
422428
}
@@ -434,7 +440,7 @@ export class StreamableHTTPClientTransport implements Transport {
434440
// Handle SSE stream responses for requests
435441
// We use the same handler as standalone streams, which now supports
436442
// reconnection with the last event ID
437-
this._handleSseStream(response.body, { onLastEventIdUpdate });
443+
this._handleSseStream(response.body, { onresumptiontoken });
438444
} else if (contentType?.includes("application/json")) {
439445
// For non-streaming servers, we might get direct JSON responses
440446
const data = await response.json();

0 commit comments

Comments
 (0)