diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java b/mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java index 0895e02b..6163c007 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java +++ b/mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java @@ -225,18 +225,19 @@ private String generateRequestId() { public Mono sendRequest(String method, Object requestParams, TypeReference typeRef) { String requestId = this.generateRequestId(); - return Mono.create(sink -> { + return Mono.deferContextual(ctx -> Mono.create(sink -> { this.pendingResponses.put(requestId, sink); McpSchema.JSONRPCRequest jsonrpcRequest = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, method, requestId, requestParams); this.transport.sendMessage(jsonrpcRequest) + .contextWrite(ctx) // TODO: It's most efficient to create a dedicated Subscriber here .subscribe(v -> { }, error -> { this.pendingResponses.remove(requestId); sink.error(error); }); - }).timeout(this.requestTimeout).handle((jsonRpcResponse, sink) -> { + })).timeout(this.requestTimeout).handle((jsonRpcResponse, sink) -> { if (jsonRpcResponse.error() != null) { sink.error(new McpError(jsonRpcResponse.error())); }