Skip to content

Commit d4ad976

Browse files
committed
use EventSourceParserStream
1 parent f1827ce commit d4ad976

File tree

2 files changed

+21
-111
lines changed

2 files changed

+21
-111
lines changed

src/client/streamableHttp.test.ts

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -124,46 +124,6 @@ describe("StreamableHTTPClientTransport", () => {
124124
expect(errorSpy).toHaveBeenCalled();
125125
});
126126

127-
it("should handle session termination via DELETE request", async () => {
128-
// First set the session ID by mocking initialization
129-
(global.fetch as jest.Mock).mockResolvedValueOnce({
130-
ok: true,
131-
status: 200,
132-
headers: new Headers({ "mcp-session-id": "session-to-terminate" }),
133-
});
134-
135-
await transport.send({
136-
jsonrpc: "2.0",
137-
method: "initialize",
138-
params: {
139-
clientInfo: { name: "test-client", version: "1.0" },
140-
protocolVersion: "2025-03-26"
141-
},
142-
id: "init-id"
143-
} as JSONRPCMessage);
144-
145-
// Mock DELETE request for session termination
146-
(global.fetch as jest.Mock).mockResolvedValueOnce({
147-
ok: true,
148-
status: 200,
149-
headers: new Headers()
150-
});
151-
152-
const closeSpy = jest.fn();
153-
transport.onclose = closeSpy;
154-
155-
await transport.close();
156-
157-
// Check that DELETE request was sent
158-
const calls = (global.fetch as jest.Mock).mock.calls;
159-
const lastCall = calls[calls.length - 1];
160-
expect(lastCall[1].method).toBe("DELETE");
161-
// The headers may be a plain object in tests
162-
expect(lastCall[1].headers["mcp-session-id"]).toBe("session-to-terminate");
163-
164-
expect(closeSpy).toHaveBeenCalled();
165-
});
166-
167127
it("should handle non-streaming JSON response", async () => {
168128
const message: JSONRPCMessage = {
169129
jsonrpc: "2.0",

src/client/streamableHttp.ts

Lines changed: 21 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { Transport } from "../shared/transport.js";
22
import { JSONRPCMessage, JSONRPCMessageSchema } from "../types.js";
33
import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js";
44
import { type ErrorEvent } from "eventsource";
5-
5+
import { EventSourceParserStream } from 'eventsource-parser/stream';
66
export class StreamableHTTPError extends Error {
77
constructor(
88
public readonly code: number | undefined,
@@ -45,7 +45,7 @@ export type StreamableHTTPClientTransportOptions = {
4545
* for receiving messages.
4646
*/
4747
export class StreamableHTTPClientTransport implements Transport {
48-
private _activeStreams: Map<string, ReadableStreamDefaultReader<Uint8Array>> = new Map();
48+
private _activeStreams: Map<string, ReadableStreamDefaultReader<any>> = new Map();
4949
private _abortController?: AbortController;
5050
private _url: URL;
5151
private _requestInit?: RequestInit;
@@ -186,30 +186,6 @@ export class StreamableHTTPClientTransport implements Transport {
186186
// Abort any pending requests
187187
this._abortController?.abort();
188188

189-
// If we have a session ID, send a DELETE request to explicitly terminate the session
190-
if (this._sessionId) {
191-
try {
192-
const commonHeaders = await this._commonHeaders();
193-
const response = await fetch(this._url, {
194-
method: "DELETE",
195-
headers: commonHeaders,
196-
signal: this._abortController?.signal,
197-
});
198-
199-
if (!response.ok) {
200-
// Server might respond with 405 if it doesn't support explicit session termination
201-
// We don't throw an error in that case
202-
if (response.status !== 405) {
203-
const text = await response.text().catch(() => null);
204-
throw new Error(`Error terminating session (HTTP ${response.status}): ${text}`);
205-
}
206-
}
207-
} catch (error) {
208-
// We still want to invoke onclose even if the session termination fails
209-
this.onerror?.(error as Error);
210-
}
211-
}
212-
213189
this.onclose?.();
214190
}
215191

@@ -300,62 +276,36 @@ export class StreamableHTTPClientTransport implements Transport {
300276
return;
301277
}
302278

303-
// Set up stream handling for server-sent events
304-
const reader = stream.getReader();
279+
// Create a pipeline: binary stream -> text decoder -> SSE parser
280+
const eventStream = stream
281+
.pipeThrough(new TextDecoderStream())
282+
.pipeThrough(new EventSourceParserStream());
283+
284+
const reader = eventStream.getReader();
305285
this._activeStreams.set(streamId, reader);
306-
const decoder = new TextDecoder();
307-
let buffer = '';
308286

309287
const processStream = async () => {
310288
try {
311289
while (true) {
312-
const { done, value } = await reader.read();
290+
const { done, value: event } = await reader.read();
313291
if (done) {
314-
// Stream closed by server
315292
this._activeStreams.delete(streamId);
316293
break;
317294
}
318295

319-
buffer += decoder.decode(value, { stream: true });
320-
321-
// Process SSE messages in the buffer
322-
const events = buffer.split('\n\n');
323-
buffer = events.pop() || '';
324-
325-
for (const event of events) {
326-
const lines = event.split('\n');
327-
let id: string | undefined;
328-
let eventType: string | undefined;
329-
let data: string | undefined;
330-
331-
// Parse SSE message according to the format
332-
for (const line of lines) {
333-
if (line.startsWith('id:')) {
334-
id = line.slice(3).trim();
335-
} else if (line.startsWith('event:')) {
336-
eventType = line.slice(6).trim();
337-
} else if (line.startsWith('data:')) {
338-
data = line.slice(5).trim();
339-
}
340-
}
341-
342-
// Update last event ID if provided by server
343-
// As per spec: the ID MUST be globally unique across all streams within that session
344-
if (id) {
345-
this._lastEventId = id;
346-
}
296+
// Update last event ID if provided
297+
if (event.id) {
298+
this._lastEventId = event.id;
299+
}
347300

348-
// Handle message event
349-
if (data) {
350-
// Default event type is 'message' per SSE spec if not specified
351-
if (!eventType || eventType === 'message') {
352-
try {
353-
const message = JSONRPCMessageSchema.parse(JSON.parse(data));
354-
this.onmessage?.(message);
355-
} catch (error) {
356-
this.onerror?.(error as Error);
357-
}
358-
}
301+
// Handle message events (default event type is undefined per docs)
302+
// or explicit 'message' event type
303+
if (!event.event || event.event === 'message') {
304+
try {
305+
const message = JSONRPCMessageSchema.parse(JSON.parse(event.data));
306+
this.onmessage?.(message);
307+
} catch (error) {
308+
this.onerror?.(error as Error);
359309
}
360310
}
361311
}

0 commit comments

Comments
 (0)