Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ This document contains critical information about working with this codebase. Fo
- Coverage: test edge cases and errors
- New features require tests
- Bug fixes require regression tests
- Avoid `anyio.sleep()` with a fixed duration to wait for async operations. Instead:
- Use `anyio.Event` — set it in the callback/handler, `await event.wait()` in the test
- For stream messages, use `await stream.receive()` instead of `sleep()` + `receive_nowait()`
- Exception: `sleep()` is appropriate when testing time-based features (e.g., timeouts)
- Wrap indefinite waits (`event.wait()`, `stream.receive()`) in `anyio.fail_after(5)` to prevent hangs

- For commits fixing bugs or adding features based on user reports add:

Expand Down
6 changes: 6 additions & 0 deletions src/mcp/server/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ def __init__(
] = {}
self._sse_stream_writers: dict[RequestId, MemoryObjectSendStream[dict[str, str]]] = {}
self._terminated = False
# Idle timeout cancel scope; managed by the session manager.
self.idle_scope: anyio.CancelScope | None = None

@property
def is_terminated(self) -> bool:
Expand Down Expand Up @@ -773,8 +775,12 @@ async def terminate(self) -> None:
"""Terminate the current session, closing all streams.

Once terminated, all requests with this session ID will receive 404 Not Found.
Calling this method multiple times is safe (idempotent).
"""

if self._terminated: # pragma: no cover
return

self._terminated = True
logger.info(f"Terminating session: {self.mcp_session_id}")

Expand Down
67 changes: 47 additions & 20 deletions src/mcp/server/streamable_http_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,28 @@ class StreamableHTTPSessionManager:
2. Resumability via an optional event store
3. Connection management and lifecycle
4. Request handling and transport setup
5. Idle session cleanup via optional timeout

Important: Only one StreamableHTTPSessionManager instance should be created
per application. The instance cannot be reused after its run() context has
completed. If you need to restart the manager, create a new instance.

