Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/backend/base/langflow/api/utils/mcp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
"""MCP utilities for Langflow."""

from langflow.api.utils.mcp.config_utils import auto_configure_starter_projects_mcp, get_project_sse_url, get_url_by_os
from langflow.api.utils.mcp.config_utils import (
auto_configure_starter_projects_mcp,
get_composer_streamable_http_url,
get_project_sse_url,
get_project_streamable_http_url,
get_url_by_os,
)

__all__ = [
"auto_configure_starter_projects_mcp",
"get_composer_streamable_http_url",
"get_project_sse_url",
"get_project_streamable_http_url",
"get_url_by_os",
]
73 changes: 63 additions & 10 deletions src/backend/base/langflow/api/utils/mcp/config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from sqlmodel import select

from langflow.api.v2.mcp import get_server_list, update_server
from langflow.services.auth.mcp_encryption import encrypt_auth_settings
from langflow.services.auth.mcp_encryption import decrypt_auth_settings, encrypt_auth_settings
from langflow.services.database.models import Flow, Folder
from langflow.services.database.models.api_key.crud import create_api_key
from langflow.services.database.models.api_key.model import ApiKeyCreate
Expand Down Expand Up @@ -204,8 +204,8 @@ async def get_url_by_os(host: str, port: int, url: str) -> str:
return url


async def get_project_sse_url(project_id: UUID) -> str:
"""Generate the SSE URL for a project, including WSL handling."""
async def _get_project_base_url_components() -> tuple[str, int]:
"""Return normalized host and port for building MCP URLs."""
# Get settings service to build the SSE URL
settings_service = get_settings_service()
server_host = getattr(settings_service.settings, "host", "localhost")
Expand All @@ -219,14 +219,52 @@ async def get_project_sse_url(project_id: UUID) -> str:
# For MCP clients, always use localhost instead of 0.0.0.0
# 0.0.0.0 is a bind address, not a connect address
host = "localhost" if server_host == ALL_INTERFACES_HOST else server_host
port = server_port
return host, server_port


async def get_project_streamable_http_url(project_id: UUID) -> str:
"""Generate the Streamable HTTP endpoint for a project (no /sse suffix)."""
host, port = await _get_project_base_url_components()
base_url = f"http://{host}:{port}".rstrip("/")
project_sse_url = f"{base_url}/api/v1/mcp/project/{project_id}/sse"
project_url = f"{base_url}/api/v1/mcp/project/{project_id}/streamable"
return await get_url_by_os(host, port, project_url)


async def get_project_sse_url(project_id: UUID) -> str:
"""Generate the legacy SSE URL for a project, including WSL handling."""
host, port = await _get_project_base_url_components()
base_url = f"http://{host}:{port}".rstrip("/")
project_sse_url = f"{base_url}/api/v1/mcp/project/{project_id}/sse"
return await get_url_by_os(host, port, project_sse_url)


async def _get_mcp_composer_auth_config(project: Folder) -> dict:
"""Decrypt and return MCP Composer auth configuration for a project."""
auth_config = None
if project.auth_settings:
decrypted_settings = decrypt_auth_settings(project.auth_settings)
if decrypted_settings:
auth_config = decrypted_settings

if not auth_config:
error_message = "Auth config is missing. Please check your settings and try again."
raise ValueError(error_message)

return auth_config


async def get_composer_streamable_http_url(project: Folder) -> str:
"""Generate Streamable HTTP URL for the MCP Composer instance."""
auth_config = await _get_mcp_composer_auth_config(project)
composer_host = auth_config.get("oauth_host")
composer_port = auth_config.get("oauth_port")
if not composer_host or not composer_port:
error_msg = "OAuth host and port are required to get the MCP Composer URL"
raise ValueError(error_msg)
composer_url = f"http://{composer_host}:{composer_port}"
return await get_url_by_os(composer_host, int(composer_port), composer_url) # type: ignore[arg-type]


