Skip to content

Feature/resource progress #800

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions src/mcp/client/session.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
from datetime import timedelta
from typing import Any, Protocol
from typing import Annotated, Any, Protocol

import anyio.lowlevel
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from pydantic import AnyUrl, TypeAdapter
from pydantic import TypeAdapter
from pydantic.networks import AnyUrl, UrlConstraints

import mcp.types as types
from mcp.shared.context import RequestContext
from mcp.shared.message import SessionMessage
from mcp.shared.session import BaseSession, ProgressFnT, RequestResponder
from mcp.shared.session import (
BaseSession,
ProgressFnT,
RequestResponder,
ResourceProgressFnT,
)
from mcp.shared.version import SUPPORTED_PROTOCOL_VERSIONS

DEFAULT_CLIENT_INFO = types.Implementation(name="mcp", version="0.1.0")
Expand Down Expand Up @@ -173,6 +179,9 @@ async def send_progress_notification(
progress: float,
total: float | None = None,
message: str | None = None,
# TODO check whether MCP spec allows clients to create resources
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On my interpretation of the spec, resources are owned and exposed by the "server". How the resource is created (say it's a file, media, API response) is not relevant. It may be that the same application can act as a client and a server (e.g. Claude Code can be a server but it is also a client, maybe the Claude Code application produces log files that the same application then exposes as a resource), but conceptually I don't think this fits the protocol, as currently framed. Nevertheless, it clearly states that both parties can request progress notifications about long-running processes (https://modelcontextprotocol.io/specification/2025-03-26/basic/utilities/progress#progress-flow), but IMO conceptually it is the server (or "the application when it's acting as a server") that creates resources. This is splitting hairs, but if Claude Code the server exposes a resource that Claude Code the application itself is producing, it's still inaccurate to say CC the client is creating the resource.

In any case, I'm curious what is the importance of this TODO? Is it to determine whether a client can send a resource/updated notification? If so, I would say no, IMO. But if the answer is "yes", like how would that affect the code/this PR?

# for server and therefore whether resource notifications
# would be required here too
) -> None:
"""Send a progress notification."""
await self.send_notification(
Expand Down Expand Up @@ -202,7 +211,10 @@ async def set_logging_level(self, level: types.LoggingLevel) -> types.EmptyResul
)

async def list_resources(
self, cursor: str | None = None
self,
cursor: str | None = None,
# TODO suggest in progress resources should be excluded by default?
# possibly add an optional flag to include?
) -> types.ListResourcesResult:
"""Send a resources/list request."""
return await self.send_request(
Expand Down Expand Up @@ -233,7 +245,9 @@ async def list_resource_templates(
types.ListResourceTemplatesResult,
)

async def read_resource(self, uri: AnyUrl) -> types.ReadResourceResult:
async def read_resource(
self, uri: Annotated[AnyUrl, UrlConstraints(host_required=False)]
) -> types.ReadResourceResult:
"""Send a resources/read request."""
return await self.send_request(
types.ClientRequest(
Expand All @@ -245,7 +259,9 @@ async def read_resource(self, uri: AnyUrl) -> types.ReadResourceResult:
types.ReadResourceResult,
)

async def subscribe_resource(self, uri: AnyUrl) -> types.EmptyResult:
async def subscribe_resource(
self, uri: Annotated[AnyUrl, UrlConstraints(host_required=False)]
) -> types.EmptyResult:
"""Send a resources/subscribe request."""
return await self.send_request(
types.ClientRequest(
Expand All @@ -257,7 +273,9 @@ async def subscribe_resource(self, uri: AnyUrl) -> types.EmptyResult:
types.EmptyResult,
)

async def unsubscribe_resource(self, uri: AnyUrl) -> types.EmptyResult:
async def unsubscribe_resource(
self, uri: Annotated[AnyUrl, UrlConstraints(host_required=False)]
) -> types.EmptyResult:
"""Send a resources/unsubscribe request."""
return await self.send_request(
types.ClientRequest(
Expand All @@ -274,7 +292,7 @@ async def call_tool(
name: str,
arguments: dict[str, Any] | None = None,
read_timeout_seconds: timedelta | None = None,
progress_callback: ProgressFnT | None = None,
progress_callback: ProgressFnT | ResourceProgressFnT | None = None,
) -> types.CallToolResult:
"""Send a tools/call request with optional progress callback support."""

Expand Down
1 change: 0 additions & 1 deletion src/mcp/client/session_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ async def __aexit__(
for exit_stack in self._session_exit_stacks.values():
tg.start_soon(exit_stack.aclose)


@property
def sessions(self) -> list[mcp.ClientSession]:
"""Returns the list of sessions being managed."""
Expand Down
12 changes: 9 additions & 3 deletions src/mcp/server/fastmcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
asynccontextmanager,
)
from itertools import chain
from typing import Any, Generic, Literal
from typing import Annotated, Any, Generic, Literal

import anyio
import pydantic_core
from pydantic import BaseModel, Field
from pydantic.networks import AnyUrl
from pydantic.networks import AnyUrl, UrlConstraints
from pydantic_settings import BaseSettings, SettingsConfigDict
from starlette.applications import Starlette
from starlette.middleware import Middleware
Expand Down Expand Up @@ -956,7 +956,12 @@ def request_context(self) -> RequestContext[ServerSessionT, LifespanContextT]:
return self._request_context

async def report_progress(
self, progress: float, total: float | None = None, message: str | None = None
self,
progress: float,
total: float | None = None,
message: str | None = None,
resource_uri: Annotated[AnyUrl, UrlConstraints(host_required=False)]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Local testing suggests it might be better to allow several resources to be reported as part of a single progress notification.

e.g.

context.report_progress(
  progress=3,
  total=10,
  message='Reticulating splines',
  resource_uris=['resource://spline1', 'resource://spline2', 'resource://spline3']
)

I might refactor along these lines if this patch has a chance of making it into the protocol/sdk and it makes sense to others.

Copy link
Contributor

@hesreallyhim hesreallyhim May 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry i need to review what you're trying to do more carefully, but ProgressNotificationParams, like most of the things in the Request/Response models, have model_config = ConfigDict(extra="allow"), which afaik means you can just add more data to the progress notification, without changing the protocol.

EDIT: Well, strictly-er speaking, without changing the SDK. It's a little unclear to me if the protocol itself allows this.

Copy link
Author

@davemssavage davemssavage May 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did wonder about this also, the motivation for this patch is as an alternative to modelcontextprotocol/modelcontextprotocol#549 which appears to be trying to allow tasks to return references to resources that will eventually be created on the server. The thread of conversation there points out that by allowing resources to be in a pending state complicates the protocol as clients then need to always check if a resource is ready. This is an alternative approach that rather than making resources pending they are always 'ready' but the server can report intermediary temporary resources as state along the way.

Take an example of a tool that allows training of a model. This patch allows the tool to report several different types of info to the client via one interface whilst the training is happening, e.g. metrics such as training loss as a json report, training artifacts and model checkpoints. The client can then decide to display and/or take action based on this progress resources as it sees fit.

I guess I could just add arbitary data to the low level protocol request params and achieve the same result, however to make this easy for tool implementers this needs to be exposed via the FastMCP interface, hence adding the resource_uri (s) to the context interface. Also if going with the low level protocol approach the client would also need to implement quite a lot of low level code and there would likely end up quite a lot of different approaches among tool providers.

However, as an argument against all this, I'm also debating whether long running tasks such as training are a good fit for this approach, as this involves a very long connection to the server for a single task. An alternative to all of this might be to have a tool that uses the structuredContent approach to start a server task and then the client can then poll via another tool to check on the status e.g.

@mcp.tool("Start a long running task")
async def start_task() -> Ticket:
  ...
  return Ticket(id=1234)

@mcp.tool("Check if a task has completed")
async def check_task(ticket: Ticket) -> Status:
  ...
  return Status(pending=True)

It's not obvious to me whether a LLM Agent would be able to reason about either behaviour so hence this branch https://github.com/davemssavage/python-sdk/tree/integration_test_1 puts together a few things I've been tinkering with to try out various approaches.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Futher thoughts, the downside of the manual start/check approach is it also requires tools to create a cancel method which is then duplicating behaviour of the MCP protocol itself at a higher level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I didn't catch this was aimed at improving FastMCP mostly. During my initial foray into this, I found that this was something kinda neglected in FastMCP which is why I went to lowlevel. Resource-update subscription notification I think is not really built into FastMCP API anyway, they just have something for ProgressNotification I believe (so maybe this is the missing piece really). I just got up so I have to read this a few more times, but a few thoughts (a) a long-running task could be treated as a resource (like "job://") and then the client could make a read request for the job, or subscribe to change notifications for the job (that's like your tool approach but you don't need any new tools like "check_task", it's just a read_resource where the resource is a "task://"... (b) something else lol I gotta chew on it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I think I get where you're going with the task itself as a job resource, however I think in that scenario it would be quite complex to support the type of progress updates seen in training, e.g. in the case of training a model there are quite a few different elements that can be notifiied as progress and I'm slightly struggling to see now a single resource could satisfy all of these.

I guess I could pass back several resources one for task metrics, one for task validation artefacts and one for checkpoints. However that only allows for the latest version of this information rather than being able to plot metrics over time as is possible if the resource is associated with a progress state. This also leaves open the question of how to cancel the task itself which would require a new tool to cancel a previous task.

I took a pass at simplifying cancelling a tool call initiated via the client session with this pull request #628. I'll see if I can get this training example working and and aim to add it as an example so there's something concrete to point to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in that scenario it would be quite complex to support the type of progress updates seen in training, e.g. in the case of training a model there are quite a few different elements that can be notifiied as progress and I'm slightly struggling to see now a single resource could satisfy all of these.

I think the idea is you're not asking for "ProgressNotifications" in the sense of like 50% complete, etc., the task/job is a resource, which can be whatever kind of data structure you like, with all kinds of different statuses/properties, etc., and so the server can update the resource with all the relevant data as things go along, and then client reads it and the server can give it whatever data structure supports your needs.

However that only allows for the latest version of this information rather than being able to plot metrics over time as is possible if the resource is associated with a progress state.

So then you could solve this by subscribing to get "resources/updated" notifications for the "job"/"task" resource, and then when anything is updated (new checkpoint, whatever), the server tells the client "there's a change" and the client reads the resource again, where the resource is the job itself. No polling involved either.

| None = None,
) -> None:
"""Report progress for the current operation.

Expand All @@ -979,6 +984,7 @@ async def report_progress(
progress=progress,
total=total,
message=message,
resource_uri=resource_uri,
)

async def read_resource(self, uri: str | AnyUrl) -> Iterable[ReadResourceContents]:
Expand Down
7 changes: 5 additions & 2 deletions src/mcp/server/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ async def handle_list_prompts(ctx: RequestContext) -> list[types.Prompt]:
"""

from enum import Enum
from typing import Any, TypeVar
from typing import Annotated, Any, TypeVar

import anyio
import anyio.lowlevel
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from pydantic import AnyUrl
from pydantic.networks import AnyUrl, UrlConstraints

import mcp.types as types
from mcp.server.models import InitializationOptions
Expand Down Expand Up @@ -288,6 +288,8 @@ async def send_progress_notification(
total: float | None = None,
message: str | None = None,
related_request_id: str | None = None,
resource_uri: Annotated[AnyUrl, UrlConstraints(host_required=False)]
| None = None,
) -> None:
"""Send a progress notification."""
await self.send_notification(
Expand All @@ -299,6 +301,7 @@ async def send_progress_notification(
progress=progress,
total=total,
message=message,
resource_uri=resource_uri,
),
)
),
Expand Down
43 changes: 39 additions & 4 deletions src/mcp/shared/session.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import inspect
import logging
from collections.abc import Callable
from contextlib import AsyncExitStack
from datetime import timedelta
from types import TracebackType
from typing import Any, Generic, Protocol, TypeVar
from typing import Annotated, Any, Generic, Protocol, TypeVar, runtime_checkable

import anyio
import httpx
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from pydantic import BaseModel
from pydantic.networks import AnyUrl, UrlConstraints
from typing_extensions import Self

from mcp.shared.exceptions import McpError
Expand Down Expand Up @@ -43,6 +45,7 @@
RequestId = str | int


@runtime_checkable
class ProgressFnT(Protocol):
"""Protocol for progress notification callbacks."""

Expand All @@ -51,6 +54,20 @@ async def __call__(
) -> None: ...


@runtime_checkable
class ResourceProgressFnT(Protocol):
"""Protocol for progress notification callbacks with resources."""

async def __call__(
self,
progress: float,
total: float | None,
message: str | None,
resource_uri: Annotated[AnyUrl, UrlConstraints(host_required=False)]
| None = None,
) -> None: ...


class RequestResponder(Generic[ReceiveRequestT, SendResultT]):
"""Handles responding to MCP requests and manages request lifecycle.

Expand Down Expand Up @@ -179,6 +196,7 @@ class BaseSession(
_request_id: int
_in_flight: dict[RequestId, RequestResponder[ReceiveRequestT, SendResultT]]
_progress_callbacks: dict[RequestId, ProgressFnT]
_resource_callbacks: dict[RequestId, ResourceProgressFnT]

def __init__(
self,
Expand All @@ -198,6 +216,7 @@ def __init__(
self._session_read_timeout_seconds = read_timeout_seconds
self._in_flight = {}
self._progress_callbacks = {}
self._resource_callbacks = {}
self._exit_stack = AsyncExitStack()

async def __aenter__(self) -> Self:
Expand Down Expand Up @@ -225,7 +244,7 @@ async def send_request(
result_type: type[ReceiveResultT],
request_read_timeout_seconds: timedelta | None = None,
metadata: MessageMetadata = None,
progress_callback: ProgressFnT | None = None,
progress_callback: ProgressFnT | ResourceProgressFnT | None = None,
) -> ReceiveResultT:
"""
Sends a request and wait for a response. Raises an McpError if the
Expand All @@ -252,8 +271,15 @@ async def send_request(
if "_meta" not in request_data["params"]:
request_data["params"]["_meta"] = {}
request_data["params"]["_meta"]["progressToken"] = request_id
# Store the callback for this request
self._progress_callbacks[request_id] = progress_callback
# note this is required to ensure backwards compatibility
# for previous clients
signature = inspect.signature(progress_callback.__call__)
if len(signature.parameters) == 3:
# Store the callback for this request
self._resource_callbacks[request_id] = progress_callback # type: ignore
else:
# Store the callback for this request
self._progress_callbacks[request_id] = progress_callback

try:
jsonrpc_request = JSONRPCRequest(
Expand Down Expand Up @@ -397,6 +423,15 @@ async def _receive_loop(self) -> None:
notification.root.params.total,
notification.root.params.message,
)
elif progress_token in self._resource_callbacks:
callback = self._resource_callbacks[progress_token]
await callback(
notification.root.params.progress,
notification.root.params.total,
notification.root.params.message,
notification.root.params.resource_uri,
)

await self._received_notification(notification)
await self._handle_incoming(notification)
except Exception as e:
Expand Down
11 changes: 9 additions & 2 deletions src/mcp/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,12 +346,19 @@ class ProgressNotificationParams(NotificationParams):
total is unknown.
"""
total: float | None = None
"""Total number of items to process (or total progress required), if known."""
message: str | None = None
"""
Message related to progress. This should provide relevant human readable
progress information.
"""
message: str | None = None
"""Total number of items to process (or total progress required), if known."""
resource_uri: Annotated[AnyUrl, UrlConstraints(host_required=False)] | None = None
"""
An optional reference to an ephemeral resource associated with this
progress, servers may delete these at their descretion, but are encouraged
to make them available for a reasonable time period to allow clients to
retrieve and cache the resources locally
"""
model_config = ConfigDict(extra="allow")


Expand Down
6 changes: 3 additions & 3 deletions tests/issues/test_176_progress_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ async def test_progress_token_zero_first_call():
mock_session.send_progress_notification.call_count == 3
), "All progress notifications should be sent"
mock_session.send_progress_notification.assert_any_call(
progress_token=0, progress=0.0, total=10.0, message=None
progress_token=0, progress=0.0, total=10.0, message=None, resource_uri=None
)
mock_session.send_progress_notification.assert_any_call(
progress_token=0, progress=5.0, total=10.0, message=None
progress_token=0, progress=5.0, total=10.0, message=None, resource_uri=None
)
mock_session.send_progress_notification.assert_any_call(
progress_token=0, progress=10.0, total=10.0, message=None
progress_token=0, progress=10.0, total=10.0, message=None, resource_uri=None
)
Loading