Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
155 commits
Select commit Hold shift + click to select a range
386fa99
migrate mcp transport to http streamable
HzaRashid Nov 26, 2025
945d3ff
clean up v2/mcp
HzaRashid Nov 27, 2025
9afc891
ruff (mcp)
HzaRashid Nov 27, 2025
98235f4
add back ensure_session_mgr_running handler
HzaRashid Nov 27, 2025
5d4a5f3
refactor mcp server and set default mcp settings on project creation
HzaRashid Nov 28, 2025
07909c4
fix mcp auto install tests
HzaRashid Nov 28, 2025
9295f06
[autofix.ci] apply automated fixes
autofix-ci[bot] Nov 28, 2025
9583ee7
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Nov 28, 2025
c3e0f0a
migrate mcp transport to http streamable
HzaRashid Nov 26, 2025
6a7c5e9
clean up v2/mcp
HzaRashid Nov 27, 2025
c73c033
ruff (mcp)
HzaRashid Nov 27, 2025
d25e2fb
add back ensure_session_mgr_running handler
HzaRashid Nov 27, 2025
d9c5a2b
refactor mcp server and set default mcp settings on project creation
HzaRashid Nov 28, 2025
dcc4a23
fix mcp auto install tests
HzaRashid Nov 28, 2025
1a515c2
misc
HzaRashid Nov 28, 2025
592923c
refactor mcp and mcp_projects
HzaRashid Nov 28, 2025
a6ac93a
fix mcp ctx mgmt in main.py
HzaRashid Nov 29, 2025
023b007
use global exit stack mcp project servers
HzaRashid Nov 29, 2025
e44bf85
use global exit stack mcp project servers
HzaRashid Nov 29, 2025
953e260
return noop response in streamable dispatcher
HzaRashid Nov 30, 2025
dca3f83
docs
HzaRashid Nov 30, 2025
fb77727
docs (main.py)
HzaRashid Nov 30, 2025
4228eb1
use classes for http mgrs
HzaRashid Dec 1, 2025
cf8cd4c
use asyncio tg for mcp projects session mgrs mgmt
HzaRashid Dec 1, 2025
e3b717f
docs
HzaRashid Dec 1, 2025
edbdf19
clean up project tasks class
HzaRashid Dec 1, 2025
bc5df6a
recover original behaviour for /{project_id} endpoint
HzaRashid Dec 1, 2025
1554c3a
[autofix.ci] apply automated fixes
autofix-ci[bot] Dec 1, 2025
6f873b6
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Dec 1, 2025
49db8c5
[autofix.ci] apply automated fixes (attempt 3/3)
autofix-ci[bot] Dec 1, 2025
e44d9b9
Merge branch 'feat/http-stream-mcp' of https://github.com/langflow-ai…
HimavarshaVS Dec 1, 2025
e69f395
fix for backwards compatilibility for sse requests
HimavarshaVS Dec 1, 2025
dbe619b
[autofix.ci] apply automated fixes
autofix-ci[bot] Dec 1, 2025
8b599f2
migrate mcp transport to http streamable
HzaRashid Nov 26, 2025
625499d
clean up v2/mcp
HzaRashid Nov 27, 2025
3edd952
ruff (mcp)
HzaRashid Nov 27, 2025
716207f
add back ensure_session_mgr_running handler
HzaRashid Nov 27, 2025
1b7146b
refactor mcp server and set default mcp settings on project creation
HzaRashid Nov 28, 2025
88d1f2a
fix mcp auto install tests
HzaRashid Nov 28, 2025
cb3d1a6
misc
HzaRashid Nov 28, 2025
78faaf7
refactor mcp and mcp_projects
HzaRashid Nov 28, 2025
6baa7dd
fix mcp ctx mgmt in main.py
HzaRashid Nov 29, 2025
fc2e11b
use global exit stack mcp project servers
HzaRashid Nov 29, 2025
d93ccf7
use global exit stack mcp project servers
HzaRashid Nov 29, 2025
c261a29
return noop response in streamable dispatcher
HzaRashid Nov 30, 2025
66e449e
docs
HzaRashid Nov 30, 2025
a9ecfc3
docs (main.py)
HzaRashid Nov 30, 2025
6624dc0
use classes for http mgrs
HzaRashid Dec 1, 2025
c1a7618
use asyncio tg for mcp projects session mgrs mgmt
HzaRashid Dec 1, 2025
e96266a
docs
HzaRashid Dec 1, 2025
719b7e6
clean up project tasks class
HzaRashid Dec 1, 2025
84e48e9
recover original behaviour for /{project_id} endpoint
HzaRashid Dec 1, 2025
565ae37
implement asyncio.TaskGroup backport for python 3.10
HzaRashid Dec 1, 2025
c8a9352
remove unused const
HzaRashid Dec 1, 2025
9cfeca4
[autofix.ci] apply automated fixes
autofix-ci[bot] Dec 1, 2025
981f85e
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Dec 1, 2025
6fb668f
[autofix.ci] apply automated fixes (attempt 3/3)
autofix-ci[bot] Dec 1, 2025
e310825
Merge branch 'feat/http-stream-mcp' of https://github.com/langflow-ai…
HimavarshaVS Dec 2, 2025
2941572
replace asyncio with anyio
HzaRashid Dec 2, 2025
f7e574f
touch ups
HzaRashid Dec 2, 2025
7402173
[autofix.ci] apply automated fixes
autofix-ci[bot] Dec 2, 2025
f892748
Merge branch 'main' into feat/http-stream-mcp
HimavarshaVS Dec 3, 2025
c0ebc4f
[autofix.ci] apply automated fixes
autofix-ci[bot] Dec 3, 2025
d0f0489
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Dec 3, 2025
f2146eb
Merge branch 'feat/http-stream-mcp' of https://github.com/langflow-ai…
HimavarshaVS Dec 3, 2025
4c2d301
Merge branch 'feat/http-stream-mcp' of https://github.com/langflow-ai…
HimavarshaVS Dec 3, 2025
bbc3584
handle shutdown properly
HimavarshaVS Dec 3, 2025
71513e4
Merge branch 'main' into feat/http-stream-mcp
HimavarshaVS Dec 3, 2025
40df8e2
Update test_mcp_projects.py
erichare Dec 3, 2025
d606968
Fix mypy errors
erichare Dec 3, 2025
f06a9ed
rollback start manager changes
HimavarshaVS Dec 3, 2025
04914c5
Merge branch 'feat/http-stream-mcp' of https://github.com/langflow-ai…
HimavarshaVS Dec 3, 2025
75ade96
use global exit stack mcp project servers
HzaRashid Nov 29, 2025
ae02720
return noop response in streamable dispatcher
HzaRashid Nov 30, 2025
b91ace6
docs
HzaRashid Nov 30, 2025
06d40b5
docs (main.py)
HzaRashid Nov 30, 2025
4a4920e
use classes for http mgrs
HzaRashid Dec 1, 2025
379a487
use asyncio tg for mcp projects session mgrs mgmt
HzaRashid Dec 1, 2025
ca4fe98
docs
HzaRashid Dec 1, 2025
c05de69
clean up project tasks class
HzaRashid Dec 1, 2025
d1c39e4
recover original behaviour for /{project_id} endpoint
HzaRashid Dec 1, 2025
f5441c0
[autofix.ci] apply automated fixes
autofix-ci[bot] Dec 1, 2025
1a1b2ce
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Dec 1, 2025
60da51d
[autofix.ci] apply automated fixes (attempt 3/3)
autofix-ci[bot] Dec 1, 2025
64b35d8
fix for backwards compatilibility for sse requests
HimavarshaVS Dec 1, 2025
8117663
[autofix.ci] apply automated fixes
autofix-ci[bot] Dec 1, 2025
27f8ab5
migrate mcp transport to http streamable
HzaRashid Nov 26, 2025
2cc0fec
clean up v2/mcp
HzaRashid Nov 27, 2025
23c6046
ruff (mcp)
HzaRashid Nov 27, 2025
b37b6fe
add back ensure_session_mgr_running handler
HzaRashid Nov 27, 2025
e49e5fe
refactor mcp server and set default mcp settings on project creation
HzaRashid Nov 28, 2025
144bd01
migrate mcp transport to http streamable
HzaRashid Nov 26, 2025
7a83a76
clean up v2/mcp
HzaRashid Nov 27, 2025
3a493b1
ruff (mcp)
HzaRashid Nov 27, 2025
c2ee46b
add back ensure_session_mgr_running handler
HzaRashid Nov 27, 2025
04bd598
refactor mcp server and set default mcp settings on project creation
HzaRashid Nov 28, 2025
3966682
fix mcp auto install tests
HzaRashid Nov 28, 2025
3b59b7e
[autofix.ci] apply automated fixes
autofix-ci[bot] Nov 28, 2025
dfd52ff
refactor mcp and mcp_projects
HzaRashid Nov 28, 2025
bbc5daf
fix mcp ctx mgmt in main.py
HzaRashid Nov 29, 2025
77d0804
use global exit stack mcp project servers
HzaRashid Nov 29, 2025
2a7edeb
fix mcp auto install tests
HzaRashid Nov 28, 2025
d47c259
misc
HzaRashid Nov 28, 2025
873916f
refactor mcp and mcp_projects
HzaRashid Nov 28, 2025
e4059fc
fix mcp ctx mgmt in main.py
HzaRashid Nov 29, 2025
07a8666
use global exit stack mcp project servers
HzaRashid Nov 29, 2025
6a9a3b2
use global exit stack mcp project servers
HzaRashid Nov 29, 2025
aca1ece
return noop response in streamable dispatcher
HzaRashid Nov 30, 2025
166d974
docs
HzaRashid Nov 30, 2025
c76cfb1
docs (main.py)
HzaRashid Nov 30, 2025
0f0257d
use classes for http mgrs
HzaRashid Dec 1, 2025
3d7ff92
use asyncio tg for mcp projects session mgrs mgmt
HzaRashid Dec 1, 2025
323258f
docs
HzaRashid Dec 1, 2025
6add199
clean up project tasks class
HzaRashid Dec 1, 2025
58d621f
recover original behaviour for /{project_id} endpoint
HzaRashid Dec 1, 2025
c52fbd5
implement asyncio.TaskGroup backport for python 3.10
HzaRashid Dec 1, 2025
c5c8fcd
remove unused const
HzaRashid Dec 1, 2025
13deb16
[autofix.ci] apply automated fixes
autofix-ci[bot] Dec 1, 2025
2321075
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Dec 1, 2025
eefbc02
[autofix.ci] apply automated fixes (attempt 3/3)
autofix-ci[bot] Dec 1, 2025
78885b6
replace asyncio with anyio
HzaRashid Dec 2, 2025
52142dd
touch ups
HzaRashid Dec 2, 2025
81c847d
[autofix.ci] apply automated fixes
autofix-ci[bot] Dec 3, 2025
924f59a
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Dec 3, 2025
a0663e5
handle shutdown properly
HimavarshaVS Dec 3, 2025
040ca64
rollback start manager changes
HimavarshaVS Dec 3, 2025
c206c0a
Update test_mcp_projects.py
erichare Dec 3, 2025
8935e53
Fix mypy errors
erichare Dec 3, 2025
1694e3b
graceful termination of mcp projects /streamable endpoint
HzaRashid Dec 3, 2025
5654ceb
delete removed file
HzaRashid Dec 3, 2025
9f7014d
[autofix.ci] apply automated fixes
autofix-ci[bot] Dec 3, 2025
6d75e3b
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Nov 28, 2025
fad1b43
migrate mcp transport to http streamable
HzaRashid Nov 26, 2025
ca2941f
clean up v2/mcp
HzaRashid Nov 27, 2025
44ed389
ruff (mcp)
HzaRashid Nov 27, 2025
8b191e0
add back ensure_session_mgr_running handler
HzaRashid Nov 27, 2025
245e3c9
refactor mcp server and set default mcp settings on project creation
HzaRashid Nov 28, 2025
bb5faba
misc
HzaRashid Nov 28, 2025
02d5319
Merge branch 'main' into feat/http-stream-mcp
HimavarshaVS Dec 4, 2025
44b185f
[autofix.ci] apply automated fixes
autofix-ci[bot] Dec 4, 2025
2aa111e
Merge branch 'feat/http-stream-mcp' of https://github.com/langflow-ai…
HimavarshaVS Dec 4, 2025
3b4c4bd
build component index
HimavarshaVS Dec 4, 2025
173a2d1
Merge remote-tracking branch 'origin' into feat/http-stream-mcp
HimavarshaVS Dec 4, 2025
3d3def6
[autofix.ci] apply automated fixes
autofix-ci[bot] Dec 4, 2025
aea5f78
[autofix.ci] apply automated fixes (attempt 3/3)
autofix-ci[bot] Dec 1, 2025
510cc65
add health check endpoint for streamable
HzaRashid Dec 4, 2025
99705e0
sync starter projects with main
HzaRashid Dec 4, 2025
1c2f734
remove unused delete method from sse transport
HzaRashid Dec 4, 2025
eea4849
Merge remote-tracking branch 'upstream/main' into feat/http-stream-mcp
HzaRashid Dec 5, 2025
269dd1e
get user-level files in handle_list_resources
HzaRashid Dec 7, 2025
0c709b3
Merge remote-tracking branch 'upstream/main' into feat/http-stream-mcp
HzaRashid Dec 8, 2025
b943b6e
add backwards compat tests for sse transport
HzaRashid Dec 8, 2025
d0925df
merge with main
HzaRashid Dec 8, 2025
d458fc2
fix frontend tests
HzaRashid Dec 8, 2025
b973b41
fix lfx test
HzaRashid Dec 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
use global exit stack mcp project servers
  • Loading branch information
