Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jerome/fix/188 #206

Merged
merged 5 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added .git-blame-ignore-revs
Empty file.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.DS_Store
scratch/
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a comfort thing - i use folders like this to hack on random scripts before writing proper tests


# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
69 changes: 38 additions & 31 deletions src/mcp/server/lowlevel/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ async def main():
import logging
import warnings
from collections.abc import Awaitable, Callable
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from contextlib import AbstractAsyncContextManager, AsyncExitStack, asynccontextmanager
from typing import Any, AsyncIterator, Generic, Sequence, TypeVar

import anyio
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from pydantic import AnyUrl

Expand Down Expand Up @@ -469,41 +470,47 @@ async def run(
# in-process servers.
raise_exceptions: bool = False,
):
with warnings.catch_warnings(record=True) as w:
from contextlib import AsyncExitStack

async with AsyncExitStack() as stack:
lifespan_context = await stack.enter_async_context(self.lifespan(self))
session = await stack.enter_async_context(
ServerSession(read_stream, write_stream, initialization_options)
)
async with AsyncExitStack() as stack:
lifespan_context = await stack.enter_async_context(self.lifespan(self))
session = await stack.enter_async_context(
ServerSession(read_stream, write_stream, initialization_options)
)

async with anyio.create_task_group() as tg:
async for message in session.incoming_messages:
logger.debug(f"Received message: {message}")

match message:
case (
RequestResponder(
request=types.ClientRequest(root=req)
) as responder
):
with responder:
await self._handle_request(
message,
req,
session,
lifespan_context,
raise_exceptions,
)
case types.ClientNotification(root=notify):
await self._handle_notification(notify)

for warning in w:
logger.info(
"Warning: %s: %s",
warning.category.__name__,
warning.message,
tg.start_soon(
self._handle_message,
message,
session,
lifespan_context,
raise_exceptions,
)

async def _handle_message(
self,
message: RequestResponder[types.ClientRequest, types.ServerResult]
| types.ClientNotification
| Exception,
session: ServerSession,
lifespan_context: LifespanResultT,
raise_exceptions: bool = False,
):
with warnings.catch_warnings(record=True) as w:
match message:
case (
RequestResponder(request=types.ClientRequest(root=req)) as responder
):
with responder:
await self._handle_request(
message, req, session, lifespan_context, raise_exceptions
)
case types.ClientNotification(root=notify):
await self._handle_notification(notify)

for warning in w:
logger.info(f"Warning: {warning.category.__name__}: {warning.message}")

async def _handle_request(
self,
Expand Down
49 changes: 49 additions & 0 deletions tests/issues/test_188_concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import anyio
from pydantic import AnyUrl

from mcp.server.fastmcp import FastMCP
from mcp.shared.memory import (
create_connected_server_and_client_session as create_session,
)

_sleep_time_seconds = 0.01
_resource_name = "slow://slow_resource"


async def test_messages_are_executed_concurrently():
server = FastMCP("test")

@server.tool("sleep")
async def sleep_tool():
await anyio.sleep(_sleep_time_seconds)
return "done"

@server.resource(_resource_name)
async def slow_resource():
await anyio.sleep(_sleep_time_seconds)
return "slow"

async with create_session(server._mcp_server) as client_session:
start_time = anyio.current_time()
async with anyio.create_task_group() as tg:
for _ in range(10):
tg.start_soon(client_session.call_tool, "sleep")
tg.start_soon(client_session.read_resource, AnyUrl(_resource_name))

end_time = anyio.current_time()

duration = end_time - start_time
assert duration < 3 * _sleep_time_seconds
print(duration)


def main():
anyio.run(test_messages_are_executed_concurrently)


if __name__ == "__main__":
import logging

logging.basicConfig(level=logging.DEBUG)

main()