From c3f08567a2d2078f1f7b890612ef5244e92f8a48 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Wed, 16 Apr 2025 14:48:47 +0100 Subject: [PATCH 1/6] share implementation of in memory store between examples and integration tests --- src/examples/server/simpleStreamableHttp.ts | 80 +------------------ .../sseAndStreamableHttpCompatibleServer.ts | 79 +----------------- src/examples/shared/inMemoryEventStore.ts | 77 ++++++++++++++++++ .../taskResumability.test.ts | 63 +-------------- 4 files changed, 86 insertions(+), 213 deletions(-) create mode 100644 src/examples/shared/inMemoryEventStore.ts diff --git a/src/examples/server/simpleStreamableHttp.ts b/src/examples/server/simpleStreamableHttp.ts index 0ae0f910..3e7e7c52 100644 --- a/src/examples/server/simpleStreamableHttp.ts +++ b/src/examples/server/simpleStreamableHttp.ts @@ -1,84 +1,10 @@ import express, { Request, Response } from 'express'; import { randomUUID } from 'node:crypto'; import { McpServer } from '../../server/mcp.js'; -import { EventStore, StreamableHTTPServerTransport } from '../../server/streamableHttp.js'; +import { StreamableHTTPServerTransport } from '../../server/streamableHttp.js'; import { z } from 'zod'; -import { CallToolResult, GetPromptResult, isInitializeRequest, JSONRPCMessage, ReadResourceResult } from '../../types.js'; - -// Create a simple in-memory EventStore for resumability -class InMemoryEventStore implements EventStore { - private events: Map = new Map(); - - /** - * Generates a unique event ID for a given stream ID - */ - private generateEventId(streamId: string): string { - return `${streamId}_${Date.now()}_${Math.random().toString(36).substring(2, 10)}`; - } - - private getStreamIdFromEventId(eventId: string): string { - const parts = eventId.split('_'); - return parts.length > 0 ? parts[0] : ''; - } - - /** - * Stores an event with a generated event ID - * Implements EventStore.storeEvent - */ - async storeEvent(streamId: string, message: JSONRPCMessage): Promise { - const eventId = this.generateEventId(streamId); - console.log(`Storing event ${eventId} for stream ${streamId}`); - this.events.set(eventId, { streamId, message }); - return eventId; - } - - /** - * Replays events that occurred after a specific event ID - * Implements EventStore.replayEventsAfter - */ - async replayEventsAfter(lastEventId: string, - { send }: { send: (eventId: string, message: JSONRPCMessage) => Promise } - ): Promise { - if (!lastEventId || !this.events.has(lastEventId)) { - console.log(`No events found for lastEventId: ${lastEventId}`); - return ''; - } - - // Extract the stream ID from the event ID - const streamId = this.getStreamIdFromEventId(lastEventId); - if (!streamId) { - console.log(`Could not extract streamId from lastEventId: ${lastEventId}`); - return ''; - } - - let foundLastEvent = false; - let eventCount = 0; - - // Sort events by eventId for chronological ordering - const sortedEvents = [...this.events.entries()].sort((a, b) => a[0].localeCompare(b[0])); - - for (const [eventId, { streamId: eventStreamId, message }] of sortedEvents) { - // Only include events from the same stream - if (eventStreamId !== streamId) { - continue; - } - - // Start sending events after we find the lastEventId - if (eventId === lastEventId) { - foundLastEvent = true; - continue; - } - - if (foundLastEvent) { - await send(eventId, message); - eventCount++; - } - } - - console.log(`Replayed ${eventCount} events after ${lastEventId} for stream ${streamId}`); - return streamId; - } -} +import { CallToolResult, GetPromptResult, isInitializeRequest, ReadResourceResult } from '../../types.js'; +import { InMemoryEventStore } from '../shared/inMemoryEventStore.js'; // Create an MCP server with implementation details const server = new McpServer({ diff --git a/src/examples/server/sseAndStreamableHttpCompatibleServer.ts b/src/examples/server/sseAndStreamableHttpCompatibleServer.ts index 5a79a4e5..378c0e1f 100644 --- a/src/examples/server/sseAndStreamableHttpCompatibleServer.ts +++ b/src/examples/server/sseAndStreamableHttpCompatibleServer.ts @@ -1,10 +1,11 @@ import express, { Request, Response } from 'express'; import { randomUUID } from "node:crypto"; import { McpServer } from '../../server/mcp.js'; -import { EventStore, StreamableHTTPServerTransport } from '../../server/streamableHttp.js'; +import { StreamableHTTPServerTransport } from '../../server/streamableHttp.js'; import { SSEServerTransport } from '../../server/sse.js'; import { z } from 'zod'; -import { CallToolResult, isInitializeRequest, JSONRPCMessage } from '../../types.js'; +import { CallToolResult, isInitializeRequest } from '../../types.js'; +import { InMemoryEventStore } from '../shared/inMemoryEventStore.js'; /** * This example server demonstrates backwards compatibility with both: @@ -17,81 +18,7 @@ import { CallToolResult, isInitializeRequest, JSONRPCMessage } from '../../types * - /request: The deprecated POST endpoint for older clients (POST to send messages) */ -// Simple in-memory event store for resumability -class InMemoryEventStore implements EventStore { - private events: Map = new Map(); - /** - * Generates a unique event ID for a given stream ID - */ - private generateEventId(streamId: string): string { - return `${streamId}_${Date.now()}_${Math.random().toString(36).substring(2, 10)}`; - } - - private getStreamIdFromEventId(eventId: string): string { - const parts = eventId.split('_'); - return parts.length > 0 ? parts[0] : ''; - } - - /** - * Stores an event with a generated event ID - * Implements EventStore.storeEvent - */ - async storeEvent(streamId: string, message: JSONRPCMessage): Promise { - const eventId = this.generateEventId(streamId); - console.log(`Storing event ${eventId} for stream ${streamId}`); - this.events.set(eventId, { streamId, message }); - return eventId; - } - - /** - * Replays events that occurred after a specific event ID - * Implements EventStore.replayEventsAfter - */ - async replayEventsAfter(lastEventId: string, - { send }: { send: (eventId: string, message: JSONRPCMessage) => Promise } - ): Promise { - if (!lastEventId || !this.events.has(lastEventId)) { - console.log(`No events found for lastEventId: ${lastEventId}`); - return ''; - } - - const streamId = this.getStreamIdFromEventId(lastEventId); - if (!streamId) { - console.log(`Could not extract streamId from lastEventId: ${lastEventId}`); - return ''; - } - - let foundLastEvent = false; - let eventCount = 0; - - // Sort events by eventId for chronological ordering - const sortedEvents = [...this.events.entries()].sort((a, b) => a[0].localeCompare(b[0])); - - for (const [eventId, { streamId: eventStreamId, message }] of sortedEvents) { - // Only include events from the same stream - if (eventStreamId !== streamId) { - continue; - } - - // Start sending events after we find the lastEventId - if (eventId === lastEventId) { - foundLastEvent = true; - continue; - } - - if (foundLastEvent) { - await send(eventId, message); - eventCount++; - } - } - - console.log(`Replayed ${eventCount} events after ${lastEventId} for stream ${streamId}`); - return streamId; - } -} - -// Create a shared MCP server instance const server = new McpServer({ name: 'backwards-compatible-server', version: '1.0.0', diff --git a/src/examples/shared/inMemoryEventStore.ts b/src/examples/shared/inMemoryEventStore.ts new file mode 100644 index 00000000..fbdebe12 --- /dev/null +++ b/src/examples/shared/inMemoryEventStore.ts @@ -0,0 +1,77 @@ +import { JSONRPCMessage } from '../../types.js'; +import { EventStore } from '../../server/streamableHttp.js'; + +/** + * Simple in-memory implementation of the EventStore interface for resumability + * This is primarily intended for examples and testing, not for production use + * where a persistent storage solution would be more appropriate. + */ +export class InMemoryEventStore implements EventStore { + private events: Map = new Map(); + + /** + * Generates a unique event ID for a given stream ID + */ + private generateEventId(streamId: string): string { + return `${streamId}_${Date.now()}_${Math.random().toString(36).substring(2, 10)}`; + } + + /** + * Extracts the stream ID from an event ID + */ + private getStreamIdFromEventId(eventId: string): string { + const parts = eventId.split('_'); + return parts.length > 0 ? parts[0] : ''; + } + + /** + * Stores an event with a generated event ID + * Implements EventStore.storeEvent + */ + async storeEvent(streamId: string, message: JSONRPCMessage): Promise { + const eventId = this.generateEventId(streamId); + this.events.set(eventId, { streamId, message }); + return eventId; + } + + /** + * Replays events that occurred after a specific event ID + * Implements EventStore.replayEventsAfter + */ + async replayEventsAfter(lastEventId: string, + { send }: { send: (eventId: string, message: JSONRPCMessage) => Promise } + ): Promise { + if (!lastEventId || !this.events.has(lastEventId)) { + return ''; + } + + // Extract the stream ID from the event ID + const streamId = this.getStreamIdFromEventId(lastEventId); + if (!streamId) { + return ''; + } + + let foundLastEvent = false; + + // Sort events by eventId for chronological ordering + const sortedEvents = [...this.events.entries()].sort((a, b) => a[0].localeCompare(b[0])); + + for (const [eventId, { streamId: eventStreamId, message }] of sortedEvents) { + // Only include events from the same stream + if (eventStreamId !== streamId) { + continue; + } + + // Start sending events after we find the lastEventId + if (eventId === lastEventId) { + foundLastEvent = true; + continue; + } + + if (foundLastEvent) { + await send(eventId, message); + } + } + return streamId; + } +} \ No newline at end of file diff --git a/src/integration-tests/taskResumability.test.ts b/src/integration-tests/taskResumability.test.ts index 3bd23498..efd2611f 100644 --- a/src/integration-tests/taskResumability.test.ts +++ b/src/integration-tests/taskResumability.test.ts @@ -4,68 +4,11 @@ import { randomUUID } from 'node:crypto'; import { Client } from '../client/index.js'; import { StreamableHTTPClientTransport } from '../client/streamableHttp.js'; import { McpServer } from '../server/mcp.js'; -import { EventStore, StreamableHTTPServerTransport } from '../server/streamableHttp.js'; -import { CallToolResultSchema, JSONRPCMessage, LoggingMessageNotificationSchema } from '../types.js'; +import { StreamableHTTPServerTransport } from '../server/streamableHttp.js'; +import { CallToolResultSchema, LoggingMessageNotificationSchema } from '../types.js'; import { z } from 'zod'; +import { InMemoryEventStore } from '../examples/shared/inMemoryEventStore.js'; -/** - * Simple in-memory event store implementation for resumability - */ -class InMemoryEventStore implements EventStore { - private events: Map = new Map(); - - private generateEventId(streamId: string): string { - return `${streamId}_${Date.now()}_${Math.random().toString(36).substring(2, 10)}`; - } - - private getStreamIdFromEventId(eventId: string): string { - const parts = eventId.split('_'); - return parts.length > 0 ? parts[0] : ''; - } - - async storeEvent(streamId: string, message: JSONRPCMessage): Promise { - const eventId = this.generateEventId(streamId); - this.events.set(eventId, { streamId, message }); - return eventId; - } - - async replayEventsAfter(lastEventId: string, - { send }: { send: (eventId: string, message: JSONRPCMessage) => Promise } - ): Promise { - if (!lastEventId || !this.events.has(lastEventId)) { - return ''; - } - - // Extract the stream ID from the event ID - const streamId = this.getStreamIdFromEventId(lastEventId); - if (!streamId) { - return ''; - } - let foundLastEvent = false; - - // Sort events by eventId for chronological ordering - const sortedEvents = [...this.events.entries()].sort((a, b) => a[0].localeCompare(b[0])); - - for (const [eventId, { streamId: eventStreamId, message }] of sortedEvents) { - // Only include events from the same stream - if (eventStreamId !== streamId) { - continue; - } - - // Start collecting events after we find the lastEventId - if (eventId === lastEventId) { - foundLastEvent = true; - continue; - } - - if (foundLastEvent) { - await send(eventId, message); - } - } - - return streamId; - } -} describe('Transport resumability', () => { From 16df308d4fa30b32467606337f435dcf924f72d7 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Wed, 16 Apr 2025 14:51:09 +0100 Subject: [PATCH 2/6] remove logs how to ititialize with curl - not needed anymore as we have client --- .../server/jsonResponseStreamableHttp.ts | 22 ------------------- src/examples/server/simpleSseServer.ts | 2 +- src/examples/server/simpleStreamableHttp.ts | 22 ------------------- 3 files changed, 1 insertion(+), 45 deletions(-) diff --git a/src/examples/server/jsonResponseStreamableHttp.ts b/src/examples/server/jsonResponseStreamableHttp.ts index 101a581f..bcd6a960 100644 --- a/src/examples/server/jsonResponseStreamableHttp.ts +++ b/src/examples/server/jsonResponseStreamableHttp.ts @@ -148,28 +148,6 @@ app.get('/mcp', async (req: Request, res: Response) => { const PORT = 3000; app.listen(PORT, () => { console.log(`MCP Streamable HTTP Server listening on port ${PORT}`); - console.log(`Initialize session with the command below id you are using curl for testing: - ----------------------------- - SESSION_ID=$(curl -X POST \ - -H "Content-Type: application/json" \ - -H "Accept: application/json" \ - -H "Accept: text/event-stream" \ - -d '{ - "jsonrpc": "2.0", - "method": "initialize", - "params": { - "capabilities": {}, - "protocolVersion": "2025-03-26", - "clientInfo": { - "name": "test", - "version": "1.0.0" - } - }, - "id": "1" - }' \ - -i http://localhost:3000/mcp 2>&1 | grep -i "mcp-session-id" | cut -d' ' -f2 | tr -d '\\r') - echo "Session ID: $SESSION_ID" - -----------------------------`); }); // Handle server shutdown diff --git a/src/examples/server/simpleSseServer.ts b/src/examples/server/simpleSseServer.ts index 4c5aa446..74cdcaac 100644 --- a/src/examples/server/simpleSseServer.ts +++ b/src/examples/server/simpleSseServer.ts @@ -9,7 +9,7 @@ import { CallToolResult } from '../../types.js'; * (protocol version 2024-11-05). It mainly used for testing backward compatible clients. * * The server exposes two endpoints: - * - /sse: For establishing the SSE stream (GET) + * - /mcp: For establishing the SSE stream (GET) * - /messages: For receiving client messages (POST) * */ diff --git a/src/examples/server/simpleStreamableHttp.ts b/src/examples/server/simpleStreamableHttp.ts index 3e7e7c52..35c04fe5 100644 --- a/src/examples/server/simpleStreamableHttp.ts +++ b/src/examples/server/simpleStreamableHttp.ts @@ -272,28 +272,6 @@ app.delete('/mcp', async (req: Request, res: Response) => { const PORT = 3000; app.listen(PORT, () => { console.log(`MCP Streamable HTTP Server listening on port ${PORT}`); - console.log(`Initialize session with the command below id you are using curl for testing: - ----------------------------- - SESSION_ID=$(curl -X POST \ - -H "Content-Type: application/json" \ - -H "Accept: application/json" \ - -H "Accept: text/event-stream" \ - -d '{ - "jsonrpc": "2.0", - "method": "initialize", - "params": { - "capabilities": {}, - "protocolVersion": "2025-03-26", - "clientInfo": { - "name": "test", - "version": "1.0.0" - } - }, - "id": "1" - }' \ - -i http://localhost:3000/mcp 2>&1 | grep -i "mcp-session-id" | cut -d' ' -f2 | tr -d '\\r') - echo "Session ID: $SESSION_ID" - -----------------------------`); }); // Handle server shutdown From 8806c0ecb4f5c5dce7bc8ee1b7362cbcbc024fec Mon Sep 17 00:00:00 2001 From: ihrpr Date: Wed, 16 Apr 2025 17:49:22 +0100 Subject: [PATCH 3/6] examples readme improvements --- src/examples/README.md | 330 ++++++++++++++++++++++++++++++----------- 1 file changed, 245 insertions(+), 85 deletions(-) diff --git a/src/examples/README.md b/src/examples/README.md index 2ea1a596..2083f4ed 100644 --- a/src/examples/README.md +++ b/src/examples/README.md @@ -4,134 +4,294 @@ This directory contains example implementations of MCP clients and servers using ## Table of Contents -- [Streamable HTTP Servers - Single Node Deployment](#streamable-http---single-node-deployment-with-basic-session-state-management) - - [Simple Server with Streamable HTTP](#simple-server-with-streamable-http-transport-serversimplestreamablehttpts) - - [Server Supporting SSE via GET](#server-supporting-with-sse-via-get-serverstandalonessewithgetstreamablehttpts) - - [Server with JSON Response Mode](#server-with-json-response-mode-serverjsonresponsestreamablehttpts) -- [Client Example - Streamable HTTP](#client-clientsimplestreamablehttpts) -- [Useful bash commands for testing](#useful-commands-for-testing) +- [Client Implementations](#client-implementations) + - [Streamable HTTP Client](#streamable-http-client) + - [Backwards Compatible Client](#backwards-compatible-client) +- [Server Implementations](#server-implementations) + - [Single Node Deployment](#single-node-deployment) + - [Streamable HTTP Transport](#streamable-http-transport) + - [Deprecated SSE Transport](#deprecated-sse-transport) + - [Backwards Compatible Server](#streamable-http-backwards-compatible-server-with-sse) + - [Multi-Node Deployment](#multi-node-deployment) +- [Backwards Compatibility](#testing-streamable-http-backwards-compatibility-with-sse) + +## Client Implementations + +### Streamable HTTP Client + +A full-featured interactive client that connects to a Streamable HTTP server, demonstrating how to: + +- Establish and manage a connection to an MCP server +- List and call tools with arguments +- Handle notifications through the SSE stream +- List and get prompts with arguments +- List available resources +- Handle session termination and reconnection +- Support for resumability with Last-Event-ID tracking + +```bash +npx tsx src/examples/client/simpleStreamableHttp.ts +``` + +### Backwards Compatible Client + +A client that implements backwards compatibility according to the [MCP specification](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#backwards-compatibility), allowing it to work with both new and legacy servers. This client demonstrates: -## Streamable HTTP - single node deployment with basic session state management +- The client first POSTs an initialize request to the server URL: + - If successful, it uses the Streamable HTTP transport + - If it fails with a 4xx status, it attempts a GET request to establish an SSE stream -Multi node with state management example will be added soon after we add support. +```bash +npx tsx src/examples/client/streamableHttpWithSseFallbackClient.ts +``` + +## Server Implementations + +### Single Node Deployment +These examples demonstrate how to set up an MCP server on a single node with different transport options. -### Simple server with Streamable HTTP transport (`server/simpleStreamableHttp.ts`) +#### Streamable HTTP Transport -A simple MCP server that uses the Streamable HTTP transport, implemented with Express. The server provides: +##### Simple Streamable HTTP Server -- A simple `greet` tool that returns a greeting for a name -- A `greeting-template` prompt that generates a greeting template -- A static `greeting-resource` resource +A server that implements the Streamable HTTP transport (protocol version 2025-03-26). -#### Running the server +- Basic server setup with Express and the Streamable HTTP transport +- Session management with an in-memory event store for resumability +- Tool implementation with the `greet` and `multi-greet` tools +- Prompt implementation with the `greeting-template` prompt +- Static resource exposure +- Support for notifications via SSE stream established by GET requests +- Session termination via DELETE requests ```bash npx tsx src/examples/server/simpleStreamableHttp.ts ``` -The server will start on port 3000. You can test the initialization and tool listing: +##### JSON Response Mode Server + +A server that uses Streamable HTTP transport with JSON response mode enabled (no SSE). + +- Streamable HTTP with JSON response mode, which returns responses directly in the response body +- Limited support for notifications (since SSE is disabled) +- Proper response handling according to the MCP specification for servers that don't support SSE +- Returning appropriate HTTP status codes for unsupported methods + +```bash +npx tsx src/examples/server/jsonResponseStreamableHttp.ts +``` + +##### Streamable HTTP with server notifications -### Server supporting SSE via GET (`server/standaloneSseWithGetStreamableHttp.ts`) +A server that demonstrates server notifications using Streamable HTTP. -An MCP server that demonstrates how to support SSE notifications via GET requests using the Streamable HTTP transport with Express. The server dynamically adds resources at regular intervals and supports notifications for resource list changes (server notifications are available through the standalone SSE connection established by GET request). +- Resource list change notifications with dynamically added resources +- Automatic resource creation on a timed interval -#### Running the server ```bash npx tsx src/examples/server/standaloneSseWithGetStreamableHttp.ts ``` -The server will start on port 3000 and automatically create new resources every 5 seconds. +#### Deprecated SSE Transport -### Server with JSON response mode (`server/jsonResponseStreamableHttp.ts`) +A server that implements the deprecated HTTP+SSE transport (protocol version 2024-11-05). This example only used for testing backwards compatibility for clients. + +- Two separate endpoints: `/mcp` for the SSE stream (GET) and `/messages` for client messages (POST) +- Tool implementation with a `start-notification-stream` tool that demonstrates sending periodic notifications + +```bash +npx tsx src/examples/server/simpleSseServer.ts +``` -A simple MCP server that uses the Streamable HTTP transport with JSON response mode enabled, implemented with Express. The server provides a simple `greet` tool that returns a greeting for a name. +#### Streamable Http Backwards Compatible Server with SSE -_NOTE: This demonstrates a server that does not use SSE at all. Note that this limits its support for MCP features; for example, it cannot provide logging and progress notifications for tool execution._ +A server that supports both Streamable HTTP and SSE transports, adhering to the [MCP specification for backwards compatibility](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#backwards-compatibility). -#### Running the server +- Single MCP server instance with multiple transport options +- Support for Streamable HTTP requests at `/mcp` endpoint (GET/POST/DELETE) +- Support for deprecated SSE transport with `/sse` (GET) and `/messages` (POST) +- Session type tracking to avoid mixing transport types +- Notifications and tool execution across both transport types ```bash -npx tsx src/examples/server/jsonResponseStreamableHttp.ts +npx tsx src/examples/server/sseAndStreamableHttpCompatibleServer.ts ``` +### Multi-Node Deployment -### Client (`client/simpleStreamableHttp.ts`) +When deploying MCP servers in a horizontally scaled environment (multiple server instances), there are a few different options that can be useful for different use cases: +- **Stateless mode** - No need to maintain state between calls to MCP servers. Useful for simple API wrapper servers. +- **Persistent storage mode** - No local state needed, but session data is stored in a database. Example: an MCP server for online ordering where the shopping cart is stored in a database. +- **Local state with message routing** - Local state is needed, and all requests for a session must be routed to the correct node. This can be done with a message queue and pub/sub system. -A client that connects to the server, initializes it, and demonstrates how to: +#### Stateless Mode -- List available tools and call the `greet` tool -- List available prompts and get the `greeting-template` prompt -- List available resources +The Streamable HTTP transport can be configured to operate without tracking sessions. This is perfect for simple API proxies or when each request is completely independent. -#### Running the client +##### Implementation -```bash -npx tsx src/examples/client/simpleStreamableHttp.ts +To enable stateless mode, configure the `StreamableHTTPServerTransport` with: +```typescript +sessionIdGenerator: () => undefined ``` -Make sure the server is running before starting the client. +This disables session management entirely, and the server won't generate or expect session IDs. +- No session ID headers are sent or expected +- Any server node can process any request +- No state is preserved between requests +- Perfect for RESTful or stateless API scenarios +- Simplest deployment model with minimal infrastructure requirements -### Useful commands for testing +``` +┌─────────────────────────────────────────────┐ +│ Client │ +└─────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────┐ +│ Load Balancer │ +└─────────────────────────────────────────────┘ + │ │ + ▼ ▼ +┌─────────────────┐ ┌─────────────────────┐ +│ MCP Server #1 │ │ MCP Server #2 │ +│ (Node.js) │ │ (Node.js) │ +└─────────────────┘ └─────────────────────┘ +``` -#### Initialize -Streamable HTTP transport requires to do the initialization first. -```bash -# First initialize the server and save the session ID to a variable -SESSION_ID=$(curl -X POST \ - -H "Content-Type: application/json" \ - -H "Accept: application/json" \ - -H "Accept: text/event-stream" \ - -d '{ - "jsonrpc": "2.0", - "method": "initialize", - "params": { - "capabilities": {}, - "protocolVersion": "2025-03-26", - "clientInfo": { - "name": "test", - "version": "1.0.0" - } - }, - "id": "1" - }' \ - -i http://localhost:3000/mcp 2>&1 | grep -i "mcp-session-id" | cut -d' ' -f2 | tr -d '\r') -echo "Session ID: $SESSION_ID +#### Persistent Storage Mode + +For cases where you need session continuity but don't need to maintain in-memory state on specific nodes, you can use a database to persist session data while still allowing any node to handle requests. + +##### Implementation + +Configure the transport with session management, but retrieve and store all state in an external persistent storage: + +```typescript +sessionIdGenerator: () => randomUUID(), +eventStore: databaseEventStore ``` -Once a session is established, we can send POST requests: -#### List tools -```bash -# Then list tools using the saved session ID -curl -X POST -H "Content-Type: application/json" -H "Accept: application/json, text/event-stream" \ - -H "mcp-session-id: $SESSION_ID" \ - -d '{"jsonrpc":"2.0","method":"tools/list","params":{},"id":"2"}' \ - http://localhost:3000/mcp +All session state is stored in the database, and any node can serve any client by retrieving the state when needed. + +- Maintains sessions with unique IDs +- Stores all session data in an external database +- Provides resumability through the database-backed EventStore +- Any node can handle any request for the same session +- No node-specific memory state means no need for message routing +- Good for applications where state can be fully externalized +- Somewhat higher latency due to database access for each request + + +``` +┌─────────────────────────────────────────────┐ +│ Client │ +└─────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────┐ +│ Load Balancer │ +└─────────────────────────────────────────────┘ + │ │ + ▼ ▼ +┌─────────────────┐ ┌─────────────────────┐ +│ MCP Server #1 │ │ MCP Server #2 │ +│ (Node.js) │ │ (Node.js) │ +└─────────────────┘ └─────────────────────┘ + │ │ + │ │ + ▼ ▼ +┌─────────────────────────────────────────────┐ +│ Database (PostgreSQL) │ +│ │ +│ • Session state │ +│ • Event storage for resumability │ +└─────────────────────────────────────────────┘ ``` -#### Call tool -```bash -# Call the greet tool using the saved session ID -curl -X POST \ - -H "Content-Type: application/json" \ - -H "Accept: application/json" \ - -H "Accept: text/event-stream" \ - -H "mcp-session-id: $SESSION_ID" \ - -d '{ - "jsonrpc": "2.0", - "method": "tools/call", - "params": { - "name": "greet", - "arguments": { - "name": "World" - } - }, - "id": "2" - }' \ - http://localhost:3000/mcp + +#### Streamable HTTP with Distributed Message Routing + +For scenarios where local in-memory state must be maintained on specific nodes (such as Computer Use or complex session state), the Streamable HTTP transport can be combined with a pub/sub system to route messages to the correct node handling each session. + +1. **Bidirectional Message Queue Integration**: + - All nodes both publish to and subscribe from the message queue + - Each node registers the sessions it's actively handling + - Messages are routed based on session ownership + +2. **Request Handling Flow**: + - When a client connects to Node A with an existing `mcp-session-id` + - If Node A doesn't own this session, it: + - Establishes and maintains the SSE connection with the client + - Publishes the request to the message queue with the session ID + - Node B (which owns the session) receives the request from the queue + - Node B processes the request with its local session state + - Node B publishes responses/notifications back to the queue + - Node A subscribes to the response channel and forwards to the client + +3. **Channel Identification**: + - Each message channel combines both `mcp-session-id` and `stream-id` + - This ensures responses are correctly routed back to the originating connection + +``` +┌─────────────────────────────────────────────┐ +│ Client │ +└─────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────┐ +│ Load Balancer │ +└─────────────────────────────────────────────┘ + │ │ + ▼ ▼ +┌─────────────────┐ ┌─────────────────────┐ +│ MCP Server #1 │◄───►│ MCP Server #2 │ +│ (Has Session A) │ │ (Has Session B) │ +└─────────────────┘ └─────────────────────┘ + ▲│ ▲│ + │▼ │▼ +┌─────────────────────────────────────────────┐ +│ Message Queue / Pub-Sub │ +│ │ +│ • Session ownership registry │ +│ • Bidirectional message routing │ +│ • Request/response forwarding │ +└─────────────────────────────────────────────┘ ``` + + +- Maintains session affinity for stateful operations without client redirection +- Enables horizontal scaling while preserving complex in-memory state +- Provides fault tolerance through the message queue as intermediary + + +## Backwards Compatibility + +### Testing Streamable HTTP Backwards Compatibility with SSE + +To test the backwards compatibility features: + +1. Start one of the server implementations: + ```bash + # Legacy SSE server (protocol version 2024-11-05) + npx tsx src/examples/server/simpleSseServer.ts + + # Streamable HTTP server (protocol version 2025-03-26) + npx tsx src/examples/server/simpleStreamableHttp.ts + + # Backwards compatible server (supports both protocols) + npx tsx src/examples/server/sseAndStreamableHttpCompatibleServer.ts + ``` + +2. Then run the backwards compatible client: + ```bash + npx tsx src/examples/client/streamableHttpWithSseFallbackClient.ts + ``` + +This demonstrates how the MCP ecosystem ensures interoperability between clients and servers regardless of which protocol version they were built for. \ No newline at end of file From 864aa2155c46f094d2284a3c86adce5c82e1e196 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Wed, 16 Apr 2025 18:22:55 +0100 Subject: [PATCH 4/6] update readme with StreamableHttp --- README.md | 241 ++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 218 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 4fcdff17..ab16e100 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ - [Prompts](#prompts) - [Running Your Server](#running-your-server) - [stdio](#stdio) - - [HTTP with SSE](#http-with-sse) + - [Streamable HTTP](#streamable-http) - [Testing and Debugging](#testing-and-debugging) - [Examples](#examples) - [Echo Server](#echo-server) @@ -22,6 +22,7 @@ - [Writing MCP Clients](#writing-mcp-clients) - [Server Capabilities](#server-capabilities) - [Proxy OAuth Server](#proxy-authorization-requests-upstream) + - [Backwards Compatibility](#backwards-compatibility) ## Overview @@ -29,7 +30,7 @@ The Model Context Protocol allows applications to provide context for LLMs in a - Build MCP clients that can connect to any MCP server - Create MCP servers that expose resources, prompts and tools -- Use standard transports like stdio and SSE +- Use standard transports like stdio and Streamable HTTP - Handle all MCP protocol messages and lifecycle events ## Installation @@ -207,14 +208,18 @@ const transport = new StdioServerTransport(); await server.connect(transport); ``` -### HTTP with SSE +### Streamable HTTP -For remote servers, start a web server with a Server-Sent Events (SSE) endpoint, and a separate endpoint for the client to send its messages to: +For remote servers, set up a Streamable HTTP transport that handles both client requests and server-to-client notifications. + +#### With Session Management ```typescript -import express, { Request, Response } from "express"; +import express from "express"; +import { randomUUID } from "node:crypto"; import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; -import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; +import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; +import { InMemoryEventStore } from "@modelcontextprotocol/sdk/inMemory.js"; const server = new McpServer({ name: "example-server", @@ -224,33 +229,128 @@ const server = new McpServer({ // ... set up server resources, tools, and prompts ... const app = express(); +app.use(express.json()); + +// Map to store transports by session ID +const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {}; + +// Handle POST requests for client-to-server communication +app.post('/mcp', async (req, res) => { + // Check for existing session ID + const sessionId = req.headers['mcp-session-id'] as string | undefined; + let transport: StreamableHTTPServerTransport; + + if (sessionId && transports[sessionId]) { + // Reuse existing transport + transport = transports[sessionId]; + } else if (!sessionId && isInitializeRequest(req.body)) { + // New initialization request + const eventStore = new InMemoryEventStore(); + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + eventStore, // Enable resumability + onsessioninitialized: (sessionId) => { + // Store the transport by session ID + transports[sessionId] = transport; + } + }); -// to support multiple simultaneous connections we have a lookup object from -// sessionId to transport -const transports: {[sessionId: string]: SSEServerTransport} = {}; + // Clean up transport when closed + transport.onclose = () => { + if (transport.sessionId) { + delete transports[transport.sessionId]; + } + }; -app.get("/sse", async (_: Request, res: Response) => { - const transport = new SSEServerTransport('/messages', res); - transports[transport.sessionId] = transport; - res.on("close", () => { - delete transports[transport.sessionId]; - }); - await server.connect(transport); + // Connect to the MCP server + await server.connect(transport); + } else { + // Invalid request + res.status(400).json({ + jsonrpc: '2.0', + error: { + code: -32000, + message: 'Bad Request: No valid session ID provided', + }, + id: null, + }); + return; + } + + // Handle the request + await transport.handleRequest(req, res, req.body); }); -app.post("/messages", async (req: Request, res: Response) => { - const sessionId = req.query.sessionId as string; +// Handle GET requests for server-to-client notifications via SSE +app.get('/mcp', async (req, res) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + if (!sessionId || !transports[sessionId]) { + res.status(400).send('Invalid or missing session ID'); + return; + } + + // Support resumability with Last-Event-ID header const transport = transports[sessionId]; - if (transport) { - await transport.handlePostMessage(req, res); - } else { - res.status(400).send('No transport found for sessionId'); + await transport.handleRequest(req, res); +}); + +// Handle DELETE requests for session termination +app.delete('/mcp', async (req, res) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + if (!sessionId || !transports[sessionId]) { + res.status(400).send('Invalid or missing session ID'); + return; } + + const transport = transports[sessionId]; + await transport.handleRequest(req, res); }); -app.listen(3001); +app.listen(3000); ``` +#### Without Session Management (Stateless) + +For simpler use cases where session management isn't needed: + +```typescript +import express from "express"; +import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; +import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; + +const server = new McpServer({ + name: "stateless-server", + version: "1.0.0" +}); + +// ... set up server resources, tools, and prompts ... + +const app = express(); +app.use(express.json()); + +// Handle all MCP requests (GET, POST, DELETE) at a single endpoint +app.all('/mcp', async (req, res) => { + // Create a transport with sessionIdGenerator set to return undefined + // This disables session tracking completely + const transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => undefined, + req, + res + }); + + // Connect to server and handle the request + await server.connect(transport); + await transport.handleRequest(req, res); +}); + +app.listen(3000); +``` + +This stateless approach is useful for: +- Simple API wrappers +- RESTful scenarios where each request is independent +- Horizontally scaled deployments without shared session state + ### Testing and Debugging To test your server, you can use the [MCP Inspector](https://github.com/modelcontextprotocol/inspector). See its README for more information. @@ -596,6 +696,101 @@ This setup allows you to: - Provide custom documentation URLs - Maintain control over the OAuth flow while delegating to an external provider +### Backwards Compatibility + +The SDK provides support for backwards compatibility between different protocol versions: + +#### Client-Side Compatibility + +For clients that need to work with both Streamable HTTP and older SSE servers: + +```typescript +import { Client } from "@modelcontextprotocol/sdk/client/index.js"; +import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js"; +import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js"; + +// First try connecting with Streamable HTTP transport +try { + const transport = new StreamableHTTPClientTransport( + new URL("http://localhost:3000/mcp") + ); + await client.connect(transport); + console.log("Connected using Streamable HTTP transport"); +} catch (error) { + // If that fails with a 4xx error, try the older SSE transport + console.log("Streamable HTTP connection failed, falling back to SSE transport"); + const sseTransport = new SSEClientTransport({ + sseUrl: new URL("http://localhost:3000/sse"), + postUrl: new URL("http://localhost:3000/messages") + }); + await client.connect(sseTransport); + console.log("Connected using SSE transport"); +} +``` + +#### Server-Side Compatibility + +For servers that need to support both Streamable HTTP and older clients: + +```typescript +import express from "express"; +import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; +import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; +import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; +import { InMemoryEventStore } from "@modelcontextprotocol/sdk/inMemory.js"; + +const server = new McpServer({ + name: "backwards-compatible-server", + version: "1.0.0" +}); + +// ... set up server resources, tools, and prompts ... + +const app = express(); +app.use(express.json()); + +// Store transports for each session type +const transports = { + streamable: {} as Record, + sse: {} as Record +}; + +// Modern Streamable HTTP endpoint +app.all('/mcp', async (req, res) => { + // Handle Streamable HTTP transport for modern clients + // Implementation as shown in the "With Session Management" example + // ... +}); + +// Legacy SSE endpoint for older clients +app.get('/sse', async (req, res) => { + // Create SSE transport for legacy clients + const transport = new SSEServerTransport('/messages', res); + transports.sse[transport.sessionId] = transport; + + res.on("close", () => { + delete transports.sse[transport.sessionId]; + }); + + await server.connect(transport); +}); + +// Legacy message endpoint for older clients +app.post('/messages', async (req, res) => { + const sessionId = req.query.sessionId as string; + const transport = transports.sse[sessionId]; + if (transport) { + await transport.handlePostMessage(req, res); + } else { + res.status(400).send('No transport found for sessionId'); + } +}); + +app.listen(3000); +``` + +**Note**: The SSE transport is now deprecated in favor of Streamable HTTP. New implementations should use Streamable HTTP, and existing SSE implementations should plan to migrate. + ## Documentation - [Model Context Protocol documentation](https://modelcontextprotocol.io) From 1781d31efc0d595f5ae709aa1ffcc5b9151debcd Mon Sep 17 00:00:00 2001 From: ihrpr Date: Thu, 17 Apr 2025 18:08:58 +0100 Subject: [PATCH 5/6] readme suggested changes --- README.md | 56 +++++++++++++++++++++++++++---------------------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index ab16e100..07d9eb30 100644 --- a/README.md +++ b/README.md @@ -214,6 +214,8 @@ For remote servers, set up a Streamable HTTP transport that handles both client #### With Session Management +In some cases, servers need to be stateful. This is achieved by [session management](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#session-management). + ```typescript import express from "express"; import { randomUUID } from "node:crypto"; @@ -221,12 +223,6 @@ import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; import { InMemoryEventStore } from "@modelcontextprotocol/sdk/inMemory.js"; -const server = new McpServer({ - name: "example-server", - version: "1.0.0" -}); - -// ... set up server resources, tools, and prompts ... const app = express(); app.use(express.json()); @@ -261,6 +257,12 @@ app.post('/mcp', async (req, res) => { delete transports[transport.sessionId]; } }; + const server = new McpServer({ + name: "example-server", + version: "1.0.0" + }); + + // ... set up server resources, tools, and prompts ... // Connect to the MCP server await server.connect(transport); @@ -281,30 +283,23 @@ app.post('/mcp', async (req, res) => { await transport.handleRequest(req, res, req.body); }); -// Handle GET requests for server-to-client notifications via SSE -app.get('/mcp', async (req, res) => { +// Reusable handler for GET and DELETE requests +const handleSessionRequest = async (req: express.Request, res: express.Response) => { const sessionId = req.headers['mcp-session-id'] as string | undefined; if (!sessionId || !transports[sessionId]) { res.status(400).send('Invalid or missing session ID'); return; } - - // Support resumability with Last-Event-ID header + const transport = transports[sessionId]; await transport.handleRequest(req, res); -}); +}; + +// Handle GET requests for server-to-client notifications via SSE +app.get('/mcp', handleSessionRequest); // Handle DELETE requests for session termination -app.delete('/mcp', async (req, res) => { - const sessionId = req.headers['mcp-session-id'] as string | undefined; - if (!sessionId || !transports[sessionId]) { - res.status(400).send('Invalid or missing session ID'); - return; - } - - const transport = transports[sessionId]; - await transport.handleRequest(req, res); -}); +app.delete('/mcp', handleSessionRequest); app.listen(3000); ``` @@ -698,7 +693,7 @@ This setup allows you to: ### Backwards Compatibility -The SDK provides support for backwards compatibility between different protocol versions: +Clients and servers with StreamableHttp tranport can maintain [backwards compatibility](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#backwards-compatibility) with the deprecated HTTP+SSE transport (from protocol version 2024-11-05) as follows #### Client-Side Compatibility @@ -708,21 +703,26 @@ For clients that need to work with both Streamable HTTP and older SSE servers: import { Client } from "@modelcontextprotocol/sdk/client/index.js"; import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js"; import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js"; - -// First try connecting with Streamable HTTP transport +let client: Client|undefined = undefined +const baseUrl = new URL(url); try { + client = new Client({ + name: 'streamable-http-client', + version: '1.0.0' + }); const transport = new StreamableHTTPClientTransport( - new URL("http://localhost:3000/mcp") + new URL(baseUrl) ); await client.connect(transport); console.log("Connected using Streamable HTTP transport"); } catch (error) { // If that fails with a 4xx error, try the older SSE transport console.log("Streamable HTTP connection failed, falling back to SSE transport"); - const sseTransport = new SSEClientTransport({ - sseUrl: new URL("http://localhost:3000/sse"), - postUrl: new URL("http://localhost:3000/messages") + client = new Client({ + name: 'sse-client', + version: '1.0.0' }); + const sseTransport = new SSEClientTransport(baseUrl); await client.connect(sseTransport); console.log("Connected using SSE transport"); } From f10d1ff93f6ea9daba663c44478db64088389df9 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Thu, 17 Apr 2025 18:22:11 +0100 Subject: [PATCH 6/6] change sessionIdGenerator to be either undefined or generate a string --- README.md | 5 ++--- src/examples/README.md | 2 +- .../stateManagementStreamableHttp.test.ts | 2 +- src/server/streamableHttp.test.ts | 13 +++++++------ src/server/streamableHttp.ts | 6 +++--- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 07d9eb30..ed97b83a 100644 --- a/README.md +++ b/README.md @@ -325,10 +325,9 @@ app.use(express.json()); // Handle all MCP requests (GET, POST, DELETE) at a single endpoint app.all('/mcp', async (req, res) => { - // Create a transport with sessionIdGenerator set to return undefined - // This disables session tracking completely + // Disable session tracking by setting sessionIdGenerator to undefined const transport = new StreamableHTTPServerTransport({ - sessionIdGenerator: () => undefined, + sessionIdGenerator: undefined, req, res }); diff --git a/src/examples/README.md b/src/examples/README.md index 2083f4ed..611c081e 100644 --- a/src/examples/README.md +++ b/src/examples/README.md @@ -134,7 +134,7 @@ The Streamable HTTP transport can be configured to operate without tracking sess To enable stateless mode, configure the `StreamableHTTPServerTransport` with: ```typescript -sessionIdGenerator: () => undefined +sessionIdGenerator: undefined ``` This disables session management entirely, and the server won't generate or expect session IDs. diff --git a/src/integration-tests/stateManagementStreamableHttp.test.ts b/src/integration-tests/stateManagementStreamableHttp.test.ts index 1e80b7b8..6d553727 100644 --- a/src/integration-tests/stateManagementStreamableHttp.test.ts +++ b/src/integration-tests/stateManagementStreamableHttp.test.ts @@ -68,7 +68,7 @@ describe('Streamable HTTP Transport Session Management', () => { const serverTransport = new StreamableHTTPServerTransport({ sessionIdGenerator: withSessionManagement ? () => randomUUID() // With session management, generate UUID - : () => undefined // Without session management, return undefined + : undefined // Without session management, return undefined }); await mcpServer.connect(serverTransport); diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index 0794e4bb..a0f2e0bb 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -10,7 +10,7 @@ import { z } from "zod"; * Test server configuration for StreamableHTTPServerTransport tests */ interface TestServerConfig { - sessionIdGenerator?: () => string | undefined; + sessionIdGenerator: (() => string) | undefined; enableJsonResponse?: boolean; customRequestHandler?: (req: IncomingMessage, res: ServerResponse, parsedBody?: unknown) => Promise; eventStore?: EventStore; @@ -19,7 +19,7 @@ interface TestServerConfig { /** * Helper to create and start test HTTP server with MCP setup */ -async function createTestServer(config: TestServerConfig = {}): Promise<{ +async function createTestServer(config: TestServerConfig = { sessionIdGenerator: (() => randomUUID()) }): Promise<{ server: Server; transport: StreamableHTTPServerTransport; mcpServer: McpServer; @@ -40,7 +40,7 @@ async function createTestServer(config: TestServerConfig = {}): Promise<{ ); const transport = new StreamableHTTPServerTransport({ - sessionIdGenerator: config.sessionIdGenerator ?? (() => randomUUID()), + sessionIdGenerator: config.sessionIdGenerator, enableJsonResponse: config.enableJsonResponse ?? false, eventStore: config.eventStore }); @@ -681,7 +681,7 @@ describe("StreamableHTTPServerTransport with JSON Response Mode", () => { let sessionId: string; beforeEach(async () => { - const result = await createTestServer({ enableJsonResponse: true }); + const result = await createTestServer({ sessionIdGenerator: (() => randomUUID()), enableJsonResponse: true }); server = result.server; transport = result.transport; baseUrl = result.baseUrl; @@ -784,7 +784,8 @@ describe("StreamableHTTPServerTransport with pre-parsed body", () => { console.error("Error handling request:", error); if (!res.headersSent) res.writeHead(500).end(); } - } + }, + sessionIdGenerator: (() => randomUUID()) }); server = result.server; @@ -1063,7 +1064,7 @@ describe("StreamableHTTPServerTransport in stateless mode", () => { let baseUrl: URL; beforeEach(async () => { - const result = await createTestServer({ sessionIdGenerator: () => undefined }); + const result = await createTestServer({ sessionIdGenerator: undefined }); server = result.server; transport = result.transport; baseUrl = result.baseUrl; diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index 82fcf321..d9205ac1 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -37,7 +37,7 @@ export interface StreamableHTTPServerTransportOptions { * * Return undefined to disable session management. */ - sessionIdGenerator: () => string | undefined; + sessionIdGenerator: (() => string) | undefined; /** * A callback for session initialization events @@ -98,7 +98,7 @@ export interface StreamableHTTPServerTransportOptions { */ export class StreamableHTTPServerTransport implements Transport { // when sessionId is not set (undefined), it means the transport is in stateless mode - private sessionIdGenerator: () => string | undefined; + private sessionIdGenerator: (() => string) | undefined; private _started: boolean = false; private _streamMapping: Map = new Map(); private _requestToStreamMapping: Map = new Map(); @@ -365,7 +365,7 @@ export class StreamableHTTPServerTransport implements Transport { })); return; } - this.sessionId = this.sessionIdGenerator(); + this.sessionId = this.sessionIdGenerator?.(); this._initialized = true; // If we have a session ID and an onsessioninitialized handler, call it immediately