HzaRashid committed Dec 3, 2025
commit 07a8666c50224e84f61994842e0184a31be71f11
122 changes: 20 additions & 102 deletions src/backend/base/langflow/api/v1/mcp_projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import platform
from asyncio.subprocess import create_subprocess_exec
from collections.abc import Sequence
from contextlib import AsyncExitStack
from contextlib import AsyncExitStack, asynccontextmanager
from contextvars import ContextVar
from datetime import datetime, timezone
from ipaddress import ip_address
Expand Down Expand Up @@ -1202,7 +1202,6 @@ def __init__(self, project_id: UUID):
# TODO: implement an environment variable to enable/disable stateless mode
self.session_manager = StreamableHTTPSessionManager(self.server, stateless=True)
self._manager_lock = asyncio.Lock()
self._manager_stack: AsyncExitStack | None = None
self._manager_started = False

# Register handlers that filter by project
Expand Down Expand Up @@ -1247,105 +1246,34 @@ async def ensure_session_manager_running(self) -> None:
if self._manager_started:
return

self._manager_stack = AsyncExitStack()
await self._manager_stack.enter_async_context(self.session_manager.run())
stack = _get_project_session_manager_stack()
await stack.enter_async_context(self.session_manager.run())
self._manager_started = True
await logger.adebug("Streamable HTTP manager started for project %s", self.project_id)

