Skip to content

Commit c3b446d

Browse files
committed
multiple streams and initial stream with get
1 parent 640ae5a commit c3b446d

File tree

2 files changed

+235
-64
lines changed

2 files changed

+235
-64
lines changed

src/client/streamableHttp.test.ts

Lines changed: 153 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ describe("StreamableHTTPClientTransport", () => {
1010
jest.spyOn(global, "fetch");
1111
});
1212

13-
afterEach(() => {
13+
afterEach(async () => {
14+
await transport.close().catch(() => { });
1415
jest.clearAllMocks();
1516
});
1617

@@ -191,4 +192,155 @@ describe("StreamableHTTPClientTransport", () => {
191192

192193
expect(messageSpy).toHaveBeenCalledWith(responseMessage);
193194
});
195+
196+
it("should attempt initial GET connection and handle 405 gracefully", async () => {
197+
// Mock the server not supporting GET for SSE (returning 405)
198+
(global.fetch as jest.Mock).mockResolvedValueOnce({
199+
ok: false,
200+
status: 405,
201+
statusText: "Method Not Allowed"
202+
});
203+
204+
await transport.start();
205+
206+
// Check that GET was attempted
207+
expect(global.fetch).toHaveBeenCalledWith(
208+
expect.anything(),
209+
expect.objectContaining({
210+
method: "GET",
211+
headers: expect.any(Headers)
212+
})
213+
);
214+
215+
// Verify transport still works after 405
216+
(global.fetch as jest.Mock).mockResolvedValueOnce({
217+
ok: true,
218+
status: 202,
219+
headers: new Headers()
220+
});
221+
222+
await transport.send({ jsonrpc: "2.0", method: "test", params: {} } as JSONRPCMessage);
223+
expect(global.fetch).toHaveBeenCalledTimes(2);
224+
});
225+
226+
it("should handle successful initial GET connection for SSE", async () => {
227+
// Set up readable stream for SSE events
228+
const encoder = new TextEncoder();
229+
const stream = new ReadableStream({
230+
start(controller) {
231+
// Send a server notification via SSE
232+
const event = 'event: message\ndata: {"jsonrpc": "2.0", "method": "serverNotification", "params": {}}\n\n';
233+
controller.enqueue(encoder.encode(event));
234+
}
235+
});
236+
237+
// Mock successful GET connection
238+
(global.fetch as jest.Mock).mockResolvedValueOnce({
239+
ok: true,
240+
status: 200,
241+
headers: new Headers({ "content-type": "text/event-stream" }),
242+
body: stream
243+
});
244+
245+
const messageSpy = jest.fn();
246+
transport.onmessage = messageSpy;
247+
248+
await transport.start();
249+
250+
// Give time for the SSE event to be processed
251+
await new Promise(resolve => setTimeout(resolve, 50));
252+
253+
expect(messageSpy).toHaveBeenCalledWith(
254+
expect.objectContaining({
255+
jsonrpc: "2.0",
256+
method: "serverNotification",
257+
params: {}
258+
})
259+
);
260+
});
261+
262+
it("should handle multiple concurrent SSE streams", async () => {
263+
// Mock two POST requests that return SSE streams
264+
const makeStream = (id: string) => {
265+
const encoder = new TextEncoder();
266+
return new ReadableStream({
267+
start(controller) {
268+
const event = `event: message\ndata: {"jsonrpc": "2.0", "result": {"id": "${id}"}, "id": "${id}"}\n\n`;
269+
controller.enqueue(encoder.encode(event));
270+
}
271+
});
272+
};
273+
274+
(global.fetch as jest.Mock)
275+
.mockResolvedValueOnce({
276+
ok: true,
277+
status: 200,
278+
headers: new Headers({ "content-type": "text/event-stream" }),
279+
body: makeStream("request1")
280+
})
281+
.mockResolvedValueOnce({
282+
ok: true,
283+
status: 200,
284+
headers: new Headers({ "content-type": "text/event-stream" }),
285+
body: makeStream("request2")
286+
});
287+
288+
const messageSpy = jest.fn();
289+
transport.onmessage = messageSpy;
290+
291+
// Send two concurrent requests
292+
await Promise.all([
293+
transport.send({ jsonrpc: "2.0", method: "test1", params: {}, id: "request1" }),
294+
transport.send({ jsonrpc: "2.0", method: "test2", params: {}, id: "request2" })
295+
]);
296+
297+
// Give time for SSE processing
298+
await new Promise(resolve => setTimeout(resolve, 50));
299+
300+
// Both streams should have delivered their messages
301+
expect(messageSpy).toHaveBeenCalledTimes(2);
302+
expect(messageSpy).toHaveBeenCalledWith(
303+
expect.objectContaining({ result: { id: "request1" }, id: "request1" })
304+
);
305+
expect(messageSpy).toHaveBeenCalledWith(
306+
expect.objectContaining({ result: { id: "request2" }, id: "request2" })
307+
);
308+
});
309+
310+
it("should include last-event-id header when resuming a broken connection", async () => {
311+
// First make a successful connection that provides an event ID
312+
const encoder = new TextEncoder();
313+
const stream = new ReadableStream({
314+
start(controller) {
315+
const event = 'id: event-123\nevent: message\ndata: {"jsonrpc": "2.0", "method": "serverNotification", "params": {}}\n\n';
316+
controller.enqueue(encoder.encode(event));
317+
controller.close();
318+
}
319+
});
320+
321+
(global.fetch as jest.Mock).mockResolvedValueOnce({
322+
ok: true,
323+
status: 200,
324+
headers: new Headers({ "content-type": "text/event-stream" }),
325+
body: stream
326+
});
327+
328+
await transport.start();
329+
await new Promise(resolve => setTimeout(resolve, 50));
330+
331+
// Now simulate attempting to reconnect
332+
(global.fetch as jest.Mock).mockResolvedValueOnce({
333+
ok: true,
334+
status: 200,
335+
headers: new Headers({ "content-type": "text/event-stream" }),
336+
body: null
337+
});
338+
339+
await transport.start();
340+
341+
// Check that Last-Event-ID was included
342+
const calls = (global.fetch as jest.Mock).mock.calls;
343+
const lastCall = calls[calls.length - 1];
344+
expect(lastCall[1].headers.get("last-event-id")).toBe("event-123");
345+
});
194346
});

0 commit comments

Comments
 (0)