diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java index ac487b6f..d71fe1ab 100644 --- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java +++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java @@ -4,6 +4,7 @@ package io.modelcontextprotocol; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -111,27 +112,28 @@ void testCreateMessageWithoutSamplingCapabilities(String clientType) { return Mono.just(mock(CallToolResult.class)); }); - McpServer.async(mcpServerTransportProvider).serverInfo("test-server", "1.0.0").tools(tool).build(); + var server = McpServer.async(mcpServerTransportProvider).serverInfo("test-server", "1.0.0").tools(tool).build(); - // Create client without sampling capabilities - var client = clientBuilder.clientInfo(new McpSchema.Implementation("Sample " + "client", "0.0.0")).build(); + try (var client = clientBuilder.clientInfo(new McpSchema.Implementation("Sample " + "client", "0.0.0")) + .build();) { - assertThat(client.initialize()).isNotNull(); + assertThat(client.initialize()).isNotNull(); - try { - client.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); - } - catch (McpError e) { - assertThat(e).isInstanceOf(McpError.class) - .hasMessage("Client must be configured with sampling capabilities"); + try { + client.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); + } + catch (McpError e) { + assertThat(e).isInstanceOf(McpError.class) + .hasMessage("Client must be configured with sampling capabilities"); + } } + server.close(); } @ParameterizedTest(name = "{0} : {displayName} ") @ValueSource(strings = { "httpclient", "webflux" }) void testCreateMessageSuccess(String clientType) throws InterruptedException { - // Client var clientBuilder = clientBuilders.get(clientType); Function samplingHandler = request -> { @@ -142,13 +144,6 @@ void testCreateMessageSuccess(String clientType) throws InterruptedException { CreateMessageResult.StopReason.STOP_SEQUENCE); }; - var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0")) - .capabilities(ClientCapabilities.builder().sampling().build()) - .sampling(samplingHandler) - .build(); - - // Server - CallToolResult callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")), null); @@ -183,15 +178,19 @@ void testCreateMessageSuccess(String clientType) throws InterruptedException { .tools(tool) .build(); - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + try (var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0")) + .capabilities(ClientCapabilities.builder().sampling().build()) + .sampling(samplingHandler) + .build()) { - CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - assertThat(response).isNotNull(); - assertThat(response).isEqualTo(callResponse); + CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); - mcpClient.close(); + assertThat(response).isNotNull(); + assertThat(response).isEqualTo(callResponse); + } mcpServer.close(); } @@ -206,41 +205,42 @@ void testRootsSuccess(String clientType) { List roots = List.of(new Root("uri1://", "root1"), new Root("uri2://", "root2")); AtomicReference> rootsRef = new AtomicReference<>(); + var mcpServer = McpServer.sync(mcpServerTransportProvider) .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef.set(rootsUpdate)) .build(); - var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) + try (var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) .roots(roots) - .build(); + .build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - assertThat(rootsRef.get()).isNull(); + assertThat(rootsRef.get()).isNull(); - mcpClient.rootsListChangedNotification(); + mcpClient.rootsListChangedNotification(); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).containsAll(roots); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(roots); + }); - // Remove a root - mcpClient.removeRoot(roots.get(0).uri()); + // Remove a root + mcpClient.removeRoot(roots.get(0).uri()); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).containsAll(List.of(roots.get(1))); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(List.of(roots.get(1))); + }); - // Add a new root - var root3 = new Root("uri3://", "root3"); - mcpClient.addRoot(root3); + // Add a new root + var root3 = new Root("uri3://", "root3"); + mcpClient.addRoot(root3); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).containsAll(List.of(roots.get(1), root3)); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(List.of(roots.get(1), root3)); + }); + } - mcpClient.close(); mcpServer.close(); } @@ -261,21 +261,21 @@ void testRootsWithoutCapability(String clientType) { var mcpServer = McpServer.sync(mcpServerTransportProvider).rootsChangeHandler((exchange, rootsUpdate) -> { }).tools(tool).build(); - // Create client without roots capability - // No roots capability - var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().build()).build(); + try ( + // Create client without roots capability + var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().build()).build()) { - assertThat(mcpClient.initialize()).isNotNull(); + assertThat(mcpClient.initialize()).isNotNull(); - // Attempt to list roots should fail - try { - mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); - } - catch (McpError e) { - assertThat(e).isInstanceOf(McpError.class).hasMessage("Roots not supported"); + // Attempt to list roots should fail + try { + mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); + } + catch (McpError e) { + assertThat(e).isInstanceOf(McpError.class).hasMessage("Roots not supported"); + } } - mcpClient.close(); mcpServer.close(); } @@ -285,30 +285,31 @@ void testRootsNotifciationWithEmptyRootsList(String clientType) { var clientBuilder = clientBuilders.get(clientType); AtomicReference> rootsRef = new AtomicReference<>(); + var mcpServer = McpServer.sync(mcpServerTransportProvider) .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef.set(rootsUpdate)) .build(); - var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) + try (var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) .roots(List.of()) // Empty roots list - .build(); + .build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + assertThat(mcpClient.initialize()).isNotNull(); - mcpClient.rootsListChangedNotification(); + mcpClient.rootsListChangedNotification(); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).isEmpty(); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).isEmpty(); + }); + } - mcpClient.close(); mcpServer.close(); } @ParameterizedTest(name = "{0} : {displayName} ") @ValueSource(strings = { "httpclient", "webflux" }) void testRootsWithMultipleHandlers(String clientType) { + var clientBuilder = clientBuilders.get(clientType); List roots = List.of(new Root("uri1://", "root1")); @@ -321,21 +322,21 @@ void testRootsWithMultipleHandlers(String clientType) { .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef2.set(rootsUpdate)) .build(); - var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) + try (var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) .roots(roots) - .build(); + .build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - mcpClient.rootsListChangedNotification(); + mcpClient.rootsListChangedNotification(); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef1.get()).containsAll(roots); - assertThat(rootsRef2.get()).containsAll(roots); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef1.get()).containsAll(roots); + assertThat(rootsRef2.get()).containsAll(roots); + }); + } - mcpClient.close(); mcpServer.close(); } @@ -348,28 +349,26 @@ void testRootsServerCloseWithActiveSubscription(String clientType) { List roots = List.of(new Root("uri1://", "root1")); AtomicReference> rootsRef = new AtomicReference<>(); + var mcpServer = McpServer.sync(mcpServerTransportProvider) .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef.set(rootsUpdate)) .build(); - var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) + try (var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) .roots(roots) - .build(); + .build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - mcpClient.rootsListChangedNotification(); + mcpClient.rootsListChangedNotification(); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).containsAll(roots); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(roots); + }); + } - // Close server while subscription is active mcpServer.close(); - - // Verify client can handle server closure gracefully - mcpClient.close(); } // --------------------------------------- @@ -378,9 +377,9 @@ void testRootsServerCloseWithActiveSubscription(String clientType) { String emptyJsonSchema = """ { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": {} + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {} } """; @@ -408,19 +407,19 @@ void testToolCallSuccess(String clientType) { .tools(tool1) .build(); - var mcpClient = clientBuilder.build(); + try (var mcpClient = clientBuilder.build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - assertThat(mcpClient.listTools().tools()).contains(tool1.tool()); + assertThat(mcpClient.listTools().tools()).contains(tool1.tool()); - CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); + CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); - assertThat(response).isNotNull(); - assertThat(response).isEqualTo(callResponse); + assertThat(response).isNotNull(); + assertThat(response).isEqualTo(callResponse); + } - mcpClient.close(); mcpServer.close(); } @@ -443,13 +442,14 @@ void testToolListChangeHandlingSuccess(String clientType) { return callResponse; }); + AtomicReference> rootsRef = new AtomicReference<>(); + var mcpServer = McpServer.sync(mcpServerTransportProvider) .capabilities(ServerCapabilities.builder().tools(true).build()) .tools(tool1) .build(); - AtomicReference> rootsRef = new AtomicReference<>(); - var mcpClient = clientBuilder.toolsChangeConsumer(toolsUpdate -> { + try (var mcpClient = clientBuilder.toolsChangeConsumer(toolsUpdate -> { // perform a blocking call to a remote service String response = RestClient.create() .get() @@ -458,39 +458,40 @@ void testToolListChangeHandlingSuccess(String clientType) { .body(String.class); assertThat(response).isNotBlank(); rootsRef.set(toolsUpdate); - }).build(); + }).build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - assertThat(rootsRef.get()).isNull(); + assertThat(rootsRef.get()).isNull(); - assertThat(mcpClient.listTools().tools()).contains(tool1.tool()); + assertThat(mcpClient.listTools().tools()).contains(tool1.tool()); - mcpServer.notifyToolsListChanged(); + mcpServer.notifyToolsListChanged(); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).containsAll(List.of(tool1.tool())); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(List.of(tool1.tool())); + }); - // Remove a tool - mcpServer.removeTool("tool1"); + // Remove a tool + mcpServer.removeTool("tool1"); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).isEmpty(); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).isEmpty(); + }); - // Add a new tool - McpServerFeatures.SyncToolSpecification tool2 = new McpServerFeatures.SyncToolSpecification( - new McpSchema.Tool("tool2", "tool2 description", emptyJsonSchema), (exchange, request) -> callResponse); + // Add a new tool + McpServerFeatures.SyncToolSpecification tool2 = new McpServerFeatures.SyncToolSpecification( + new McpSchema.Tool("tool2", "tool2 description", emptyJsonSchema), + (exchange, request) -> callResponse); - mcpServer.addTool(tool2); + mcpServer.addTool(tool2); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).containsAll(List.of(tool2.tool())); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(List.of(tool2.tool())); + }); + } - mcpClient.close(); mcpServer.close(); } @@ -502,12 +503,115 @@ void testInitialize(String clientType) { var mcpServer = McpServer.sync(mcpServerTransportProvider).build(); - var mcpClient = clientBuilder.build(); + try (var mcpClient = clientBuilder.build()) { + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + } + + mcpServer.close(); + } + + // --------------------------------------- + // Logging Tests + // --------------------------------------- + + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = { "httpclient", "webflux" }) + void testLoggingNotification(String clientType) { + // Create a list to store received logging notifications + List receivedNotifications = new ArrayList<>(); - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + var clientBuilder = clientBuilders.get(clientType); + + // Create server with a tool that sends logging notifications + McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification( + new McpSchema.Tool("logging-test", "Test logging notifications", emptyJsonSchema), + (exchange, request) -> { + + // Create and send notifications with different levels + + //@formatter:off + return exchange // This should be filtered out (DEBUG < NOTICE) + .loggingNotification(McpSchema.LoggingMessageNotification.builder() + .level(McpSchema.LoggingLevel.DEBUG) + .logger("test-logger") + .data("Debug message") + .build()) + .then(exchange // This should be sent (NOTICE >= NOTICE) + .loggingNotification(McpSchema.LoggingMessageNotification.builder() + .level(McpSchema.LoggingLevel.NOTICE) + .logger("test-logger") + .data("Notice message") + .build())) + .then(exchange // This should be sent (ERROR > NOTICE) + .loggingNotification(McpSchema.LoggingMessageNotification.builder() + .level(McpSchema.LoggingLevel.ERROR) + .logger("test-logger") + .data("Error message") + .build())) + .then(exchange // This should be filtered out (INFO < NOTICE) + .loggingNotification(McpSchema.LoggingMessageNotification.builder() + .level(McpSchema.LoggingLevel.INFO) + .logger("test-logger") + .data("Another info message") + .build())) + .then(exchange // This should be sent (ERROR >= NOTICE) + .loggingNotification(McpSchema.LoggingMessageNotification.builder() + .level(McpSchema.LoggingLevel.ERROR) + .logger("test-logger") + .data("Another error message") + .build())) + .thenReturn(new CallToolResult("Logging test completed", false)); + //@formatter:on + }); - mcpClient.close(); + var mcpServer = McpServer.async(mcpServerTransportProvider) + .serverInfo("test-server", "1.0.0") + .capabilities(ServerCapabilities.builder().logging().tools(true).build()) + .tools(tool) + .build(); + + try ( + // Create client with logging notification handler + var mcpClient = clientBuilder.loggingConsumer(notification -> { + receivedNotifications.add(notification); + }).build()) { + + // Initialize client + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + // Set minimum logging level to NOTICE + mcpClient.setLoggingLevel(McpSchema.LoggingLevel.NOTICE); + + // Call the tool that sends logging notifications + CallToolResult result = mcpClient.callTool(new McpSchema.CallToolRequest("logging-test", Map.of())); + assertThat(result).isNotNull(); + assertThat(result.content().get(0)).isInstanceOf(McpSchema.TextContent.class); + assertThat(((McpSchema.TextContent) result.content().get(0)).text()).isEqualTo("Logging test completed"); + + // Wait for notifications to be processed + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + + // Should have received 3 notifications (1 NOTICE and 2 ERROR) + assertThat(receivedNotifications).hasSize(3); + + // First notification should be NOTICE level + assertThat(receivedNotifications.get(0).level()).isEqualTo(McpSchema.LoggingLevel.NOTICE); + assertThat(receivedNotifications.get(0).logger()).isEqualTo("test-logger"); + assertThat(receivedNotifications.get(0).data()).isEqualTo("Notice message"); + + // Second notification should be ERROR level + assertThat(receivedNotifications.get(1).level()).isEqualTo(McpSchema.LoggingLevel.ERROR); + assertThat(receivedNotifications.get(1).logger()).isEqualTo("test-logger"); + assertThat(receivedNotifications.get(1).data()).isEqualTo("Error message"); + + // Third notification should be ERROR level + assertThat(receivedNotifications.get(2).level()).isEqualTo(McpSchema.LoggingLevel.ERROR); + assertThat(receivedNotifications.get(2).logger()).isEqualTo("test-logger"); + assertThat(receivedNotifications.get(2).data()).isEqualTo("Another error message"); + }); + } mcpServer.close(); } diff --git a/mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcSseIntegrationTests.java b/mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcSseIntegrationTests.java index d5c9f90f..ceac4fa3 100644 --- a/mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcSseIntegrationTests.java +++ b/mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcSseIntegrationTests.java @@ -30,6 +30,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.org.checkerframework.checker.units.qual.s; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -125,27 +126,34 @@ void testCreateMessageWithoutSamplingCapabilities() { return Mono.just(mock(CallToolResult.class)); }); - McpServer.async(mcpServerTransportProvider).serverInfo("test-server", "1.0.0").tools(tool).build(); + //@formatter:off + var server = McpServer.async(mcpServerTransportProvider) + .serverInfo("test-server", "1.0.0") + .tools(tool) + .build(); + + try ( + // Create client without sampling capabilities + var client = clientBuilder + .clientInfo(new McpSchema.Implementation("Sample " + "client", "0.0.0")) + .build()) {//@formatter:on - // Create client without sampling capabilities - var client = clientBuilder.clientInfo(new McpSchema.Implementation("Sample " + "client", "0.0.0")).build(); + assertThat(client.initialize()).isNotNull(); - assertThat(client.initialize()).isNotNull(); - - try { - client.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); - } - catch (McpError e) { - assertThat(e).isInstanceOf(McpError.class) - .hasMessage("Client must be configured with sampling capabilities"); + try { + client.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); + } + catch (McpError e) { + assertThat(e).isInstanceOf(McpError.class) + .hasMessage("Client must be configured with sampling capabilities"); + } } + server.close(); } @Test void testCreateMessageSuccess() throws InterruptedException { - // Client - Function samplingHandler = request -> { assertThat(request.messages()).hasSize(1); assertThat(request.messages().get(0).content()).isInstanceOf(McpSchema.TextContent.class); @@ -154,13 +162,6 @@ void testCreateMessageSuccess() throws InterruptedException { CreateMessageResult.StopReason.STOP_SEQUENCE); }; - var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0")) - .capabilities(ClientCapabilities.builder().sampling().build()) - .sampling(samplingHandler) - .build(); - - // Server - CallToolResult callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")), null); @@ -190,19 +191,25 @@ void testCreateMessageSuccess() throws InterruptedException { return Mono.just(callResponse); }); + //@formatter:off var mcpServer = McpServer.async(mcpServerTransportProvider) - .serverInfo("test-server", "1.0.0") - .tools(tool) - .build(); + .serverInfo("test-server", "1.0.0") + .tools(tool) + .build(); - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + try ( + var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0")) + .capabilities(ClientCapabilities.builder().sampling().build()) + .sampling(samplingHandler) + .build()) {//@formatter:on - CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - assertThat(response).isNotNull().isEqualTo(callResponse); + CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); - mcpClient.close(); + assertThat(response).isNotNull().isEqualTo(callResponse); + } mcpServer.close(); } @@ -214,41 +221,42 @@ void testRootsSuccess() { List roots = List.of(new Root("uri1://", "root1"), new Root("uri2://", "root2")); AtomicReference> rootsRef = new AtomicReference<>(); + var mcpServer = McpServer.sync(mcpServerTransportProvider) .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef.set(rootsUpdate)) .build(); - var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) + try (var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) .roots(roots) - .build(); + .build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - assertThat(rootsRef.get()).isNull(); + assertThat(rootsRef.get()).isNull(); - mcpClient.rootsListChangedNotification(); + mcpClient.rootsListChangedNotification(); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).containsAll(roots); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(roots); + }); - // Remove a root - mcpClient.removeRoot(roots.get(0).uri()); + // Remove a root + mcpClient.removeRoot(roots.get(0).uri()); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).containsAll(List.of(roots.get(1))); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(List.of(roots.get(1))); + }); - // Add a new root - var root3 = new Root("uri3://", "root3"); - mcpClient.addRoot(root3); + // Add a new root + var root3 = new Root("uri3://", "root3"); + mcpClient.addRoot(root3); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).containsAll(List.of(roots.get(1), root3)); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(List.of(roots.get(1), root3)); + }); + } - mcpClient.close(); mcpServer.close(); } @@ -266,21 +274,22 @@ void testRootsWithoutCapability() { var mcpServer = McpServer.sync(mcpServerTransportProvider).rootsChangeHandler((exchange, rootsUpdate) -> { }).tools(tool).build(); - // Create client without roots capability - // No roots capability - var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().build()).build(); + try ( + // Create client without roots capability + // No roots capability + var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().build()).build()) { - assertThat(mcpClient.initialize()).isNotNull(); + assertThat(mcpClient.initialize()).isNotNull(); - // Attempt to list roots should fail - try { - mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); - } - catch (McpError e) { - assertThat(e).isInstanceOf(McpError.class).hasMessage("Roots not supported"); + // Attempt to list roots should fail + try { + mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); + } + catch (McpError e) { + assertThat(e).isInstanceOf(McpError.class).hasMessage("Roots not supported"); + } } - mcpClient.close(); mcpServer.close(); } @@ -292,20 +301,20 @@ void testRootsNotifciationWithEmptyRootsList() { .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef.set(rootsUpdate)) .build(); - var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) + try (var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) .roots(List.of()) // Empty roots list - .build(); + .build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - mcpClient.rootsListChangedNotification(); + mcpClient.rootsListChangedNotification(); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).isEmpty(); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).isEmpty(); + }); + } - mcpClient.close(); mcpServer.close(); } @@ -321,20 +330,20 @@ void testRootsWithMultipleHandlers() { .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef2.set(rootsUpdate)) .build(); - var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) + try (var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) .roots(roots) - .build(); + .build()) { - assertThat(mcpClient.initialize()).isNotNull(); + assertThat(mcpClient.initialize()).isNotNull(); - mcpClient.rootsListChangedNotification(); + mcpClient.rootsListChangedNotification(); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef1.get()).containsAll(roots); - assertThat(rootsRef2.get()).containsAll(roots); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef1.get()).containsAll(roots); + assertThat(rootsRef2.get()).containsAll(roots); + }); + } - mcpClient.close(); mcpServer.close(); } @@ -343,28 +352,26 @@ void testRootsServerCloseWithActiveSubscription() { List roots = List.of(new Root("uri1://", "root1")); AtomicReference> rootsRef = new AtomicReference<>(); + var mcpServer = McpServer.sync(mcpServerTransportProvider) .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef.set(rootsUpdate)) .build(); - var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) + try (var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) .roots(roots) - .build(); + .build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - mcpClient.rootsListChangedNotification(); + mcpClient.rootsListChangedNotification(); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).containsAll(roots); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(roots); + }); + } - // Close server while subscription is active mcpServer.close(); - - // Verify client can handle server closure gracefully - mcpClient.close(); } // --------------------------------------- @@ -400,18 +407,18 @@ void testToolCallSuccess() { .tools(tool1) .build(); - var mcpClient = clientBuilder.build(); + try (var mcpClient = clientBuilder.build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - assertThat(mcpClient.listTools().tools()).contains(tool1.tool()); + assertThat(mcpClient.listTools().tools()).contains(tool1.tool()); - CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); + CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); - assertThat(response).isNotNull().isEqualTo(callResponse); + assertThat(response).isNotNull().isEqualTo(callResponse); + } - mcpClient.close(); mcpServer.close(); } @@ -431,13 +438,14 @@ void testToolListChangeHandlingSuccess() { return callResponse; }); + AtomicReference> rootsRef = new AtomicReference<>(); + var mcpServer = McpServer.sync(mcpServerTransportProvider) .capabilities(ServerCapabilities.builder().tools(true).build()) .tools(tool1) .build(); - AtomicReference> rootsRef = new AtomicReference<>(); - var mcpClient = clientBuilder.toolsChangeConsumer(toolsUpdate -> { + try (var mcpClient = clientBuilder.toolsChangeConsumer(toolsUpdate -> { // perform a blocking call to a remote service String response = RestClient.create() .get() @@ -446,39 +454,40 @@ void testToolListChangeHandlingSuccess() { .body(String.class); assertThat(response).isNotBlank(); rootsRef.set(toolsUpdate); - }).build(); + }).build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - assertThat(rootsRef.get()).isNull(); + assertThat(rootsRef.get()).isNull(); - assertThat(mcpClient.listTools().tools()).contains(tool1.tool()); + assertThat(mcpClient.listTools().tools()).contains(tool1.tool()); - mcpServer.notifyToolsListChanged(); + mcpServer.notifyToolsListChanged(); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).containsAll(List.of(tool1.tool())); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(List.of(tool1.tool())); + }); - // Remove a tool - mcpServer.removeTool("tool1"); + // Remove a tool + mcpServer.removeTool("tool1"); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).isEmpty(); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).isEmpty(); + }); - // Add a new tool - McpServerFeatures.SyncToolSpecification tool2 = new McpServerFeatures.SyncToolSpecification( - new McpSchema.Tool("tool2", "tool2 description", emptyJsonSchema), (exchange, request) -> callResponse); + // Add a new tool + McpServerFeatures.SyncToolSpecification tool2 = new McpServerFeatures.SyncToolSpecification( + new McpSchema.Tool("tool2", "tool2 description", emptyJsonSchema), + (exchange, request) -> callResponse); - mcpServer.addTool(tool2); + mcpServer.addTool(tool2); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).containsAll(List.of(tool2.tool())); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(List.of(tool2.tool())); + }); + } - mcpClient.close(); mcpServer.close(); } @@ -487,12 +496,12 @@ void testInitialize() { var mcpServer = McpServer.sync(mcpServerTransportProvider).build(); - var mcpClient = clientBuilder.build(); + try (var mcpClient = clientBuilder.build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + } - mcpClient.close(); mcpServer.close(); } diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java index 71356351..5452c8ea 100644 --- a/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java +++ b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java @@ -31,6 +31,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -453,15 +454,10 @@ void testLoggingLevelsWithoutInitialization() { @Test void testLoggingLevels() { withClient(createMcpTransport(), mcpAsyncClient -> { - Mono testAllLevels = mcpAsyncClient.initialize().then(Mono.defer(() -> { - Mono chain = Mono.empty(); - for (McpSchema.LoggingLevel level : McpSchema.LoggingLevel.values()) { - chain = chain.then(mcpAsyncClient.setLoggingLevel(level)); - } - return chain; - })); - - StepVerifier.create(testAllLevels).verifyComplete(); + StepVerifier + .create(mcpAsyncClient.initialize() + .thenMany(Flux.fromArray(McpSchema.LoggingLevel.values()).flatMap(mcpAsyncClient::setLoggingLevel))) + .verifyComplete(); }); } diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java b/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java index 7bcb9a8b..a91632c6 100644 --- a/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java +++ b/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java @@ -416,53 +416,4 @@ void testRootsChangeHandlers() { .doesNotThrowAnyException(); } - // --------------------------------------- - // Logging Tests - // --------------------------------------- - - @Test - void testLoggingLevels() { - var mcpAsyncServer = McpServer.async(createMcpTransportProvider()) - .serverInfo("test-server", "1.0.0") - .capabilities(ServerCapabilities.builder().logging().build()) - .build(); - - // Test all logging levels - for (McpSchema.LoggingLevel level : McpSchema.LoggingLevel.values()) { - var notification = McpSchema.LoggingMessageNotification.builder() - .level(level) - .logger("test-logger") - .data("Test message with level " + level) - .build(); - - StepVerifier.create(mcpAsyncServer.loggingNotification(notification)).verifyComplete(); - } - } - - @Test - void testLoggingWithoutCapability() { - var mcpAsyncServer = McpServer.async(createMcpTransportProvider()) - .serverInfo("test-server", "1.0.0") - .capabilities(ServerCapabilities.builder().build()) // No logging capability - .build(); - - var notification = McpSchema.LoggingMessageNotification.builder() - .level(McpSchema.LoggingLevel.INFO) - .logger("test-logger") - .data("Test log message") - .build(); - - StepVerifier.create(mcpAsyncServer.loggingNotification(notification)).verifyComplete(); - } - - @Test - void testLoggingWithNullNotification() { - var mcpAsyncServer = McpServer.async(createMcpTransportProvider()) - .serverInfo("test-server", "1.0.0") - .capabilities(ServerCapabilities.builder().logging().build()) - .build(); - - StepVerifier.create(mcpAsyncServer.loggingNotification(null)).verifyError(McpError.class); - } - } diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java b/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java index 7846e053..9a63143c 100644 --- a/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java +++ b/mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java @@ -388,53 +388,4 @@ void testRootsChangeHandlers() { assertThatCode(() -> noConsumersServer.closeGracefully()).doesNotThrowAnyException(); } - // --------------------------------------- - // Logging Tests - // --------------------------------------- - - @Test - void testLoggingLevels() { - var mcpSyncServer = McpServer.sync(createMcpTransportProvider()) - .serverInfo("test-server", "1.0.0") - .capabilities(ServerCapabilities.builder().logging().build()) - .build(); - - // Test all logging levels - for (McpSchema.LoggingLevel level : McpSchema.LoggingLevel.values()) { - var notification = McpSchema.LoggingMessageNotification.builder() - .level(level) - .logger("test-logger") - .data("Test message with level " + level) - .build(); - - assertThatCode(() -> mcpSyncServer.loggingNotification(notification)).doesNotThrowAnyException(); - } - } - - @Test - void testLoggingWithoutCapability() { - var mcpSyncServer = McpServer.sync(createMcpTransportProvider()) - .serverInfo("test-server", "1.0.0") - .capabilities(ServerCapabilities.builder().build()) // No logging capability - .build(); - - var notification = McpSchema.LoggingMessageNotification.builder() - .level(McpSchema.LoggingLevel.INFO) - .logger("test-logger") - .data("Test log message") - .build(); - - assertThatCode(() -> mcpSyncServer.loggingNotification(notification)).doesNotThrowAnyException(); - } - - @Test - void testLoggingWithNullNotification() { - var mcpSyncServer = McpServer.sync(createMcpTransportProvider()) - .serverInfo("test-server", "1.0.0") - .capabilities(ServerCapabilities.builder().logging().build()) - .build(); - - assertThatThrownBy(() -> mcpSyncServer.loggingNotification(null)).isInstanceOf(McpError.class); - } - } diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java index ce49b0a5..df099836 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java @@ -786,10 +786,9 @@ public Mono setLoggingLevel(LoggingLevel loggingLevel) { } return this.withInitializationCheck("setting logging level", initializedResult -> { - String levelName = this.transport.unmarshalFrom(loggingLevel, new TypeReference() { - }); - Map params = Map.of("level", levelName); - return this.mcpSession.sendNotification(McpSchema.METHOD_LOGGING_SET_LEVEL, params); + var params = new McpSchema.SetLevelRequest(loggingLevel); + return this.mcpSession.sendRequest(McpSchema.METHOD_LOGGING_SET_LEVEL, params, new TypeReference() { + }).then(); }); } diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java index 071d7646..32cf325e 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java @@ -6,7 +6,6 @@ import java.time.Duration; -import io.modelcontextprotocol.spec.McpClientTransport; import io.modelcontextprotocol.spec.McpSchema; import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities; import io.modelcontextprotocol.spec.McpSchema.GetPromptRequest; diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java index ec2a04c9..062de13e 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java @@ -21,6 +21,7 @@ import io.modelcontextprotocol.spec.McpSchema.CallToolResult; import io.modelcontextprotocol.spec.McpSchema.LoggingLevel; import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification; +import io.modelcontextprotocol.spec.McpSchema.SetLevelRequest; import io.modelcontextprotocol.spec.McpSchema.Tool; import io.modelcontextprotocol.spec.McpServerSession; import io.modelcontextprotocol.spec.McpServerTransportProvider; @@ -216,11 +217,17 @@ public Mono notifyPromptsListChanged() { // --------------------------------------- /** - * Send a logging message notification to all connected clients. Messages below the - * current minimum logging level will be filtered out. + * This implementation would, incorrectly, broadcast the logging message to all + * connected clients, using a single minLoggingLevel for all of them. Similar to the + * sampling and roots, the logging level should be set per client session and use the + * ServerExchange to send the logging message to the right client. * @param loggingMessageNotification The logging message to send * @return A Mono that completes when the notification has been sent + * @deprecated Use + * {@link McpAsyncServerExchange#loggingNotification(LoggingMessageNotification)} + * instead. */ + @Deprecated public Mono loggingNotification(LoggingMessageNotification loggingMessageNotification) { return this.delegate.loggingNotification(loggingMessageNotification); } @@ -257,6 +264,8 @@ private static class AsyncServerImpl extends McpAsyncServer { private final ConcurrentHashMap prompts = new ConcurrentHashMap<>(); + // FIXME: this field is deprecated and should be remvoed together with the + // broadcasting loggingNotification. private LoggingLevel minLoggingLevel = LoggingLevel.DEBUG; private List protocolVersions = List.of(McpSchema.LATEST_PROTOCOL_VERSION); @@ -677,12 +686,22 @@ public Mono loggingNotification(LoggingMessageNotification loggingMessageN loggingMessageNotification); } - private McpServerSession.RequestHandler setLoggerRequestHandler() { + private McpServerSession.RequestHandler setLoggerRequestHandler() { return (exchange, params) -> { - this.minLoggingLevel = objectMapper.convertValue(params, new TypeReference() { - }); + return Mono.defer(() -> { - return Mono.empty(); + SetLevelRequest newMinLoggingLevel = objectMapper.convertValue(params, + new TypeReference() { + }); + + exchange.setMinLoggingLevel(newMinLoggingLevel.level()); + + // FIXME: this field is deprecated and should be removed together + // with the broadcasting loggingNotification. + this.minLoggingLevel = newMinLoggingLevel.level(); + + return Mono.just(Map.of()); + }); }; } diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java index 65862844..889dc66d 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java @@ -1,9 +1,16 @@ +/* + * Copyright 2024-2024 the original author or authors. + */ + package io.modelcontextprotocol.server; import com.fasterxml.jackson.core.type.TypeReference; import io.modelcontextprotocol.spec.McpError; import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.McpSchema.LoggingLevel; +import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification; import io.modelcontextprotocol.spec.McpServerSession; +import io.modelcontextprotocol.util.Assert; import reactor.core.publisher.Mono; /** @@ -11,6 +18,7 @@ * exchange provides methods to interact with the client and query its capabilities. * * @author Dariusz Jędrzejczyk + * @author Christian Tzolov */ public class McpAsyncServerExchange { @@ -20,6 +28,8 @@ public class McpAsyncServerExchange { private final McpSchema.Implementation clientInfo; + private volatile LoggingLevel minLoggingLevel = LoggingLevel.INFO; + private static final TypeReference CREATE_MESSAGE_RESULT_TYPE_REF = new TypeReference<>() { }; @@ -101,4 +111,38 @@ public Mono listRoots(String cursor) { LIST_ROOTS_RESULT_TYPE_REF); } + /** + * Send a logging message notification to all connected clients. Messages below the + * current minimum logging level will be filtered out. + * @param loggingMessageNotification The logging message to send + * @return A Mono that completes when the notification has been sent + */ + public Mono loggingNotification(LoggingMessageNotification loggingMessageNotification) { + + if (loggingMessageNotification == null) { + return Mono.error(new McpError("Logging message must not be null")); + } + + return Mono.defer(() -> { + if (this.isNotificationForLevelAllowed(loggingMessageNotification.level())) { + return this.session.sendNotification(McpSchema.METHOD_NOTIFICATION_MESSAGE, loggingMessageNotification); + } + return Mono.empty(); + }); + } + + /** + * Set the minimum logging level for the client. Messages below this level will be + * filtered out. + * @param minLoggingLevel The minimum logging level + */ + void setMinLoggingLevel(LoggingLevel minLoggingLevel) { + Assert.notNull(minLoggingLevel, "minLoggingLevel must not be null"); + this.minLoggingLevel = minLoggingLevel; + } + + private boolean isNotificationForLevelAllowed(LoggingLevel loggingLevel) { + return loggingLevel.level() >= this.minLoggingLevel.level(); + } + } diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServer.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServer.java index 72eba8b8..bf310450 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServer.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServer.java @@ -4,9 +4,7 @@ package io.modelcontextprotocol.server; -import io.modelcontextprotocol.spec.McpError; import io.modelcontextprotocol.spec.McpSchema; -import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities; import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification; import io.modelcontextprotocol.util.Assert; @@ -151,9 +149,16 @@ public void notifyPromptsListChanged() { } /** - * Send a logging message notification to all clients. - * @param loggingMessageNotification The logging message notification to send + * This implementation would, incorrectly, broadcast the logging message to all + * connected clients, using a single minLoggingLevel for all of them. Similar to the + * sampling and roots, the logging level should be set per client session and use the + * ServerExchange to send the logging message to the right client. + * @param loggingMessageNotification The logging message to send + * @deprecated Use + * {@link McpSyncServerExchange#loggingNotification(LoggingMessageNotification)} + * instead. */ + @Deprecated public void loggingNotification(LoggingMessageNotification loggingMessageNotification) { this.asyncServer.loggingNotification(loggingMessageNotification).block(); } diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServerExchange.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServerExchange.java index f121db55..52360e54 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServerExchange.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServerExchange.java @@ -1,13 +1,19 @@ +/* + * Copyright 2024-2024 the original author or authors. + */ + package io.modelcontextprotocol.server; -import com.fasterxml.jackson.core.type.TypeReference; import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.McpSchema.LoggingLevel; +import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification; /** * Represents a synchronous exchange with a Model Context Protocol (MCP) client. The * exchange provides methods to interact with the client and query its capabilities. * * @author Dariusz Jędrzejczyk + * @author Christian Tzolov */ public class McpSyncServerExchange { @@ -75,4 +81,13 @@ public McpSchema.ListRootsResult listRoots(String cursor) { return this.exchange.listRoots(cursor).block(); } + /** + * Send a logging message notification to all connected clients. Messages below the + * current minimum logging level will be filtered out. + * @param loggingMessageNotification The logging message to send + */ + public void loggingNotification(LoggingMessageNotification loggingMessageNotification) { + this.exchange.loggingNotification(loggingMessageNotification).block(); + } + } diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java index 4c596b62..e621ac19 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java +++ b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java @@ -1165,6 +1165,11 @@ public int level() { } // @formatter:on + @JsonInclude(JsonInclude.Include.NON_ABSENT) + @JsonIgnoreProperties(ignoreUnknown = true) + public record SetLevelRequest(@JsonProperty("level") LoggingLevel level) { + } + // --------------------------- // Autocomplete // --------------------------- diff --git a/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java b/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java index ac7b9e5e..72b409af 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java @@ -31,6 +31,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -454,15 +455,10 @@ void testLoggingLevelsWithoutInitialization() { @Test void testLoggingLevels() { withClient(createMcpTransport(), mcpAsyncClient -> { - Mono testAllLevels = mcpAsyncClient.initialize().then(Mono.defer(() -> { - Mono chain = Mono.empty(); - for (McpSchema.LoggingLevel level : McpSchema.LoggingLevel.values()) { - chain = chain.then(mcpAsyncClient.setLoggingLevel(level)); - } - return chain; - })); - - StepVerifier.create(testAllLevels).verifyComplete(); + StepVerifier + .create(mcpAsyncClient.initialize() + .thenMany(Flux.fromArray(McpSchema.LoggingLevel.values()).flatMap(mcpAsyncClient::setLoggingLevel))) + .verifyComplete(); }); } diff --git a/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java b/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java index 4b4fc434..c7c69b52 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java @@ -415,53 +415,4 @@ void testRootsChangeHandlers() { .doesNotThrowAnyException(); } - // --------------------------------------- - // Logging Tests - // --------------------------------------- - - @Test - void testLoggingLevels() { - var mcpAsyncServer = McpServer.async(createMcpTransportProvider()) - .serverInfo("test-server", "1.0.0") - .capabilities(ServerCapabilities.builder().logging().build()) - .build(); - - // Test all logging levels - for (McpSchema.LoggingLevel level : McpSchema.LoggingLevel.values()) { - var notification = McpSchema.LoggingMessageNotification.builder() - .level(level) - .logger("test-logger") - .data("Test message with level " + level) - .build(); - - StepVerifier.create(mcpAsyncServer.loggingNotification(notification)).verifyComplete(); - } - } - - @Test - void testLoggingWithoutCapability() { - var mcpAsyncServer = McpServer.async(createMcpTransportProvider()) - .serverInfo("test-server", "1.0.0") - .capabilities(ServerCapabilities.builder().build()) // No logging capability - .build(); - - var notification = McpSchema.LoggingMessageNotification.builder() - .level(McpSchema.LoggingLevel.INFO) - .logger("test-logger") - .data("Test log message") - .build(); - - StepVerifier.create(mcpAsyncServer.loggingNotification(notification)).verifyComplete(); - } - - @Test - void testLoggingWithNullNotification() { - var mcpAsyncServer = McpServer.async(createMcpTransportProvider()) - .serverInfo("test-server", "1.0.0") - .capabilities(ServerCapabilities.builder().logging().build()) - .build(); - - StepVerifier.create(mcpAsyncServer.loggingNotification(null)).verifyError(McpError.class); - } - } diff --git a/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java b/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java index 17feb36e..8c9328cc 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java @@ -387,53 +387,4 @@ void testRootsChangeHandlers() { assertThatCode(() -> noConsumersServer.closeGracefully()).doesNotThrowAnyException(); } - // --------------------------------------- - // Logging Tests - // --------------------------------------- - - @Test - void testLoggingLevels() { - var mcpSyncServer = McpServer.sync(createMcpTransportProvider()) - .serverInfo("test-server", "1.0.0") - .capabilities(ServerCapabilities.builder().logging().build()) - .build(); - - // Test all logging levels - for (McpSchema.LoggingLevel level : McpSchema.LoggingLevel.values()) { - var notification = McpSchema.LoggingMessageNotification.builder() - .level(level) - .logger("test-logger") - .data("Test message with level " + level) - .build(); - - assertThatCode(() -> mcpSyncServer.loggingNotification(notification)).doesNotThrowAnyException(); - } - } - - @Test - void testLoggingWithoutCapability() { - var mcpSyncServer = McpServer.sync(createMcpTransportProvider()) - .serverInfo("test-server", "1.0.0") - .capabilities(ServerCapabilities.builder().build()) // No logging capability - .build(); - - var notification = McpSchema.LoggingMessageNotification.builder() - .level(McpSchema.LoggingLevel.INFO) - .logger("test-logger") - .data("Test log message") - .build(); - - assertThatCode(() -> mcpSyncServer.loggingNotification(notification)).doesNotThrowAnyException(); - } - - @Test - void testLoggingWithNullNotification() { - var mcpSyncServer = McpServer.sync(createMcpTransportProvider()) - .serverInfo("test-server", "1.0.0") - .capabilities(ServerCapabilities.builder().logging().build()) - .build(); - - assertThatThrownBy(() -> mcpSyncServer.loggingNotification(null)).isInstanceOf(McpError.class); - } - } diff --git a/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerCustomContextPathTests.java b/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerCustomContextPathTests.java index 1254e2ad..212a3c95 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerCustomContextPathTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerCustomContextPathTests.java @@ -8,7 +8,6 @@ import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport; import io.modelcontextprotocol.server.McpServer; import io.modelcontextprotocol.spec.McpSchema; -import org.apache.catalina.Context; import org.apache.catalina.LifecycleException; import org.apache.catalina.LifecycleState; import org.apache.catalina.startup.Tomcat; @@ -78,9 +77,13 @@ public void after() { @Test void testCustomContextPath() { - McpServer.async(mcpServerTransportProvider).serverInfo("test-server", "1.0.0").build(); - var client = clientBuilder.clientInfo(new McpSchema.Implementation("Sample " + "client", "0.0.0")).build(); - assertThat(client.initialize()).isNotNull(); + var server = McpServer.async(mcpServerTransportProvider).serverInfo("test-server", "1.0.0").build(); + try (//@formatter:off + var client = clientBuilder.clientInfo(new McpSchema.Implementation("Sample " + "client", "0.0.0")) .build()) { //@formatter:on + + assertThat(client.initialize()).isNotNull(); + } + server.close(); } } diff --git a/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java b/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java index e34baf9d..a7b61064 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java @@ -4,6 +4,7 @@ package io.modelcontextprotocol.server.transport; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; @@ -33,6 +34,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.org.checkerframework.checker.units.qual.m; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -44,7 +46,7 @@ public class HttpServletSseServerTransportProviderIntegrationTests { - private static final int PORT = 8185; + private static final int PORT = 8189; private static final String CUSTOM_SSE_ENDPOINT = "/somePath/sse"; @@ -110,27 +112,29 @@ void testCreateMessageWithoutSamplingCapabilities() { return Mono.just(mock(CallToolResult.class)); }); - McpServer.async(mcpServerTransportProvider).serverInfo("test-server", "1.0.0").tools(tool).build(); + var server = McpServer.async(mcpServerTransportProvider).serverInfo("test-server", "1.0.0").tools(tool).build(); - // Create client without sampling capabilities - var client = clientBuilder.clientInfo(new McpSchema.Implementation("Sample " + "client", "0.0.0")).build(); + try ( + // Create client without sampling capabilities + var client = clientBuilder.clientInfo(new McpSchema.Implementation("Sample " + "client", "0.0.0")) + .build()) { - assertThat(client.initialize()).isNotNull(); + assertThat(client.initialize()).isNotNull(); - try { - client.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); - } - catch (McpError e) { - assertThat(e).isInstanceOf(McpError.class) - .hasMessage("Client must be configured with sampling capabilities"); + try { + client.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); + } + catch (McpError e) { + assertThat(e).isInstanceOf(McpError.class) + .hasMessage("Client must be configured with sampling capabilities"); + } } + server.close(); } @Test void testCreateMessageSuccess() throws InterruptedException { - // Client - Function samplingHandler = request -> { assertThat(request.messages()).hasSize(1); assertThat(request.messages().get(0).content()).isInstanceOf(McpSchema.TextContent.class); @@ -139,13 +143,6 @@ void testCreateMessageSuccess() throws InterruptedException { CreateMessageResult.StopReason.STOP_SEQUENCE); }; - var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0")) - .capabilities(ClientCapabilities.builder().sampling().build()) - .sampling(samplingHandler) - .build(); - - // Server - CallToolResult callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")), null); @@ -180,15 +177,19 @@ void testCreateMessageSuccess() throws InterruptedException { .tools(tool) .build(); - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + try (var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0")) + .capabilities(ClientCapabilities.builder().sampling().build()) + .sampling(samplingHandler) + .build()) { - CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - assertThat(response).isNotNull(); - assertThat(response).isEqualTo(callResponse); + CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); - mcpClient.close(); + assertThat(response).isNotNull(); + assertThat(response).isEqualTo(callResponse); + } mcpServer.close(); } @@ -200,42 +201,43 @@ void testRootsSuccess() { List roots = List.of(new Root("uri1://", "root1"), new Root("uri2://", "root2")); AtomicReference> rootsRef = new AtomicReference<>(); + var mcpServer = McpServer.sync(mcpServerTransportProvider) .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef.set(rootsUpdate)) .build(); - var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) + try (var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) .roots(roots) - .build(); + .build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - assertThat(rootsRef.get()).isNull(); + assertThat(rootsRef.get()).isNull(); - mcpClient.rootsListChangedNotification(); + mcpClient.rootsListChangedNotification(); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).containsAll(roots); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(roots); + }); - // Remove a root - mcpClient.removeRoot(roots.get(0).uri()); + // Remove a root + mcpClient.removeRoot(roots.get(0).uri()); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).containsAll(List.of(roots.get(1))); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(List.of(roots.get(1))); + }); - // Add a new root - var root3 = new Root("uri3://", "root3"); - mcpClient.addRoot(root3); + // Add a new root + var root3 = new Root("uri3://", "root3"); + mcpClient.addRoot(root3); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).containsAll(List.of(roots.get(1), root3)); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(List.of(roots.get(1), root3)); + }); - mcpClient.close(); - mcpServer.close(); + mcpServer.close(); + } } @Test @@ -252,21 +254,19 @@ void testRootsWithoutCapability() { var mcpServer = McpServer.sync(mcpServerTransportProvider).rootsChangeHandler((exchange, rootsUpdate) -> { }).tools(tool).build(); - // Create client without roots capability - // No roots capability - var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().build()).build(); + try (var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().build()).build()) { - assertThat(mcpClient.initialize()).isNotNull(); + assertThat(mcpClient.initialize()).isNotNull(); - // Attempt to list roots should fail - try { - mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); - } - catch (McpError e) { - assertThat(e).isInstanceOf(McpError.class).hasMessage("Roots not supported"); + // Attempt to list roots should fail + try { + mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); + } + catch (McpError e) { + assertThat(e).isInstanceOf(McpError.class).hasMessage("Roots not supported"); + } } - mcpClient.close(); mcpServer.close(); } @@ -278,20 +278,20 @@ void testRootsNotifciationWithEmptyRootsList() { .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef.set(rootsUpdate)) .build(); - var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) + try (var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) .roots(List.of()) // Empty roots list - .build(); + .build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - mcpClient.rootsListChangedNotification(); + mcpClient.rootsListChangedNotification(); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).isEmpty(); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).isEmpty(); + }); + } - mcpClient.close(); mcpServer.close(); } @@ -307,20 +307,20 @@ void testRootsWithMultipleHandlers() { .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef2.set(rootsUpdate)) .build(); - var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) + try (var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) .roots(roots) - .build(); + .build()) { - assertThat(mcpClient.initialize()).isNotNull(); + assertThat(mcpClient.initialize()).isNotNull(); - mcpClient.rootsListChangedNotification(); + mcpClient.rootsListChangedNotification(); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef1.get()).containsAll(roots); - assertThat(rootsRef2.get()).containsAll(roots); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef1.get()).containsAll(roots); + assertThat(rootsRef2.get()).containsAll(roots); + }); + } - mcpClient.close(); mcpServer.close(); } @@ -329,28 +329,26 @@ void testRootsServerCloseWithActiveSubscription() { List roots = List.of(new Root("uri1://", "root1")); AtomicReference> rootsRef = new AtomicReference<>(); + var mcpServer = McpServer.sync(mcpServerTransportProvider) .rootsChangeHandler((exchange, rootsUpdate) -> rootsRef.set(rootsUpdate)) .build(); - var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) + try (var mcpClient = clientBuilder.capabilities(ClientCapabilities.builder().roots(true).build()) .roots(roots) - .build(); + .build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - mcpClient.rootsListChangedNotification(); + mcpClient.rootsListChangedNotification(); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).containsAll(roots); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(roots); + }); + } - // Close server while subscription is active mcpServer.close(); - - // Verify client can handle server closure gracefully - mcpClient.close(); } // --------------------------------------- @@ -386,19 +384,18 @@ void testToolCallSuccess() { .tools(tool1) .build(); - var mcpClient = clientBuilder.build(); + try (var mcpClient = clientBuilder.build()) { + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + assertThat(mcpClient.listTools().tools()).contains(tool1.tool()); - assertThat(mcpClient.listTools().tools()).contains(tool1.tool()); + CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); - CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of())); - - assertThat(response).isNotNull(); - assertThat(response).isEqualTo(callResponse); + assertThat(response).isNotNull(); + assertThat(response).isEqualTo(callResponse); + } - mcpClient.close(); mcpServer.close(); } @@ -418,13 +415,14 @@ void testToolListChangeHandlingSuccess() { return callResponse; }); + AtomicReference> rootsRef = new AtomicReference<>(); + var mcpServer = McpServer.sync(mcpServerTransportProvider) .capabilities(ServerCapabilities.builder().tools(true).build()) .tools(tool1) .build(); - AtomicReference> rootsRef = new AtomicReference<>(); - var mcpClient = clientBuilder.toolsChangeConsumer(toolsUpdate -> { + try (var mcpClient = clientBuilder.toolsChangeConsumer(toolsUpdate -> { // perform a blocking call to a remote service String response = RestClient.create() .get() @@ -433,53 +431,167 @@ void testToolListChangeHandlingSuccess() { .body(String.class); assertThat(response).isNotBlank(); rootsRef.set(toolsUpdate); - }).build(); + }).build()) { - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); - assertThat(rootsRef.get()).isNull(); + assertThat(rootsRef.get()).isNull(); - assertThat(mcpClient.listTools().tools()).contains(tool1.tool()); + assertThat(mcpClient.listTools().tools()).contains(tool1.tool()); - mcpServer.notifyToolsListChanged(); + mcpServer.notifyToolsListChanged(); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).containsAll(List.of(tool1.tool())); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(List.of(tool1.tool())); + }); - // Remove a tool - mcpServer.removeTool("tool1"); + // Remove a tool + mcpServer.removeTool("tool1"); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).isEmpty(); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).isEmpty(); + }); - // Add a new tool - McpServerFeatures.SyncToolSpecification tool2 = new McpServerFeatures.SyncToolSpecification( - new McpSchema.Tool("tool2", "tool2 description", emptyJsonSchema), (exchange, request) -> callResponse); + // Add a new tool + McpServerFeatures.SyncToolSpecification tool2 = new McpServerFeatures.SyncToolSpecification( + new McpSchema.Tool("tool2", "tool2 description", emptyJsonSchema), + (exchange, request) -> callResponse); - mcpServer.addTool(tool2); + mcpServer.addTool(tool2); - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(rootsRef.get()).containsAll(List.of(tool2.tool())); - }); + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(rootsRef.get()).containsAll(List.of(tool2.tool())); + }); + } - mcpClient.close(); mcpServer.close(); } @Test void testInitialize() { - var mcpServer = McpServer.sync(mcpServerTransportProvider).build(); - var mcpClient = clientBuilder.build(); + try (var mcpClient = clientBuilder.build()) { + + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + } + + mcpServer.close(); + } - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); + // --------------------------------------- + // Logging Tests + // --------------------------------------- + @Test + void testLoggingNotification() { + // Create a list to store received logging notifications + List receivedNotifications = new ArrayList<>(); - mcpClient.close(); + // Create server with a tool that sends logging notifications + McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification( + new McpSchema.Tool("logging-test", "Test logging notifications", emptyJsonSchema), + (exchange, request) -> { + + // Create and send notifications with different levels + + // This should be filtered out (DEBUG < NOTICE) + exchange + .loggingNotification(McpSchema.LoggingMessageNotification.builder() + .level(McpSchema.LoggingLevel.DEBUG) + .logger("test-logger") + .data("Debug message") + .build()) + .block(); + + // This should be sent (NOTICE >= NOTICE) + exchange + .loggingNotification(McpSchema.LoggingMessageNotification.builder() + .level(McpSchema.LoggingLevel.NOTICE) + .logger("test-logger") + .data("Notice message") + .build()) + .block(); + + // This should be sent (ERROR > NOTICE) + exchange + .loggingNotification(McpSchema.LoggingMessageNotification.builder() + .level(McpSchema.LoggingLevel.ERROR) + .logger("test-logger") + .data("Error message") + .build()) + .block(); + + // This should be filtered out (INFO < NOTICE) + exchange + .loggingNotification(McpSchema.LoggingMessageNotification.builder() + .level(McpSchema.LoggingLevel.INFO) + .logger("test-logger") + .data("Another info message") + .build()) + .block(); + + // This should be sent (ERROR >= NOTICE) + exchange + .loggingNotification(McpSchema.LoggingMessageNotification.builder() + .level(McpSchema.LoggingLevel.ERROR) + .logger("test-logger") + .data("Another error message") + .build()) + .block(); + + return Mono.just(new CallToolResult("Logging test completed", false)); + }); + + var mcpServer = McpServer.async(mcpServerTransportProvider) + .serverInfo("test-server", "1.0.0") + .capabilities(ServerCapabilities.builder().logging().tools(true).build()) + .tools(tool) + .build(); + try ( + // Create client with logging notification handler + var mcpClient = clientBuilder.loggingConsumer(notification -> { + receivedNotifications.add(notification); + }).build()) { + + // Initialize client + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + // Set minimum logging level to NOTICE + mcpClient.setLoggingLevel(McpSchema.LoggingLevel.NOTICE); + + // Call the tool that sends logging notifications + CallToolResult result = mcpClient.callTool(new McpSchema.CallToolRequest("logging-test", Map.of())); + assertThat(result).isNotNull(); + assertThat(result.content().get(0)).isInstanceOf(McpSchema.TextContent.class); + assertThat(((McpSchema.TextContent) result.content().get(0)).text()).isEqualTo("Logging test completed"); + + // Wait for notifications to be processed + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + + System.out.println("Received notifications: " + receivedNotifications); + + // Should have received 3 notifications (1 NOTICE and 2 ERROR) + assertThat(receivedNotifications).hasSize(3); + + // First notification should be NOTICE level + assertThat(receivedNotifications.get(0).level()).isEqualTo(McpSchema.LoggingLevel.NOTICE); + assertThat(receivedNotifications.get(0).logger()).isEqualTo("test-logger"); + assertThat(receivedNotifications.get(0).data()).isEqualTo("Notice message"); + + // Second notification should be ERROR level + assertThat(receivedNotifications.get(1).level()).isEqualTo(McpSchema.LoggingLevel.ERROR); + assertThat(receivedNotifications.get(1).logger()).isEqualTo("test-logger"); + assertThat(receivedNotifications.get(1).data()).isEqualTo("Error message"); + + // Third notification should be ERROR level + assertThat(receivedNotifications.get(2).level()).isEqualTo(McpSchema.LoggingLevel.ERROR); + assertThat(receivedNotifications.get(2).logger()).isEqualTo("test-logger"); + assertThat(receivedNotifications.get(2).data()).isEqualTo("Another error message"); + }); + } mcpServer.close(); }