async def stop_session_manager(self) -> None:
"""Stop the project's Streamable HTTP manager if it is running."""
async with self._manager_lock:
if not self._manager_started or self._manager_stack is None:
return

await self._manager_stack.aclose()
self._manager_stack = None
self._manager_started = False
await logger.adebug("Streamable HTTP manager stopped for project %s", self.project_id)


# Cache of project MCP servers
project_mcp_servers: dict[str, ProjectMCPServer] = {}
project_mcp_servers = {}
_project_session_manager_stack: AsyncExitStack | None = None


# Due to the lazy initialization of the project MCP servers
# We implement a global task group (asyncio) manager for
# the project MCP servers' streamable-http session managers.
# This ensures that each session manager's .run() context manager is
# entered and exited from the same coroutine, otherwise asyncio will raise a RuntimeError.
class ProjectTaskGroup:
"""Manage the dynamically created MCP project servers' streamable-http session managers.

Utilizes a asyncio.TaskGroup to manage
the lifecycle of the streamable-http session managers.
This ensures that each session manager's .run()
context manager is entered and exited from the same coroutine,
otherwise asyncio will raise a RuntimeError.

"""

def __init__(self):
self._task_group: asyncio.TaskGroup | None = None
self._started = False
self._start_stop_lock = asyncio.Lock()

