Skip to content

Commit dfe0fe5

Browse files
committed
feat: extracts McpClient/McpServer abstraction into mcp-spi
1 parent ac6c155 commit dfe0fe5

28 files changed

+893
-348
lines changed

mcp-reactor/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,19 +72,19 @@
7272
* @author Dariusz Jędrzejczyk
7373
* @author Christian Tzolov
7474
* @author Jihoon Kim
75-
* @see McpClient
75+
* @see McpClientFactory
7676
* @see McpSchema
7777
* @see McpClientSession
7878
*/
79-
public class McpAsyncClient {
79+
public class McpAsyncClient implements McpClient {
8080

8181
private static final Logger logger = LoggerFactory.getLogger(McpAsyncClient.class);
8282

8383
private static final McpType<Void> VOID_TYPE_REFERENCE = McpType.of(Void.class);
8484

8585
protected final Sinks.One<McpSchema.InitializeResult> initializedSink = Sinks.one();
8686

87-
private AtomicBoolean initialized = new AtomicBoolean(false);
87+
private final AtomicBoolean initialized = new AtomicBoolean(false);
8888

8989
/**
9090
* The max timeout to await for the client-server connection to be initialized.
@@ -808,7 +808,7 @@ public Mono<Void> setLoggingLevel(LoggingLevel loggingLevel) {
808808
* code.
809809
* @param protocolVersions the Client supported protocol versions.
810810
*/
811-
void setProtocolVersions(List<String> protocolVersions) {
811+
public void setProtocolVersions(List<String> protocolVersions) {
812812
this.protocolVersions = protocolVersions;
813813

814814
}

mcp-reactor/src/main/java/io/modelcontextprotocol/client/McpClient.java renamed to mcp-reactor/src/main/java/io/modelcontextprotocol/client/McpClientFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@
9999
* @see McpSyncClient
100100
* @see McpTransport
101101
*/
102-
public interface McpClient {
102+
public interface McpClientFactory {
103103

104104
/**
105105
* Start building a synchronous MCP client with the specified transport layer. The

mcp-reactor/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
* operations in non-blocking contexts by scheduling them on a bounded elastic scheduler.
4343
*
4444
* @author Dariusz Jędrzejczyk
45-
* @see McpClient
45+
* @see McpClientFactory
4646
* @see McpSchema.Implementation
4747
* @see McpSchema.ClientCapabilities
4848
*/

mcp-reactor/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
* @author Dariusz Jędrzejczyk
4848
* @author Christian Tzolov
4949
* @author Jihoon Kim
50-
* @see McpClient
50+
* @see McpClientFactory
5151
* @see McpAsyncClient
5252
* @see McpSchema
5353
*/

mcp-reactor/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java

Lines changed: 36 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,11 @@
7777
* @author Christian Tzolov
7878
* @author Dariusz Jędrzejczyk
7979
* @author Jihoon Kim
80-
* @see McpServer
80+
* @see McpServerFactory
8181
* @see McpSchema
8282
* @see McpClientSession
8383
*/
84-
public class McpAsyncServer {
84+
public class McpAsyncServer implements McpServer {
8585

8686
private static final Logger logger = LoggerFactory.getLogger(McpAsyncServer.class);
8787

@@ -95,13 +95,13 @@ public class McpAsyncServer {
9595

9696
private final String instructions;
9797

98-
private final CopyOnWriteArrayList<McpServerFeatures.AsyncToolSpecification> tools = new CopyOnWriteArrayList<>();
98+
private final CopyOnWriteArrayList<McpServer.AsyncToolSpecification> tools = new CopyOnWriteArrayList<>();
9999

100100
private final CopyOnWriteArrayList<McpSchema.ResourceTemplate> resourceTemplates = new CopyOnWriteArrayList<>();
101101

102-
private final ConcurrentHashMap<String, McpServerFeatures.AsyncResourceSpecification> resources = new ConcurrentHashMap<>();
102+
private final ConcurrentHashMap<String, McpServer.AsyncResourceSpecification> resources = new ConcurrentHashMap<>();
103103

104-
private final ConcurrentHashMap<String, McpServerFeatures.AsyncPromptSpecification> prompts = new ConcurrentHashMap<>();
104+
private final ConcurrentHashMap<String, McpServer.AsyncPromptSpecification> prompts = new ConcurrentHashMap<>();
105105

106106
// FIXME: this field is deprecated and should be remvoed together with the
107107
// broadcasting loggingNotification.
@@ -111,7 +111,7 @@ public class McpAsyncServer {
111111

112112
private List<String> protocolVersions = List.of(McpSchema.LATEST_PROTOCOL_VERSION);
113113

114-
private McpUriTemplateManagerFactory uriTemplateManagerFactory = new DeafaultMcpUriTemplateManagerFactory();
114+
private final McpUriTemplateManagerFactory uriTemplateManagerFactory;
115115

116116
/**
117117
* Create a new McpAsyncServer with the given transport provider and capabilities.
@@ -282,7 +282,7 @@ private McpServerSession.NotificationHandler asyncRootsListChangedNotificationHa
282282
* @param toolSpecification The tool specification to add
283283
* @return Mono that completes when clients have been notified of the change
284284
*/
285-
public Mono<Void> addTool(McpServerFeatures.AsyncToolSpecification toolSpecification) {
285+
public Mono<Void> addTool(McpServer.AsyncToolSpecification toolSpecification) {
286286
if (toolSpecification == null) {
287287
return Mono.error(new McpError("Tool specification must not be null"));
288288
}
@@ -351,7 +351,7 @@ public Mono<Void> notifyToolsListChanged() {
351351

352352
private McpServerSession.RequestHandler<McpSchema.ListToolsResult> toolsListRequestHandler() {
353353
return (exchange, params) -> {
354-
List<Tool> tools = this.tools.stream().map(McpServerFeatures.AsyncToolSpecification::tool).toList();
354+
List<Tool> tools = this.tools.stream().map(McpServer.AsyncToolSpecification::tool).toList();
355355

356356
return Mono.just(new McpSchema.ListToolsResult(tools, null));
357357
};
@@ -362,15 +362,16 @@ private McpServerSession.RequestHandler<CallToolResult> toolsCallRequestHandler(
362362
McpSchema.CallToolRequest callToolRequest = schemaCodec.decodeResult(params,
363363
McpType.of(McpSchema.CallToolRequest.class));
364364

365-
Optional<McpServerFeatures.AsyncToolSpecification> toolSpecification = this.tools.stream()
365+
Optional<McpServer.AsyncToolSpecification> toolSpecification = this.tools.stream()
366366
.filter(tr -> callToolRequest.name().equals(tr.tool().name()))
367367
.findAny();
368368

369369
if (toolSpecification.isEmpty()) {
370370
return Mono.error(new McpError("Tool not found: " + callToolRequest.name()));
371371
}
372372

373-
return toolSpecification.map(tool -> tool.call().apply(exchange, callToolRequest.arguments()))
373+
return toolSpecification
374+
.map(tool -> Mono.from(tool.call().apply(exchange, callToolRequest.arguments())))
374375
.orElse(Mono.error(new McpError("Tool not found: " + callToolRequest.name())));
375376
};
376377
}
@@ -384,7 +385,7 @@ private McpServerSession.RequestHandler<CallToolResult> toolsCallRequestHandler(
384385
* @param resourceSpecification The resource handler to add
385386
* @return Mono that completes when clients have been notified of the change
386387
*/
387-
public Mono<Void> addResource(McpServerFeatures.AsyncResourceSpecification resourceSpecification) {
388+
public Mono<Void> addResource(McpServer.AsyncResourceSpecification resourceSpecification) {
388389
if (resourceSpecification == null || resourceSpecification.resource() == null) {
389390
return Mono.error(new McpError("Resource must not be null"));
390391
}
@@ -420,7 +421,7 @@ public Mono<Void> removeResource(String resourceUri) {
420421
}
421422

422423
return Mono.defer(() -> {
423-
McpServerFeatures.AsyncResourceSpecification removed = this.resources.remove(resourceUri);
424+
McpServer.AsyncResourceSpecification removed = this.resources.remove(resourceUri);
424425
if (removed != null) {
425426
logger.debug("Removed resource handler: {}", resourceUri);
426427
if (this.serverCapabilities.resources().listChanged()) {
@@ -445,7 +446,7 @@ private McpServerSession.RequestHandler<McpSchema.ListResourcesResult> resources
445446
return (exchange, params) -> {
446447
var resourceList = this.resources.values()
447448
.stream()
448-
.map(McpServerFeatures.AsyncResourceSpecification::resource)
449+
.map(McpServer.AsyncResourceSpecification::resource)
449450
.toList();
450451
return Mono.just(new McpSchema.ListResourcesResult(resourceList, null));
451452
};
@@ -481,15 +482,15 @@ private McpServerSession.RequestHandler<McpSchema.ReadResourceResult> resourcesR
481482
McpType.of(McpSchema.ReadResourceRequest.class));
482483
var resourceUri = resourceRequest.uri();
483484

484-
McpServerFeatures.AsyncResourceSpecification specification = this.resources.values()
485+
McpServer.AsyncResourceSpecification specification = this.resources.values()
485486
.stream()
486487
.filter(resourceSpecification -> this.uriTemplateManagerFactory
487488
.create(resourceSpecification.resource().uri())
488489
.matches(resourceUri))
489490
.findFirst()
490491
.orElseThrow(() -> new McpError("Resource not found: " + resourceUri));
491492

492-
return specification.readHandler().apply(exchange, resourceRequest);
493+
return Mono.from(specification.readHandler().apply(exchange, resourceRequest));
493494
};
494495
}
495496

@@ -502,7 +503,7 @@ private McpServerSession.RequestHandler<McpSchema.ReadResourceResult> resourcesR
502503
* @param promptSpecification The prompt handler to add
503504
* @return Mono that completes when clients have been notified of the change
504505
*/
505-
public Mono<Void> addPrompt(McpServerFeatures.AsyncPromptSpecification promptSpecification) {
506+
public Mono<Void> addPrompt(McpServer.AsyncPromptSpecification promptSpecification) {
506507
if (promptSpecification == null) {
507508
return Mono.error(new McpError("Prompt specification must not be null"));
508509
}
@@ -511,7 +512,7 @@ public Mono<Void> addPrompt(McpServerFeatures.AsyncPromptSpecification promptSpe
511512
}
512513

513514
return Mono.defer(() -> {
514-
McpServerFeatures.AsyncPromptSpecification specification = this.prompts
515+
McpServer.AsyncPromptSpecification specification = this.prompts
515516
.putIfAbsent(promptSpecification.prompt().name(), promptSpecification);
516517
if (specification != null) {
517518
return Mono.error(
@@ -544,7 +545,7 @@ public Mono<Void> removePrompt(String promptName) {
544545
}
545546

546547
return Mono.defer(() -> {
547-
McpServerFeatures.AsyncPromptSpecification removed = this.prompts.remove(promptName);
548+
McpServer.AsyncPromptSpecification removed = this.prompts.remove(promptName);
548549

549550
if (removed != null) {
550551
logger.debug("Removed prompt handler: {}", promptName);
@@ -577,7 +578,7 @@ private McpServerSession.RequestHandler<McpSchema.ListPromptsResult> promptsList
577578

578579
var promptList = this.prompts.values()
579580
.stream()
580-
.map(McpServerFeatures.AsyncPromptSpecification::prompt)
581+
.map(McpServer.AsyncPromptSpecification::prompt)
581582
.toList();
582583

583584
return Mono.just(new McpSchema.ListPromptsResult(promptList, null));
@@ -590,12 +591,12 @@ private McpServerSession.RequestHandler<McpSchema.GetPromptResult> promptsGetReq
590591
McpType.of(McpSchema.GetPromptRequest.class));
591592

592593
// Implement prompt retrieval logic here
593-
McpServerFeatures.AsyncPromptSpecification specification = this.prompts.get(promptRequest.name());
594+
McpServer.AsyncPromptSpecification specification = this.prompts.get(promptRequest.name());
594595
if (specification == null) {
595596
return Mono.error(new McpError("Prompt not found: " + promptRequest.name()));
596597
}
597598

598-
return specification.promptHandler().apply(exchange, promptRequest);
599+
return Mono.from(specification.promptHandler().apply(exchange, promptRequest));
599600
};
600601
}
601602

@@ -665,31 +666,26 @@ private McpServerSession.RequestHandler<McpSchema.CompleteResult> completionComp
665666

666667
// check if the referenced resource exists
667668
if (type.equals("ref/prompt") && request.ref() instanceof McpSchema.PromptReference promptReference) {
668-
McpServerFeatures.AsyncPromptSpecification promptSpec = this.prompts.get(promptReference.name());
669+
McpServer.AsyncPromptSpecification promptSpec = this.prompts.get(promptReference.name());
669670
if (promptSpec == null) {
670671
return Mono.error(new McpError("Prompt not found: " + promptReference.name()));
671672
}
672-
if (!promptSpec.prompt()
673-
.arguments()
674-
.stream()
675-
.filter(arg -> arg.name().equals(argumentName))
676-
.findFirst()
677-
.isPresent()) {
673+
if (promptSpec.prompt().arguments().stream().noneMatch(arg -> arg.name().equals(argumentName))) {
678674

679675
return Mono.error(new McpError("Argument not found: " + argumentName));
680676
}
681677
}
682678

683679
if (type.equals("ref/resource") && request.ref() instanceof McpSchema.ResourceReference resourceReference) {
684-
McpServerFeatures.AsyncResourceSpecification resourceSpec = this.resources.get(resourceReference.uri());
685-
if (resourceSpec == null) {
686-
return Mono.error(new McpError("Resource not found: " + resourceReference.uri()));
687-
}
688-
if (!uriTemplateManagerFactory.create(resourceSpec.resource().uri())
689-
.getVariableNames()
690-
.contains(argumentName)) {
691-
return Mono.error(new McpError("Argument not found: " + argumentName));
692-
}
680+
McpServer.AsyncResourceSpecification resourceSpec = this.resources.get(resourceReference.uri());
681+
if (resourceSpec == null) {
682+
return Mono.error(new McpError("Resource not found: " + resourceReference.uri()));
683+
}
684+
if (!uriTemplateManagerFactory.create(resourceSpec.resource().uri())
685+
.getVariableNames()
686+
.contains(argumentName)) {
687+
return Mono.error(new McpError("Argument not found: " + argumentName));
688+
}
693689

694690
}
695691

@@ -743,8 +739,8 @@ private McpSchema.CompleteRequest parseCompletionParams(Object object) {
743739
* code.
744740
* @param protocolVersions the Client supported protocol versions.
745741
*/
746-
void setProtocolVersions(List<String> protocolVersions) {
747-
this.protocolVersions = protocolVersions;
748-
}
742+
public void setProtocolVersions(List<String> protocolVersions) {
743+
this.protocolVersions = protocolVersions;
744+
}
749745

750746
}

mcp-reactor/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
* @author Dariusz Jędrzejczyk
2121
* @author Christian Tzolov
2222
*/
23-
public class McpAsyncServerExchange {
23+
public class McpAsyncServerExchange implements McpServerExchange {
2424

2525
private final McpServerSession session;
2626

@@ -136,7 +136,7 @@ public Mono<Void> loggingNotification(LoggingMessageNotification loggingMessageN
136136
* filtered out.
137137
* @param minLoggingLevel The minimum logging level
138138
*/
139-
void setMinLoggingLevel(LoggingLevel minLoggingLevel) {
139+
public void setMinLoggingLevel(LoggingLevel minLoggingLevel) {
140140
Assert.notNull(minLoggingLevel, "minLoggingLevel must not be null");
141141
this.minLoggingLevel = minLoggingLevel;
142142
}

0 commit comments

Comments
 (0)