Skip to content

Commit 67a4595

Browse files
committed
improve event store interface
1 parent 643ef5c commit 67a4595

File tree

4 files changed

+58
-68
lines changed

4 files changed

+58
-68
lines changed

src/client/streamableHttp.test.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ describe("StreamableHTTPClientTransport", () => {
164164
// We expect the 405 error to be caught and handled gracefully
165165
// This should not throw an error that breaks the transport
166166
await transport.start();
167-
await expect(transport["_startOrAuthStandaloneSSE"]()).resolves.not.toThrow("Failed to open SSE stream: Method Not Allowed");
167+
await expect(transport["_startOrAuthStandaloneSSE"]({})).resolves.not.toThrow("Failed to open SSE stream: Method Not Allowed");
168168
// Check that GET was attempted
169169
expect(global.fetch).toHaveBeenCalledWith(
170170
expect.anything(),
@@ -208,7 +208,7 @@ describe("StreamableHTTPClientTransport", () => {
208208
transport.onmessage = messageSpy;
209209

210210
await transport.start();
211-
await transport["_startOrAuthStandaloneSSE"]();
211+
await transport["_startOrAuthStandaloneSSE"]({});
212212

213213
// Give time for the SSE event to be processed
214214
await new Promise(resolve => setTimeout(resolve, 50));
@@ -289,10 +289,10 @@ describe("StreamableHTTPClientTransport", () => {
289289
// Verify options were set correctly (checking implementation details)
290290
// Access private properties for testing
291291
const transportInstance = transport as unknown as {
292-
_defaultReconnectionOptions: StreamableHTTPReconnectionOptions;
292+
_reconnectionOptions: StreamableHTTPReconnectionOptions;
293293
};
294-
expect(transportInstance._defaultReconnectionOptions.initialReconnectionDelay).toBe(1000);
295-
expect(transportInstance._defaultReconnectionOptions.maxRetries).toBe(2);
294+
expect(transportInstance._reconnectionOptions.initialReconnectionDelay).toBe(500);
295+
expect(transportInstance._reconnectionOptions.maxRetries).toBe(5);
296296
});
297297

298298
it("should pass lastEventId when reconnecting", async () => {
@@ -313,9 +313,9 @@ describe("StreamableHTTPClientTransport", () => {
313313
await transport.start();
314314
// Type assertion to access private method
315315
const transportWithPrivateMethods = transport as unknown as {
316-
_startOrAuthStandaloneSSE: (lastEventId?: string) => Promise<void>
316+
_startOrAuthStandaloneSSE: (options: { lastEventId?: string }) => Promise<void>
317317
};
318-
await transportWithPrivateMethods._startOrAuthStandaloneSSE("test-event-id");
318+
await transportWithPrivateMethods._startOrAuthStandaloneSSE({ lastEventId: "test-event-id" });
319319

320320
// Verify fetch was called with the lastEventId header
321321
expect(fetchSpy).toHaveBeenCalled();
@@ -382,7 +382,7 @@ describe("StreamableHTTPClientTransport", () => {
382382

383383
await transport.start();
384384

385-
await transport["_startOrAuthStandaloneSSE"]();
385+
await transport["_startOrAuthStandaloneSSE"]({});
386386
expect((actualReqInit.headers as Headers).get("x-custom-header")).toBe("CustomValue");
387387

388388
requestInit.headers["X-Custom-Header"] = "SecondCustomValue";

src/examples/server/simpleStreamableHttp.ts

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,18 @@ class InMemoryEventStore implements EventStore {
1212
/**
1313
* Generates a unique event ID for a given stream ID
1414
*/
15-
generateEventId(streamId: string): string {
15+
private generateEventId(streamId: string): string {
1616
return `${streamId}_${Date.now()}_${Math.random().toString(36).substring(2, 10)}`;
1717
}
1818

19-
getStreamIdFromEventId(eventId: string): string {
19+
private getStreamIdFromEventId(eventId: string): string {
2020
const parts = eventId.split('_');
2121
return parts.length > 0 ? parts[0] : '';
2222
}
2323

24-
2524
/**
2625
* Stores an event with a generated event ID
26+
* Implements EventStore.storeEvent
2727
*/
2828
async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
2929
const eventId = this.generateEventId(streamId);
@@ -33,17 +33,26 @@ class InMemoryEventStore implements EventStore {
3333
}
3434

3535
/**
36-
* Retrieves events that occurred after a specific event
36+
* Replays events that occurred after a specific event ID
37+
* Implements EventStore.replayEventsAfter
3738
*/
38-
async getEventsAfter(lastEventId: string): Promise<Array<{ eventId: string, message: JSONRPCMessage }>> {
39+
async replayEventsAfter(lastEventId: string,
40+
{ send }: { send: (eventId: string, message: JSONRPCMessage) => Promise<void> }
41+
): Promise<string> {
3942
if (!lastEventId || !this.events.has(lastEventId)) {
40-
return [];
43+
console.log(`No events found for lastEventId: ${lastEventId}`);
44+
return '';
4145
}
4246

4347
// Extract the stream ID from the event ID
44-
const streamId = lastEventId.split('_')[0];
45-
const result: Array<{ eventId: string, message: JSONRPCMessage }> = [];
48+
const streamId = this.getStreamIdFromEventId(lastEventId);
49+
if (!streamId) {
50+
console.log(`Could not extract streamId from lastEventId: ${lastEventId}`);
51+
return '';
52+
}
53+
4654
let foundLastEvent = false;
55+
let eventCount = 0;
4756

4857
// Sort events by eventId for chronological ordering
4958
const sortedEvents = [...this.events.entries()].sort((a, b) => a[0].localeCompare(b[0]));
@@ -54,21 +63,21 @@ class InMemoryEventStore implements EventStore {
5463
continue;
5564
}
5665

57-
// Start collecting events after we find the lastEventId
66+
// Start sending events after we find the lastEventId
5867
if (eventId === lastEventId) {
5968
foundLastEvent = true;
6069
continue;
6170
}
6271

6372
if (foundLastEvent) {
64-
result.push({ eventId, message });
73+
await send(eventId, message);
74+
eventCount++;
6575
}
6676
}
6777

68-
console.log(`Found ${result.length} events after ${lastEventId} for replay`);
69-
return result;
78+
console.log(`Replayed ${eventCount} events after ${lastEventId} for stream ${streamId}`);
79+
return streamId;
7080
}
71-
7281
}
7382

7483
// Create an MCP server with implementation details

src/server/streamableHttp.test.ts

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { createServer, type Server, IncomingMessage, ServerResponse } from "node:http";
22
import { AddressInfo } from "node:net";
33
import { randomUUID } from "node:crypto";
4-
import { EventStore, StreamableHTTPServerTransport } from "./streamableHttp.js";
4+
import { EventStore, StreamableHTTPServerTransport, EventId, StreamId } from "./streamableHttp.js";
55
import { McpServer } from "./mcp.js";
66
import { CallToolResult, JSONRPCMessage } from "../types.js";
77
import { z } from "zod";
@@ -912,31 +912,25 @@ describe("StreamableHTTPServerTransport with resumability", () => {
912912

913913
// Simple implementation of EventStore
914914
const eventStore: EventStore = {
915-
generateEventId(streamId: string): string {
916-
return `${streamId}_${randomUUID()}`;
917-
},
918-
getStreamIdFromEventId(eventId: string): string {
919-
return eventId.split('_')[0]; // Extract stream ID from the event ID
920-
},
915+
921916
async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
922-
const eventId = this.generateEventId(streamId);
917+
const eventId = `${streamId}_${randomUUID()}`;
923918
storedEvents.set(eventId, { eventId, message });
924919
return eventId;
925920
},
926921

927-
async getEventsAfter(lastEventId: string): Promise<Array<{ eventId: string, message: JSONRPCMessage }>> {
928-
const streamId = lastEventId.split('_')[0]; // Extract stream ID from the event ID
929-
const result: Array<{ eventId: string, message: JSONRPCMessage }> = [];
930-
922+
async replayEventsAfter(lastEventId: EventId, { send }: {
923+
send: (eventId: EventId, message: JSONRPCMessage) => Promise<void>
924+
}): Promise<StreamId> {
925+
const streamId = lastEventId.split('_')[0];
926+
// Extract stream ID from the event ID
931927
// For test simplicity, just return all events with matching streamId that aren't the lastEventId
932-
// This avoids issues with event ordering in tests
933928
for (const [eventId, { message }] of storedEvents.entries()) {
934929
if (eventId.startsWith(streamId) && eventId !== lastEventId) {
935-
result.push({ eventId, message });
930+
await send(eventId, message);
936931
}
937932
}
938-
939-
return result;
933+
return streamId;
940934
},
941935
};
942936

src/server/streamableHttp.ts

Lines changed: 18 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,30 @@
11
import { IncomingMessage, ServerResponse } from "node:http";
22
import { Transport } from "../shared/transport.js";
3-
import { isJSONRPCNotification, isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema, RequestId } from "../types.js";
3+
import { isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema, RequestId } from "../types.js";
44
import getRawBody from "raw-body";
55
import contentType from "content-type";
66
import { randomUUID } from "node:crypto";
77

88
const MAXIMUM_MESSAGE_SIZE = "4mb";
99

10+
export type StreamId = string;
11+
export type EventId = string;
12+
1013
/**
1114
* Interface for resumability support via event storage
1215
*/
1316
export interface EventStore {
14-
/**
15-
* Generates a unique event ID for a given stream ID
16-
* @param streamId The stream ID to include in the event ID
17-
* @returns A unique event ID that includes the stream ID
18-
*/
19-
generateEventId(streamId: string): string;
20-
2117
/**
2218
* Stores an event for later retrieval
2319
* @param streamId ID of the stream the event belongs to
2420
* @param message The JSON-RPC message to store
2521
* @returns The generated event ID for the stored event
2622
*/
27-
storeEvent(streamId: string, message: JSONRPCMessage): Promise<string>;
23+
storeEvent(streamId: StreamId, message: JSONRPCMessage): Promise<EventId>;
2824

29-
/**
30-
* Retrieves events for a stream starting from a given event ID
31-
* @param lastEventId The event ID to start from
32-
* @returns Array of stored events with their event IDs
33-
*/
34-
getEventsAfter(lastEventId: string): Promise<Array<{ eventId: string, message: JSONRPCMessage }>>;
35-
36-
/**
37-
* Extracts the stream ID from an event ID
38-
* This is necessary for resumability to identify which stream the event belongs to
39-
* @param eventId The event ID to extract stream ID from
40-
* @returns The stream ID portion of the event ID
41-
*/
42-
getStreamIdFromEventId(eventId: string): string;
25+
replayEventsAfter(lastEventId: EventId, { send }: {
26+
send: (eventId: EventId, message: JSONRPCMessage) => Promise<void>
27+
}): Promise<StreamId>;
4328
}
4429

4530
/**
@@ -232,12 +217,7 @@ export class StreamableHTTPServerTransport implements Transport {
232217
if (!this._eventStore) {
233218
return;
234219
}
235-
236220
try {
237-
const events = await this._eventStore.getEventsAfter(lastEventId);
238-
const streamId = this._eventStore.getStreamIdFromEventId(lastEventId);
239-
240-
this._streamMapping.set(streamId, res);
241221
const headers: Record<string, string> = {
242222
"Content-Type": "text/event-stream",
243223
"Cache-Control": "no-cache, no-transform",
@@ -248,9 +228,16 @@ export class StreamableHTTPServerTransport implements Transport {
248228
headers["mcp-session-id"] = this.sessionId;
249229
}
250230
res.writeHead(200, headers).flushHeaders();
251-
for (const { eventId, message } of events) {
252-
this.writeSSEEvent(res, message, eventId);
253-
}
231+
232+
const streamId = await this._eventStore?.replayEventsAfter(lastEventId, {
233+
send: async (eventId: string, message: JSONRPCMessage) => {
234+
if (!this.writeSSEEvent(res, message, eventId)) {
235+
this.onerror?.(new Error("Failed replay events"));
236+
res.end();
237+
}
238+
}
239+
});
240+
this._streamMapping.set(streamId, res);
254241
} catch (error) {
255242
this.onerror?.(error as Error);
256243
}

0 commit comments

Comments
 (0)