async def start(self) -> None:
"""Create the project task group."""
async with self._start_stop_lock:
if self._started:
return
self._task_group = asyncio.TaskGroup()
await self._task_group.__aenter__()
self._started = True

async def stop(self) -> None:
"""Close the shared project task group and signal all servers to shut down."""
async with self._start_stop_lock:
if not self._started:
return
# TODO: Need a mechanism to prevent new servers from being created while/after stopping
try:
for server in list(project_mcp_servers.values()):
try:
server.request_shutdown()
except Exception as e: # noqa: BLE001
await logger.aerror(f"Error shutting down project MCP server {server.project_id}: {e}")
if self._task_group is not None:
await self._task_group.__aexit__(None, None, None)
finally:
self._task_group = None
self._started = False
project_mcp_servers.clear()
await logger.adebug("Project MCP session task group stopped")

def create_task(self, coro: Awaitable[Any]) -> asyncio.Task[Any]:
"""Create a task bound to the shared task group."""
if not self._started:
error_message = "Project MCP session task group not initialized. Start the session stack before use."
raise RuntimeError(error_message)
return self._task_group.create_task(coro)

def get_task_group(self) -> asyncio.TaskGroup:
"""Get the shared project task group."""
return self._task_group


_project_task_group = ProjectTaskGroup()


