Skip to content

Commit 8f83407

Browse files
authored
Add support for MCP's Streamable HTTP transport (#1716)
1 parent 2392791 commit 8f83407

File tree

5 files changed

+101
-61
lines changed

5 files changed

+101
-61
lines changed

docs/mcp/client.md

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,23 @@ pip/uv-add "pydantic-ai-slim[mcp]"
1818

1919
PydanticAI comes with two ways to connect to MCP servers:
2020

21-
- [`MCPServerHTTP`][pydantic_ai.mcp.MCPServerHTTP] which connects to an MCP server using the [HTTP SSE](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse) transport
21+
- [`MCPServerHTTP`][pydantic_ai.mcp.MCPServerHTTP] which connects to an MCP server using the [Streamable HTTP transport](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http) transport
2222
- [`MCPServerStdio`][pydantic_ai.mcp.MCPServerStdio] which runs the server as a subprocess and connects to it using the [stdio](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#stdio) transport
2323

2424
Examples of both are shown below; [mcp-run-python](run-python.md) is used as the MCP server in both examples.
2525

26-
### SSE Client
26+
### HTTP Client
2727

28-
[`MCPServerHTTP`][pydantic_ai.mcp.MCPServerHTTP] connects over HTTP using the [HTTP + Server Sent Events transport](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse) to a server.
28+
[`MCPServerHTTP`][pydantic_ai.mcp.MCPServerHTTP] connects over HTTP using the [Streamable HTTP transport](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http) to a server.
2929

3030
!!! note
3131
[`MCPServerHTTP`][pydantic_ai.mcp.MCPServerHTTP] requires an MCP server to be running and accepting HTTP connections before calling [`agent.run_mcp_servers()`][pydantic_ai.Agent.run_mcp_servers]. Running the server is not managed by PydanticAI.
3232

33-
The name "HTTP" is used since this implemented will be adapted in future to use the new
34-
[Streamable HTTP](https://github.com/modelcontextprotocol/specification/pull/206) currently in development.
33+
The StreamableHTTP Transport is able to connect to both stateless HTTP and older Server Sent Events (SSE) servers.
3534

36-
Before creating the SSE client, we need to run the server (docs [here](run-python.md)):
35+
Before creating the HTTP client, we need to run the server (docs [here](run-python.md)):
3736

38-
```bash {title="terminal (run sse server)"}
37+
```bash {title="terminal (run http server)"}
3938
deno run \
4039
-N -R=node_modules -W=node_modules --node-modules-dir=auto \
4140
jsr:@pydantic/mcp-run-python sse
@@ -56,7 +55,7 @@ async def main():
5655
#> There are 9,208 days between January 1, 2000, and March 18, 2025.
5756
```
5857

59-
1. Define the MCP server with the URL used to connect.
58+
1. Define the MCP server with the URL used to connect. This will typically end in `/mcp` for HTTP servers and `/sse` for SSE.
6059
2. Create an agent with the MCP server attached.
6160
3. Create a client session to connect to the server.
6261

pydantic_ai_slim/pydantic_ai/mcp.py

Lines changed: 57 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,22 @@
22

33
import base64
44
import json
5+
import warnings
56
from abc import ABC, abstractmethod
67
from collections.abc import AsyncIterator, Sequence
78
from contextlib import AsyncExitStack, asynccontextmanager
89
from dataclasses import dataclass
10+
from datetime import timedelta
911
from pathlib import Path
1012
from types import TracebackType
1113
from typing import Any
1214

1315
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
16+
from mcp.shared.message import SessionMessage
1417
from mcp.types import (
1518
BlobResourceContents,
1619
EmbeddedResource,
1720
ImageContent,
18-
JSONRPCMessage,
1921
LoggingLevel,
2022
TextContent,
2123
TextResourceContents,
@@ -28,8 +30,8 @@
2830

2931
try:
3032
from mcp.client.session import ClientSession
31-
from mcp.client.sse import sse_client
3233
from mcp.client.stdio import StdioServerParameters, stdio_client
34+
from mcp.client.streamable_http import streamablehttp_client
3335
except ImportError as _import_error:
3436
raise ImportError(
3537
'Please install the `mcp` package to use the MCP server, '
@@ -55,19 +57,16 @@ class MCPServer(ABC):
5557
"""
5658

5759
_client: ClientSession
58-
_read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception]
59-
_write_stream: MemoryObjectSendStream[JSONRPCMessage]
60+
_read_stream: MemoryObjectReceiveStream[SessionMessage | Exception]
61+
_write_stream: MemoryObjectSendStream[SessionMessage]
6062
_exit_stack: AsyncExitStack
6163

6264
@abstractmethod
6365
@asynccontextmanager
6466
async def client_streams(
6567
self,
6668
) -> AsyncIterator[
67-
tuple[
68-
MemoryObjectReceiveStream[JSONRPCMessage | Exception],
69-
MemoryObjectSendStream[JSONRPCMessage],
70-
]
69+
tuple[MemoryObjectReceiveStream[SessionMessage | Exception], MemoryObjectSendStream[SessionMessage]]
7170
]:
7271
"""Create the streams for the MCP server."""
7372
raise NotImplementedError('MCP Server subclasses must implement this method.')
@@ -256,10 +255,7 @@ async def main():
256255
async def client_streams(
257256
self,
258257
) -> AsyncIterator[
259-
tuple[
260-
MemoryObjectReceiveStream[JSONRPCMessage | Exception],
261-
MemoryObjectSendStream[JSONRPCMessage],
262-
]
258+
tuple[MemoryObjectReceiveStream[SessionMessage | Exception], MemoryObjectSendStream[SessionMessage]]
263259
]:
264260
server = StdioServerParameters(command=self.command, args=list(self.args), env=self.env, cwd=self.cwd)
265261
async with stdio_client(server=server) as (read_stream, write_stream):
@@ -276,11 +272,11 @@ def __repr__(self) -> str:
276272
class MCPServerHTTP(MCPServer):
277273
"""An MCP server that connects over streamable HTTP connections.
278274
279-
This class implements the SSE transport from the MCP specification.
280-
See <https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse> for more information.
275+
This class implements the Streamable HTTP transport from the MCP specification.
276+
See <https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http> for more information.
281277
282-
The name "HTTP" is used since this implemented will be adapted in future to use the new
283-
[Streamable HTTP](https://github.com/modelcontextprotocol/specification/pull/206) currently in development.
278+
The Streamable HTTP transport is intended to replace the SSE transport from the previous protocol, but it is fully
279+
backwards compatible with SSE-based servers.
284280
285281
!!! note
286282
Using this class as an async context manager will create a new pool of HTTP connections to connect
@@ -291,7 +287,7 @@ class MCPServerHTTP(MCPServer):
291287
from pydantic_ai import Agent
292288
from pydantic_ai.mcp import MCPServerHTTP
293289
294-
server = MCPServerHTTP('http://localhost:3001/sse') # (1)!
290+
server = MCPServerHTTP('http://localhost:3001/mcp') # (1)!
295291
agent = Agent('openai:gpt-4o', mcp_servers=[server])
296292
297293
async def main():
@@ -304,27 +300,27 @@ async def main():
304300
"""
305301

306302
url: str
307-
"""The URL of the SSE endpoint on the MCP server.
303+
"""The URL of the SSE or MCP endpoint on the MCP server.
308304
309-
For example for a server running locally, this might be `http://localhost:3001/sse`.
305+
For example for a server running locally, this might be `http://localhost:3001/mcp`.
310306
"""
311307

312308
headers: dict[str, Any] | None = None
313-
"""Optional HTTP headers to be sent with each request to the SSE endpoint.
309+
"""Optional HTTP headers to be sent with each request to the endpoint.
314310
315311
These headers will be passed directly to the underlying `httpx.AsyncClient`.
316312
Useful for authentication, custom headers, or other HTTP-specific configurations.
317313
"""
318314

319-
timeout: float = 5
320-
"""Initial connection timeout in seconds for establishing the SSE connection.
315+
timeout: timedelta | float = timedelta(seconds=5)
316+
"""Initial connection timeout as a timedelta for establishing the connection.
321317
322318
This timeout applies to the initial connection setup and handshake.
323319
If the connection cannot be established within this time, the operation will fail.
324320
"""
325321

326-
sse_read_timeout: float = 60 * 5
327-
"""Maximum time in seconds to wait for new SSE messages before timing out.
322+
sse_read_timeout: timedelta | float = timedelta(minutes=5)
323+
"""Maximum time as a timedelta to wait for new SSE messages before timing out.
328324
329325
This timeout applies to the long-lived SSE connection after it's established.
330326
If no new messages are received within this time, the connection will be considered stale
@@ -346,21 +342,48 @@ async def main():
346342
For example, if `tool_prefix='foo'`, then a tool named `bar` will be registered as `foo_bar`
347343
"""
348344

345+
def __post_init__(self):
346+
if not isinstance(self.timeout, timedelta):
347+
warnings.warn(
348+
'Passing timeout as a float has been deprecated, please use a timedelta instead.',
349+
DeprecationWarning,
350+
stacklevel=2,
351+
)
352+
self.timeout = timedelta(seconds=self.timeout)
353+
354+
if not isinstance(self.sse_read_timeout, timedelta):
355+
warnings.warn(
356+
'Passing sse_read_timeout as a float has been deprecated, please use a timedelta instead.',
357+
DeprecationWarning,
358+
stacklevel=2,
359+
)
360+
self.sse_read_timeout = timedelta(seconds=self.sse_read_timeout)
361+
349362
@asynccontextmanager
350363
async def client_streams(
351364
self,
352365
) -> AsyncIterator[
353-
tuple[
354-
MemoryObjectReceiveStream[JSONRPCMessage | Exception],
355-
MemoryObjectSendStream[JSONRPCMessage],
356-
]
366+
tuple[MemoryObjectReceiveStream[SessionMessage | Exception], MemoryObjectSendStream[SessionMessage]]
357367
]: # pragma: no cover
358-
async with sse_client(
359-
url=self.url,
360-
headers=self.headers,
361-
timeout=self.timeout,
362-
sse_read_timeout=self.sse_read_timeout,
363-
) as (read_stream, write_stream):
368+
if not isinstance(self.timeout, timedelta):
369+
warnings.warn(
370+
'Passing timeout as a float has been deprecated, please use a timedelta instead.',
371+
DeprecationWarning,
372+
stacklevel=2,
373+
)
374+
self.timeout = timedelta(seconds=self.timeout)
375+
376+
if not isinstance(self.sse_read_timeout, timedelta):
377+
warnings.warn(
378+
'Passing sse_read_timeout as a float has been deprecated, please use a timedelta instead.',
379+
DeprecationWarning,
380+
stacklevel=2,
381+
)
382+
self.sse_read_timeout = timedelta(seconds=self.sse_read_timeout)
383+
384+
async with streamablehttp_client(
385+
url=self.url, headers=self.headers, timeout=self.timeout, sse_read_timeout=self.sse_read_timeout
386+
) as (read_stream, write_stream, _):
364387
yield read_stream, write_stream
365388

366389
def _get_log_level(self) -> LoggingLevel | None:

pydantic_ai_slim/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ tavily = ["tavily-python>=0.5.0"]
7575
# CLI
7676
cli = ["rich>=13", "prompt-toolkit>=3", "argcomplete>=3.5.0"]
7777
# MCP
78-
mcp = ["mcp>=1.6.0; python_version >= '3.10'"]
78+
mcp = ["mcp>=1.8.0; python_version >= '3.10'"]
7979
# Evals
8080
evals = ["pydantic-evals=={{ version }}"]
8181
# A2A

tests/test_mcp.py

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Tests for the MCP (Model Context Protocol) server implementation."""
22

33
import re
4+
from datetime import timedelta
45
from pathlib import Path
56

67
import pytest
@@ -70,25 +71,41 @@ async def test_stdio_server_with_cwd():
7071
assert len(tools) == 10
7172

7273

73-
def test_sse_server():
74-
sse_server = MCPServerHTTP(url='http://localhost:8000/sse')
75-
assert sse_server.url == 'http://localhost:8000/sse'
76-
assert sse_server._get_log_level() is None # pyright: ignore[reportPrivateUsage]
74+
def test_http_server():
75+
http_server = MCPServerHTTP(url='http://localhost:8000/sse')
76+
assert http_server.url == 'http://localhost:8000/sse'
77+
assert http_server._get_log_level() is None # pyright: ignore[reportPrivateUsage]
7778

7879

79-
def test_sse_server_with_header_and_timeout():
80-
sse_server = MCPServerHTTP(
80+
def test_http_server_with_header_and_timeout():
81+
http_server = MCPServerHTTP(
8182
url='http://localhost:8000/sse',
8283
headers={'my-custom-header': 'my-header-value'},
83-
timeout=10,
84-
sse_read_timeout=100,
84+
timeout=timedelta(seconds=10),
85+
sse_read_timeout=timedelta(seconds=100),
8586
log_level='info',
8687
)
87-
assert sse_server.url == 'http://localhost:8000/sse'
88-
assert sse_server.headers is not None and sse_server.headers['my-custom-header'] == 'my-header-value'
89-
assert sse_server.timeout == 10
90-
assert sse_server.sse_read_timeout == 100
91-
assert sse_server._get_log_level() == 'info' # pyright: ignore[reportPrivateUsage]
88+
assert http_server.url == 'http://localhost:8000/sse'
89+
assert http_server.headers is not None and http_server.headers['my-custom-header'] == 'my-header-value'
90+
assert http_server.timeout == timedelta(seconds=10)
91+
assert http_server.sse_read_timeout == timedelta(seconds=100)
92+
assert http_server._get_log_level() == 'info' # pyright: ignore[reportPrivateUsage]
93+
94+
95+
def test_http_server_with_deprecated_arguments():
96+
with pytest.warns(DeprecationWarning):
97+
http_server = MCPServerHTTP(
98+
url='http://localhost:8000/sse',
99+
headers={'my-custom-header': 'my-header-value'},
100+
timeout=10,
101+
sse_read_timeout=100,
102+
log_level='info',
103+
)
104+
assert http_server.url == 'http://localhost:8000/sse'
105+
assert http_server.headers is not None and http_server.headers['my-custom-header'] == 'my-header-value'
106+
assert http_server.timeout == timedelta(seconds=10)
107+
assert http_server.sse_read_timeout == timedelta(seconds=100)
108+
assert http_server._get_log_level() == 'info' # pyright: ignore[reportPrivateUsage]
92109

93110

94111
@pytest.mark.vcr()

uv.lock

Lines changed: 6 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)