diff --git a/src/examples/server/simpleStreamableHttp.ts b/src/examples/server/simpleStreamableHttp.ts index 35c04fe5..2b25f4b2 100644 --- a/src/examples/server/simpleStreamableHttp.ts +++ b/src/examples/server/simpleStreamableHttp.ts @@ -12,6 +12,21 @@ const server = new McpServer({ version: '1.0.0', }, { capabilities: { logging: {} } }); +// Log the capability invocation details +server.onCapabilityChange((event) => { + switch (event.action) { + case 'invoked': + console.log(`${event.capabilityType} invocation ${event.invocationIndex}: '${event.capabilityName}' started`); + break; + case 'completed': + console.log(`${event.capabilityType} invocation ${event.invocationIndex}: '${event.capabilityName}' completed in ${event.durationMs}ms`); + break; + case 'error': + console.log(`${event.capabilityType} invocation ${event.invocationIndex}: '${event.capabilityName}' failed in ${event.durationMs}ms: ${event.error}`); + break; + } +}); + // Register a simple tool that returns a greeting server.tool( 'greet', @@ -291,4 +306,4 @@ process.on('SIGINT', async () => { await server.close(); console.log('Server shutdown complete'); process.exit(0); -}); \ No newline at end of file +}); diff --git a/src/server/index.ts b/src/server/index.ts index 3901099e..befaade2 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -104,6 +104,13 @@ export class Server< ); } + /** + * The server's name and version. + */ + getVersion(): { readonly name: string; readonly version: string } { + return this._serverInfo; + } + /** * Registers new capabilities. This can only be called before connecting to a transport. * diff --git a/src/server/mcp.test.ts b/src/server/mcp.test.ts index eaac5c71..4e6270cc 100644 --- a/src/server/mcp.test.ts +++ b/src/server/mcp.test.ts @@ -14,7 +14,7 @@ import { LoggingMessageNotificationSchema, Notification, } from "../types.js"; -import { ResourceTemplate } from "./mcp.js"; +import { ResourceTemplate, CapabilityEvent } from "./mcp.js"; import { completable } from "./completable.js"; import { UriTemplate } from "../shared/uriTemplate.js"; @@ -26,6 +26,10 @@ describe("McpServer", () => { }); expect(mcpServer.server).toBeDefined(); + expect(mcpServer.server.getVersion()).toEqual({ + name: "test server", + version: "1.0", + }); }); test("should allow sending notifications via Server", async () => { @@ -862,6 +866,95 @@ describe("tool()", () => { ), ).rejects.toThrow(/Tool nonexistent-tool not found/); }); + + test("should include duration in completed and error events", async () => { + const mcpServer = new McpServer({ + name: "test server", + version: "1.0", + }); + + const client = new Client({ + name: "test client", + version: "1.0", + }); + + const events: Array = []; + const subscription = mcpServer.onCapabilityChange((event) => { + events.push(event); + }); + + // Register tools with different behaviors + mcpServer.tool("success-tool", async () => { + // Simulate some work + await new Promise(resolve => setTimeout(resolve, 10)); + return { content: [{ type: "text", text: "Success" }] }; + }); + + mcpServer.tool("error-tool", async () => { + // Simulate some work + await new Promise(resolve => setTimeout(resolve, 10)); + throw new Error("Simulated error"); + }); + + const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair(); + + await Promise.all([ + client.connect(clientTransport), + mcpServer.connect(serverTransport), + ]); + + // Clear events from registration + events.length = 0; + + // Call the success tool + await client.request( + { + method: "tools/call", + params: { + name: "success-tool", + }, + }, + CallToolResultSchema, + ); + + // Find the completed event + const completedEvent = events.find(e => + e.capabilityType === "tool" && + e.capabilityName === "success-tool" && + e.action === "completed" + ) as Extract; + + expect(completedEvent).toBeDefined(); + expect(completedEvent.durationMs).toBeDefined(); + expect(completedEvent.durationMs).toBeGreaterThan(0); + + // Clear events + events.length = 0; + + // Call the error tool + await client.request( + { + method: "tools/call", + params: { + name: "error-tool", + }, + }, + CallToolResultSchema, + ); + + // Find the error event + const errorEvent = events.find(e => + e.capabilityType === "tool" && + e.capabilityName === "error-tool" && + e.action === "error" + ) as Extract; + + expect(errorEvent).toBeDefined(); + expect(errorEvent.durationMs).toBeDefined(); + expect(errorEvent.durationMs).toBeGreaterThan(0); + + subscription.close(); + }); }); describe("resource()", () => { @@ -2512,3 +2605,322 @@ describe("prompt()", () => { expect(result.completion.total).toBe(1); }); }); + +describe("CapabilityEvents", () => { + test("should emit capability events when registering and interacting with tools", async () => { + const mcpServer = new McpServer({ + name: "test server", + version: "1.0", + }); + + const events: Array = []; + const subscription = mcpServer.onCapabilityChange((event) => { + events.push(event); + }); + + // Register a tool (should trigger "added" event) + const tool = mcpServer.tool("test-tool", async () => ({ + content: [{ type: "text", text: "Test response" }], + })); + + expect(events.length).toBeGreaterThan(0); + expect(events[0]).toMatchObject({ + serverInfo: { name: "test server", version: "1.0" }, + capabilityType: "tool", + capabilityName: "test-tool", + action: "added", + }); + + // Update the tool (should trigger "updated" event) + tool.update({ + description: "Updated description", + }); + + expect(events.some(e => + e.capabilityType === "tool" && + e.capabilityName === "test-tool" && + e.action === "updated" + )).toBe(true); + + // Disable the tool (should trigger "disabled" event) + tool.disable(); + + expect(events.some(e => + e.capabilityType === "tool" && + e.capabilityName === "test-tool" && + e.action === "disabled" + )).toBe(true); + + // Enable the tool (should trigger "enabled" event) + tool.enable(); + + expect(events.some(e => + e.capabilityType === "tool" && + e.capabilityName === "test-tool" && + e.action === "enabled" + )).toBe(true); + + // Clean up + subscription.close(); + }); + + test("should emit capability events when tools are invoked", async () => { + const mcpServer = new McpServer({ + name: "test server", + version: "1.0", + }); + + const client = new Client({ + name: "test client", + version: "1.0", + }); + + const events: Array = []; + const subscription = mcpServer.onCapabilityChange((event) => { + events.push(event); + }); + + mcpServer.tool("test-tool", async () => ({ + content: [{ type: "text", text: "Test response" }], + })); + + // Clear events from registration + events.length = 0; + + const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair(); + + await Promise.all([ + client.connect(clientTransport), + mcpServer.connect(serverTransport), + ]); + + await client.request( + { + method: "tools/call", + params: { + name: "test-tool", + }, + }, + CallToolResultSchema, + ); + + // Should have "invoked" and "completed" events + expect(events.some(e => + e.capabilityType === "tool" && + e.capabilityName === "test-tool" && + e.action === "invoked" + )).toBe(true); + + expect(events.some(e => + e.capabilityType === "tool" && + e.capabilityName === "test-tool" && + e.action === "completed" + )).toBe(true); + + // The invoked and completed events should have the same invocationIndex + const invokedEvent = events.find(e => e.action === "invoked"); + const completedEvent = events.find(e => e.action === "completed"); + expect(invokedEvent?.invocationIndex).toBe(completedEvent?.invocationIndex); + + subscription.close(); + }); + + test("should emit capability events for resources", async () => { + const mcpServer = new McpServer({ + name: "test server", + version: "1.0", + }); + + const client = new Client({ + name: "test client", + version: "1.0", + }); + + const events: Array = []; + const subscription = mcpServer.onCapabilityChange((event) => { + events.push(event); + }); + + // Register a resource + const resource = mcpServer.resource("test-resource", "test://resource", async () => ({ + contents: [{ uri: "test://resource", text: "Test content" }], + })); + + expect(events.some(e => + e.capabilityType === "resource" && + e.capabilityName === "test-resource" && + e.action === "added" + )).toBe(true); + + // Clear events from registration + events.length = 0; + + const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair(); + + await Promise.all([ + client.connect(clientTransport), + mcpServer.connect(serverTransport), + ]); + + // Read the resource + await client.request( + { + method: "resources/read", + params: { + uri: "test://resource", + }, + }, + ReadResourceResultSchema, + ); + + // Should have "invoked" event + expect(events.some(e => + e.capabilityType === "resource" && + e.capabilityName === "test://resource" && + e.action === "invoked" + )).toBe(true); + + // Clear events + events.length = 0; + + // Remove the resource + resource.remove(); + + // Should have "removed" event + expect(events.some(e => + e.capabilityType === "resource" && + e.capabilityName === "test-resource" && + e.action === "removed" + )).toBe(true); + + subscription.close(); + }); + + test("should emit capability events for prompts", async () => { + const mcpServer = new McpServer({ + name: "test server", + version: "1.0", + }); + + const client = new Client({ + name: "test client", + version: "1.0", + }); + + const events: Array = []; + const subscription = mcpServer.onCapabilityChange((event) => { + events.push(event); + }); + + // Register a prompt + mcpServer.prompt("test-prompt", async () => ({ + messages: [ + { + role: "assistant", + content: { + type: "text", + text: "Test response", + }, + }, + ], + })); + + expect(events.some(e => + e.capabilityType === "prompt" && + e.capabilityName === "test-prompt" && + e.action === "added" + )).toBe(true); + + // Clear events from registration + events.length = 0; + + const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair(); + + await Promise.all([ + client.connect(clientTransport), + mcpServer.connect(serverTransport), + ]); + + // Get the prompt + await client.request( + { + method: "prompts/get", + params: { + name: "test-prompt", + }, + }, + GetPromptResultSchema, + ); + + // Should have "invoked" and "completed" events + expect(events.some(e => + e.capabilityType === "prompt" && + e.capabilityName === "test-prompt" && + e.action === "invoked" + )).toBe(true); + + expect(events.some(e => + e.capabilityType === "prompt" && + e.capabilityName === "test-prompt" && + e.action === "completed" + )).toBe(true); + + subscription.close(); + }); + + test("should emit error events when tool execution fails", async () => { + const mcpServer = new McpServer({ + name: "test server", + version: "1.0", + }); + + const client = new Client({ + name: "test client", + version: "1.0", + }); + + const events: Array = []; + const subscription = mcpServer.onCapabilityChange((event) => { + // Only capture error events + if (event.action === "error") { + events.push(event); + } + }); + + mcpServer.tool("error-tool", async () => { + throw new Error("Simulated error"); + }); + + const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair(); + + await Promise.all([ + client.connect(clientTransport), + mcpServer.connect(serverTransport), + ]); + + // Call the tool that throws an error + await client.request( + { + method: "tools/call", + params: { + name: "error-tool", + }, + }, + CallToolResultSchema, + ); + + // Should have error event + expect(events.length).toBe(1); + expect(events[0]).toMatchObject({ + capabilityType: "tool", + capabilityName: "error-tool", + action: "error", + }); + + // Type assert to the specific variant of CapabilityEvent that has the error property + const errorEvent = events[0] as Extract; + expect(errorEvent.error).toBeDefined(); + + subscription.close(); + }); +}); diff --git a/src/server/mcp.ts b/src/server/mcp.ts index 652f9774..b713d0a2 100644 --- a/src/server/mcp.ts +++ b/src/server/mcp.ts @@ -44,6 +44,7 @@ import { Completable, CompletableDef } from "./completable.js"; import { UriTemplate, Variables } from "../shared/uriTemplate.js"; import { RequestHandlerExtra } from "../shared/protocol.js"; import { Transport } from "../shared/transport.js"; +import { createEventNotifier } from "../shared/eventNotifier.js"; /** * High-level MCP server that provides a simpler API for working with resources, tools, and prompts. @@ -63,6 +64,14 @@ export class McpServer { private _registeredTools: { [name: string]: RegisteredTool } = {}; private _registeredPrompts: { [name: string]: RegisteredPrompt } = {}; + private _onCapabilityChange = createEventNotifier(); + /** Counter for unique resource invocation indexes, used to correlate resource invocation events */ + private _resourceInvocationIndex = 0; + /** Counter for unique tool invocation indexes, used to correlate tool invocation events */ + private _toolInvocationIndex = 0; + /** Counter for unique prompt invocation indexes, used to correlate prompt invocation events */ + private _promptInvocationIndex = 0; + constructor(serverInfo: Implementation, options?: ServerOptions) { this.server = new Server(serverInfo, options); } @@ -80,16 +89,40 @@ export class McpServer { * Closes the connection. */ async close(): Promise { + this._onCapabilityChange.close(); await this.server.close(); } + /** + * Event notifier for capability changes. Listeners will be notified when capabilities are added, updated, removed, + * enabled, disabled, invoked, completed, or when errors occur. + * + * This provides a way to monitor and respond to all capability-related activities in the server, + * including both lifecycle changes and invocation events. Each capability type (resource, tool, prompt) + * maintains its own sequence of invocation indexes, which can be used to correlate invocations + * with their completions or errors. + * + * @example + * const subscription = server.onCapabilityChange((event) => { + * if (event.action === "invoked") { + * console.log(`${event.capabilityType} ${event.capabilityName} invoked with index ${event.invocationIndex}`); + * } else if (event.action === "completed" || event.action === "error") { + * console.log(`${event.capabilityType} operation completed in ${event.durationMs}ms`); + * } + * }); + * + * // Later, to stop listening: + * subscription.close(); + */ + public readonly onCapabilityChange = this._onCapabilityChange.onEvent; + private _toolHandlersInitialized = false; private setToolRequestHandlers() { if (this._toolHandlersInitialized) { return; } - + this.server.assertCanSetRequestHandler( ListToolsRequestSchema.shape.method.value, ); @@ -135,11 +168,35 @@ export class McpServer { ); } + const invocationIndex = this._toolInvocationIndex++; + const startTime = performance.now(); + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "tool", + capabilityName: request.params.name, + action: "invoked", + invocationIndex, + arguments: request.params.arguments, + })); + if (!tool.enabled) { - throw new McpError( + const error = new McpError( ErrorCode.InvalidParams, `Tool ${request.params.name} disabled`, ); + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "tool", + capabilityName: request.params.name, + action: "error", + invocationIndex, + error, + durationMs: performance.now() - startTime, + })); + + throw error; } if (tool.inputSchema) { @@ -147,17 +204,51 @@ export class McpServer { request.params.arguments, ); if (!parseResult.success) { - throw new McpError( + const error = new McpError( ErrorCode.InvalidParams, `Invalid arguments for tool ${request.params.name}: ${parseResult.error.message}`, ); + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "tool", + capabilityName: request.params.name, + action: "error", + invocationIndex, + error, + durationMs: performance.now() - startTime, + })); + + throw error; } const args = parseResult.data; const cb = tool.callback as ToolCallback; try { - return await Promise.resolve(cb(args, extra)); + return await Promise.resolve(cb(args, extra)).then((result) => { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "tool", + capabilityName: request.params.name, + action: "completed", + invocationIndex, + result, + durationMs: performance.now() - startTime, + })); + + return result; + }); } catch (error) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "tool", + capabilityName: request.params.name, + action: "error", + invocationIndex, + error, + durationMs: performance.now() - startTime, + })); + return { content: [ { @@ -171,8 +262,30 @@ export class McpServer { } else { const cb = tool.callback as ToolCallback; try { - return await Promise.resolve(cb(extra)); + const result = await Promise.resolve(cb(extra)); + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "tool", + capabilityName: request.params.name, + action: "completed", + invocationIndex, + result, + durationMs: performance.now() - startTime, + })); + + return result; } catch (error) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "tool", + capabilityName: request.params.name, + action: "error", + invocationIndex, + error, + durationMs: performance.now() - startTime, + })); + return { content: [ { @@ -235,24 +348,79 @@ export class McpServer { ); } + const invocationIndex = this._promptInvocationIndex++; + const startTime = performance.now(); + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "prompt", + capabilityName: ref.name, + action: "invoked", + invocationIndex, + arguments: request.params.argument.name, + })); + if (!prompt.enabled) { - throw new McpError( + const error = new McpError( ErrorCode.InvalidParams, `Prompt ${ref.name} disabled`, ); + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "prompt", + capabilityName: ref.name, + action: "error", + invocationIndex, + error, + durationMs: performance.now() - startTime, + })); + + throw error; } if (!prompt.argsSchema) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "prompt", + capabilityName: ref.name, + action: "completed", + invocationIndex, + result: EMPTY_COMPLETION_RESULT, + durationMs: performance.now() - startTime, + })); + return EMPTY_COMPLETION_RESULT; } const field = prompt.argsSchema.shape[request.params.argument.name]; if (!(field instanceof Completable)) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "prompt", + capabilityName: ref.name, + action: "completed", + invocationIndex, + result: EMPTY_COMPLETION_RESULT, + durationMs: performance.now() - startTime, + })); + return EMPTY_COMPLETION_RESULT; } const def: CompletableDef = field._def; const suggestions = await def.complete(request.params.argument.value); + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "prompt", + capabilityName: ref.name, + action: "completed", + invocationIndex, + result: suggestions, + durationMs: performance.now() - startTime, + })); + return createCompletionResult(suggestions); } @@ -264,8 +432,21 @@ export class McpServer { (t) => t.resourceTemplate.uriTemplate.toString() === ref.uri, ); + const invocationIndex = this._resourceInvocationIndex++; + const startTime = performance.now(); + if (!template) { if (this._registeredResources[ref.uri]) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: ref.uri, + action: "completed", + invocationIndex, + result: EMPTY_COMPLETION_RESULT, + durationMs: performance.now() - startTime, + })); + // Attempting to autocomplete a fixed resource URI is not an error in the spec (but probably should be). return EMPTY_COMPLETION_RESULT; } @@ -280,11 +461,46 @@ export class McpServer { request.params.argument.name, ); if (!completer) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: ref.uri, + action: "completed", + invocationIndex, + result: EMPTY_COMPLETION_RESULT, + durationMs: performance.now() - startTime, + })); + return EMPTY_COMPLETION_RESULT; } - const suggestions = await completer(request.params.argument.value); - return createCompletionResult(suggestions); + try { + const suggestions = await completer(request.params.argument.value); + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: ref.uri, + action: "completed", + invocationIndex, + result: suggestions, + durationMs: performance.now() - startTime, + })); + + return createCompletionResult(suggestions); + } catch (error) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: ref.uri, + action: "error", + invocationIndex, + error, + durationMs: performance.now() - startTime, + })); + + throw error; + } } private _resourceHandlersInitialized = false; @@ -367,13 +583,62 @@ export class McpServer { // First check for exact resource match const resource = this._registeredResources[uri.toString()]; if (resource) { + const invocationIndex = this._resourceInvocationIndex++; + const startTime = performance.now(); + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: uri.toString(), + action: "invoked", + invocationIndex, + })); + if (!resource.enabled) { - throw new McpError( + const error = new McpError( ErrorCode.InvalidParams, `Resource ${uri} disabled`, ); + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: uri.toString(), + action: "error", + invocationIndex, + error, + durationMs: performance.now() - startTime, + })); + + throw error; + } + try { + const result = await resource.readCallback(uri, extra); + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: uri.toString(), + action: "completed", + invocationIndex, + result, + durationMs: performance.now() - startTime, + })); + + return result; + } catch (error) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: uri.toString(), + action: "error", + invocationIndex, + error, + durationMs: performance.now() - startTime, + })); + + throw error; } - return resource.readCallback(uri, extra); } // Then check templates @@ -384,7 +649,44 @@ export class McpServer { uri.toString(), ); if (variables) { - return template.readCallback(uri, variables, extra); + const invocationIndex = this._resourceInvocationIndex++; + const startTime = performance.now(); + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: uri.toString(), + action: "invoked", + invocationIndex, + })); + + try { + const result = await template.readCallback(uri, variables, extra); + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: uri.toString(), + action: "completed", + invocationIndex, + result, + durationMs: performance.now() - startTime, + })); + + return result; + } catch (error) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: uri.toString(), + action: "error", + invocationIndex, + error, + durationMs: performance.now() - startTime, + })); + + throw error; + } } } @@ -396,7 +698,7 @@ export class McpServer { ); this.setCompletionRequestHandler(); - + this._resourceHandlersInitialized = true; } @@ -450,11 +752,35 @@ export class McpServer { ); } + const invocationIndex = this._promptInvocationIndex++; + const startTime = performance.now(); + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "prompt", + capabilityName: request.params.name, + action: "invoked", + invocationIndex, + arguments: request.params.arguments, + })); + if (!prompt.enabled) { - throw new McpError( + const error = new McpError( ErrorCode.InvalidParams, `Prompt ${request.params.name} disabled`, ); + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "prompt", + capabilityName: request.params.name, + action: "error", + invocationIndex, + error, + durationMs: performance.now() - startTime, + })); + + throw error; } if (prompt.argsSchema) { @@ -462,24 +788,90 @@ export class McpServer { request.params.arguments, ); if (!parseResult.success) { - throw new McpError( + const error = new McpError( ErrorCode.InvalidParams, `Invalid arguments for prompt ${request.params.name}: ${parseResult.error.message}`, ); + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "prompt", + capabilityName: request.params.name, + action: "error", + invocationIndex, + error, + durationMs: performance.now() - startTime, + })); + + throw error; } const args = parseResult.data; const cb = prompt.callback as PromptCallback; - return await Promise.resolve(cb(args, extra)); + + try { + const result = await Promise.resolve(cb(args, extra)); + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "prompt", + capabilityName: request.params.name, + action: "completed", + invocationIndex, + result, + durationMs: performance.now() - startTime, + })); + + return result; + } catch (error) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "prompt", + capabilityName: request.params.name, + action: "error", + invocationIndex, + error, + durationMs: performance.now() - startTime, + })); + + throw error; + } } else { const cb = prompt.callback as PromptCallback; - return await Promise.resolve(cb(extra)); + + try { + const result = await Promise.resolve(cb(extra)); + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "prompt", + capabilityName: request.params.name, + action: "completed", + invocationIndex, + result, + durationMs: performance.now() - startTime, + })); + + return result; + } catch (error) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "prompt", + capabilityName: request.params.name, + action: "error", + invocationIndex, + error, + durationMs: performance.now() - startTime, + })); + + throw error; + } } - }, + } ); this.setCompletionRequestHandler(); - + this._promptHandlersInitialized = true; } @@ -541,23 +933,128 @@ export class McpServer { metadata, readCallback: readCallback as ReadResourceCallback, enabled: true, - disable: () => registeredResource.update({ enabled: false }), - enable: () => registeredResource.update({ enabled: true }), - remove: () => registeredResource.update({ uri: null }), + disable: () => { + if (!registeredResource.enabled) return; + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: name, + action: "disabled", + })); + + registeredResource.update({ enabled: false }); + }, + enable: () => { + if (registeredResource.enabled) return; + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: name, + action: "enabled", + })); + + registeredResource.update({ enabled: true }); + }, + remove: () => { + if (uriOrTemplate === null) return; + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: name, + action: "removed", + })); + + registeredResource.update({ uri: null }); + }, update: (updates) => { - if (typeof updates.uri !== "undefined" && updates.uri !== uriOrTemplate) { - delete this._registeredResources[uriOrTemplate] - if (updates.uri) this._registeredResources[updates.uri] = registeredResource + let added = false; + let removed = false; + let updated = false; + let enabled = false; + + if ( + typeof updates.uri !== "undefined" && + updates.uri !== uriOrTemplate + ) { + removed = true; + delete this._registeredResources[uriOrTemplate]; + + if (updates.uri) { + added = true; + this._registeredResources[updates.uri] = registeredResource; + } + } + + if (typeof updates.name !== "undefined" && updates.name !== name) { + updated = true; + registeredResource.name = updates.name; + } + + if (typeof updates.metadata !== "undefined" && updates.metadata !== metadata) { + updated = true; + registeredResource.metadata = updates.metadata; } - if (typeof updates.name !== "undefined") registeredResource.name = updates.name - if (typeof updates.metadata !== "undefined") registeredResource.metadata = updates.metadata - if (typeof updates.callback !== "undefined") registeredResource.readCallback = updates.callback - if (typeof updates.enabled !== "undefined") registeredResource.enabled = updates.enabled - this.sendResourceListChanged() + + if (typeof updates.callback !== "undefined" && updates.callback !== registeredResource.readCallback) { + updated = true; + registeredResource.readCallback = updates.callback; + } + + if (typeof updates.enabled !== "undefined" && updates.enabled !== registeredResource.enabled) { + enabled = true; + registeredResource.enabled = updates.enabled; + } + + if (removed) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: name, + action: "removed", + })); + } + + if (added) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: registeredResource.name, + action: "added", + })); + } + + if (updated) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: registeredResource.name, + action: "updated", + })); + } + + if (enabled) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: registeredResource.name, + action: registeredResource.enabled ? "enabled" : "disabled", + })); + } + this.sendResourceListChanged(); }, }; this._registeredResources[uriOrTemplate] = registeredResource; + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: name, + action: "added", + })); + this.setResourceRequestHandlers(); this.sendResourceListChanged(); return registeredResource; @@ -571,23 +1068,136 @@ export class McpServer { metadata, readCallback: readCallback as ReadResourceTemplateCallback, enabled: true, - disable: () => registeredResourceTemplate.update({ enabled: false }), - enable: () => registeredResourceTemplate.update({ enabled: true }), - remove: () => registeredResourceTemplate.update({ name: null }), + disable: () => { + if (!registeredResourceTemplate.enabled) return; + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: name, + action: "disabled", + })); + + registeredResourceTemplate.update({ enabled: false }); + }, + enable: () => { + if (registeredResourceTemplate.enabled) return; + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: name, + action: "enabled", + })); + + registeredResourceTemplate.update({ enabled: true }); + }, + remove: () => { + if (name === null) return; + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: name, + action: "removed", + })); + + registeredResourceTemplate.update({ name: null }); + }, update: (updates) => { + let added = false; + let removed = false; + let updated = false; + let enabled = false; + if (typeof updates.name !== "undefined" && updates.name !== name) { - delete this._registeredResourceTemplates[name] - if (updates.name) this._registeredResourceTemplates[updates.name] = registeredResourceTemplate + removed = true; + delete this._registeredResourceTemplates[name]; + + if (updates.name) { + added = true; + this._registeredResourceTemplates[updates.name] = + registeredResourceTemplate; + } } - if (typeof updates.template !== "undefined") registeredResourceTemplate.resourceTemplate = updates.template - if (typeof updates.metadata !== "undefined") registeredResourceTemplate.metadata = updates.metadata - if (typeof updates.callback !== "undefined") registeredResourceTemplate.readCallback = updates.callback - if (typeof updates.enabled !== "undefined") registeredResourceTemplate.enabled = updates.enabled - this.sendResourceListChanged() + + if (typeof updates.template !== "undefined" && updates.template !== registeredResourceTemplate.resourceTemplate) { + updated = true; + registeredResourceTemplate.resourceTemplate = updates.template; + } + + if ( + typeof updates.metadata !== "undefined" && + updates.metadata !== metadata + ) { + updated = true; + registeredResourceTemplate.metadata = updates.metadata; + } + + if ( + typeof updates.callback !== "undefined" && + updates.callback !== registeredResourceTemplate.readCallback + ) { + updated = true; + registeredResourceTemplate.readCallback = updates.callback; + } + + if ( + typeof updates.enabled !== "undefined" && + updates.enabled !== registeredResourceTemplate.enabled + ) { + enabled = true; + registeredResourceTemplate.enabled = updates.enabled; + } + + if (removed) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: name, + action: "removed", + })); + } + + if (added) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: updates.name || name, + action: "added", + })); + } + + if (updated) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: updates.name || name, + action: "updated", + })); + } + + if (enabled) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: updates.name || name, + action: registeredResourceTemplate.enabled ? "enabled" : "disabled", + })); + } + + this.sendResourceListChanged(); }, }; this._registeredResourceTemplates[name] = registeredResourceTemplate; + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "resource", + capabilityName: name, + action: "added", + })); + this.setResourceRequestHandlers(); this.sendResourceListChanged(); return registeredResourceTemplate; @@ -645,23 +1255,138 @@ export class McpServer { paramsSchema === undefined ? undefined : z.object(paramsSchema), callback: cb, enabled: true, - disable: () => registeredTool.update({ enabled: false }), - enable: () => registeredTool.update({ enabled: true }), - remove: () => registeredTool.update({ name: null }), + disable: () => { + if (!registeredTool.enabled) return; + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "tool", + capabilityName: name, + action: "disabled", + })); + + registeredTool.update({ enabled: false }); + }, + enable: () => { + if (registeredTool.enabled) return; + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "tool", + capabilityName: name, + action: "enabled", + })); + + registeredTool.update({ enabled: true }); + }, + remove: () => { + if (name === null) return; + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "tool", + capabilityName: name, + action: "removed", + })); + + registeredTool.update({ name: null }); + }, update: (updates) => { + let added = false; + let removed = false; + let updated = false; + let enabled = false; + if (typeof updates.name !== "undefined" && updates.name !== name) { - delete this._registeredTools[name] - if (updates.name) this._registeredTools[updates.name] = registeredTool + removed = true; + delete this._registeredTools[name]; + + if (updates.name) { + added = true; + this._registeredTools[updates.name] = registeredTool; + } + } + + if ( + typeof updates.description !== "undefined" && + updates.description !== description + ) { + updated = true; + registeredTool.description = updates.description; + } + + if ( + typeof updates.paramsSchema !== "undefined" && + updates.paramsSchema !== paramsSchema + ) { + updated = true; + registeredTool.inputSchema = z.object(updates.paramsSchema); + } + + if ( + typeof updates.callback !== "undefined" && + updates.callback !== registeredTool.callback + ) { + updated = true; + registeredTool.callback = updates.callback; } - if (typeof updates.description !== "undefined") registeredTool.description = updates.description - if (typeof updates.paramsSchema !== "undefined") registeredTool.inputSchema = z.object(updates.paramsSchema) - if (typeof updates.callback !== "undefined") registeredTool.callback = updates.callback - if (typeof updates.enabled !== "undefined") registeredTool.enabled = updates.enabled - this.sendToolListChanged() + + if ( + typeof updates.enabled !== "undefined" && + updates.enabled !== registeredTool.enabled + ) { + enabled = true; + registeredTool.enabled = updates.enabled; + } + + if (removed) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "tool", + capabilityName: name, + action: "removed", + })); + } + + if (added) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "tool", + capabilityName: updates.name || name, + action: "added", + })); + } + + if (updated) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "tool", + capabilityName: updates.name || name, + action: "updated", + })); + } + + if (enabled) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "tool", + capabilityName: updates.name || name, + action: registeredTool.enabled ? "enabled" : "disabled", + })); + } + + this.sendToolListChanged(); }, }; this._registeredTools[name] = registeredTool; + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "tool", + capabilityName: name, + action: "added", + })); + this.setToolRequestHandlers(); this.sendToolListChanged() @@ -718,23 +1443,138 @@ export class McpServer { argsSchema: argsSchema === undefined ? undefined : z.object(argsSchema), callback: cb, enabled: true, - disable: () => registeredPrompt.update({ enabled: false }), - enable: () => registeredPrompt.update({ enabled: true }), - remove: () => registeredPrompt.update({ name: null }), + disable: () => { + if (!registeredPrompt.enabled) return; + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "prompt", + capabilityName: name, + action: "disabled", + })); + + registeredPrompt.update({ enabled: false }); + }, + enable: () => { + if (registeredPrompt.enabled) return; + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "prompt", + capabilityName: name, + action: "enabled", + })); + + registeredPrompt.update({ enabled: true }); + }, + remove: () => { + if (name === null) return; + + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "prompt", + capabilityName: name, + action: "removed", + })); + + registeredPrompt.update({ name: null }); + }, update: (updates) => { + let added = false; + let removed = false; + let updated = false; + let enabled = false; + if (typeof updates.name !== "undefined" && updates.name !== name) { - delete this._registeredPrompts[name] - if (updates.name) this._registeredPrompts[updates.name] = registeredPrompt + removed = true; + delete this._registeredPrompts[name]; + + if (updates.name) { + added = true; + this._registeredPrompts[updates.name!] = registeredPrompt; + } + } + + if ( + typeof updates.description !== "undefined" && + updates.description !== description + ) { + updated = true; + registeredPrompt.description = updates.description; + } + + if ( + typeof updates.argsSchema !== "undefined" && + updates.argsSchema !== argsSchema + ) { + updated = true; + registeredPrompt.argsSchema = z.object(updates.argsSchema); + } + + if ( + typeof updates.callback !== "undefined" && + updates.callback !== registeredPrompt.callback + ) { + updated = true; + registeredPrompt.callback = updates.callback; + } + + if ( + typeof updates.enabled !== "undefined" && + updates.enabled !== registeredPrompt.enabled + ) { + enabled = true; + registeredPrompt.enabled = updates.enabled; + } + + if (removed) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "prompt", + capabilityName: name, + action: "removed", + })); + } + + if (added) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "prompt", + capabilityName: updates.name || name, + action: "added", + })); } - if (typeof updates.description !== "undefined") registeredPrompt.description = updates.description - if (typeof updates.argsSchema !== "undefined") registeredPrompt.argsSchema = z.object(updates.argsSchema) - if (typeof updates.callback !== "undefined") registeredPrompt.callback = updates.callback - if (typeof updates.enabled !== "undefined") registeredPrompt.enabled = updates.enabled - this.sendPromptListChanged() + + if (updated) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "prompt", + capabilityName: updates.name || name, + action: "updated", + })); + } + + if (enabled) { + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "prompt", + capabilityName: updates.name || name, + action: registeredPrompt.enabled ? "enabled" : "disabled", + })); + } + + this.sendPromptListChanged(); }, }; this._registeredPrompts[name] = registeredPrompt; + this._onCapabilityChange.notify(() => ({ + serverInfo: this.server.getVersion(), + capabilityType: "prompt", + capabilityName: name, + action: "added", + })); + this.setPromptRequestHandlers(); this.sendPromptListChanged() @@ -970,3 +1810,72 @@ const EMPTY_COMPLETION_RESULT: CompleteResult = { hasMore: false, }, }; + +/** + * Represents events emitted when capabilities (tools, resources, prompts) change state or are invoked. + * + * These events allow tracking the lifecycle and usage of all capabilities registered with an McpServer. + * Events include capability registration, updates, invocation, completion, and errors. + * + * Each capability type (tool, resource, prompt) maintains its own sequence of invocation indexes. + * The invocationIndex can be used to correlate "invoked" events with their corresponding + * "completed" or "error" events for the same capability type. + */ +export type CapabilityEvent = { + /** Information about the server that generated this event */ + readonly serverInfo: { readonly name: string; readonly version: string }; + /** The type of capability this event relates to */ + readonly capabilityType: "resource" | "tool" | "prompt"; + /** The name (or URI for resources) of the specific capability */ + readonly capabilityName: string; +} & ( + | { + /** + * Lifecycle events for capability registration and status changes. + * - "added": The capability was registered + * - "updated": The capability was modified + * - "removed": The capability was unregistered + * - "enabled": The capability was enabled + * - "disabled": The capability was disabled + */ + readonly action: "added" | "updated" | "removed" | "enabled" | "disabled"; + } + | { + /** Emitted when a capability is invoked */ + readonly action: "invoked"; + /** + * Monotonically increasing index for each invocation, per capability type. + * This index can be used to correlate this "invoked" event with a later + * "completed" or "error" event with the same capabilityType and invocationIndex. + */ + readonly invocationIndex: number; + /** The arguments passed to the capability, if any */ + readonly arguments?: unknown; + } + | { + /** Emitted when a capability invocation completes successfully */ + readonly action: "completed"; + /** + * The invocationIndex from the corresponding "invoked" event. + * This allows correlating the completion with its invocation. + */ + readonly invocationIndex: number; + /** The result returned by the capability, if any */ + readonly result?: unknown; + /** The duration of the operation in milliseconds, measured from invocation to completion */ + readonly durationMs?: number; + } + | { + /** Emitted when a capability invocation fails with an error */ + readonly action: "error"; + /** + * The invocationIndex from the corresponding "invoked" event. + * This allows correlating the error with its invocation. + */ + readonly invocationIndex: number; + /** The error that occurred during capability execution */ + readonly error: unknown; + /** The duration from invocation to error in milliseconds */ + readonly durationMs?: number; + } +); diff --git a/src/shared/eventNotifier.test.ts b/src/shared/eventNotifier.test.ts new file mode 100644 index 00000000..2923f7a0 --- /dev/null +++ b/src/shared/eventNotifier.test.ts @@ -0,0 +1,145 @@ +import { createEventNotifier } from "./eventNotifier.js"; + +describe("EventNotifier", () => { + let notifier: ReturnType>; + + beforeEach(() => { + notifier = createEventNotifier(); + }); + + test("should notify listeners in registration order", () => { + const events: string[] = []; + notifier.onEvent((event) => events.push(`first: ${event}`)); + notifier.onEvent((event) => events.push(`second: ${event}`)); + notifier.onEvent((event) => events.push(`third: ${event}`)); + + notifier.notify("test event"); + + expect(events).toEqual([ + "first: test event", + "second: test event", + "third: test event", + ]); + }); + + test("should not notify unsubscribed listeners", () => { + const events: string[] = []; + const subscription = notifier.onEvent((event) => events.push(event)); + + notifier.notify("first event"); + subscription.close(); + notifier.notify("second event"); + + expect(events).toEqual(["first event"]); + }); + + test("should handle function events", () => { + const events: string[] = []; + notifier.onEvent((event) => events.push(event)); + + notifier.notify(() => "dynamic event"); + + expect(events).toEqual(["dynamic event"]); + }); + + test("should handle errors through error handler", () => { + const errors: Error[] = []; + notifier.onError((error) => errors.push(error)); + + notifier.onEvent(() => { + throw new Error("test error"); + }); + + notifier.notify("test event"); + + expect(errors).toHaveLength(1); + expect(errors[0]).toBeInstanceOf(Error); + expect(errors[0].message).toBe("test error"); + }); + + test("should not notify after close", () => { + const events: string[] = []; + notifier.onEvent((event) => events.push(event)); + + notifier.notify("first event"); + notifier.close(); + notifier.notify("second event"); + + expect(events).toEqual(["first event"]); + }); + + test("should handle multiple subscriptions and unsubscriptions", () => { + const events: string[] = []; + const subscription1 = notifier.onEvent((event) => + events.push(`1: ${event}`) + ); + const subscription2 = notifier.onEvent((event) => + events.push(`2: ${event}`) + ); + + notifier.notify("first event"); + subscription1.close(); + notifier.notify("second event"); + subscription2.close(); + notifier.notify("third event"); + + expect(events).toEqual([ + "1: first event", + "2: first event", + "2: second event", + ]); + }); + + test("should handle error handler after close", () => { + const errors: Error[] = []; + notifier.onError((error) => errors.push(error)); + notifier.close(); + + notifier.onEvent(() => { + throw new Error("test error"); + }); + + notifier.notify("test event"); + + expect(errors).toHaveLength(0); + }); + + test("should clear error handler on close", () => { + const errors: Error[] = []; + notifier.onError((error) => errors.push(error)); + + // Close should clear the error handler + notifier.close(); + + // Setting up a new listener after close + notifier.onEvent(() => { + throw new Error("test error"); + }); + + // This should not trigger the error handler since the notifier is closed + notifier.notify("test event"); + + expect(errors).toHaveLength(0); + }); + + test("should use the last set error handler", () => { + const errors1: Error[] = []; + const errors2: Error[] = []; + + // First error handler + notifier.onError((error) => errors1.push(error)); + + // Second error handler should replace the first one + notifier.onError((error) => errors2.push(error)); + + notifier.onEvent(() => { + throw new Error("test error"); + }); + + notifier.notify("test event"); + + expect(errors1).toHaveLength(0); + expect(errors2).toHaveLength(1); + expect(errors2[0].message).toBe("test error"); + }); +}); diff --git a/src/shared/eventNotifier.ts b/src/shared/eventNotifier.ts new file mode 100644 index 00000000..a1f53238 --- /dev/null +++ b/src/shared/eventNotifier.ts @@ -0,0 +1,152 @@ +/** + * Provides a simple, type-safe event notification implementation. This module allows components to implement the + * observer pattern with minimal boilerplate and proper type checking. + */ + +/** + * A type-safe event notifier that manages event listeners and notifications. + * + * @template T The type of events this notifier will handle + * @template E The type of error that can be handled (defaults to Error) + * + * EventNotifier provides: + * + * - Type-safe event subscriptions via `onEvent` + * - Synchronized event notifications via `notify` + * - Automatic cleanup of resources via `close` + * - Status tracking via `active` property + * - Error handling via optional error callback + */ +export type EventNotifier = { + /** + * Registers a listener function to be called when events are notified. Listeners are notified in the order they + * were registered. + * + * @example + * + * ```ts + * const notifier = createEventNotifier(); + * const subscription = notifier.onEvent((message) => { + * console.log(`Received message: ${message}`); + * }); + * + * // Later, to stop listening: + * subscription.close(); + * ``` + * + * @param listener A function that will be called with the notified event + * @returns A closeable to unregister the listener (all listeners are unregistered when the notifier is closed). + */ + onEvent: (listener: (event: T) => unknown) => { close: () => void }; + + /** + * Notifies all registered listeners with the provided event. + * + * This method: + * + * - Calls all registered listeners with the event in their registration order + * - Ignores errors thrown by listeners (they won't affect other listeners) + * - Ignores returned promises (results are not awaited) + * - Does nothing if there are no listeners + * - If the event is a function, it will be called if there are listeners and its return value will be + * used as the event. + * + * @example + * + * ```ts + * const notifier = createEventNotifier<{ type: string; value: number }>(); + * notifier.onEvent((event) => { + * console.log(`Received ${event.type} with value:`, event.value); + * }); + * + * notifier.notify({ type: 'progress', value: 75 }); + * ``` + * + * @param event The event to send to all listeners or a function that returns such event. + */ + notify: (event: T | (() => T)) => void; + + /** + * Sets an error handler for the notifier. This handler will be called when a listener throws an error. + * + * @param handler A function that will be called with any errors thrown by listeners. + */ + onError: (handler: (error: E) => void) => void; + + /** + * Closes the notifier and removes all listeners. + * + * @warning Failing to call close() on subscriptions or the notifier itself may lead to memory leaks. + */ + close: () => void; +}; + +/** + * Creates a type-safe event notifier. + * + * @example + * + * ```ts + * // Simple string event notifier + * const stringNotifier = createEventNotifier(); + * + * // Complex object event notifier + * interface TaskEvent { + * type: 'started' | 'completed' | 'failed'; + * taskId: number; + * details: { + * name: string; + * duration?: number; + * }; + * } + * const taskNotifier = createEventNotifier(); + * ``` + * + * @template T The type of events this notifier will handle + * @template E The type of error that can be handled (defaults to Error) + * @returns A new EventNotifier instance. + */ +export const createEventNotifier = (): EventNotifier => { + const listeners = new Set<(event: T) => unknown>(); + let errorHandler: ((error: E) => void) | undefined; + + return { + close: () => { + listeners.clear(); + errorHandler = undefined; + }, + + onEvent: (listener) => { + listeners.add(listener); + return { + close: () => { + listeners.delete(listener); + }, + }; + }, + + notify: (event: T | (() => T)) => { + if (!listeners.size) { + return; + } + + if (typeof event === "function") { + event = (event as () => T)(); + } + + for (const listener of listeners) { + try { + void listener(event); + } catch (error) { + if (errorHandler) { + errorHandler(error as E); + } + } + } + }, + + onError: (handler) => { + errorHandler = handler; + }, + }; +};