async def start_project_task_group() -> None:
"""Initialize the shared project task group."""
await _project_task_group.start()


def get_project_task_group_tg() -> asyncio.TaskGroup:
"""Get the shared project task group."""
return _project_task_group.get_task_group()
@asynccontextmanager
async def project_session_manager_lifespan():
"""Provide a shared AsyncExitStack so each project manager can rely on async with cleanup."""
global _project_session_manager_stack # noqa: PLW0603
async with AsyncExitStack() as stack:
_project_session_manager_stack = stack
try:
yield
finally:
_project_session_manager_stack = None


async def stop_project_task_group() -> None:
"""Close the shared project task group."""
await _project_task_group.stop()
def _get_project_session_manager_stack() -> AsyncExitStack:
if _project_session_manager_stack is None:
error_message = "Project MCP session stack not initialized. Start the project session manager lifespan first."
raise RuntimeError(error_message)
return _project_session_manager_stack


def get_project_mcp_server(project_id: UUID | None) -> ProjectMCPServer:
Expand All @@ -1360,16 +1288,6 @@ def get_project_mcp_server(project_id: UUID | None) -> ProjectMCPServer:
return project_mcp_servers[project_id_str]


@router.on_event("shutdown")
async def _shutdown_project_session_managers() -> None:
"""Ensure all per-project session managers are cleanly stopped."""
for server in project_mcp_servers.values():
try:
await server.stop_session_manager()
except Exception as exc: # noqa: BLE001
await logger.awarning(f"Error stopping Streamable HTTP manager for project {server.project_id}: {exc!s}")


async def register_project_with_composer(project: Folder):
"""Register a project with MCP Composer by starting a dedicated composer instance."""
try:
Expand Down
19 changes: 15 additions & 4 deletions src/backend/base/langflow/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint

