Skip to content

Commit 3020b4a

Browse files
committed
feat(mcp): refactor logging to use exchange for targeted client notifications
Refactors the MCP logging system to use the exchange mechanism for sending logging notifications only to specific client sessions rather than broadcasting to all clients. - Move logging notification delivery from server-wide broadcast to per-session exchange - Implement per-session minimum logging level tracking and filtering - Change setLoggingLevel from notification to request/response pattern - Deprecate global server.loggingNotification in favor of exchange.loggingNotification - Add integration test demonstrating filtered logging notifications Resolves #131 Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com>
1 parent 8fc72ae commit 3020b4a

File tree

17 files changed

+362
-245
lines changed

17 files changed

+362
-245
lines changed

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java

Lines changed: 120 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package io.modelcontextprotocol;
55

66
import java.time.Duration;
7+
import java.util.ArrayList;
78
import java.util.List;
89
import java.util.Map;
910
import java.util.concurrent.ConcurrentHashMap;
@@ -12,6 +13,7 @@
1213

1314
import com.fasterxml.jackson.databind.ObjectMapper;
1415
import io.modelcontextprotocol.client.McpClient;
16+
import io.modelcontextprotocol.client.McpSyncClient;
1517
import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport;
1618
import io.modelcontextprotocol.client.transport.WebFluxSseClientTransport;
1719
import io.modelcontextprotocol.server.McpServer;
@@ -111,7 +113,7 @@ void testCreateMessageWithoutSamplingCapabilities(String clientType) {
111113
return Mono.just(mock(CallToolResult.class));
112114
});
113115

114-
McpServer.async(mcpServerTransportProvider).serverInfo("test-server", "1.0.0").tools(tool).build();
116+
var server = McpServer.async(mcpServerTransportProvider).serverInfo("test-server", "1.0.0").tools(tool).build();
115117

116118
// Create client without sampling capabilities
117119
var client = clientBuilder.clientInfo(new McpSchema.Implementation("Sample " + "client", "0.0.0")).build();
@@ -125,6 +127,9 @@ void testCreateMessageWithoutSamplingCapabilities(String clientType) {
125127
assertThat(e).isInstanceOf(McpError.class)
126128
.hasMessage("Client must be configured with sampling capabilities");
127129
}
130+
131+
server.close();
132+
client.close();
128133
}
129134

