Skip to content

Commit 4f5ae36

Browse files
FH-30francishodianto30
authored and
francishodianto30
committed
fix: propagate reactor context up till transport
1 parent f348a83 commit 4f5ae36

File tree

1 file changed

+9
-8
lines changed

1 file changed

+9
-8
lines changed

mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -225,18 +225,19 @@ private String generateRequestId() {
225225
public <T> Mono<T> sendRequest(String method, Object requestParams, TypeReference<T> typeRef) {
226226
String requestId = this.generateRequestId();
227227

228-
return Mono.<McpSchema.JSONRPCResponse>create(sink -> {
228+
return Mono.deferContextual(ctx -> Mono.<McpSchema.JSONRPCResponse>create(sink -> {
229229
this.pendingResponses.put(requestId, sink);
230230
McpSchema.JSONRPCRequest jsonrpcRequest = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, method,
231231
requestId, requestParams);
232232
this.transport.sendMessage(jsonrpcRequest)
233-
// TODO: It's most efficient to create a dedicated Subscriber here
234-
.subscribe(v -> {
235-
}, error -> {
236-
this.pendingResponses.remove(requestId);
237-
sink.error(error);
238-
});
239-
}).timeout(this.requestTimeout).handle((jsonRpcResponse, sink) -> {
233+
.contextWrite(ctx)
234+
// TODO: It's most efficient to create a dedicated Subscriber here
235+
.subscribe(v -> {
236+
}, error -> {
237+
this.pendingResponses.remove(requestId);
238+
sink.error(error);
239+
});
240+
})).timeout(this.requestTimeout).handle((jsonRpcResponse, sink) -> {
240241
if (jsonRpcResponse.error() != null) {
241242
sink.error(new McpError(jsonRpcResponse.error()));
242243
}

0 commit comments

Comments
 (0)