async def auto_configure_starter_projects_mcp(session):
"""Auto-configure MCP servers for starter projects for all users at startup."""
# Check if auto-configure is enabled
Expand Down Expand Up @@ -333,7 +371,18 @@ async def auto_configure_starter_projects_mcp(session):
default_auth = {"auth_type": "none"}
await logger.adebug(f"Settings service auth settings: {settings_service.auth_settings}")
await logger.adebug(f"User starter folder auth settings: {user_starter_folder.auth_settings}")
if not settings_service.auth_settings.AUTO_LOGIN and not user_starter_folder.auth_settings:
if (
not user_starter_folder.auth_settings
and settings_service.auth_settings.AUTO_LOGIN
and not settings_service.auth_settings.SUPERUSER
):
default_auth = {"auth_type": "apikey"}
user_starter_folder.auth_settings = encrypt_auth_settings(default_auth)
await logger.adebug(
"AUTO_LOGIN enabled without SUPERUSER; forcing API key auth for starter folder %s",
user.username,
)
elif not settings_service.auth_settings.AUTO_LOGIN and not user_starter_folder.auth_settings:
default_auth = {"auth_type": "apikey"}
user_starter_folder.auth_settings = encrypt_auth_settings(default_auth)
await logger.adebug(f"Set up auth settings for user {user.username}'s starter folder")
Expand All @@ -344,18 +393,20 @@ async def auto_configure_starter_projects_mcp(session):
api_key_name = f"MCP Project {DEFAULT_FOLDER_NAME} - {user.username}"
unmasked_api_key = await create_api_key(session, ApiKeyCreate(name=api_key_name), user.id)

# Build SSE URL for THIS USER'S starter folder (unique ID per user)
sse_url = await get_project_sse_url(user_starter_folder.id)
# Build connection URLs for THIS USER'S starter folder (unique ID per user)
streamable_http_url = await get_project_streamable_http_url(user_starter_folder.id)

# Prepare server config (similar to new project creation)
if default_auth.get("auth_type", "none") == "apikey":
command = "uvx"
args = [
"mcp-proxy",
"--transport",
"streamablehttp",
"--headers",
"x-api-key",
unmasked_api_key.api_key,
sse_url,
streamable_http_url,
]
elif default_auth.get("auth_type", "none") == "oauth":
msg = "OAuth authentication is not yet implemented for MCP server creation during project creation."
Expand All @@ -366,7 +417,9 @@ async def auto_configure_starter_projects_mcp(session):
command = "uvx"
args = [
"mcp-proxy",
sse_url,
"--transport",
"streamablehttp",
streamable_http_url,
]
server_config = {"command": command, "args": args}

Expand Down
48 changes: 39 additions & 9 deletions src/backend/base/langflow/api/v1/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from mcp import types
from mcp.server import NotificationOptions, Server
from mcp.server.sse import SseServerTransport
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager

from langflow.api.utils import CurrentActiveMCPUser
from langflow.api.v1.mcp_utils import (
Expand All @@ -18,21 +19,12 @@
handle_mcp_errors,
handle_read_resource,
)
from langflow.services.deps import get_settings_service

router = APIRouter(prefix="/mcp", tags=["mcp"])

server = Server("langflow-mcp-server")


# Define constants
MAX_RETRIES = 2


def get_enable_progress_notifications() -> bool:
return get_settings_service().settings.mcp_server_enable_progress_notifications


@server.list_prompts()
async def handle_list_prompts():
return []
Expand Down Expand Up @@ -64,6 +56,8 @@ async def handle_global_call_tool(name: str, arguments: dict) -> list[types.Text


sse = SseServerTransport("/api/v1/mcp/")
# TODO: create environment variable for stateless flag
streamable_http_manager = StreamableHTTPSessionManager(server, stateless=True)


def find_validation_error(exc):
Expand Down Expand Up @@ -135,3 +129,39 @@ async def handle_messages(request: Request):
except Exception as e:
await logger.aerror(f"Internal server error: {e}")
raise HTTPException(status_code=500, detail=f"Internal server error: {e}") from e


async def _dispatch_streamable_http(
request: Request,
current_user: CurrentActiveMCPUser,
) -> Response:
"""Common handler for Streamable HTTP requests with user context propagation."""
await logger.adebug(
"Handling %s %s via Streamable HTTP for user %s",
request.method,
request.url.path,
current_user.id,
)

context_token = current_user_ctx.set(current_user)
try:
await streamable_http_manager.handle_request(request.scope, request.receive, request._send) # noqa: SLF001
except HTTPException:
raise
except Exception as exc:
await logger.aexception(f"Error handling Streamable HTTP request: {exc!s}")
raise HTTPException(status_code=500, detail="Internal server error in Streamable HTTP transport") from exc
finally:
current_user_ctx.reset(context_token)

return Response()


streamable_http_methods = ["GET", "POST", "DELETE"]


@router.api_route("/streamable", methods=streamable_http_methods)
@router.api_route("/streamable/", methods=streamable_http_methods)
async def handle_streamable_http(request: Request, current_user: CurrentActiveMCPUser):
"""Streamable HTTP endpoint for MCP clients that support the new transport."""
return await _dispatch_streamable_http(request, current_user)
Loading
Loading