130135
@ParameterizedTest(name = "{0} : {displayName} ")
@@ -293,8 +298,7 @@ void testRootsNotifciationWithEmptyRootsList(String clientType) {
293298
.roots(List.of()) // Empty roots list
294299
.build();
295300

296-
InitializeResult initResult = mcpClient.initialize();
297-
assertThat(initResult).isNotNull();
301+
assertThat(mcpClient.initialize()).isNotNull();
298302

299303
mcpClient.rootsListChangedNotification();
300304

@@ -309,6 +313,7 @@ void testRootsNotifciationWithEmptyRootsList(String clientType) {
309313
@ParameterizedTest(name = "{0} : {displayName} ")
310314
@ValueSource(strings = { "httpclient", "webflux" })
311315
void testRootsWithMultipleHandlers(String clientType) {
316+
312317
var clientBuilder = clientBuilders.get(clientType);
313318

314319
List<Root> roots = List.of(new Root("uri1://", "root1"));
@@ -339,8 +344,8 @@ void testRootsWithMultipleHandlers(String clientType) {
339344
mcpServer.close();
340345
}
341346

342-
@ParameterizedTest(name = "{0} : {displayName} ")
343-
@ValueSource(strings = { "httpclient", "webflux" })
347+
// @ParameterizedTest(name = "{0} : {displayName} ")
348+
// @ValueSource(strings = { "httpclient", "webflux" })
344349
void testRootsServerCloseWithActiveSubscription(String clientType) {
345350

346351
var clientBuilder = clientBuilders.get(clientType);
@@ -365,10 +370,7 @@ void testRootsServerCloseWithActiveSubscription(String clientType) {
365370
assertThat(rootsRef.get()).containsAll(roots);
366371
});
367372

368-
// Close server while subscription is active
369373
mcpServer.close();
370-
371-
// Verify client can handle server closure gracefully
372374
mcpClient.close();
373375
}
374376

@@ -378,9 +380,9 @@ void testRootsServerCloseWithActiveSubscription(String clientType) {
378380

379381
String emptyJsonSchema = """
380382
{
381-
"$schema": "http://json-schema.org/draft-07/schema#",
382-
"type": "object",
383-
"properties": {}
383+
"": "http://json-schema.org/draft-07/schema#",
384+
"type": "object",
385+
"properties": {}
384386
}
385387
""";
386388

@@ -396,7 +398,7 @@ void testToolCallSuccess(String clientType) {
396398
// perform a blocking call to a remote service
397399
String response = RestClient.create()
398400
.get()
399-
.uri("https://github.com/modelcontextprotocol/specification/blob/main/README.md")
401+
.uri("https://raw.githubusercontent.com/modelcontextprotocol/modelcontextprotocol/refs/heads/main/README.md")
400402
.retrieve()
401403
.body(String.class);
402404
assertThat(response).isNotBlank();
@@ -436,7 +438,7 @@ void testToolListChangeHandlingSuccess(String clientType) {
436438
// perform a blocking call to a remote service
437439
String response = RestClient.create()
438440
.get()
439-
.uri("https://github.com/modelcontextprotocol/specification/blob/main/README.md")
441+
.uri("https://raw.githubusercontent.com/modelcontextprotocol/modelcontextprotocol/refs/heads/main/README.md")
440442
.retrieve()
441443
.body(String.class);
442444
assertThat(response).isNotBlank();
@@ -453,7 +455,7 @@ void testToolListChangeHandlingSuccess(String clientType) {
453455
// perform a blocking call to a remote service
454456
String response = RestClient.create()
455457
.get()
456-
.uri("https://github.com/modelcontextprotocol/specification/blob/main/README.md")
458+
.uri("https://raw.githubusercontent.com/modelcontextprotocol/modelcontextprotocol/refs/heads/main/README.md")
457459
.retrieve()
458460
.body(String.class);
459461
assertThat(response).isNotBlank();
@@ -511,4 +513,108 @@ void testInitialize(String clientType) {
511513
mcpServer.close();
512514
}
513515

516+
// ---------------------------------------
517+
// Logging Tests
518+
// ---------------------------------------
519+
520+
@ParameterizedTest(name = "{0} : {displayName} ")
521+
@ValueSource(strings = { "httpclient", "webflux" })
522+
void testLoggingNotification(String clientType) {
523+
// Create a list to store received logging notifications
524+
List<McpSchema.LoggingMessageNotification> receivedNotifications = new ArrayList<>();
525+
526+
var clientBuilder = clientBuilders.get(clientType);
527+
528+
// Create server with a tool that sends logging notifications
529+
McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification(
530+
new McpSchema.Tool("logging-test", "Test logging notifications", emptyJsonSchema),
531+
(exchange, request) -> {
532+
533+
// Create and send notifications with different levels
534+
535+
//@formatter:off
536+
return exchange // This should be filtered out (DEBUG < NOTICE)
537+
.loggingNotification(McpSchema.LoggingMessageNotification.builder()
538+
.level(McpSchema.LoggingLevel.DEBUG)
539+
.logger("test-logger")
540+
.data("Debug message")
541+
.build())
542+
.then(exchange // This should be sent (NOTICE >= NOTICE)
543+
.loggingNotification(McpSchema.LoggingMessageNotification.builder()
544+
.level(McpSchema.LoggingLevel.NOTICE)
545+
.logger("test-logger")
546+
.data("Notice message")
547+
.build()))
548+
.then(exchange // This should be sent (ERROR > NOTICE)
549+
.loggingNotification(McpSchema.LoggingMessageNotification.builder()
550+
.level(McpSchema.LoggingLevel.ERROR)
551+
.logger("test-logger")
552+
.data("Error message")
553+
.build()))
554+
.then(exchange // This should be filtered out (INFO < NOTICE)
555+
.loggingNotification(McpSchema.LoggingMessageNotification.builder()
556+
.level(McpSchema.LoggingLevel.INFO)
557+
.logger("test-logger")
558+
.data("Another info message")
559+
.build()))
560+
.then(exchange // This should be sent (ERROR >= NOTICE)
561+
.loggingNotification(McpSchema.LoggingMessageNotification.builder()
562+
.level(McpSchema.LoggingLevel.ERROR)
563+
.logger("test-logger")
564+
.data("Another error message")
565+
.build()))
566+
.thenReturn(new CallToolResult("Logging test completed", false));
567+
//@formatter:on
568+
});
569+
570+
var mcpServer = McpServer.async(mcpServerTransportProvider)
571+
.serverInfo("test-server", "1.0.0")
572+
.capabilities(ServerCapabilities.builder().logging().tools(true).build())
573+
.tools(tool)
574+
.build();
575+
576+
// Create client with logging notification handler
577+
McpSyncClient mcpClient = clientBuilder.loggingConsumer(notification -> {
578+
receivedNotifications.add(notification);
579+
}).build();
580+
581+
// Initialize client
582+
InitializeResult initResult = mcpClient.initialize();
583+
assertThat(initResult).isNotNull();
584+
585+
// Set minimum logging level to NOTICE
586+
mcpClient.setLoggingLevel(McpSchema.LoggingLevel.NOTICE);
587+
588+
// Call the tool that sends logging notifications
589+
CallToolResult result = mcpClient.callTool(new McpSchema.CallToolRequest("logging-test", Map.of()));
590+
assertThat(result).isNotNull();
591+
assertThat(result.content().get(0)).isInstanceOf(McpSchema.TextContent.class);
592+
assertThat(((McpSchema.TextContent) result.content().get(0)).text()).isEqualTo("Logging test completed");
593+
594+
// Wait for notifications to be processed
595+
await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
596+
597+
// Should have received 3 notifications (1 NOTICE and 2 ERROR)
598+
assertThat(receivedNotifications).hasSize(3);
599+
600+
// First notification should be NOTICE level
601+
assertThat(receivedNotifications.get(0).level()).isEqualTo(McpSchema.LoggingLevel.NOTICE);
602+
assertThat(receivedNotifications.get(0).logger()).isEqualTo("test-logger");
603+
assertThat(receivedNotifications.get(0).data()).isEqualTo("Notice message");
604+
605+
// Second notification should be ERROR level
606+
assertThat(receivedNotifications.get(1).level()).isEqualTo(McpSchema.LoggingLevel.ERROR);
607+
assertThat(receivedNotifications.get(1).logger()).isEqualTo("test-logger");
608+
assertThat(receivedNotifications.get(1).data()).isEqualTo("Error message");
609+
610+
// Third notification should be ERROR level
611+
assertThat(receivedNotifications.get(2).level()).isEqualTo(McpSchema.LoggingLevel.ERROR);
612+
assertThat(receivedNotifications.get(2).logger()).isEqualTo("test-logger");
613+
assertThat(receivedNotifications.get(2).data()).isEqualTo("Another error message");
614+
});
615+
616+
mcpClient.close();
617+
mcpServer.close();
618+
}
619+
514620
}

mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcSseIntegrationTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ void testToolCallSuccess() {
389389
// perform a blocking call to a remote service
390390
String response = RestClient.create()
391391
.get()
392-
.uri("https://github.com/modelcontextprotocol/specification/blob/main/README.md")
392+
.uri("https://raw.githubusercontent.com/modelcontextprotocol/modelcontextprotocol/refs/heads/main/README.md")
393393
.retrieve()
394394
.body(String.class);
395395
assertThat(response).isNotBlank();
@@ -426,7 +426,7 @@ void testToolListChangeHandlingSuccess() {
426426
// perform a blocking call to a remote service
427427
String response = RestClient.create()
428428
.get()
429-
.uri("https://github.com/modelcontextprotocol/specification/blob/main/README.md")
429+
.uri("https://raw.githubusercontent.com/modelcontextprotocol/modelcontextprotocol/refs/heads/main/README.md")
430430
.retrieve()
431431
.body(String.class);
432432
assertThat(response).isNotBlank();
@@ -443,7 +443,7 @@ void testToolListChangeHandlingSuccess() {
443443
// perform a blocking call to a remote service
444444
String response = RestClient.create()
445445
.get()
446-
.uri("https://github.com/modelcontextprotocol/specification/blob/main/README.md")
446+
.uri("https://raw.githubusercontent.com/modelcontextprotocol/modelcontextprotocol/refs/heads/main/README.md")
447447
.retrieve()
448448
.body(String.class);
449449
assertThat(response).isNotBlank();

mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -453,15 +453,15 @@ void testLoggingLevelsWithoutInitialization() {
453453
@Test
454454
void testLoggingLevels() {
455455
withClient(createMcpTransport(), mcpAsyncClient -> {
456-
Mono<Void> testAllLevels = mcpAsyncClient.initialize().then(Mono.defer(() -> {
457-
Mono<Void> chain = Mono.empty();
456+
Mono<Object> testAllLevels = mcpAsyncClient.initialize().then(Mono.defer(() -> {
457+
Mono<Object> chain = Mono.empty();
458458
for (McpSchema.LoggingLevel level : McpSchema.LoggingLevel.values()) {
459459
chain = chain.then(mcpAsyncClient.setLoggingLevel(level));
460460
}
461461
return chain;
462462
}));
463463

464-
StepVerifier.create(testAllLevels).verifyComplete();
464+
StepVerifier.create(testAllLevels).expectNextCount(1).verifyComplete();
465465
});
466466
}
467467

mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -416,53 +416,4 @@ void testRootsChangeHandlers() {
416416
.doesNotThrowAnyException();
417417
}
418418

419-
// ---------------------------------------
420-
// Logging Tests
421-
// ---------------------------------------
422-
423-
@Test
424-
void testLoggingLevels() {
425-
var mcpAsyncServer = McpServer.async(createMcpTransportProvider())
426-
.serverInfo("test-server", "1.0.0")
427-
.capabilities(ServerCapabilities.builder().logging().build())
428-
.build();
429-
430-
// Test all logging levels
431-
for (McpSchema.LoggingLevel level : McpSchema.LoggingLevel.values()) {
432-
var notification = McpSchema.LoggingMessageNotification.builder()
433-
.level(level)
434-
.logger("test-logger")
435-
.data("Test message with level " + level)
436-
.build();
437-
438-
StepVerifier.create(mcpAsyncServer.loggingNotification(notification)).verifyComplete();
439-
}
440-
}
441-
442-
@Test
443-
void testLoggingWithoutCapability() {
444-
var mcpAsyncServer = McpServer.async(createMcpTransportProvider())
445-
.serverInfo("test-server", "1.0.0")
446-
.capabilities(ServerCapabilities.builder().build()) // No logging capability
447-
.build();
448-
449-
var notification = McpSchema.LoggingMessageNotification.builder()
450-
.level(McpSchema.LoggingLevel.INFO)
451-
.logger("test-logger")
452-
.data("Test log message")
453-
.build();
454-
455-
StepVerifier.create(mcpAsyncServer.loggingNotification(notification)).verifyComplete();
456-
}
457-
458-
@Test
459-
void testLoggingWithNullNotification() {
460-
var mcpAsyncServer = McpServer.async(createMcpTransportProvider())
461-
.serverInfo("test-server", "1.0.0")
462-
.capabilities(ServerCapabilities.builder().logging().build())
463-
.build();
464-
465-
StepVerifier.create(mcpAsyncServer.loggingNotification(null)).verifyError(McpError.class);
466-
}
467-
468419
}

mcp-test/src/main/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -388,53 +388,4 @@ void testRootsChangeHandlers() {
388388
assertThatCode(() -> noConsumersServer.closeGracefully()).doesNotThrowAnyException();
389389
}
390390

391-
// ---------------------------------------
392-
// Logging Tests
393-
// ---------------------------------------
394-
395-
@Test
396-
void testLoggingLevels() {
397-
var mcpSyncServer = McpServer.sync(createMcpTransportProvider())
398-
.serverInfo("test-server", "1.0.0")
399-
.capabilities(ServerCapabilities.builder().logging().build())
400-
.build();
401-
402-
// Test all logging levels
403-
for (McpSchema.LoggingLevel level : McpSchema.LoggingLevel.values()) {
404-
var notification = McpSchema.LoggingMessageNotification.builder()
405-
.level(level)
406-
.logger("test-logger")
407-
.data("Test message with level " + level)
408-
.build();
409-
410-
assertThatCode(() -> mcpSyncServer.loggingNotification(notification)).doesNotThrowAnyException();
411-
}
412-
}
413-
414-
@Test
415-
void testLoggingWithoutCapability() {
416-
var mcpSyncServer = McpServer.sync(createMcpTransportProvider())
417-
.serverInfo("test-server", "1.0.0")
418-
.capabilities(ServerCapabilities.builder().build()) // No logging capability
419-
.build();
420-
421-
var notification = McpSchema.LoggingMessageNotification.builder()
422-
.level(McpSchema.LoggingLevel.INFO)
423-
.logger("test-logger")
424-
.data("Test log message")
425-
.build();
426-
427-
assertThatCode(() -> mcpSyncServer.loggingNotification(notification)).doesNotThrowAnyException();
428-
}
429-
430-
@Test
431-
void testLoggingWithNullNotification() {
432-
var mcpSyncServer = McpServer.sync(createMcpTransportProvider())
433-
.serverInfo("test-server", "1.0.0")
434-
.capabilities(ServerCapabilities.builder().logging().build())
435-
.build();
436-
437-
assertThatThrownBy(() -> mcpSyncServer.loggingNotification(null)).isInstanceOf(McpError.class);
438-
}
439-
440391
}

mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -780,16 +780,15 @@ private NotificationHandler asyncLoggingNotificationHandler(
780780
* @return A Mono that completes when the logging level is set.
781781
* @see McpSchema.LoggingLevel
782782
*/
783-
public Mono<Void> setLoggingLevel(LoggingLevel loggingLevel) {
783+
public Mono<Object> setLoggingLevel(LoggingLevel loggingLevel) {
784784
if (loggingLevel == null) {
785785
return Mono.error(new McpError("Logging level must not be null"));
786786
}
787787

788788
return this.withInitializationCheck("setting logging level", initializedResult -> {
789-
String levelName = this.transport.unmarshalFrom(loggingLevel, new TypeReference<String>() {
789+
var params = new McpSchema.SetLevelRequest(loggingLevel);
790+
return this.mcpSession.sendRequest(McpSchema.METHOD_LOGGING_SET_LEVEL, params, new TypeReference<Object>() {
790791
});
791-
Map<String, Object> params = Map.of("level", levelName);
792-
return this.mcpSession.sendNotification(McpSchema.METHOD_LOGGING_SET_LEVEL, params);
793792
});
794793
}
795794

0 commit comments

Comments
 (0)