diff --git a/README.md b/README.md index 4c8bb90dc..1b913b491 100644 --- a/README.md +++ b/README.md @@ -73,7 +73,7 @@ The Model Context Protocol allows applications to provide context for LLMs in a ### Adding MCP to your python project -We recommend using [uv](https://docs.astral.sh/uv/) to manage your Python projects. +We recommend using [uv](https://docs.astral.sh/uv/) to manage your Python projects. If you haven't created a uv-managed project yet, create one: @@ -89,6 +89,7 @@ If you haven't created a uv-managed project yet, create one: ``` Alternatively, for projects using pip for dependencies: + ```bash pip install "mcp[cli]" ``` @@ -128,11 +129,13 @@ def get_greeting(name: str) -> str: ``` You can install this server in [Claude Desktop](https://claude.ai/download) and interact with it right away by running: + ```bash mcp install server.py ``` Alternatively, you can test it with the MCP Inspector: + ```bash mcp dev server.py ``` @@ -246,6 +249,144 @@ async def fetch_weather(city: str) -> str: return response.text ``` +#### Output Schemas + +Tools automatically generate JSON Schema definitions for their return types, helping LLMs understand the structure of the data they'll receive. FastMCP also enhances these schemas with semantic metadata that enables intelligent UI rendering and data formatting. + +##### Basic Schema Generation + +```python +from pydantic import BaseModel +from mcp.server.fastmcp import FastMCP + +# Create server +mcp = FastMCP("Output Schema Demo") + + +# Tools with primitive return types +@mcp.tool() +def get_temperature(city: str) -> float: + """Get the current temperature for a city""" + # In a real implementation, this would fetch actual weather data + return 72.5 + + +# Tools with dictionary return types +@mcp.tool() +def get_user(user_id: int) -> dict: + """Get user information by ID""" + return {"id": user_id, "name": "John Doe", "email": "john@example.com"} + + +# Using Pydantic models for structured output +class WeatherData(BaseModel): + temperature: float + humidity: float + conditions: str + + +@mcp.tool() +def get_weather_data(city: str) -> WeatherData: + """Get structured weather data for a city""" + # In a real implementation, this would fetch actual weather data + return WeatherData( + temperature=72.5, + humidity=65.0, + conditions="Partly cloudy", + ) + + +# Complex nested models +class Location(BaseModel): + city: str + country: str + coordinates: tuple[float, float] + + +class WeatherForecast(BaseModel): + current: WeatherData + location: Location + forecast: list[WeatherData] + + +@mcp.tool() +def get_weather_forecast(city: str) -> WeatherForecast: + """Get detailed weather forecast for a city""" + # In a real implementation, this would fetch actual weather data + return WeatherForecast( + current=WeatherData( + temperature=72.5, + humidity=65.0, + conditions="Partly cloudy", + ), + location=Location(city=city, country="USA", coordinates=(37.7749, -122.4194)), + forecast=[ + WeatherData(temperature=75.0, humidity=62.0, conditions="Sunny"), + WeatherData(temperature=68.0, humidity=80.0, conditions="Rainy"), + ], + ) +``` + +##### Semantic Metadata Enhancement + +FastMCP automatically enhances output schemas with semantic metadata by analyzing field names and types. This helps client applications provide intelligent UI rendering and formatting: + +```python +from pydantic import BaseModel +from mcp.server.fastmcp import FastMCP + +mcp = FastMCP("Enhanced Schema Demo") + + +class UserProfile(BaseModel): + email: str # Automatically detected as semantic_type: "email" + profile_url: str # Automatically detected as semantic_type: "url" + avatar_image: str # Automatically detected as semantic_type: "image_url" + created_date: str # Automatically detected as semantic_type: "datetime" + account_balance: float # Automatically detected as semantic_type: "currency" + completion_percentage: ( + float # Automatically detected as semantic_type: "percentage" + ) + primary_color: str # Automatically detected as semantic_type: "color" + + +@mcp.tool() +def get_user_profile(user_id: str) -> UserProfile: + """Get user profile with semantic field types""" + return UserProfile( + email="user@example.com", + profile_url="https://example.com/users/12345", + avatar_image="https://example.com/avatars/user.jpg", + created_date="2023-06-15T10:30:00Z", + account_balance=150.75, + completion_percentage=85.5, + primary_color="#3498db", + ) +``` + +**Supported Semantic Types:** + +- `email` - Email addresses +- `url`, `link` - Web URLs and links +- `image_url`, `audio_url`, `video_url` - Media URLs +- `datetime` - Date and time fields (with subtypes like `date`, `time`, `timestamp`) +- `currency`, `money` - Monetary values +- `percentage` - Percentage values +- `color` - Color codes and values +- `phone` - Phone numbers +- `status` - Status indicators +- `media_format` - File format detection for audio/video/image files + +**Benefits for Client Applications:** + +- Email fields can display mail icons and validation +- URLs become clickable links with preview capabilities +- Date fields get appropriate date pickers and formatting +- Currency fields show proper monetary formatting +- Media URLs can display thumbnails or players +- Status fields can show colored indicators +- Percentage fields can render as progress bars + ### Prompts Prompts are reusable templates that help LLMs interact with your server effectively: @@ -382,6 +523,7 @@ if __name__ == "__main__": ``` Run it with: + ```bash python server.py # or @@ -459,18 +601,17 @@ app.mount("/math", math.mcp.streamable_http_app()) ``` For low level server with Streamable HTTP implementations, see: + - Stateful server: [`examples/servers/simple-streamablehttp/`](examples/servers/simple-streamablehttp/) - Stateless server: [`examples/servers/simple-streamablehttp-stateless/`](examples/servers/simple-streamablehttp-stateless/) - - The streamable HTTP transport supports: + - Stateful and stateless operation modes - Resumability with event stores -- JSON or SSE response formats +- JSON or SSE response formats - Better scalability for multi-node deployments - ### Mounting to an Existing ASGI Server > **Note**: SSE transport is being superseded by [Streamable HTTP transport](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http). @@ -638,6 +779,7 @@ async def query_db(name: str, arguments: dict) -> list: ``` The lifespan API provides: + - A way to initialize resources when the server starts and clean them up when it stops - Access to initialized resources through the request context in handlers - Type-safe context passing between lifespan and request handlers @@ -850,7 +992,6 @@ async def main(): For a complete working example, see [`examples/clients/simple-auth-client/`](examples/clients/simple-auth-client/). - ### MCP Primitives The MCP protocol defines three core primitives that servers can implement: diff --git a/examples/servers/simple-streamablehttp-stateless/mcp_simple_streamablehttp_stateless/server.py b/examples/servers/simple-streamablehttp-stateless/mcp_simple_streamablehttp_stateless/server.py index bbf3dc64c..acfd092e0 100644 --- a/examples/servers/simple-streamablehttp-stateless/mcp_simple_streamablehttp_stateless/server.py +++ b/examples/servers/simple-streamablehttp-stateless/mcp_simple_streamablehttp_stateless/server.py @@ -99,6 +99,14 @@ async def list_tools() -> list[types.Tool]: }, }, }, + outputSchema={ + "type": "array", + "items": { + "type": "object", + "description": "TextContent with notification information", + }, + "description": "List of text content with notification results", + }, ) ] diff --git a/examples/servers/simple-streamablehttp/mcp_simple_streamablehttp/server.py b/examples/servers/simple-streamablehttp/mcp_simple_streamablehttp/server.py index bf6f51e5c..d4587a8c4 100644 --- a/examples/servers/simple-streamablehttp/mcp_simple_streamablehttp/server.py +++ b/examples/servers/simple-streamablehttp/mcp_simple_streamablehttp/server.py @@ -117,6 +117,14 @@ async def list_tools() -> list[types.Tool]: }, }, }, + outputSchema={ + "type": "array", + "items": { + "type": "object", + "description": "TextContent with notification information", + }, + "description": "List of text content with notification results", + }, ) ] diff --git a/examples/servers/simple-tool/mcp_simple_tool/server.py b/examples/servers/simple-tool/mcp_simple_tool/server.py index cd574ad5e..20ef2c521 100644 --- a/examples/servers/simple-tool/mcp_simple_tool/server.py +++ b/examples/servers/simple-tool/mcp_simple_tool/server.py @@ -54,6 +54,17 @@ async def list_tools() -> list[types.Tool]: } }, }, + outputSchema={ + "type": "array", + "items": { + "anyOf": [ + {"type": "object", "description": "TextContent"}, + {"type": "object", "description": "ImageContent"}, + {"type": "object", "description": "EmbeddedResource"}, + ] + }, + "description": "List of content from the fetched website", + }, ) ] diff --git a/src/mcp/client/session_group.py b/src/mcp/client/session_group.py index a430533b3..a77dc7a1e 100644 --- a/src/mcp/client/session_group.py +++ b/src/mcp/client/session_group.py @@ -154,7 +154,6 @@ async def __aexit__( for exit_stack in self._session_exit_stacks.values(): tg.start_soon(exit_stack.aclose) - @property def sessions(self) -> list[mcp.ClientSession]: """Returns the list of sessions being managed.""" diff --git a/src/mcp/server/fastmcp/server.py b/src/mcp/server/fastmcp/server.py index 3282baae6..c25d288cc 100644 --- a/src/mcp/server/fastmcp/server.py +++ b/src/mcp/server/fastmcp/server.py @@ -255,6 +255,7 @@ async def list_tools(self) -> list[MCPTool]: name=info.name, description=info.description, inputSchema=info.parameters, + outputSchema=info.output_schema, annotations=info.annotations, ) for info in tools diff --git a/src/mcp/server/fastmcp/tools/base.py b/src/mcp/server/fastmcp/tools/base.py index 01fedcdc9..00a9897d2 100644 --- a/src/mcp/server/fastmcp/tools/base.py +++ b/src/mcp/server/fastmcp/tools/base.py @@ -9,6 +9,7 @@ from mcp.server.fastmcp.exceptions import ToolError from mcp.server.fastmcp.utilities.func_metadata import FuncMetadata, func_metadata +from mcp.server.fastmcp.utilities.schema import enhance_output_schema from mcp.types import ToolAnnotations if TYPE_CHECKING: @@ -24,6 +25,9 @@ class Tool(BaseModel): name: str = Field(description="Name of the tool") description: str = Field(description="Description of what the tool does") parameters: dict[str, Any] = Field(description="JSON schema for tool parameters") + output_schema: dict[str, Any] | None = Field( + None, description="JSON schema for tool output format" + ) fn_metadata: FuncMetadata = Field( description="Metadata about the function including a pydantic model for tool" " arguments" @@ -46,6 +50,8 @@ def from_function( annotations: ToolAnnotations | None = None, ) -> Tool: """Create a Tool from a function.""" + from pydantic import TypeAdapter + from mcp.server.fastmcp.server import Context func_name = name or fn.__name__ @@ -71,6 +77,38 @@ def from_function( ) parameters = func_arg_metadata.arg_model.model_json_schema() + # Generate output schema from return type annotation if possible + output_schema = None + sig = inspect.signature(fn) + if sig.return_annotation != inspect.Signature.empty: + try: + # Handle common return types that don't need schema + if sig.return_annotation is str: + output_schema = {"type": "string"} + elif sig.return_annotation is int: + output_schema = {"type": "integer"} + elif sig.return_annotation is float: + output_schema = {"type": "number"} + elif sig.return_annotation is bool: + output_schema = {"type": "boolean"} + elif sig.return_annotation is dict: + output_schema = {"type": "object"} + elif sig.return_annotation is list: + output_schema = {"type": "array"} + else: + # Try to generate schema using TypeAdapter + return_type_adapter = TypeAdapter(sig.return_annotation) + output_schema = return_type_adapter.json_schema() + + # Enhance the schema with detailed field information + if output_schema: + output_schema = enhance_output_schema( + output_schema, sig.return_annotation + ) + except Exception: + # If we can't generate a schema, we'll leave it as None + pass + return cls( fn=fn, name=func_name, @@ -80,6 +118,7 @@ def from_function( is_async=is_async, context_kwarg=context_kwarg, annotations=annotations, + output_schema=output_schema, ) async def run( @@ -93,9 +132,11 @@ async def run( self.fn, self.is_async, arguments, - {self.context_kwarg: context} - if self.context_kwarg is not None - else None, + ( + {self.context_kwarg: context} + if self.context_kwarg is not None + else None + ), ) except Exception as e: raise ToolError(f"Error executing tool {self.name}: {e}") from e diff --git a/src/mcp/server/fastmcp/utilities/schema.py b/src/mcp/server/fastmcp/utilities/schema.py new file mode 100644 index 000000000..209b4ec72 --- /dev/null +++ b/src/mcp/server/fastmcp/utilities/schema.py @@ -0,0 +1,239 @@ +"""Schema enhancement utilities for FastMCP tools. + +This module provides utilities for enhancing JSON Schema definitions with semantic +metadata that helps client applications render and display tool outputs intelligently. +The enhancement process detects semantic meaning from field names and types, adding +metadata like semantic_type, datetime_type, and media_format to JSON Schema properties. +""" + +from typing import Any + + +def detect_semantic_format( + field_name: str, field_schema: dict[str, Any] +) -> dict[str, Any]: + """Detect semantic format information for a field based on its name and schema. + + Analyzes field names and JSON Schema types to determine semantic meaning, + enabling client applications to provide appropriate UI rendering and formatting. + + Args: + field_name: The name of the field to analyze + field_schema: JSON Schema definition for the field + + Returns: + Dictionary containing detected semantic information: + - semantic_type: The detected semantic type (url, email, datetime, etc.) + - datetime_type: For datetime fields, specifies date_only, time_only, or + datetime + - media_format: For media fields, specifies the format type (audio_file, + video_file, etc.) + + Examples: + >>> detect_semantic_format("email", {"type": "string"}) + {"semantic_type": "email"} + + >>> detect_semantic_format("created_date", {"type": "string"}) + {"semantic_type": "datetime", "datetime_type": "date_only"} + + >>> detect_semantic_format("profile_image", {"type": "string"}) + {"semantic_type": "image"} + """ + format_info: dict[str, Any] = {} + + # Convert field name to lowercase for pattern matching + name_lower = field_name.lower() + field_type = field_schema.get("type", "") + + # URL detection + if any(keyword in name_lower for keyword in ["url", "uri", "link", "href"]): + format_info["semantic_type"] = "url" + + # Email detection + elif "email" in name_lower: + format_info["semantic_type"] = "email" + + # Date/time detection + elif any( + keyword in name_lower + for keyword in ["date", "time", "timestamp", "created", "updated", "modified"] + ): + format_info["semantic_type"] = "datetime" + if "date" in name_lower and "time" not in name_lower: + format_info["datetime_type"] = "date_only" + elif "time" in name_lower and "date" not in name_lower: + format_info["datetime_type"] = "time_only" + else: + format_info["datetime_type"] = "datetime" + + # Audio format detection + elif any( + keyword in name_lower + for keyword in ["audio", "sound", "music", "voice", "recording"] + ): + format_info["semantic_type"] = "audio" + if any(ext in name_lower for ext in ["mp3", "wav", "ogg", "m4a", "flac"]): + format_info["media_format"] = "audio_file" + + # Video format detection + elif any( + keyword in name_lower for keyword in ["video", "movie", "clip", "recording"] + ): + format_info["semantic_type"] = "video" + if any(ext in name_lower for ext in ["mp4", "avi", "mov", "mkv", "webm"]): + format_info["media_format"] = "video_file" + + # Image format detection + elif any( + keyword in name_lower + for keyword in ["image", "photo", "picture", "img", "thumbnail", "avatar"] + ): + format_info["semantic_type"] = "image" + if any( + ext in name_lower for ext in ["jpg", "jpeg", "png", "gif", "svg", "webp"] + ): + format_info["media_format"] = "image_file" + + # File path detection + elif any( + keyword in name_lower for keyword in ["path", "file", "filename", "filepath"] + ): + format_info["semantic_type"] = "file_path" + + # Color detection + elif any(keyword in name_lower for keyword in ["color", "colour"]): + format_info["semantic_type"] = "color" + + # Currency/money detection + elif any( + keyword in name_lower + for keyword in ["price", "cost", "amount", "money", "currency", "fee"] + ): + if field_type in ["number", "integer"]: + format_info["semantic_type"] = "currency" + + # Percentage detection + elif any(keyword in name_lower for keyword in ["percent", "percentage", "rate"]): + if field_type in ["number", "integer"]: + format_info["semantic_type"] = "percentage" + + # ID/identifier detection + elif any(keyword in name_lower for keyword in ["id", "identifier", "uuid", "guid"]): + format_info["semantic_type"] = "identifier" + + # Status/state detection + elif any(keyword in name_lower for keyword in ["status", "state", "condition"]): + format_info["semantic_type"] = "status" + + return format_info + + +def enhance_output_schema(schema: dict[str, Any], return_type: Any) -> dict[str, Any]: + """Enhance output schema with semantic metadata embedded within JSON Schema + structure. + + Takes a standard JSON Schema and enhances it with semantic information that helps + client applications understand how to render and display the data. The enhancement + preserves JSON Schema compliance while adding optional semantic metadata. + + Args: + schema: Standard JSON Schema definition to enhance + return_type: Python type annotation for the return type (for future use) + + Returns: + Enhanced JSON Schema with embedded semantic metadata + + Examples: + >>> schema = { + ... "type": "object", + ... "properties": { + ... "email": {"type": "string", "title": "Email"}, + ... "created_date": {"type": "string", "title": "Created Date"} + ... } + ... } + >>> enhanced = enhance_output_schema(schema, None) + >>> enhanced["properties"]["email"]["semantic_type"] + 'email' + >>> enhanced["properties"]["created_date"]["semantic_type"] + 'datetime' + """ + enhanced_schema = schema.copy() + + # Add enhanced field information for object types + if schema.get("type") == "object" and "properties" in schema: + enhanced_properties = {} + + for field_name, field_schema in schema["properties"].items(): + # Start with the original field schema + enhanced_field = field_schema.copy() + + # Determine the primary data type + primary_type = field_schema.get("type", "unknown") + + # Handle complex nested types (anyOf, etc.) + if "anyOf" in field_schema: + # Extract the primary type from anyOf (excluding null) + non_null_types = [ + t for t in field_schema["anyOf"] if t.get("type") != "null" + ] + if non_null_types: + primary_type = non_null_types[0].get("type", "unknown") + + # Get format information + format_info = detect_semantic_format(field_name, {"type": primary_type}) + + # Add semantic information only if detected + if format_info.get("semantic_type"): + enhanced_field["semantic_type"] = format_info["semantic_type"] + + # Add additional format metadata if present + for key, value in format_info.items(): + if key not in ["semantic_type"] and value: + enhanced_field[key] = value + + enhanced_properties[field_name] = enhanced_field + + enhanced_schema["properties"] = enhanced_properties + + # Remove 'required' field from output schemas - it's not needed for outputs + # Tools always return complete objects as defined, so all fields are guaranteed + if "required" in enhanced_schema: + del enhanced_schema["required"] + + # Handle array types - enhance the items schema + elif schema.get("type") == "array" and "items" in schema: + enhanced_schema = schema.copy() + item_schema = schema["items"] + + # If items have a type, we can enhance them + if isinstance(item_schema, dict) and "type" in item_schema: + enhanced_item: dict[str, Any] = item_schema.copy() + # Type-cast item_schema to ensure proper typing for detect_semantic_format + typed_item_schema: dict[str, Any] = item_schema + + # For arrays, we can't use field names for detection, so minimal enhancement + format_info = detect_semantic_format("array_item", typed_item_schema) + if ( + format_info.get("semantic_type") + and format_info["semantic_type"] != "primitive" + ): + enhanced_item["semantic_type"] = format_info["semantic_type"] + + # Add additional format metadata if present + enhanced_item.update( + { + key: value + for key, value in format_info.items() + if key not in ["semantic_type"] and value + } + ) + + enhanced_schema["items"] = enhanced_item + + # Handle simple types - minimal enhancement since no field names available + elif schema.get("type") in ["string", "integer", "number", "boolean"]: + # For primitive return types, no enhancement needed - JSON Schema type is + # sufficient + pass + + return enhanced_schema diff --git a/src/mcp/types.py b/src/mcp/types.py index 465fc6ee6..0a8429533 100644 --- a/src/mcp/types.py +++ b/src/mcp/types.py @@ -347,7 +347,7 @@ class ProgressNotificationParams(NotificationParams): """ total: float | None = None """ - Message related to progress. This should provide relevant human readable + Message related to progress. This should provide relevant human readable progress information. """ message: str | None = None @@ -768,6 +768,8 @@ class Tool(BaseModel): """A human-readable description of the tool.""" inputSchema: dict[str, Any] """A JSON Schema object defining the expected parameters for the tool.""" + outputSchema: dict[str, Any] | None = None + """A JSON Schema object defining the expected output format of the tool.""" annotations: ToolAnnotations | None = None """Optional additional tool information.""" model_config = ConfigDict(extra="allow") diff --git a/tests/server/fastmcp/test_schema_utilities.py b/tests/server/fastmcp/test_schema_utilities.py new file mode 100644 index 000000000..647430381 --- /dev/null +++ b/tests/server/fastmcp/test_schema_utilities.py @@ -0,0 +1,495 @@ +"""Tests for schema enhancement utilities. + +This module tests the schema enhancement functionality that adds semantic metadata +to JSON Schema definitions, helping client applications understand how to render +and display tool outputs intelligently. +""" + +from mcp.server.fastmcp.utilities.schema import ( + detect_semantic_format, + enhance_output_schema, +) + + +class TestDetectSemanticFormat: + """Test the detect_semantic_format function.""" + + def test_url_detection(self): + """Test URL field detection.""" + test_cases = [ + ("url", {"type": "string"}), + ("website_url", {"type": "string"}), + ("api_uri", {"type": "string"}), + ("profile_link", {"type": "string"}), + ("redirect_href", {"type": "string"}), + ] + + for field_name, schema in test_cases: + result = detect_semantic_format(field_name, schema) + assert result["semantic_type"] == "url", f"Failed for field: {field_name}" + + def test_email_detection(self): + """Test email field detection.""" + test_cases = [ + ("email", {"type": "string"}), + ("user_email", {"type": "string"}), + ("contact_email", {"type": "string"}), + ("notification_email", {"type": "string"}), + ] + + for field_name, schema in test_cases: + result = detect_semantic_format(field_name, schema) + assert result["semantic_type"] == "email", f"Failed for field: {field_name}" + + def test_datetime_detection(self): + """Test datetime field detection with different types.""" + # Date only fields + date_fields = [ + ("created_date", "date_only"), + ("birth_date", "date_only"), + ("start_date", "date_only"), + ("expiry_date", "date_only"), + ] + + for field_name, expected_type in date_fields: + result = detect_semantic_format(field_name, {"type": "string"}) + assert result["semantic_type"] == "datetime" + assert result["datetime_type"] == expected_type + + # Time only fields + time_fields = [ + ("start_time", "time_only"), + ("end_time", "time_only"), + ("lunch_time", "time_only"), + ] + + for field_name, expected_type in time_fields: + result = detect_semantic_format(field_name, {"type": "string"}) + assert result["semantic_type"] == "datetime" + assert result["datetime_type"] == expected_type + + # Mixed datetime fields (testing the actual logic) + mixed_datetime_fields = [ + ("created_timestamp", "time_only"), # Contains "time" but not "date" + ("updated", "date_only"), # Contains "date" (from "updated") but not "time" + ("modified", "datetime"), # Contains neither "date" nor "time" explicitly + ( + "last_modified", + "datetime", + ), # Contains neither "date" nor "time" explicitly + ] + + for field_name, expected_type in mixed_datetime_fields: + result = detect_semantic_format(field_name, {"type": "string"}) + assert result["semantic_type"] == "datetime" + assert result["datetime_type"] == expected_type + + def test_media_detection(self): + """Test media field detection.""" + # Audio fields + audio_fields = [ + ("audio_file", None), + ("background_music", None), + ("voice_recording", None), + ("sound_effect", None), + ("audio_mp3", "audio_file"), + ("music_wav", "audio_file"), + ] + + for field_name, expected_format in audio_fields: + result = detect_semantic_format(field_name, {"type": "string"}) + assert result["semantic_type"] == "audio" + if expected_format: + assert result["media_format"] == expected_format + + # Video fields + video_fields = [ + ("video_file", None), + ("movie_clip", None), + ("video_content", None), # Changed to avoid "recording" keyword + ("video_mp4", "video_file"), + ("movie_avi", "video_file"), + ] + + for field_name, expected_format in video_fields: + result = detect_semantic_format(field_name, {"type": "string"}) + assert result["semantic_type"] == "video" + if expected_format: + assert result["media_format"] == expected_format + + # Image fields + image_fields = [ + ("profile_image", None), + ("photo", None), + ("picture", None), + ("thumbnail", None), + ("avatar", None), + ("image_jpg", "image_file"), + ("photo_png", "image_file"), + ] + + for field_name, expected_format in image_fields: + result = detect_semantic_format(field_name, {"type": "string"}) + assert result["semantic_type"] == "image" + if expected_format: + assert result["media_format"] == expected_format + + def test_file_path_detection(self): + """Test file path detection.""" + test_cases = [ + ("file_path", {"type": "string"}), + ("filename", {"type": "string"}), + ("filepath", {"type": "string"}), + ("document_path", {"type": "string"}), + ] + + for field_name, schema in test_cases: + result = detect_semantic_format(field_name, schema) + assert result["semantic_type"] == "file_path" + + def test_color_detection(self): + """Test color field detection.""" + test_cases = [ + ("color", {"type": "string"}), + ("background_color", {"type": "string"}), + ("theme_colour", {"type": "string"}), + ("primary_color", {"type": "string"}), + ] + + for field_name, schema in test_cases: + result = detect_semantic_format(field_name, schema) + assert result["semantic_type"] == "color" + + def test_currency_detection(self): + """Test currency field detection.""" + test_cases = [ + ("price", {"type": "number"}), + ("cost", {"type": "integer"}), + ("amount", {"type": "number"}), + ("fee", {"type": "integer"}), + ("currency", {"type": "number"}), + ] + + for field_name, schema in test_cases: + result = detect_semantic_format(field_name, schema) + assert result["semantic_type"] == "currency" + + # Should not detect currency for non-numeric types + result = detect_semantic_format("price", {"type": "string"}) + assert "semantic_type" not in result + + def test_percentage_detection(self): + """Test percentage field detection.""" + test_cases = [ + ("percentage", {"type": "number"}), + ("completion_percent", {"type": "integer"}), + ("success_rate", {"type": "number"}), + ] + + for field_name, schema in test_cases: + result = detect_semantic_format(field_name, schema) + assert result["semantic_type"] == "percentage" + + # Should not detect percentage for non-numeric types + result = detect_semantic_format("percentage", {"type": "string"}) + assert "semantic_type" not in result + + def test_identifier_detection(self): + """Test identifier field detection.""" + test_cases = [ + ("user_id", {"type": "string"}), + ("identifier", {"type": "string"}), + ("uuid", {"type": "string"}), + ("guid", {"type": "string"}), + ] + + for field_name, schema in test_cases: + result = detect_semantic_format(field_name, schema) + assert result["semantic_type"] == "identifier" + + def test_status_detection(self): + """Test status field detection.""" + test_cases = [ + ("status", {"type": "string"}), + ("state", {"type": "string"}), + ("condition", {"type": "string"}), + ("user_status", {"type": "string"}), + ] + + for field_name, schema in test_cases: + result = detect_semantic_format(field_name, schema) + assert result["semantic_type"] == "status" + + def test_no_detection(self): + """Test fields that should not be detected as having semantic types.""" + test_cases = [ + ("name", {"type": "string"}), + ("description", {"type": "string"}), + ("content", {"type": "string"}), + ("value", {"type": "number"}), + ("count", {"type": "integer"}), + ] + + for field_name, schema in test_cases: + result = detect_semantic_format(field_name, schema) + assert "semantic_type" not in result + + def test_case_insensitive_detection(self): + """Test that detection is case insensitive.""" + test_cases = [ + ("EMAIL", "email"), + ("User_URL", "url"), + ("CREATED_DATE", "datetime"), + ("Profile_Image", "image"), + ] + + for field_name, expected_type in test_cases: + result = detect_semantic_format(field_name, {"type": "string"}) + assert result["semantic_type"] == expected_type + + +class TestEnhanceOutputSchema: + """Test the enhance_output_schema function.""" + + def test_enhance_object_schema(self): + """Test enhancing object schemas with semantic information.""" + schema = { + "type": "object", + "properties": { + "email": {"type": "string", "title": "Email"}, + "created_date": {"type": "string", "title": "Created Date"}, + "profile_url": {"type": "string", "title": "Profile URL"}, + "age": {"type": "integer", "title": "Age"}, + }, + "required": ["email", "age"], + } + + enhanced = enhance_output_schema(schema, None) + + # Check that original structure is preserved + assert enhanced["type"] == "object" + assert "properties" in enhanced + + # Check semantic enhancements + assert enhanced["properties"]["email"]["semantic_type"] == "email" + assert enhanced["properties"]["created_date"]["semantic_type"] == "datetime" + assert enhanced["properties"]["created_date"]["datetime_type"] == "date_only" + assert enhanced["properties"]["profile_url"]["semantic_type"] == "url" + + # Check that non-semantic fields are unchanged + assert "semantic_type" not in enhanced["properties"]["age"] + + # Check that required field is removed from output schemas + assert "required" not in enhanced + + # Check that original titles are preserved + assert enhanced["properties"]["email"]["title"] == "Email" + assert enhanced["properties"]["created_date"]["title"] == "Created Date" + + def test_enhance_schema_with_anyof(self): + """Test enhancing schemas with anyOf (nullable fields).""" + schema = { + "type": "object", + "properties": { + "email": { + "anyOf": [{"type": "string"}, {"type": "null"}], + "title": "Email", + }, + "profile_url": { + "anyOf": [{"type": "string"}, {"type": "null"}], + "title": "Profile URL", + }, + }, + } + + enhanced = enhance_output_schema(schema, None) + + # Check that semantic types are detected even with anyOf + assert enhanced["properties"]["email"]["semantic_type"] == "email" + assert enhanced["properties"]["profile_url"]["semantic_type"] == "url" + + # Check that anyOf structure is preserved + assert "anyOf" in enhanced["properties"]["email"] + assert "anyOf" in enhanced["properties"]["profile_url"] + + def test_enhance_array_schema(self): + """Test enhancing array schemas.""" + schema = {"type": "array", "items": {"type": "string"}} + + enhanced = enhance_output_schema(schema, None) + + # Check that array structure is preserved + assert enhanced["type"] == "array" + assert "items" in enhanced + assert enhanced["items"]["type"] == "string" + + # Array items don't get semantic enhancement without field names + assert "semantic_type" not in enhanced["items"] + + def test_enhance_primitive_schema(self): + """Test enhancing primitive type schemas.""" + primitive_schemas = [ + {"type": "string"}, + {"type": "integer"}, + {"type": "number"}, + {"type": "boolean"}, + ] + + for schema in primitive_schemas: + enhanced = enhance_output_schema(schema, None) + # Primitive schemas should remain unchanged + assert enhanced == schema + + def test_enhance_complex_nested_schema(self): + """Test enhancing complex nested schemas.""" + schema = { + "type": "object", + "properties": { + "user_data": { # Changed from "user_profile" which contains "file" + "type": "object", + "properties": { + "email": {"type": "string"}, + "avatar_image": {"type": "string"}, + "last_login": {"type": "string"}, + }, + }, + "media_files": { + "type": "array", + "items": { + "type": "object", + "properties": { + "file_path": {"type": "string"}, + "audio_url": {"type": "string"}, + }, + }, + }, + }, + } + + enhanced = enhance_output_schema(schema, None) + + # Check top-level structure is preserved + assert enhanced["type"] == "object" + assert "properties" in enhanced + + # Nested objects should not be enhanced (current limitation) + # Only top-level properties get semantic enhancement + user_data = enhanced["properties"]["user_data"] + assert user_data["type"] == "object" + assert "semantic_type" not in user_data + + # Array structure should be preserved + media_files = enhanced["properties"]["media_files"] + assert media_files["type"] == "array" + assert "items" in media_files + + def test_enhance_schema_preserves_original(self): + """Test that enhancement doesn't modify the original schema.""" + original_schema = { + "type": "object", + "properties": { + "email": {"type": "string", "title": "Email"}, + "name": {"type": "string", "title": "Name"}, + }, + "required": ["email"], + } + + # Make a copy to compare against + original_copy = original_schema.copy() + + enhanced = enhance_output_schema(original_schema, None) + + # Original schema should be unchanged + assert original_schema == original_copy + + # Enhanced schema should be different + assert enhanced != original_schema + assert "semantic_type" in enhanced["properties"]["email"] + assert "required" not in enhanced + + def test_enhance_schema_with_media_formats(self): + """Test enhancement of schemas with media format detection.""" + schema = { + "type": "object", + "properties": { + "audio_mp3": {"type": "string"}, + "video_mp4": {"type": "string"}, + "image_jpg": {"type": "string"}, + "generic_audio": {"type": "string"}, + "generic_video": {"type": "string"}, + "generic_image": {"type": "string"}, + }, + } + + enhanced = enhance_output_schema(schema, None) + + # Check media format detection + assert enhanced["properties"]["audio_mp3"]["semantic_type"] == "audio" + assert enhanced["properties"]["audio_mp3"]["media_format"] == "audio_file" + + assert enhanced["properties"]["video_mp4"]["semantic_type"] == "video" + assert enhanced["properties"]["video_mp4"]["media_format"] == "video_file" + + assert enhanced["properties"]["image_jpg"]["semantic_type"] == "image" + assert enhanced["properties"]["image_jpg"]["media_format"] == "image_file" + + # Generic media fields should have semantic_type but no media_format + assert enhanced["properties"]["generic_audio"]["semantic_type"] == "audio" + assert "media_format" not in enhanced["properties"]["generic_audio"] + + assert enhanced["properties"]["generic_video"]["semantic_type"] == "video" + assert "media_format" not in enhanced["properties"]["generic_video"] + + assert enhanced["properties"]["generic_image"]["semantic_type"] == "image" + assert "media_format" not in enhanced["properties"]["generic_image"] + + def test_enhance_schema_with_numeric_semantics(self): + """Test enhancement of schemas with numeric semantic types.""" + schema = { + "type": "object", + "properties": { + "price": {"type": "number"}, + "completion_percentage": {"type": "integer"}, + "price_string": { + "type": "string" + }, # Should not be detected as currency + "percentage_string": { + "type": "string" + }, # Should not be detected as percentage + "regular_number": {"type": "number"}, + }, + } + + enhanced = enhance_output_schema(schema, None) + + # Check numeric semantic detection + assert enhanced["properties"]["price"]["semantic_type"] == "currency" + assert ( + enhanced["properties"]["completion_percentage"]["semantic_type"] + == "percentage" + ) + + # String fields with numeric semantic names should not be detected + assert "semantic_type" not in enhanced["properties"]["price_string"] + assert "semantic_type" not in enhanced["properties"]["percentage_string"] + + # Regular numbers should not have semantic types + assert "semantic_type" not in enhanced["properties"]["regular_number"] + + def test_enhance_empty_schema(self): + """Test enhancement of empty or minimal schemas.""" + # Empty object schema + empty_schema = {"type": "object", "properties": {}} + enhanced = enhance_output_schema(empty_schema, None) + assert enhanced == {"type": "object", "properties": {}} + + # Schema without properties + no_props_schema = {"type": "object"} + enhanced = enhance_output_schema(no_props_schema, None) + assert enhanced == {"type": "object"} + + # Schema without type + no_type_schema = {"properties": {"name": {"type": "string"}}} + enhanced = enhance_output_schema(no_type_schema, None) + # Should not be enhanced since type is not "object" + assert enhanced == no_type_schema diff --git a/tests/server/fastmcp/test_tool_manager.py b/tests/server/fastmcp/test_tool_manager.py index 203a7172b..cc115ecd8 100644 --- a/tests/server/fastmcp/test_tool_manager.py +++ b/tests/server/fastmcp/test_tool_manager.py @@ -49,6 +49,7 @@ class AddArguments(ArgModelBase): fn_metadata=fn_metadata, is_async=False, parameters=AddArguments.model_json_schema(), + output_schema={"type": "integer"}, context_kwarg=None, annotations=None, ) @@ -453,3 +454,361 @@ def echo(message: str) -> str: assert tools[0].annotations is not None assert tools[0].annotations.title == "Echo Tool" assert tools[0].annotations.readOnlyHint is True + + +class TestOutputSchema: + """Test the output schema generation for tools.""" + + def test_primitive_type_output_schemas(self): + """Test output schema generation for primitive return types.""" + manager = ToolManager() + + # String return type + def string_tool(text: str) -> str: + return text + + tool = manager.add_tool(string_tool) + assert tool.output_schema == {"type": "string"} + + # Integer return type + def int_tool(number: int) -> int: + return number + + tool = manager.add_tool(int_tool) + assert tool.output_schema == {"type": "integer"} + + # Float return type + def float_tool(number: float) -> float: + return number + + tool = manager.add_tool(float_tool) + assert tool.output_schema == {"type": "number"} + + # Boolean return type + def bool_tool(value: bool) -> bool: + return value + + tool = manager.add_tool(bool_tool) + assert tool.output_schema == {"type": "boolean"} + + # Dictionary return type + def dict_tool(data: dict) -> dict: + return data + + tool = manager.add_tool(dict_tool) + assert tool.output_schema == {"type": "object"} + + # List return type + def list_tool(items: list) -> list: + return items + + tool = manager.add_tool(list_tool) + assert tool.output_schema == {"type": "array"} + + def test_pydantic_model_output_schema(self): + """Test output schema generation for Pydantic model return types.""" + manager = ToolManager() + + class Person(BaseModel): + name: str + age: int + email: str | None = None + + def create_person(name: str, age: int) -> Person: + return Person(name=name, age=age) + + tool = manager.add_tool(create_person) + assert tool.output_schema is not None + assert tool.output_schema["type"] == "object" + assert "properties" in tool.output_schema + assert "name" in tool.output_schema["properties"] + assert "age" in tool.output_schema["properties"] + assert "email" in tool.output_schema["properties"] + assert tool.output_schema["properties"]["name"]["type"] == "string" + assert tool.output_schema["properties"]["age"]["type"] == "integer" + assert "anyOf" in tool.output_schema["properties"]["email"] + assert "string" in [ + t["type"] + for t in tool.output_schema["properties"]["email"]["anyOf"] + if "type" in t + ] + assert "null" in [ + t["type"] + for t in tool.output_schema["properties"]["email"]["anyOf"] + if "type" in t + ] + + # Check semantic enhancements + assert tool.output_schema["properties"]["email"]["semantic_type"] == "email" + assert ( + "semantic_type" not in tool.output_schema["properties"]["name"] + ) # Non-semantic field + assert ( + "semantic_type" not in tool.output_schema["properties"]["age"] + ) # Non-semantic field + + # Check that required field is removed from output schema + assert "required" not in tool.output_schema + + def test_complex_output_schema(self): + """Test output schema generation for complex return types.""" + manager = ToolManager() + + class Person(BaseModel): + name: str + age: int + + class ApiResponse(BaseModel): + status: str + code: int + data: list[Person] | Person | None = None + + def complex_response(success: bool) -> ApiResponse: + return ApiResponse( + status="success" if success else "error", + code=200 if success else 400, + data=None, + ) + + tool = manager.add_tool(complex_response) + assert tool.output_schema is not None + assert tool.output_schema["type"] == "object" + assert "properties" in tool.output_schema + assert "status" in tool.output_schema["properties"] + assert "code" in tool.output_schema["properties"] + assert "data" in tool.output_schema["properties"] + assert "anyOf" in tool.output_schema["properties"]["data"] + + # Check semantic enhancements + assert tool.output_schema["properties"]["status"]["semantic_type"] == "status" + assert ( + "semantic_type" not in tool.output_schema["properties"]["code"] + ) # Non-semantic field + assert ( + "semantic_type" not in tool.output_schema["properties"]["data"] + ) # Non-semantic field + + # Check that required field is removed from output schema + assert "required" not in tool.output_schema + + def test_generic_list_output_schema(self): + """Test output schema generation for generic list return types.""" + manager = ToolManager() + + def list_of_strings() -> list[str]: + return ["a", "b", "c"] + + tool = manager.add_tool(list_of_strings) + assert tool.output_schema is not None + assert "items" in tool.output_schema + assert tool.output_schema["items"]["type"] == "string" + + @pytest.mark.anyio + async def test_output_schema_in_fastmcp(self): + """Test that output schemas are included in FastMCP tool listing.""" + app = FastMCP() + + @app.tool() + def string_tool(text: str) -> str: + """Returns the input text""" + return text + + @app.tool() + def int_tool(number: int) -> int: + """Returns the input number""" + return number + + class Person(BaseModel): + name: str + age: int + email: str | None = None # Add email field to test semantic enhancement + + @app.tool() + def create_person(name: str, age: int) -> Person: + """Creates a person object""" + return Person(name=name, age=age) + + tools = await app.list_tools() + assert len(tools) == 3 + + # Check string tool + string_tool_info = next(t for t in tools if t.name == "string_tool") + assert string_tool_info.outputSchema == {"type": "string"} + + # Check int tool + int_tool_info = next(t for t in tools if t.name == "int_tool") + assert int_tool_info.outputSchema == {"type": "integer"} + + # Check complex tool + person_tool_info = next(t for t in tools if t.name == "create_person") + assert person_tool_info.outputSchema is not None + assert person_tool_info.outputSchema["type"] == "object" + assert "properties" in person_tool_info.outputSchema + assert "name" in person_tool_info.outputSchema["properties"] + assert "age" in person_tool_info.outputSchema["properties"] + assert "email" in person_tool_info.outputSchema["properties"] + + # Check semantic enhancements in FastMCP listing + properties = person_tool_info.outputSchema["properties"] + assert properties["email"]["semantic_type"] == "email" + assert "semantic_type" not in properties["name"] # Non-semantic field + assert "semantic_type" not in properties["age"] # Non-semantic field + + # Check that required field is removed from output schema + assert "required" not in person_tool_info.outputSchema + + def test_enhanced_output_schema_with_semantic_fields(self): + """Test that output schemas are enhanced with semantic information.""" + manager = ToolManager() + + class UserProfile(BaseModel): + user_id: str + email: str + profile_url: str + avatar_image: str + created_date: str + last_login_time: str + account_amount: float # Changed to trigger currency detection + completion_percentage: int + primary_color: str + status: str + name: str # Should not get semantic enhancement + + def get_user_profile(user_id: str) -> UserProfile: + """Get user profile with semantic fields""" + return UserProfile( + user_id="usr_123", + email="user@example.com", + profile_url="https://example.com/user/123", + avatar_image="https://example.com/avatar.jpg", + created_date="2023-06-15", + last_login_time="2024-01-15T14:22:00Z", + account_amount=150.75, + completion_percentage=85, + primary_color="#3498db", + status="active", + name="John Doe", + ) + + tool = manager.add_tool(get_user_profile) + assert tool.output_schema is not None + + properties = tool.output_schema["properties"] + + # Check semantic enhancements + assert properties["user_id"]["semantic_type"] == "identifier" + assert properties["email"]["semantic_type"] == "email" + assert properties["profile_url"]["semantic_type"] == "url" + assert properties["avatar_image"]["semantic_type"] == "image" + assert properties["created_date"]["semantic_type"] == "datetime" + assert properties["created_date"]["datetime_type"] == "date_only" + assert properties["last_login_time"]["semantic_type"] == "datetime" + assert ( + properties["last_login_time"]["datetime_type"] == "time_only" + ) # Contains "time" but not "date" + assert properties["account_amount"]["semantic_type"] == "currency" + assert properties["completion_percentage"]["semantic_type"] == "percentage" + assert properties["primary_color"]["semantic_type"] == "color" + assert properties["status"]["semantic_type"] == "status" + + # Check that non-semantic fields don't get enhancement + assert "semantic_type" not in properties["name"] + + # Check that required field is removed from output schema + assert "required" not in tool.output_schema + + @pytest.mark.anyio + async def test_enhanced_schemas_in_fastmcp_listing(self): + """Test that enhanced schemas are included in FastMCP tool listing.""" + app = FastMCP() + + class MediaFile(BaseModel): + name: str # Changed from "filename" to avoid file_path detection + file_path: str + audio_url: str + video_url: str + image_url: str + created_timestamp: str + size: int # Changed from "file_size" to avoid file_path detection + + @app.tool() + def get_media_file(file_id: str) -> MediaFile: + """Get media file information with semantic fields""" + return MediaFile( + name="example.mp3", + file_path="/media/audio/example.mp3", + audio_url="https://example.com/audio/example.mp3", + video_url="https://example.com/video/example.mp4", + image_url="https://example.com/images/thumbnail.jpg", + created_timestamp="2024-01-15T10:30:00Z", + size=1024000, + ) + + tools = await app.list_tools() + assert len(tools) == 1 + + media_tool = tools[0] + assert media_tool.outputSchema is not None + + properties = media_tool.outputSchema["properties"] + + # Verify semantic enhancements are present in the listing + assert properties["file_path"]["semantic_type"] == "file_path" + assert properties["audio_url"]["semantic_type"] == "url" + assert properties["video_url"]["semantic_type"] == "url" + assert properties["image_url"]["semantic_type"] == "url" + assert properties["created_timestamp"]["semantic_type"] == "datetime" + assert ( + properties["created_timestamp"]["datetime_type"] == "time_only" + ) # Contains "time" but not "date" + + # Non-semantic fields should not have enhancements + assert "semantic_type" not in properties["name"] + assert "semantic_type" not in properties["size"] + + def test_enhanced_schema_with_media_formats(self): + """Test schema enhancement with specific media format detection.""" + manager = ToolManager() + + class MediaCollection(BaseModel): + audio_mp3: str + video_mp4: str + image_jpg: str + generic_audio: str + generic_video: str + generic_image: str + + def get_media_collection() -> MediaCollection: + """Get media collection with format-specific fields""" + return MediaCollection( + audio_mp3="song.mp3", + video_mp4="movie.mp4", + image_jpg="photo.jpg", + generic_audio="sound", + generic_video="clip", + generic_image="picture", + ) + + tool = manager.add_tool(get_media_collection) + assert tool.output_schema is not None + properties = tool.output_schema["properties"] + + # Check media format detection + assert properties["audio_mp3"]["semantic_type"] == "audio" + assert properties["audio_mp3"]["media_format"] == "audio_file" + + assert properties["video_mp4"]["semantic_type"] == "video" + assert properties["video_mp4"]["media_format"] == "video_file" + + assert properties["image_jpg"]["semantic_type"] == "image" + assert properties["image_jpg"]["media_format"] == "image_file" + + # Generic media fields should have semantic_type but no media_format + assert properties["generic_audio"]["semantic_type"] == "audio" + assert "media_format" not in properties["generic_audio"] + + assert properties["generic_video"]["semantic_type"] == "video" + assert "media_format" not in properties["generic_video"] + + assert properties["generic_image"]["semantic_type"] == "image" + assert "media_format" not in properties["generic_image"] diff --git a/tests/server/test_lowlevel_tool_annotations.py b/tests/server/test_lowlevel_tool_annotations.py index e9eff9ed0..2ced304b5 100644 --- a/tests/server/test_lowlevel_tool_annotations.py +++ b/tests/server/test_lowlevel_tool_annotations.py @@ -38,6 +38,7 @@ async def list_tools(): }, "required": ["message"], }, + outputSchema={"type": "string", "description": "The echoed message"}, annotations=ToolAnnotations( title="Echo Tool", readOnlyHint=True, @@ -54,9 +55,11 @@ async def list_tools(): # Message handler for client async def message_handler( - message: RequestResponder[ServerRequest, ClientResult] - | ServerNotification - | Exception, + message: ( + RequestResponder[ServerRequest, ClientResult] + | ServerNotification + | Exception + ), ) -> None: if isinstance(message, Exception): raise message