from langflow.api import health_check_router, log_router, router
from langflow.api.v1.mcp_projects import init_mcp_servers
from langflow.api.v1.mcp_projects import init_mcp_servers, project_session_manager_lifespan
from langflow.initial_setup.setup import (
copy_profile_pictures,
create_or_update_starter_projects,
Expand Down Expand Up @@ -313,12 +313,23 @@ async def delayed_init_mcp_servers():
# Allows the server to start first to avoid race conditions with MCP Server startup
mcp_init_task = asyncio.create_task(delayed_init_mcp_servers())

# Start streamable-http transport session manager for MCP server
from contextlib import AsyncExitStack # noqa: I001
# Create AsyncExitStack for context managers that need
# to be kept alive for the duration of lf main's lifespan.
# Right now, this includes the streamable-http
# session manager lifecycle for the v1/mcp server,
# and the project MCP global exit stack that manages
# each project's Streamable HTTP session manager.
from contextlib import AsyncExitStack

from langflow.api.v1.mcp import init_streamable_http_manager
await logger.adebug("Starting MCP Server Streamable HTTP session manager")

async with AsyncExitStack() as stack:
# Start streamable-http session manager for MCP server
await logger.adebug("Starting MCP Server Streamable HTTP session manager")
await stack.enter_async_context(init_streamable_http_manager().run())
# Start streamable-http lifespan manager and global exit stack for project mcp servers.
await logger.adebug("Starting project MCP Streamable HTTP session manager lifespan")
await stack.enter_async_context(project_session_manager_lifespan())

yield

Expand Down
34 changes: 31 additions & 3 deletions src/backend/tests/unit/api/v1/test_mcp_projects.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import json
from contextlib import asynccontextmanager
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
from uuid import uuid4
Expand All @@ -12,6 +13,7 @@
get_project_mcp_server,
init_mcp_servers,
project_mcp_servers,
project_session_manager_lifespan,
)
from langflow.services.auth.utils import create_user_longterm_token, get_password_hash
from langflow.services.database.models.flow import Flow
Expand Down Expand Up @@ -599,17 +601,43 @@ async def test_project_sse_creation(user_test_project):
await asyncio.sleep(0)


async def test_project_session_manager_lifespan_handles_cleanup(user_test_project, monkeypatch):
"""Session manager contexts should be cleaned up automatically via shared lifespan stack."""
project_mcp_servers.clear()
lifecycle_events: list[str] = []

@asynccontextmanager
async def fake_run():
lifecycle_events.append("enter")
try:
yield
finally:
lifecycle_events.append("exit")

monkeypatch.setattr(
"langflow.api.v1.mcp_projects.StreamableHTTPSessionManager.run",
lambda self: fake_run(), # noqa: ARG005
)

async with project_session_manager_lifespan():
server = get_project_mcp_server(user_test_project.id)
await server.ensure_session_manager_running()
assert lifecycle_events == ["enter"]

assert lifecycle_events == ["enter", "exit"]


def _prepare_install_test_env(monkeypatch, tmp_path, filename="cursor.json"):
config_path = tmp_path / filename
config_path.parent.mkdir(parents=True, exist_ok=True)

monkeypatch.setattr("langflow.api.v1.mcp_projects.get_client_ip", lambda request: "127.0.0.1")
async def fake_get_config_path(client_name):
monkeypatch.setattr("langflow.api.v1.mcp_projects.get_client_ip", lambda request: "127.0.0.1") # noqa: ARG005
async def fake_get_config_path(client_name): # noqa: ARG001
return config_path

monkeypatch.setattr("langflow.api.v1.mcp_projects.get_config_path", fake_get_config_path)
monkeypatch.setattr("langflow.api.v1.mcp_projects.platform.system", lambda: "Linux")
monkeypatch.setattr("langflow.api.v1.mcp_projects.should_use_mcp_composer", lambda project: False)
monkeypatch.setattr("langflow.api.v1.mcp_projects.should_use_mcp_composer", lambda project: False) # noqa: ARG005

async def fake_streamable(project_id):
return f"https://langflow.local/api/v1/mcp/project/{project_id}/streamable"
Expand Down