Args:
app: The MCP server instance
event_store: Optional event store for resumability support.
If provided, enables resumable connections where clients
can reconnect and receive missed events.
If None, sessions are still tracked but not resumable.
event_store: Optional event store for resumability support. If provided, enables resumable connections
where clients can reconnect and receive missed events. If None, sessions are still tracked but not
resumable.
json_response: Whether to use JSON responses instead of SSE streams
stateless: If True, creates a completely fresh transport for each request
with no session tracking or state persistence between requests.
stateless: If True, creates a completely fresh transport for each request with no session tracking or
state persistence between requests.
security_settings: Optional transport security settings.
retry_interval: Retry interval in milliseconds to suggest to clients in SSE
retry field. Used for SSE polling behavior.
retry_interval: Retry interval in milliseconds to suggest to clients in SSE retry field. Used for SSE
polling behavior.
session_idle_timeout: Optional idle timeout in seconds for stateful sessions. If set, sessions that
receive no HTTP requests for this duration will be automatically terminated and removed. When
retry_interval is also configured, ensure the idle timeout comfortably exceeds the retry interval to
avoid reaping sessions during normal SSE polling gaps. Default is None (no timeout). A value of 1800
(30 minutes) is recommended for most deployments.
"""

def __init__(
Expand All @@ -65,13 +70,20 @@ def __init__(
stateless: bool = False,
security_settings: TransportSecuritySettings | None = None,
retry_interval: int | None = None,
session_idle_timeout: float | None = None,
):
if session_idle_timeout is not None and session_idle_timeout <= 0:
raise ValueError("session_idle_timeout must be a positive number of seconds")
if stateless and session_idle_timeout is not None:
raise RuntimeError("session_idle_timeout is not supported in stateless mode")

self.app = app
self.event_store = event_store
self.json_response = json_response
self.stateless = stateless
self.security_settings = security_settings
self.retry_interval = retry_interval
self.session_idle_timeout = session_idle_timeout

# Session tracking (only used if not stateless)
self._session_creation_lock = anyio.Lock()
Expand Down Expand Up @@ -219,6 +231,9 @@ async def _handle_stateful_request(
if request_mcp_session_id is not None and request_mcp_session_id in self._server_instances: # pragma: no cover
transport = self._server_instances[request_mcp_session_id]
logger.debug("Session already exists, handling request directly")
# Push back idle deadline on activity
if transport.idle_scope is not None and self.session_idle_timeout is not None:
transport.idle_scope.deadline = anyio.current_time() + self.session_idle_timeout
Comment on lines +234 to +236
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the response takes more than the deadline?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm good question - there's no logic to prevent the cleanup from happening, so the response wouldn't make it back before the Transport gets closed. It'd get a ClosedResourceError and the response would be lost. I'd probably argue that's something for the server dev to calibrate for their use case if they have very long running requests? Default stays no timeout anyway.

await transport.handle_request(scope, receive, send)
return

Expand All @@ -245,19 +260,31 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
read_stream, write_stream = streams
task_status.started()
try:
await self.app.run(
read_stream,
write_stream,
self.app.create_initialization_options(),
stateless=False, # Stateful mode
)
except Exception as e:
logger.error(
f"Session {http_transport.mcp_session_id} crashed: {e}",
exc_info=True,
)
# Use a cancel scope for idle timeout — when the
# deadline passes the scope cancels app.run() and
# execution continues after the ``with`` block.
# Incoming requests push the deadline forward.
idle_scope = anyio.CancelScope()
if self.session_idle_timeout is not None:
idle_scope.deadline = anyio.current_time() + self.session_idle_timeout
http_transport.idle_scope = idle_scope

with idle_scope:
await self.app.run(
read_stream,
write_stream,
self.app.create_initialization_options(),
stateless=False,
)

if idle_scope.cancelled_caught:
assert http_transport.mcp_session_id is not None
logger.info(f"Session {http_transport.mcp_session_id} idle timeout")
self._server_instances.pop(http_transport.mcp_session_id, None)
await http_transport.terminate()
except Exception:
logger.exception(f"Session {http_transport.mcp_session_id} crashed")
finally:
# Only remove from instances if not terminated
if ( # pragma: no branch
http_transport.mcp_session_id
and http_transport.mcp_session_id in self._server_instances
Expand Down
77 changes: 77 additions & 0 deletions tests/server/test_streamable_http_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,80 @@ async def mock_receive():
assert error_data["id"] == "server-error"
assert error_data["error"]["code"] == INVALID_REQUEST
assert error_data["error"]["message"] == "Session not found"


@pytest.mark.anyio
async def test_idle_session_is_reaped():
"""After idle timeout fires, the session returns 404."""
app = Server("test-idle-reap")
manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.05)

async with manager.run():
sent_messages: list[Message] = []

async def mock_send(message: Message):
sent_messages.append(message)

scope = {
"type": "http",
"method": "POST",
"path": "/mcp",
"headers": [(b"content-type", b"application/json")],
}

async def mock_receive(): # pragma: no cover
return {"type": "http.request", "body": b"", "more_body": False}

await manager.handle_request(scope, mock_receive, mock_send)

session_id = None
for msg in sent_messages: # pragma: no branch
if msg["type"] == "http.response.start": # pragma: no branch
for header_name, header_value in msg.get("headers", []): # pragma: no branch
if header_name.decode().lower() == MCP_SESSION_ID_HEADER.lower():
session_id = header_value.decode()
break
if session_id: # pragma: no branch
break

assert session_id is not None, "Session ID not found in response headers"

# Wait for the 50ms idle timeout to fire and cleanup to complete
await anyio.sleep(0.1)
Copy link
Contributor Author

@felixweinberger felixweinberger Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some back and forth trying out event based stuff, given we're implementing timeout behavior here, this actually seems like the right way to test this - a case where a anyio.sleep(0.1) is justified?


# Verify via public API: old session ID now returns 404
response_messages: list[Message] = []

async def capture_send(message: Message):
response_messages.append(message)

scope_with_session = {
"type": "http",
"method": "POST",
"path": "/mcp",
"headers": [
(b"content-type", b"application/json"),
(b"mcp-session-id", session_id.encode()),
],
}

await manager.handle_request(scope_with_session, mock_receive, capture_send)

response_start = next(
(msg for msg in response_messages if msg["type"] == "http.response.start"),
None,
)
assert response_start is not None
assert response_start["status"] == 404


def test_session_idle_timeout_rejects_non_positive():
with pytest.raises(ValueError, match="positive number"):
StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=-1)
with pytest.raises(ValueError, match="positive number"):
StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=0)


def test_session_idle_timeout_rejects_stateless():
with pytest.raises(RuntimeError, match="not supported in stateless"):
StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=30, stateless=True)