diff --git a/.env.example b/.env.example index 725a5775a4d9..d24f2faad5bd 100644 --- a/.env.example +++ b/.env.example @@ -104,7 +104,7 @@ LANGFLOW_STORE_ENVIRONMENT_VARIABLES= # Should enable the MCP composer feature in MCP projects # Values: true, false # Default: false -LANGFLOW_FEATURE_MCP_COMPOSER= +LANGFLOW_MCP_COMPOSER_ENABLED= # STORE_URL # Example: LANGFLOW_STORE_URL=https://api.langflow.store diff --git a/.gitignore b/.gitignore index 5896b9850389..5bc813acad58 100644 --- a/.gitignore +++ b/.gitignore @@ -277,4 +277,6 @@ src/frontend/temp .dspy_cache/ *.db -*.mcp.json \ No newline at end of file +*.mcp.json + +member_servers.json \ No newline at end of file diff --git a/src/backend/base/langflow/api/v1/auth_helpers.py b/src/backend/base/langflow/api/v1/auth_helpers.py new file mode 100644 index 000000000000..1c7450f31e68 --- /dev/null +++ b/src/backend/base/langflow/api/v1/auth_helpers.py @@ -0,0 +1,75 @@ +from typing import Any + +from pydantic import SecretStr + +from langflow.services.auth.mcp_encryption import decrypt_auth_settings, encrypt_auth_settings +from langflow.services.database.models.folder.model import Folder + + +def handle_auth_settings_update( + existing_project: Folder, + new_auth_settings: dict | Any | None, +) -> dict[str, bool]: + """Handle auth settings update including encryption/decryption and MCP Composer logic. + + Args: + existing_project: The project being updated (modified in-place) + new_auth_settings: New auth settings (could be dict, Pydantic model, or None) + + Returns: + Dict containing: + - should_start_composer: bool + - should_stop_composer: bool + """ + # Get current auth type before update + current_auth_type = None + decrypted_current = None + if existing_project.auth_settings: + current_auth_type = existing_project.auth_settings.get("auth_type") + # Only decrypt if we need access to sensitive fields (for preserving masked values) + if current_auth_type in ["oauth", "apikey"]: + decrypted_current = decrypt_auth_settings(existing_project.auth_settings) + + if new_auth_settings is None: + # Explicitly set to None - clear auth settings + existing_project.auth_settings = None + # If we were using OAuth, stop the composer + return {"should_start_composer": False, "should_stop_composer": current_auth_type == "oauth"} + + # Handle different input types (dict vs Pydantic model) + if isinstance(new_auth_settings, dict): + auth_dict = new_auth_settings.copy() + else: + # Pydantic model - use python mode to get raw values without SecretStr masking + auth_dict = new_auth_settings.model_dump(mode="python", exclude_none=True) + + # Handle SecretStr fields + secret_fields = ["api_key", "oauth_client_secret"] + for field in secret_fields: + field_val = getattr(new_auth_settings, field, None) + if isinstance(field_val, SecretStr): + auth_dict[field] = field_val.get_secret_value() + + new_auth_type = auth_dict.get("auth_type") + + # Handle masked secret fields from frontend + # If frontend sends back "*******" for a secret field, preserve the existing value + if decrypted_current: + secret_fields = ["oauth_client_secret", "api_key"] + for field in secret_fields: + if field in auth_dict and auth_dict[field] == "*******" and field in decrypted_current: + auth_dict[field] = decrypted_current[field] + + # Encrypt and store the auth settings + existing_project.auth_settings = encrypt_auth_settings(auth_dict) + + # Determine MCP Composer actions + should_start_composer = new_auth_type == "oauth" + should_stop_composer = current_auth_type == "oauth" and new_auth_type != "oauth" + should_handle_composer = current_auth_type == "oauth" or new_auth_type == "oauth" + + return { + "should_start_composer": should_start_composer, + "should_stop_composer": should_stop_composer, + "should_handle_composer": should_handle_composer, + } diff --git a/src/backend/base/langflow/api/v1/mcp_projects.py b/src/backend/base/langflow/api/v1/mcp_projects.py index af716733c96d..97ef6103aa66 100644 --- a/src/backend/base/langflow/api/v1/mcp_projects.py +++ b/src/backend/base/langflow/api/v1/mcp_projects.py @@ -8,19 +8,21 @@ from ipaddress import ip_address from pathlib import Path from subprocess import CalledProcessError -from typing import Annotated, Any +from typing import Annotated, Any, cast from uuid import UUID from anyio import BrokenResourceError -from fastapi import APIRouter, Depends, HTTPException, Request, Response +from fastapi import APIRouter, Depends, HTTPException, Request, Response, status from fastapi.responses import HTMLResponse from mcp import types from mcp.server import NotificationOptions, Server from mcp.server.sse import SseServerTransport from sqlalchemy.orm import selectinload from sqlmodel import select +from sqlmodel.ext.asyncio.session import AsyncSession from langflow.api.utils import CurrentActiveMCPUser +from langflow.api.v1.auth_helpers import handle_auth_settings_update from langflow.api.v1.mcp_utils import ( current_user_ctx, handle_call_tool, @@ -38,34 +40,48 @@ ) from langflow.base.mcp.constants import MAX_MCP_SERVER_NAME_LENGTH from langflow.base.mcp.util import sanitize_mcp_name -from langflow.logging import logger +from langflow.logging.logger import logger from langflow.services.auth.mcp_encryption import decrypt_auth_settings, encrypt_auth_settings +from langflow.services.auth.utils import AUTO_LOGIN_WARNING from langflow.services.database.models import Flow, Folder from langflow.services.database.models.api_key.crud import check_key, create_api_key -from langflow.services.database.models.api_key.model import ApiKeyCreate +from langflow.services.database.models.api_key.model import ApiKey, ApiKeyCreate +from langflow.services.database.models.user.crud import get_user_by_username from langflow.services.database.models.user.model import User -from langflow.services.deps import get_settings_service, session_scope -from langflow.services.settings.feature_flags import FEATURE_FLAGS +from langflow.services.deps import get_service, get_settings_service, session_scope +from langflow.services.mcp_composer.service import MCPComposerError, MCPComposerService +from langflow.services.schema import ServiceType router = APIRouter(prefix="/mcp/project", tags=["mcp_projects"]) async def verify_project_auth( + db: AsyncSession, project_id: UUID, - query_param: str | None = None, - header_param: str | None = None, + query_param: str, + header_param: str, ) -> User: - """Custom authentication for MCP project endpoints when API key is required. + """MCP-specific user authentication that allows fallback to username lookup when not using API key auth. - This is only used when MCP composer is enabled and project requires API key auth. + This function provides authentication for MCP endpoints when using MCP Composer and no API key is provided, + or checks if the API key is valid. """ - async with session_scope() as session: - # First, get the project to check its auth settings - project = (await session.exec(select(Folder).where(Folder.id == project_id))).first() + settings_service = get_settings_service() + result: ApiKey | User | None - if not project: - raise HTTPException(status_code=404, detail="Project not found") + project = (await db.exec(select(Folder).where(Folder.id == project_id))).first() + if not project: + raise HTTPException(status_code=404, detail="Project not found") + + auth_settings: AuthSettings | None = None + # Check if this project requires API key only authentication + if project.auth_settings: + auth_settings = AuthSettings(**project.auth_settings) + + if (not auth_settings and not settings_service.auth_settings.AUTO_LOGIN) or ( + auth_settings and auth_settings.auth_type == "apikey" + ): # For MCP composer enabled, only use API key api_key = query_param or header_param if not api_key: @@ -75,20 +91,36 @@ async def verify_project_auth( ) # Validate the API key - user = await check_key(session, api_key) + user = await check_key(db, api_key) if not user: raise HTTPException(status_code=401, detail="Invalid API key") # Verify user has access to the project project_access = ( - await session.exec(select(Folder).where(Folder.id == project_id, Folder.user_id == user.id)) + await db.exec(select(Folder).where(Folder.id == project_id, Folder.user_id == user.id)) ).first() if not project_access: - raise HTTPException(status_code=403, detail="Access denied to this project") + raise HTTPException(status_code=404, detail="Project not found") return user + # Get the first user + if not settings_service.auth_settings.SUPERUSER: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Missing first superuser credentials", + ) + # For MCP endpoints, always fall back to username lookup when no API key is provided + result = await get_user_by_username(db, settings_service.auth_settings.SUPERUSER) + if result: + await logger.awarning(AUTO_LOGIN_WARNING) + return result + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Invalid user", + ) + # Smart authentication dependency that chooses method based on project settings async def verify_project_auth_conditional( @@ -107,16 +139,6 @@ async def verify_project_auth_conditional( if not project: raise HTTPException(status_code=404, detail="Project not found") - # Check if this project requires API key only authentication - if FEATURE_FLAGS.mcp_composer and project.auth_settings: - auth_settings = AuthSettings(**project.auth_settings) - if auth_settings.auth_type == "apikey": - # For MCP composer projects with API key auth, use custom API key validation - api_key_header_value = request.headers.get("x-api-key") - api_key_query_value = request.query_params.get("x-api-key") - return await verify_project_auth(project_id, api_key_query_value, api_key_header_value) - - # For all other cases, use standard MCP authentication (allows JWT + API keys) # Extract token token: str | None = None auth_header = request.headers.get("authorization") @@ -127,6 +149,11 @@ async def verify_project_auth_conditional( api_key_query_value = request.query_params.get("x-api-key") api_key_header_value = request.headers.get("x-api-key") + # Check if this project requires API key only authentication + if get_settings_service().settings.mcp_composer_enabled: + return await verify_project_auth(session, project_id, api_key_query_value, api_key_header_value) + + # For all other cases, use standard MCP authentication (allows JWT + API keys) # Call the MCP auth function directly from langflow.services.auth.utils import get_current_user_mcp @@ -152,8 +179,11 @@ async def verify_project_auth_conditional( project_sse_transports = {} -def get_project_sse(project_id: UUID) -> SseServerTransport: +def get_project_sse(project_id: UUID | None) -> SseServerTransport: """Get or create an SSE transport for a specific project.""" + if not project_id: + raise HTTPException(status_code=400, detail="Project ID is required to start MCP server") + project_id_str = str(project_id) if project_id_str not in project_sse_transports: project_sse_transports[project_id_str] = SseServerTransport(f"/api/v1/mcp/project/{project_id_str}/") @@ -166,7 +196,7 @@ async def list_project_tools( current_user: CurrentActiveMCPUser, *, mcp_enabled: bool = True, -) -> MCPProjectResponse: +) -> MCPProjectResponse | None: """List all tools in a project that are enabled for MCP.""" tools: list[MCPSettings] = [] try: @@ -220,14 +250,19 @@ async def list_project_tools( await logger.awarning(msg) continue - # Get project-level auth settings and decrypt sensitive fields + # Get project-level auth settings but mask sensitive fields for security auth_settings = None if project.auth_settings: - from langflow.api.v1.schemas import AuthSettings - - # Decrypt sensitive fields before returning + # Decrypt to get the settings structure decrypted_settings = decrypt_auth_settings(project.auth_settings) - auth_settings = AuthSettings(**decrypted_settings) if decrypted_settings else None + if decrypted_settings: + # Mask sensitive fields before sending to frontend + masked_settings = decrypted_settings.copy() + if masked_settings.get("oauth_client_secret"): + masked_settings["oauth_client_secret"] = "*******" # noqa: S105 + if masked_settings.get("api_key"): + masked_settings["api_key"] = "*******" + auth_settings = AuthSettings(**masked_settings) except Exception as e: msg = f"Error listing project tools: {e!s}" @@ -326,7 +361,11 @@ async def update_project_mcp_settings( request: MCPProjectUpdateRequest, current_user: CurrentActiveMCPUser, ): - """Update the MCP settings of all flows in a project and project-level auth settings.""" + """Update the MCP settings of all flows in a project and project-level auth settings. + + On MCP Composer failure, this endpoint should return with a 200 status code and an error message in + the body of the response to display to the user. + """ try: async with session_scope() as session: # Fetch the project first to verify it exists and belongs to the current user @@ -341,32 +380,21 @@ async def update_project_mcp_settings( if not project: raise HTTPException(status_code=404, detail="Project not found") + # Track if MCP Composer needs to be started or stopped + should_handle_mcp_composer = False + should_start_composer = False + should_stop_composer = False + # Update project-level auth settings with encryption if "auth_settings" in request.model_fields_set: - if request.auth_settings is None: - # Explicitly set to None - clear auth settings - project.auth_settings = None - else: - # Use python mode to get raw values without SecretStr masking - auth_model = request.auth_settings - auth_dict = auth_model.model_dump(mode="python", exclude_none=True) - - # Extract actual secret values before encryption - from pydantic import SecretStr - - # Handle api_key if it's a SecretStr - api_key_val = getattr(auth_model, "api_key", None) - if isinstance(api_key_val, SecretStr): - auth_dict["api_key"] = api_key_val.get_secret_value() - - # Handle oauth_client_secret if it's a SecretStr - client_secret_val = getattr(auth_model, "oauth_client_secret", None) - if isinstance(client_secret_val, SecretStr): - auth_dict["oauth_client_secret"] = client_secret_val.get_secret_value() + auth_result = handle_auth_settings_update( + existing_project=project, + new_auth_settings=request.auth_settings, + ) - # Encrypt and store - encrypted_settings = encrypt_auth_settings(auth_dict) - project.auth_settings = encrypted_settings + should_handle_mcp_composer = auth_result["should_handle_composer"] + should_start_composer = auth_result["should_start_composer"] + should_stop_composer = auth_result["should_stop_composer"] session.add(project) @@ -390,7 +418,69 @@ async def update_project_mcp_settings( await session.commit() - return {"message": f"Updated MCP settings for {len(updated_flows)} flows and project auth settings"} + response: dict[str, Any] = { + "message": f"Updated MCP settings for {len(updated_flows)} flows and project auth settings" + } + + if should_handle_mcp_composer: + if should_start_composer: + await logger.adebug( + f"Auth settings changed to OAuth for project {project.name} ({project_id}), " + "starting MCP Composer" + ) + + if should_use_mcp_composer(project): + try: + auth_config = await _get_mcp_composer_auth_config(project) + await get_or_start_mcp_composer(auth_config, project.name, project_id) + composer_sse_url = await get_composer_sse_url(project) + response["result"] = { + "project_id": str(project_id), + "sse_url": composer_sse_url, + "uses_composer": True, + } + except MCPComposerError as e: + response["result"] = { + "project_id": str(project_id), + "uses_composer": True, + "error_message": e.message, + } + except Exception as e: + # Unexpected errors + await logger.aerror(f"Failed to get mcp composer URL for project {project_id}: {e}") + raise HTTPException(status_code=500, detail=str(e)) from e + else: + # This shouldn't happen - we determined we should start composer but now we can't use it + await logger.aerror( + f"PATCH: OAuth set but MCP Composer is disabled in settings for project {project_id}" + ) + response["result"] = { + "project_id": str(project_id), + "uses_composer": False, + "error_message": "OAuth authentication is set but MCP Composer is disabled in settings", + } + elif should_stop_composer: + await logger.adebug( + f"Auth settings changed from OAuth for project {project.name} ({project_id}), " + "stopping MCP Composer" + ) + mcp_composer_service: MCPComposerService = cast( + MCPComposerService, get_service(ServiceType.MCP_COMPOSER_SERVICE) + ) + await mcp_composer_service.stop_project_composer(str(project_id)) + + # Provide the direct SSE URL since we're no longer using composer + sse_url = await get_project_sse_url(project_id) + if not sse_url: + raise HTTPException(status_code=500, detail="Failed to get direct SSE URL") + + response["result"] = { + "project_id": str(project_id), + "sse_url": sse_url, + "uses_composer": False, + } + + return response except Exception as e: msg = f"Error updating project MCP settings: {e!s}" @@ -464,110 +554,98 @@ async def install_mcp_config( removed_servers: list[str] = [] # Track removed servers for reinstallation try: - # Verify project exists and user has access - async with session_scope() as session: - project = ( - await session.exec(select(Folder).where(Folder.id == project_id, Folder.user_id == current_user.id)) - ).first() + project = await verify_project_access(project_id, current_user) - if not project: - raise HTTPException(status_code=404, detail="Project not found") + # Check if project requires API key authentication and generate if needed + generated_api_key = None - # Check if project requires API key authentication and generate if needed - generated_api_key = None - - # Determine if we need to generate an API key based on feature flag - should_generate_api_key = False - if not FEATURE_FLAGS.mcp_composer: - # When MCP_COMPOSER is disabled, only generate API key if autologin is disabled - # (matches frontend !isAutoLogin check) - settings_service = get_settings_service() - should_generate_api_key = not settings_service.auth_settings.AUTO_LOGIN - elif project.auth_settings: - # When MCP_COMPOSER is enabled, only generate if auth_type is "apikey" - auth_settings = AuthSettings(**project.auth_settings) if project.auth_settings else AuthSettings() - should_generate_api_key = auth_settings.auth_type == "apikey" - - if should_generate_api_key: - # Generate API key with specific name format - api_key_name = f"MCP Project {project.name} - {body.client}" - api_key_create = ApiKeyCreate(name=api_key_name) - unmasked_api_key = await create_api_key(session, api_key_create, current_user.id) - generated_api_key = unmasked_api_key.api_key + # Determine if we need to generate an API key + should_generate_api_key = False + if not get_settings_service().settings.mcp_composer_enabled: + # When MCP_COMPOSER is disabled, check auth settings or fallback to auto_login setting + settings_service = get_settings_service() + if project.auth_settings: + # Project has auth settings - check if it requires API key + if project.auth_settings.get("auth_type") == "apikey": + should_generate_api_key = True + elif not settings_service.auth_settings.AUTO_LOGIN: + # No project auth settings but auto_login is disabled - generate API key + should_generate_api_key = True + elif project.auth_settings: + # When MCP_COMPOSER is enabled, only generate if auth_type is "apikey" + if project.auth_settings.get("auth_type") == "apikey": + should_generate_api_key = True # Get settings service to build the SSE URL settings_service = get_settings_service() - host = getattr(settings_service.settings, "host", "localhost") - port = getattr(settings_service.settings, "port", 3000) - base_url = f"http://{host}:{port}".rstrip("/") - sse_url = f"{base_url}/api/v1/mcp/project/{project_id}/sse" + settings = settings_service.settings + host = settings.host or None + port = settings.port or None + if not host or not port: + raise HTTPException(status_code=500, detail="Host and port are not set in settings") # Determine command and args based on operating system os_type = platform.system() - command = "uvx" - # Check if running on WSL (will appear as Linux but with Microsoft in release info) - is_wsl = os_type == "Linux" and "microsoft" in platform.uname().release.lower() + use_mcp_composer = should_use_mcp_composer(project) - if is_wsl: - await logger.adebug("WSL detected, using Windows-specific configuration") + if use_mcp_composer: + try: + auth_config = await _get_mcp_composer_auth_config(project) + await get_or_start_mcp_composer(auth_config, project.name, project_id) + sse_url = await get_composer_sse_url(project) + except MCPComposerError as e: + await logger.aerror( + f"Failed to start MCP Composer for project '{project.name}' ({project_id}): {e.message}" + ) + raise HTTPException(status_code=500, detail=e.message) from e + except Exception as e: + error_msg = f"Failed to start MCP Composer for project '{project.name}' ({project_id}): {e!s}" + await logger.aerror(error_msg) + error_detail = "Failed to start MCP Composer. See logs for details." + raise HTTPException(status_code=500, detail=error_detail) from e + + # For OAuth/MCP Composer, use the special format + command = "uvx" + args = [ + "mcp-composer", + "--mode", + "stdio", + "--sse-url", + sse_url, + "--disable-composer-tools", + "--client_auth_type", + "oauth", + ] + else: + # For non-OAuth (API key or no auth), use mcp-proxy + sse_url = await get_project_sse_url(project_id) + command = "uvx" + args = ["mcp-proxy"] + # Check if we need to add Langflow API key headers + # Necessary only when Project API Key Authentication is enabled - # If we're in WSL and the host is localhost, we might need to adjust the URL - # so Windows applications can reach the WSL service - if host in {"localhost", "127.0.0.1"}: - try: - # Try to get the WSL IP address for host.docker.internal or similar access + # Generate a Langflow API key for auto-install if needed + # Only add API key headers for projects with "apikey" auth type (not "none" or OAuth) - # This might vary depending on WSL version and configuration - proc = await create_subprocess_exec( - "/usr/bin/hostname", - "-I", - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - stdout, _ = await proc.communicate() + if should_generate_api_key: + async with session_scope() as api_key_session: + api_key_create = ApiKeyCreate(name=f"MCP Server {project.name}") + api_key_response = await create_api_key(api_key_session, api_key_create, current_user.id) + langflow_api_key = api_key_response.api_key + args.extend(["--headers", "x-api-key", langflow_api_key]) - if proc.returncode == 0 and stdout.strip(): - wsl_ip = stdout.decode().strip().split()[0] # Get first IP address - await logger.adebug("Using WSL IP for external access: %s", wsl_ip) - # Replace the localhost with the WSL IP in the URL - sse_url = sse_url.replace(f"http://{host}:{port}", f"http://{wsl_ip}:{port}") - except OSError as e: - await logger.awarning("Failed to get WSL IP address: %s. Using default URL.", str(e)) - - # Base args - args = ["mcp-composer"] if FEATURE_FLAGS.mcp_composer else ["mcp-proxy"] - - # Add authentication args based on MCP_COMPOSER feature flag and auth settings - if not FEATURE_FLAGS.mcp_composer: - # When MCP_COMPOSER is disabled, only use headers format if API key was generated - # (when autologin is disabled) - if generated_api_key: - args.extend(["--headers", "x-api-key", generated_api_key]) - elif project.auth_settings: - # Decrypt sensitive fields before using them - decrypted_settings = decrypt_auth_settings(project.auth_settings) - auth_settings = AuthSettings(**decrypted_settings) if decrypted_settings else AuthSettings() - args.extend(["--auth_type", auth_settings.auth_type]) - - # When MCP_COMPOSER is enabled, only add headers if auth_type is "apikey" - auth_settings = AuthSettings(**project.auth_settings) - if auth_settings.auth_type == "apikey" and generated_api_key: - args.extend(["--headers", "x-api-key", generated_api_key]) - # If no auth_settings or auth_type is "none", don't add any auth headers - - # Add the SSE URL - if FEATURE_FLAGS.mcp_composer: - args.extend(["--sse-url", sse_url]) - else: + # Add the SSE URL for mcp-proxy args.append(sse_url) - if os_type == "Windows": + if os_type == "Windows" and not use_mcp_composer: + # Only wrap in cmd for Windows when using mcp-proxy command = "cmd" args = ["/c", "uvx", *args] await logger.adebug("Windows detected, using cmd command") name = project.name + server_name = f"lf-{sanitize_mcp_name(name)[: (MAX_MCP_SERVER_NAME_LENGTH - 4)]}" # Create the MCP configuration server_config: dict[str, Any] = { @@ -575,11 +653,8 @@ async def install_mcp_config( "args": args, } - mcp_config = { - "mcpServers": {f"lf-{sanitize_mcp_name(name)[: (MAX_MCP_SERVER_NAME_LENGTH - 4)]}": server_config} - } + mcp_config = {"mcpServers": {server_name: server_config}} - server_name = f"lf-{sanitize_mcp_name(name)[: (MAX_MCP_SERVER_NAME_LENGTH - 4)]}" await logger.adebug("Installing MCP config for project: %s (server name: %s)", project.name, server_name) # Get the config file path and check if client is available @@ -613,12 +688,10 @@ async def install_mcp_config( if "mcpServers" not in existing_config: existing_config["mcpServers"] = {} - # Remove any existing servers with the same SSE URL (for reinstalling) - project_sse_url = await get_project_sse_url(project_id) - existing_config, removed_servers = remove_server_by_sse_url(existing_config, project_sse_url) + existing_config, removed_servers = remove_server_by_sse_url(existing_config, sse_url) if removed_servers: - logger.info("Removed existing MCP servers with same SSE URL for reinstall: %s", removed_servers) + await logger.adebug("Removed existing MCP servers with same SSE URL for reinstall: %s", removed_servers) # Merge new config with existing config existing_config["mcpServers"].update(mcp_config["mcpServers"]) @@ -629,7 +702,6 @@ async def install_mcp_config( except HTTPException: raise - except Exception as e: msg = f"Error installing MCP configuration: {e!s}" await logger.aexception(msg) @@ -640,12 +712,59 @@ async def install_mcp_config( if removed_servers: message += f" (replaced existing servers: {', '.join(removed_servers)})" if generated_api_key: - auth_type = "API key" if FEATURE_FLAGS.mcp_composer else "legacy API key" + auth_type = "API key" if get_settings_service().settings.mcp_composer_enabled else "legacy API key" message += f" with {auth_type} authentication (key name: 'MCP Project {project.name} - {body.client}')" - await logger.ainfo(message) + await logger.adebug(message) return {"message": message} +@router.get("/{project_id}/composer-url") +async def get_project_composer_url( + project_id: UUID, + current_user: CurrentActiveMCPUser, +): + """Get the MCP Composer URL for a specific project. + + On failure, this endpoint should return with a 200 status code and an error message in + the body of the response to display to the user. + """ + try: + project = await verify_project_access(project_id, current_user) + if not should_use_mcp_composer(project): + return { + "project_id": str(project_id), + "uses_composer": False, + "error_message": ( + "MCP Composer is only available for projects with MCP Composer enabled and OAuth authentication" + ), + } + + auth_config = await _get_mcp_composer_auth_config(project) + + try: + await get_or_start_mcp_composer(auth_config, project.name, project_id) + composer_sse_url = await get_composer_sse_url(project) + return {"project_id": str(project_id), "sse_url": composer_sse_url, "uses_composer": True} + except MCPComposerError as e: + return {"project_id": str(project_id), "uses_composer": True, "error_message": e.message} + except Exception as e: # noqa: BLE001 + await logger.aerror(f"Unexpected error getting composer URL: {e}") + return { + "project_id": str(project_id), + "uses_composer": True, + "error_message": "Failed to start MCP Composer. See logs for details.", + } + + except Exception as e: # noqa: BLE001 + msg = f"Error getting composer URL for project {project_id}: {e!s}" + await logger.aerror(msg) + return { + "project_id": str(project_id), + "uses_composer": True, + "error_message": "Failed to get MCP Composer URL. See logs for details.", + } + + @router.get("/{project_id}/installed") async def check_installed_mcp_servers( project_id: UUID, @@ -662,8 +781,11 @@ async def check_installed_mcp_servers( if not project: raise HTTPException(status_code=404, detail="Project not found") - # Generate the SSE URL for this project - project_sse_url = await get_project_sse_url(project_id) + project = await verify_project_access(project_id, current_user) + if should_use_mcp_composer(project): + project_sse_url = await get_composer_sse_url(project) + else: + project_sse_url = await get_project_sse_url(project_id) await logger.adebug( "Checking for installed MCP servers for project: %s (SSE URL: %s)", project.name, project_sse_url @@ -727,7 +849,7 @@ def config_contains_sse_url(config_data: dict, sse_url: str) -> bool: for server_name, server_config in mcp_servers.items(): args = server_config.get("args", []) # The SSE URL is typically the last argument in mcp-proxy configurations - if args and args[-1] == sse_url: + if args and sse_url in args: logger.debug("Found matching SSE URL in server: %s", server_name) return True return False @@ -737,12 +859,29 @@ async def get_project_sse_url(project_id: UUID) -> str: """Generate the SSE URL for a project, including WSL handling.""" # Get settings service to build the SSE URL settings_service = get_settings_service() - host = getattr(settings_service.settings, "host", "localhost") - port = getattr(settings_service.settings, "port", 3000) - base_url = f"http://{host}:{port}".rstrip("/") - project_sse_url = f"{base_url}/api/v1/mcp/project/{project_id}/sse" - # Handle WSL case - must match the logic in install function + host = settings_service.settings.host or "localhost" + port = settings_service.settings.port or 7860 + project_sse_url = f"http://{host}:{port}/api/v1/mcp/project/{project_id}/sse" + + return await get_url_by_os(host, port, project_sse_url) + + +async def get_composer_sse_url(project: Folder) -> str: + """Get the SSE URL for a project using MCP Composer.""" + auth_config = await _get_mcp_composer_auth_config(project) + composer_host = auth_config.get("oauth_host") + composer_port = auth_config.get("oauth_port") + if not composer_host or not composer_port: + error_msg = "OAuth host and port are required to get the SSE URL for MCP Composer" + raise ValueError(error_msg) + + composer_sse_url = f"http://{composer_host}:{composer_port}/sse" + return await get_url_by_os(composer_host, composer_port, composer_sse_url) + + +async def get_url_by_os(host: str, port: int, url: str) -> str: + """Get the URL by operating system.""" os_type = platform.system() is_wsl = os_type == "Linux" and "microsoft" in platform.uname().release.lower() @@ -758,13 +897,13 @@ async def get_project_sse_url(project_id: UUID) -> str: if proc.returncode == 0 and stdout.strip(): wsl_ip = stdout.decode().strip().split()[0] # Get first IP address - logger.debug("Using WSL IP for external access: %s", wsl_ip) + await logger.adebug("Using WSL IP for external access: %s", wsl_ip) # Replace the localhost with the WSL IP in the URL - project_sse_url = project_sse_url.replace(f"http://{host}:{port}", f"http://{wsl_ip}:{port}") + url = url.replace(f"http://{host}:{port}", f"http://{wsl_ip}:{port}") except OSError as e: - logger.warning("Failed to get WSL IP address: %s. Using default URL.", str(e)) + await logger.awarning("Failed to get WSL IP address: %s. Using default URL.", str(e)) - return project_sse_url + return url async def get_config_path(client: str) -> Path: @@ -818,7 +957,7 @@ async def get_config_path(client: str) -> Path: msg = "Could not find valid Windows user directory in WSL" raise ValueError(msg) except (OSError, CalledProcessError) as e: - logger.warning("Failed to determine Windows user path in WSL: %s", str(e)) + await logger.awarning("Failed to determine Windows user path in WSL: %s", str(e)) msg = f"Could not determine Windows Claude config path in WSL: {e!s}" raise ValueError(msg) from e # Regular Windows @@ -858,6 +997,31 @@ def remove_server_by_sse_url(config_data: dict, sse_url: str) -> tuple[dict, lis return config_data, removed_servers +async def _get_mcp_composer_auth_config(project) -> dict: + """Get MCP Composer authentication configuration from project settings. + + Args: + project: The project object containing auth_settings + + Returns: + dict: The decrypted authentication configuration + + Raises: + HTTPException: If MCP Composer is not enabled or auth config is missing + """ + auth_config = None + if project.auth_settings: + decrypted_settings = decrypt_auth_settings(project.auth_settings) + if decrypted_settings: + auth_config = decrypted_settings + + if not auth_config: + error_message = "Auth config is missing. Please check your settings and try again." + raise ValueError(error_message) + + return auth_config + + # Project-specific MCP server instance for handling project-specific tools class ProjectMCPServer: def __init__(self, project_id: UUID): @@ -902,29 +1066,165 @@ async def handle_call_project_tool(name: str, arguments: dict) -> list[types.Tex project_mcp_servers = {} -def get_project_mcp_server(project_id: UUID) -> ProjectMCPServer: +def get_project_mcp_server(project_id: UUID | None) -> ProjectMCPServer: """Get or create an MCP server for a specific project.""" + if project_id is None: + error_message = "Project ID cannot be None when getting project MCP server" + raise ValueError(error_message) + project_id_str = str(project_id) if project_id_str not in project_mcp_servers: project_mcp_servers[project_id_str] = ProjectMCPServer(project_id) return project_mcp_servers[project_id_str] +async def register_project_with_composer(project: Folder): + """Register a project with MCP Composer by starting a dedicated composer instance.""" + try: + mcp_composer_service: MCPComposerService = cast( + MCPComposerService, get_service(ServiceType.MCP_COMPOSER_SERVICE) + ) + + settings = get_settings_service().settings + if not settings.host or not settings.port: + error_msg = "Langflow host and port must be set in settings to register project with MCP Composer" + raise ValueError(error_msg) + + if not project.id: + error_msg = "Project must have an ID to register with MCP Composer" + raise ValueError(error_msg) + + sse_url = await get_project_sse_url(project.id) + auth_config = await _get_mcp_composer_auth_config(project) + + error_message = await mcp_composer_service.start_project_composer( + project_id=str(project.id), + sse_url=sse_url, + auth_config=auth_config, + ) + if error_message is not None: + raise RuntimeError(error_message) + + await logger.adebug(f"Registered project {project.name} ({project.id}) with MCP Composer") + + except Exception as e: # noqa: BLE001 + await logger.awarning(f"Failed to register project {project.id} with MCP Composer: {e}") + + async def init_mcp_servers(): """Initialize MCP servers for all projects.""" try: + settings_service = get_settings_service() + async with session_scope() as session: projects = (await session.exec(select(Folder))).all() for project in projects: try: + # Auto-enable API key auth for projects without auth settings or with "none" auth + # when AUTO_LOGIN is false + if not settings_service.auth_settings.AUTO_LOGIN: + should_update_to_apikey = False + + if not project.auth_settings: + # No auth settings at all + should_update_to_apikey = True + # Check if existing auth settings have auth_type "none" + elif project.auth_settings.get("auth_type") == "none": + should_update_to_apikey = True + + if should_update_to_apikey: + default_auth = {"auth_type": "apikey"} + project.auth_settings = encrypt_auth_settings(default_auth) + session.add(project) + await logger.ainfo( + f"Auto-enabled API key authentication for existing project {project.name} " + f"({project.id}) due to AUTO_LOGIN=false" + ) + + # WARN: If oauth projects exist in the database and the MCP Composer is disabled, + # these projects will be reset to "apikey" or "none" authentication, erasing all oauth settings. + if ( + not settings_service.settings.mcp_composer_enabled + and project.auth_settings + and project.auth_settings.get("auth_type") == "oauth" + ): + # Reset OAuth projects to appropriate auth type based on AUTO_LOGIN setting + fallback_auth_type = "apikey" if not settings_service.auth_settings.AUTO_LOGIN else "none" + clean_auth = AuthSettings(auth_type=fallback_auth_type) + project.auth_settings = clean_auth.model_dump(exclude_none=True) + session.add(project) + await logger.adebug( + f"Updated OAuth project {project.name} ({project.id}) to use {fallback_auth_type} " + f"authentication because MCP Composer is disabled" + ) + get_project_sse(project.id) get_project_mcp_server(project.id) + + # Only register with MCP Composer if OAuth authentication is configured + if get_settings_service().settings.mcp_composer_enabled and project.auth_settings: + auth_type = project.auth_settings.get("auth_type") + if auth_type == "oauth": + await logger.adebug( + f"Starting MCP Composer for OAuth project {project.name} ({project.id}) on startup" + ) + await register_project_with_composer(project) + except Exception as e: # noqa: BLE001 msg = f"Failed to initialize MCP server for project {project.id}: {e}" await logger.aexception(msg) # Continue to next project even if this one fails + # Commit any auth settings updates + await session.commit() + except Exception as e: # noqa: BLE001 msg = f"Failed to initialize MCP servers: {e}" await logger.aexception(msg) + + +async def verify_project_access(project_id: UUID, current_user: CurrentActiveMCPUser) -> Folder: + """Verify project exists and user has access.""" + async with session_scope() as session: + project = ( + await session.exec(select(Folder).where(Folder.id == project_id, Folder.user_id == current_user.id)) + ).first() + + if not project: + raise HTTPException(status_code=404, detail="Project not found") + + return project + + +def should_use_mcp_composer(project: Folder) -> bool: + """Check if project uses OAuth authentication and MCP Composer is enabled.""" + # If MCP Composer is disabled globally, never use it regardless of project settings + if not get_settings_service().settings.mcp_composer_enabled: + return False + + return project.auth_settings is not None and project.auth_settings.get("auth_type", "") == "oauth" + + +async def get_or_start_mcp_composer(auth_config: dict, project_name: str, project_id: UUID) -> None: + """Get MCP Composer or start it if not running. + + Raises: + MCPComposerError: If MCP Composer fails to start + """ + from langflow.services.mcp_composer.service import MCPComposerConfigError + + mcp_composer_service: MCPComposerService = cast(MCPComposerService, get_service(ServiceType.MCP_COMPOSER_SERVICE)) + + # Prepare current auth config for comparison + settings = get_settings_service().settings + if not settings.host or not settings.port: + error_msg = "Langflow host and port must be set in settings to register project with MCP Composer" + raise ValueError(error_msg) + + sse_url = await get_project_sse_url(project_id) + if not auth_config: + error_msg = f"Auth config is required to start MCP Composer for project {project_name}" + raise MCPComposerConfigError(error_msg, str(project_id)) + + await mcp_composer_service.start_project_composer(str(project_id), sse_url, auth_config) diff --git a/src/backend/base/langflow/api/v1/projects.py b/src/backend/base/langflow/api/v1/projects.py index 9cf5600ed240..159467d9c184 100644 --- a/src/backend/base/langflow/api/v1/projects.py +++ b/src/backend/base/langflow/api/v1/projects.py @@ -2,12 +2,12 @@ import json import zipfile from datetime import datetime, timezone -from typing import Annotated +from typing import Annotated, cast from urllib.parse import quote from uuid import UUID import orjson -from fastapi import APIRouter, Depends, File, HTTPException, Response, UploadFile, status +from fastapi import APIRouter, BackgroundTasks, Depends, File, HTTPException, Response, UploadFile, status from fastapi.encoders import jsonable_encoder from fastapi.responses import StreamingResponse from fastapi_pagination import Params @@ -17,11 +17,15 @@ from sqlmodel import select from langflow.api.utils import CurrentActiveUser, DbSession, cascade_delete_flow, custom_params, remove_api_keys +from langflow.api.v1.auth_helpers import handle_auth_settings_update from langflow.api.v1.flows import create_flows +from langflow.api.v1.mcp_projects import register_project_with_composer from langflow.api.v1.schemas import FlowListCreate from langflow.helpers.flow import generate_unique_flow_name from langflow.helpers.folders import generate_unique_folder_name from langflow.initial_setup.constants import STARTER_FOLDER_NAME +from langflow.logging import logger +from langflow.services.auth.mcp_encryption import encrypt_auth_settings from langflow.services.database.models.flow.model import Flow, FlowCreate, FlowRead from langflow.services.database.models.folder.constants import DEFAULT_FOLDER_NAME from langflow.services.database.models.folder.model import ( @@ -32,6 +36,9 @@ FolderUpdate, ) from langflow.services.database.models.folder.pagination_model import FolderWithPaginatedFlows +from langflow.services.deps import get_service, get_settings_service +from langflow.services.mcp_composer.service import MCPComposerService +from langflow.services.schema import ServiceType router = APIRouter(prefix="/projects", tags=["Projects"]) @@ -70,6 +77,17 @@ async def create_project( else: new_project.name = f"{new_project.name} (1)" + settings_service = get_settings_service() + + # If AUTO_LOGIN is false, automatically enable API key authentication + if not settings_service.auth_settings.AUTO_LOGIN and not new_project.auth_settings: + default_auth = {"auth_type": "apikey"} + new_project.auth_settings = encrypt_auth_settings(default_auth) + await logger.adebug( + f"Auto-enabled API key authentication for project {new_project.name} " + f"({new_project.id}) due to AUTO_LOGIN=false" + ) + session.add(new_project) await session.commit() await session.refresh(new_project) @@ -87,7 +105,6 @@ async def create_project( ) await session.exec(update_statement_flows) await session.commit() - except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e @@ -178,6 +195,7 @@ async def update_project( project_id: UUID, project: FolderUpdate, # Assuming FolderUpdate is a Pydantic model defining updatable fields current_user: CurrentActiveUser, + background_tasks: BackgroundTasks, ): try: existing_project = ( @@ -190,21 +208,55 @@ async def update_project( raise HTTPException(status_code=404, detail="Project not found") try: + # Track if MCP Composer needs to be started or stopped + should_start_mcp_composer = False + should_stop_mcp_composer = False + + # Check if auth_settings is being updated + if "auth_settings" in project.model_fields_set: # Check if auth_settings was explicitly provided + auth_result = handle_auth_settings_update( + existing_project=existing_project, + new_auth_settings=project.auth_settings, + ) + + should_start_mcp_composer = auth_result["should_start_composer"] + should_stop_mcp_composer = auth_result["should_stop_composer"] + + # Handle other updates if project.name and project.name != existing_project.name: existing_project.name = project.name - session.add(existing_project) - await session.commit() - await session.refresh(existing_project) - return existing_project - project_data = existing_project.model_dump(exclude_unset=True) - for key, value in project_data.items(): - if key not in {"components", "flows"}: - setattr(existing_project, key, value) + if project.description is not None: + existing_project.description = project.description + + if project.parent_id is not None: + existing_project.parent_id = project.parent_id + session.add(existing_project) await session.commit() await session.refresh(existing_project) + # Start MCP Composer if auth changed to OAuth + if should_start_mcp_composer: + await logger.adebug( + f"Auth settings changed to OAuth for project {existing_project.name} ({existing_project.id}), " + "starting MCP Composer" + ) + background_tasks.add_task(register_project_with_composer, existing_project) + + # Stop MCP Composer if auth changed FROM OAuth to something else + elif should_stop_mcp_composer: + await logger.ainfo( + f"Auth settings changed from OAuth for project {existing_project.name} ({existing_project.id}), " + "stopping MCP Composer" + ) + + # Get the MCP Composer service and stop the project's composer + mcp_composer_service: MCPComposerService = cast( + MCPComposerService, get_service(ServiceType.MCP_COMPOSER_SERVICE) + ) + await mcp_composer_service.stop_project_composer(str(existing_project.id)) + concat_project_components = project.components + project.flows flows_ids = (await session.exec(select(Flow.id).where(Flow.folder_id == existing_project.id))).all() @@ -256,6 +308,18 @@ async def delete_project( if not project: raise HTTPException(status_code=404, detail="Project not found") + # Check if project has OAuth authentication and stop MCP Composer if needed + if project.auth_settings and project.auth_settings.get("auth_type") == "oauth": + try: + mcp_composer_service: MCPComposerService = cast( + MCPComposerService, get_service(ServiceType.MCP_COMPOSER_SERVICE) + ) + await mcp_composer_service.stop_project_composer(str(project_id)) + await logger.adebug(f"Stopped MCP Composer for deleted OAuth project {project.name} ({project_id})") + except Exception as e: # noqa: BLE001 + # Log but don't fail the deletion if MCP Composer cleanup fails + await logger.aerror(f"Failed to stop MCP Composer for deleted project {project_id}: {e}") + try: await session.delete(project) await session.commit() @@ -338,10 +402,21 @@ async def upload_file( new_project = Folder.model_validate(project, from_attributes=True) new_project.id = None new_project.user_id = current_user.id + + settings_service = get_settings_service() + + # If AUTO_LOGIN is false, automatically enable API key authentication + if not settings_service.auth_settings.AUTO_LOGIN and not new_project.auth_settings: + default_auth = {"auth_type": "apikey"} + new_project.auth_settings = encrypt_auth_settings(default_auth) + await logger.adebug( + f"Auto-enabled API key authentication for uploaded project {new_project.name} " + f"({new_project.id}) due to AUTO_LOGIN=false" + ) + session.add(new_project) await session.commit() await session.refresh(new_project) - del data["folder_name"] del data["folder_description"] diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Hybrid Search RAG.json b/src/backend/base/langflow/initial_setup/starter_projects/Hybrid Search RAG.json index 629035aee30d..cdacc637e48c 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Hybrid Search RAG.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Hybrid Search RAG.json @@ -1109,7 +1109,7 @@ }, { "name": "langflow", - "version": null + "version": "1.5" } ], "total_dependencies": 4 diff --git a/src/backend/base/langflow/main.py b/src/backend/base/langflow/main.py index 0f36c330322e..b929d37802db 100644 --- a/src/backend/base/langflow/main.py +++ b/src/backend/base/langflow/main.py @@ -6,7 +6,7 @@ from contextlib import asynccontextmanager from http import HTTPStatus from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, cast from urllib.parse import urlencode import anyio @@ -35,12 +35,15 @@ from langflow.interface.utils import setup_llm_caching from langflow.logging.logger import configure, logger from langflow.middleware import ContentSizeLimitMiddleware -from langflow.services.deps import get_queue_service, get_settings_service, get_telemetry_service +from langflow.services.deps import get_queue_service, get_service, get_settings_service, get_telemetry_service +from langflow.services.schema import ServiceType from langflow.services.utils import initialize_services, teardown_services if TYPE_CHECKING: from tempfile import TemporaryDirectory + from langflow.services.mcp_composer.service import MCPComposerService + # Ignore Pydantic deprecation warnings from Langchain warnings.filterwarnings("ignore", category=PydanticDeprecatedSince20) @@ -188,6 +191,14 @@ async def lifespan(_app: FastAPI): telemetry_service.start() await logger.adebug(f"started telemetry service in {asyncio.get_event_loop().time() - current_time:.2f}s") + current_time = asyncio.get_event_loop().time() + await logger.adebug("Starting MCP Composer service") + mcp_composer_service = cast("MCPComposerService", get_service(ServiceType.MCP_COMPOSER_SERVICE)) + await mcp_composer_service.start() + await logger.adebug( + f"started MCP Composer service in {asyncio.get_event_loop().time() - current_time:.2f}s" + ) + current_time = asyncio.get_event_loop().time() await logger.adebug("Loading flows") await load_flows_from_directory() @@ -197,13 +208,33 @@ async def lifespan(_app: FastAPI): queue_service.start() await logger.adebug(f"Flows loaded in {asyncio.get_event_loop().time() - current_time:.2f}s") - current_time = asyncio.get_event_loop().time() - await logger.adebug("Loading mcp servers for projects") - await init_mcp_servers() - await logger.adebug(f"mcp servers loaded in {asyncio.get_event_loop().time() - current_time:.2f}s") - total_time = asyncio.get_event_loop().time() - start_time await logger.adebug(f"Total initialization time: {total_time:.2f}s") + + async def delayed_init_mcp_servers(): + await asyncio.sleep(3.0) + current_time = asyncio.get_event_loop().time() + await logger.adebug("Loading mcp servers for projects") + try: + await init_mcp_servers() + await logger.adebug(f"mcp servers loaded in {asyncio.get_event_loop().time() - current_time:.2f}s") + except Exception as e: # noqa: BLE001 + await logger.awarning(f"First MCP server initialization attempt failed: {e}") + await asyncio.sleep(3.0) + current_time = asyncio.get_event_loop().time() + await logger.adebug("Retrying mcp servers initialization") + try: + await init_mcp_servers() + await logger.adebug( + f"mcp servers loaded on retry in {asyncio.get_event_loop().time() - current_time:.2f}s" + ) + except Exception as e2: # noqa: BLE001 + await logger.aexception(f"Failed to initialize MCP servers after retry: {e2}") + + # Start the delayed initialization as a background task + # Allows the server to start first to avoid race conditions with MCP Server startup + _mcp_init_task = asyncio.create_task(delayed_init_mcp_servers()) # noqa: RUF006 + yield except asyncio.CancelledError: diff --git a/src/backend/base/langflow/services/auth/mcp_encryption.py b/src/backend/base/langflow/services/auth/mcp_encryption.py index ef84f99680c2..bc11afdc4ef1 100644 --- a/src/backend/base/langflow/services/auth/mcp_encryption.py +++ b/src/backend/base/langflow/services/auth/mcp_encryption.py @@ -3,8 +3,8 @@ from typing import Any from cryptography.fernet import InvalidToken -from loguru import logger +from langflow.logging.logger import logger from langflow.services.auth import utils as auth_utils from langflow.services.deps import get_settings_service @@ -33,17 +33,21 @@ def encrypt_auth_settings(auth_settings: dict[str, Any] | None) -> dict[str, Any for field in SENSITIVE_FIELDS: if encrypted_settings.get(field): try: + field_to_encrypt = encrypted_settings[field] # Only encrypt if the value is not already encrypted # Try to decrypt first - if it fails, it's not encrypted try: - auth_utils.decrypt_api_key(encrypted_settings[field], settings_service) + result = auth_utils.decrypt_api_key(field_to_encrypt, settings_service) + if not result: + msg = f"Failed to decrypt field {field}" + raise ValueError(msg) + # If decrypt succeeds, it's already encrypted logger.debug(f"Field {field} is already encrypted") except (ValueError, TypeError, KeyError, InvalidToken): # If decrypt fails, the value is plaintext and needs encryption - encrypted_value = auth_utils.encrypt_api_key(encrypted_settings[field], settings_service) + encrypted_value = auth_utils.encrypt_api_key(field_to_encrypt, settings_service) encrypted_settings[field] = encrypted_value - logger.debug(f"Encrypted field {field}") except (ValueError, TypeError, KeyError) as e: logger.error(f"Failed to encrypt field {field}: {e}") raise @@ -69,14 +73,26 @@ def decrypt_auth_settings(auth_settings: dict[str, Any] | None) -> dict[str, Any for field in SENSITIVE_FIELDS: if decrypted_settings.get(field): try: - decrypted_value = auth_utils.decrypt_api_key(decrypted_settings[field], settings_service) + field_to_decrypt = decrypted_settings[field] + + decrypted_value = auth_utils.decrypt_api_key(field_to_decrypt, settings_service) + if not decrypted_value: + msg = f"Failed to decrypt field {field}" + raise ValueError(msg) + decrypted_settings[field] = decrypted_value - logger.debug(f"Decrypted field {field}") except (ValueError, TypeError, KeyError, InvalidToken) as e: - # If decryption fails, assume the value is already plaintext - # This handles backward compatibility with existing unencrypted data - logger.debug(f"Field {field} appears to be plaintext or decryption failed: {e}") - # Keep the original value + # If decryption fails, check if the value appears encrypted + field_value = field_to_decrypt + if isinstance(field_value, str) and field_value.startswith("gAAAAAB"): + # Value appears to be encrypted but decryption failed + logger.error(f"Failed to decrypt encrypted field {field}: {e}") + # For OAuth flows, we need the decrypted value, so raise the error + msg = f"Unable to decrypt {field}. Check encryption key configuration." + raise ValueError(msg) from e + + # Value doesn't appear encrypted, assume it's plaintext (backward compatibility) + logger.debug(f"Field {field} appears to be plaintext, keeping original value") return decrypted_settings diff --git a/src/backend/base/langflow/services/mcp_composer/__init__.py b/src/backend/base/langflow/services/mcp_composer/__init__.py new file mode 100644 index 000000000000..068e310c005d --- /dev/null +++ b/src/backend/base/langflow/services/mcp_composer/__init__.py @@ -0,0 +1,6 @@ +"""MCP Composer service for Langflow.""" + +from langflow.services.mcp_composer.factory import MCPComposerServiceFactory +from langflow.services.mcp_composer.service import MCPComposerService + +__all__ = ["MCPComposerService", "MCPComposerServiceFactory"] diff --git a/src/backend/base/langflow/services/mcp_composer/factory.py b/src/backend/base/langflow/services/mcp_composer/factory.py new file mode 100644 index 000000000000..9352df102718 --- /dev/null +++ b/src/backend/base/langflow/services/mcp_composer/factory.py @@ -0,0 +1,15 @@ +"""Factory for creating MCP Composer service instances.""" + +from langflow.services.factory import ServiceFactory +from langflow.services.mcp_composer.service import MCPComposerService + + +class MCPComposerServiceFactory(ServiceFactory): + """Factory for creating MCP Composer service instances.""" + + def __init__(self): + super().__init__(MCPComposerService) + + def create(self): + """Create a new MCP Composer service instance.""" + return MCPComposerService() diff --git a/src/backend/base/langflow/services/mcp_composer/service.py b/src/backend/base/langflow/services/mcp_composer/service.py new file mode 100644 index 000000000000..85592728336d --- /dev/null +++ b/src/backend/base/langflow/services/mcp_composer/service.py @@ -0,0 +1,591 @@ +"""MCP Composer service for proxying and orchestrating MCP servers.""" + +import asyncio +import os +import re +import select +import socket +import subprocess +from collections.abc import Callable +from functools import wraps +from typing import Any + +from langflow.logging.logger import logger +from langflow.services.base import Service +from langflow.services.deps import get_settings_service + +GENERIC_STARTUP_ERROR_MSG = ( + "MCP Composer startup failed. Check OAuth configuration and check logs for more information." +) + + +class MCPComposerError(Exception): + """Base exception for MCP Composer errors.""" + + def __init__(self, message: str | None, project_id: str | None = None): + if not message: + message = GENERIC_STARTUP_ERROR_MSG + self.message = message + self.project_id = project_id + super().__init__(message) + + +class MCPComposerPortError(MCPComposerError): + """Port is already in use or unavailable.""" + + +class MCPComposerConfigError(MCPComposerError): + """Invalid configuration provided.""" + + +class MCPComposerDisabledError(MCPComposerError): + """MCP Composer is disabled in settings.""" + + +class MCPComposerStartupError(MCPComposerError): + """Failed to start MCP Composer process.""" + + +def require_composer_enabled(func: Callable) -> Callable: + """Decorator that checks if MCP Composer is enabled before executing the method.""" + + @wraps(func) + def wrapper(self, *args, **kwargs): + if not get_settings_service().settings.mcp_composer_enabled: + project_id = kwargs.get("project_id") + error_msg = "MCP Composer is disabled in settings" + raise MCPComposerDisabledError(error_msg, project_id) + + return func(self, *args, **kwargs) + + return wrapper + + +class MCPComposerService(Service): + """Service for managing per-project MCP Composer instances.""" + + name = "mcp_composer_service" + + def __init__(self): + super().__init__() + self.project_composers: dict[str, dict] = {} # project_id -> {process, host, port, sse_url, auth_config} + self._start_locks: dict[ + str, asyncio.Lock + ] = {} # Lock to prevent concurrent start operations for the same project + + def _is_port_available(self, port: int) -> bool: + """Check if a port is available (not in use).""" + try: + with socket.create_connection(("localhost", port), timeout=1.0): + return False # Port is in use + except (OSError, ConnectionRefusedError): + return True # Port is available + + async def start(self): + """Check if the MCP Composer service is enabled.""" + settings = get_settings_service().settings + if not settings.mcp_composer_enabled: + await logger.adebug( + "MCP Composer is disabled in settings. OAuth authentication will not be enabled for MCP Servers." + ) + else: + await logger.adebug( + "MCP Composer is enabled in settings. OAuth authentication will be enabled for MCP Servers." + ) + + async def stop(self): + """Stop all MCP Composer instances.""" + for project_id in list(self.project_composers.keys()): + await self.stop_project_composer(project_id) + await logger.adebug("All MCP Composer instances stopped") + + @require_composer_enabled + async def stop_project_composer(self, project_id: str): + """Stop the MCP Composer instance for a specific project.""" + if project_id not in self.project_composers: + return + + # Use the same lock to ensure consistency + if project_id in self._start_locks: + async with self._start_locks[project_id]: + await self._do_stop_project_composer(project_id) + # Clean up the lock as well + del self._start_locks[project_id] + else: + # Fallback if no lock exists + await self._do_stop_project_composer(project_id) + + async def _do_stop_project_composer(self, project_id: str): + """Internal method to stop a project composer.""" + if project_id not in self.project_composers: + return + + composer_info = self.project_composers[project_id] + process = composer_info.get("process") + + if process: + try: + # Check if process is still running before trying to terminate + if process.poll() is None: + await logger.adebug(f"Terminating MCP Composer process {process.pid} for project {project_id}") + process.terminate() + + # Wait longer for graceful shutdown + try: + await asyncio.wait_for(self._wait_for_process_exit(process), timeout=3.0) + await logger.adebug(f"MCP Composer for project {project_id} terminated gracefully") + except asyncio.TimeoutError: + await logger.aerror( + f"MCP Composer for project {project_id} did not terminate gracefully, force killing" + ) + process.kill() + # Wait a bit more for force kill to complete + try: + await asyncio.wait_for(self._wait_for_process_exit(process), timeout=2.0) + except asyncio.TimeoutError: + await logger.aerror( + f"Failed to kill MCP Composer process {process.pid} for project {project_id}" + ) + else: + await logger.adebug(f"MCP Composer process for project {project_id} was already terminated") + + await logger.adebug(f"MCP Composer stopped for project {project_id}") + + except ProcessLookupError: + # Process already terminated + await logger.adebug(f"MCP Composer process for project {project_id} was already terminated") + except Exception as e: # noqa: BLE001 + await logger.aerror(f"Error stopping MCP Composer for project {project_id}: {e}") + + # Remove from tracking + del self.project_composers[project_id] + + async def _wait_for_process_exit(self, process): + """Wait for a process to exit.""" + await asyncio.to_thread(process.wait) + + def _validate_oauth_settings(self, auth_config: dict[str, Any]) -> None: + """Validate that all required OAuth settings are present and non-empty. + + Raises: + MCPComposerConfigError: If any required OAuth field is missing or empty + """ + if auth_config.get("auth_type") != "oauth": + return + + required_fields = [ + "oauth_host", + "oauth_port", + "oauth_server_url", + "oauth_auth_url", + "oauth_token_url", + "oauth_client_id", + "oauth_client_secret", + ] + + missing_fields = [] + empty_fields = [] + + for field in required_fields: + value = auth_config.get(field) + if value is None: + missing_fields.append(field) + elif not str(value).strip(): + empty_fields.append(field) + + error_parts = [] + if missing_fields: + error_parts.append(f"Missing required fields: {', '.join(missing_fields)}") + if empty_fields: + error_parts.append(f"Empty required fields: {', '.join(empty_fields)}") + + if error_parts: + config_error_msg = f"Invalid OAuth configuration: {'; '.join(error_parts)}" + raise MCPComposerConfigError(config_error_msg) + + def _has_auth_config_changed(self, existing_auth: dict[str, Any] | None, new_auth: dict[str, Any] | None) -> bool: + """Check if auth configuration has changed in a way that requires restart.""" + if not existing_auth and not new_auth: + return False + + if not existing_auth or not new_auth: + return True + + auth_type = new_auth.get("auth_type", "") + + # Auth type changed? + if existing_auth.get("auth_type") != auth_type: + return True + + # Define which fields to check for each auth type + fields_to_check = [] + if auth_type == "oauth": + # Get all oauth_* fields plus host/port from both configs + all_keys = set(existing_auth.keys()) | set(new_auth.keys()) + fields_to_check = [k for k in all_keys if k.startswith("oauth_") or k in ["host", "port"]] + elif auth_type == "apikey": + fields_to_check = ["api_key"] + + # Compare relevant fields + for field in fields_to_check: + old_val = existing_auth.get(field) + new_val = new_auth.get(field) + + # Convert None and empty string to None for comparison + old_normalized = None if (old_val is None or old_val == "") else old_val + new_normalized = None if (new_val is None or new_val == "") else new_val + + if old_normalized != new_normalized: + return True + + return False + + def _obfuscate_command_secrets(self, cmd: list[str]) -> list[str]: + """Obfuscate secrets in command arguments for safe logging. + + Args: + cmd: List of command arguments + + Returns: + List of command arguments with secrets replaced with ***REDACTED*** + """ + safe_cmd = [] + skip_next = False + + for i, arg in enumerate(cmd): + if skip_next: + skip_next = False + safe_cmd.append("***REDACTED***") + continue + + if arg == "--env" and i + 2 < len(cmd): + # Check if next env var is a secret + env_key = cmd[i + 1] + if any(secret in env_key.lower() for secret in ["secret", "key", "token"]): + safe_cmd.extend([arg, env_key]) # Keep env key, redact value + skip_next = True + continue + + safe_cmd.append(arg) + + return safe_cmd + + def _extract_error_message( + self, stdout_content: str, stderr_content: str, oauth_server_url: str | None = None + ) -> str: + """Attempts to extract a user-friendly error message from subprocess output. + + Args: + stdout_content: Standard output from the subprocess + stderr_content: Standard error from the subprocess + oauth_server_url: OAuth server URL + + Returns: + User-friendly error message or a generic message if no specific pattern is found + """ + # Combine both outputs and clean them up + combined_output = (stderr_content + "\n" + stdout_content).strip() + if not oauth_server_url: + oauth_server_url = "OAuth server URL" + + # Common error patterns with user-friendly messages + error_patterns = [ + (r"address already in use", f"Address {oauth_server_url} is already in use."), + (r"permission denied", f"Permission denied starting MCP Composer on address {oauth_server_url}."), + ( + r"connection refused", + f"Connection refused on address {oauth_server_url}. The address may be blocked or unavailable.", + ), + ( + r"bind.*failed", + f"Failed to bind to address {oauth_server_url}. The address may be in use or unavailable.", + ), + (r"timeout", "MCP Composer startup timed out. Please try again."), + (r"invalid.*configuration", "Invalid MCP Composer configuration. Please check your settings."), + (r"oauth.*error", "OAuth configuration error. Please check your OAuth settings."), + (r"authentication.*failed", "Authentication failed. Please check your credentials."), + ] + + # Check for specific error patterns first + for pattern, friendly_msg in error_patterns: + if re.search(pattern, combined_output, re.IGNORECASE): + return friendly_msg + + return GENERIC_STARTUP_ERROR_MSG + + @require_composer_enabled + async def start_project_composer( + self, + project_id: str, + sse_url: str, + auth_config: dict[str, Any] | None, + max_startup_checks: int = 3, + startup_delay: float = 2.0, + ) -> None: + """Start an MCP Composer instance for a specific project. + + Raises: + MCPComposerError: Various specific errors if startup fails + """ + if not auth_config: + no_auth_error_msg = "No auth settings provided" + raise MCPComposerConfigError(no_auth_error_msg, project_id) + + # Validate OAuth settings early to provide clear error messages + self._validate_oauth_settings(auth_config) + + project_host = auth_config.get("oauth_host") if auth_config else "unknown" + project_port = auth_config.get("oauth_port") if auth_config else "unknown" + await logger.adebug(f"Starting MCP Composer for project {project_id} on {project_host}:{project_port}") + + # Use a per-project lock to prevent race conditions + if project_id not in self._start_locks: + self._start_locks[project_id] = asyncio.Lock() + + async with self._start_locks[project_id]: + # Check if already running (double-check after acquiring lock) + project_port = auth_config.get("oauth_port") + if not project_port: + no_port_error_msg = "No OAuth port provided" + raise MCPComposerConfigError(no_port_error_msg, project_id) + + project_host = auth_config.get("oauth_host") + if not project_host: + no_host_error_msg = "No OAuth host provided" + raise MCPComposerConfigError(no_host_error_msg, project_id) + + if project_id in self.project_composers: + composer_info = self.project_composers[project_id] + process = composer_info.get("process") + existing_auth = composer_info.get("auth_config", {}) + + # Check if process is still running + if process and process.poll() is None: + # Process is running - only restart if config changed + auth_changed = self._has_auth_config_changed(existing_auth, auth_config) + if auth_changed: + await logger.adebug(f"Config changed for project {project_id}, restarting MCP Composer") + await self._do_stop_project_composer(project_id) + else: + await logger.adebug( + f"MCP Composer already running for project {project_id} with current config" + ) + return # Already running with correct config + else: + # Process died or never started properly, restart it + await logger.adebug(f"MCP Composer process died for project {project_id}, restarting") + await self._do_stop_project_composer(project_id) + + is_port_available = self._is_port_available(project_port) + if not is_port_available: + await logger.awarning(f"Port {project_port} is already in use.") + port_error_msg = f"Port {project_port} is already in use" + raise MCPComposerPortError(port_error_msg) + + # Start the MCP Composer process (single attempt, no outer retry loop) + process = await self._start_project_composer_process( + project_id, project_host, project_port, sse_url, auth_config, max_startup_checks, startup_delay + ) + self.project_composers[project_id] = { + "process": process, + "host": project_host, + "port": project_port, + "sse_url": sse_url, + "auth_config": auth_config, + } + + await logger.adebug( + f"MCP Composer started for project {project_id} on port {project_port} (PID: {process.pid})" + ) + + async def _start_project_composer_process( + self, + project_id: str, + host: str, + port: int, + sse_url: str, + auth_config: dict[str, Any] | None = None, + max_startup_checks: int = 3, + startup_delay: float = 1.0, + ) -> subprocess.Popen: + """Start the MCP Composer subprocess for a specific project.""" + cmd = [ + "uvx", + "mcp-composer", + "--mode", + "sse", + "--sse-url", + sse_url, + "--disable-composer-tools", + ] + + # Set environment variables + env = os.environ.copy() + + oauth_server_url = auth_config.get("oauth_server_url") if auth_config else None + if auth_config: + auth_type = auth_config.get("auth_type") + + if auth_type == "oauth": + cmd.extend(["--auth_type", "oauth"]) + + # Add OAuth environment variables as command line arguments + cmd.extend(["--env", "ENABLE_OAUTH", "True"]) + + # Map auth config to environment variables for OAuth + oauth_env_mapping = { + "oauth_host": "OAUTH_HOST", + "oauth_port": "OAUTH_PORT", + "oauth_server_url": "OAUTH_SERVER_URL", + "oauth_callback_path": "OAUTH_CALLBACK_PATH", + "oauth_client_id": "OAUTH_CLIENT_ID", + "oauth_client_secret": "OAUTH_CLIENT_SECRET", + "oauth_auth_url": "OAUTH_AUTH_URL", + "oauth_token_url": "OAUTH_TOKEN_URL", + "oauth_mcp_scope": "OAUTH_MCP_SCOPE", + "oauth_provider_scope": "OAUTH_PROVIDER_SCOPE", + } + + # Add environment variables as command line arguments + # Only set non-empty values to avoid Pydantic validation errors + for config_key, env_key in oauth_env_mapping.items(): + value = auth_config.get(config_key) + if value is not None and str(value).strip(): + cmd.extend(["--env", env_key, str(value)]) + + # Start the subprocess with both stdout and stderr captured + process = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) # noqa: ASYNC220, S603 + + # Monitor the process startup with multiple checks + process_running = False + port_bound = False + + await logger.adebug(f"Monitoring MCP Composer startup for project {project_id} (PID: {process.pid})") + + for check in range(max_startup_checks): + await asyncio.sleep(startup_delay) + + # Check if process is still running + poll_result = process.poll() + + startup_error_msg = None + if poll_result is not None: + # Process terminated, get the error output + await logger.aerror(f"MCP Composer process {process.pid} terminated with exit code: {poll_result}") + try: + stdout_content, stderr_content = process.communicate(timeout=2) + # Log the full error details for debugging + await logger.aerror(f"MCP Composer startup failed for project {project_id}") + await logger.aerror(f"MCP Composer stdout:\n{stdout_content}") + await logger.aerror(f"MCP Composer stderr:\n{stderr_content}") + safe_cmd = self._obfuscate_command_secrets(cmd) + await logger.aerror(f"Command that failed: {' '.join(safe_cmd)}") + + # Extract meaningful error message + startup_error_msg = self._extract_error_message(stdout_content, stderr_content, oauth_server_url) + raise MCPComposerStartupError(startup_error_msg, project_id) + except subprocess.TimeoutExpired: + process.kill() + await logger.aerror( + f"MCP Composer process {process.pid} terminated unexpectedly for project {project_id}" + ) + startup_error_msg = self._extract_error_message("", "", oauth_server_url) + raise MCPComposerStartupError(startup_error_msg, project_id) from None + + # Process is still running, check if port is bound + port_bound = not self._is_port_available(port) + + if port_bound: + await logger.adebug( + f"MCP Composer for project {project_id} bound to port {port} " + f"(check {check + 1}/{max_startup_checks})" + ) + process_running = True + break + await logger.adebug( + f"MCP Composer for project {project_id} not yet bound to port {port} " + f"(check {check + 1}/{max_startup_checks})" + ) + + # Try to read any available stderr without blocking (only log if there's an error) + if process.stderr and select.select([process.stderr], [], [], 0)[0]: + try: + stderr_line = process.stderr.readline() + if stderr_line and "ERROR" in stderr_line: + await logger.aerror(f"MCP Composer error: {stderr_line.strip()}") + except Exception: # noqa: S110, BLE001 + pass + + # After all checks + if not process_running or not port_bound: + # Get comprehensive error information + poll_result = process.poll() + + if poll_result is not None: + # Process died + startup_error_msg = None + try: + stdout_content, stderr_content = process.communicate(timeout=2) + # Extract meaningful error message + startup_error_msg = self._extract_error_message(stdout_content, stderr_content, oauth_server_url) + await logger.aerror(f"MCP Composer startup failed for project {project_id}:") + await logger.aerror(f" - Process died with exit code: {poll_result}") + await logger.aerror(f" - Target: {host}:{port}") + # Obfuscate secrets in command before logging + safe_cmd = self._obfuscate_command_secrets(cmd) + await logger.aerror(f" - Command: {' '.join(safe_cmd)}") + if stderr_content.strip(): + await logger.aerror(f" - Error output: {stderr_content.strip()}") + if stdout_content.strip(): + await logger.aerror(f" - Standard output: {stdout_content.strip()}") + await logger.aerror(f" - Error message: {startup_error_msg}") + except subprocess.TimeoutExpired: + await logger.aerror(f"MCP Composer for project {project_id} died but couldn't read output") + process.kill() + + raise MCPComposerStartupError(startup_error_msg, project_id) + # Process running but port not bound + await logger.aerror(f"MCP Composer startup failed for project {project_id}:") + await logger.aerror(f" - Process is running (PID: {process.pid}) but failed to bind to port {port}") + await logger.aerror( + f" - Checked {max_startup_checks} times over {max_startup_checks * startup_delay} seconds" + ) + await logger.aerror(f" - Target: {host}:{port}") + + # Get any available output before terminating + startup_error_msg = None + try: + process.terminate() + stdout_content, stderr_content = process.communicate(timeout=2) + startup_error_msg = self._extract_error_message(stdout_content, stderr_content, oauth_server_url) + if stderr_content.strip(): + await logger.aerror(f" - Process stderr: {stderr_content.strip()}") + if stdout_content.strip(): + await logger.aerror(f" - Process stdout: {stdout_content.strip()}") + except Exception: # noqa: BLE001 + process.kill() + await logger.aerror(" - Could not retrieve process output before termination") + + raise MCPComposerStartupError(startup_error_msg, project_id) + + # Close the pipes if everything is successful + if process.stdout: + process.stdout.close() + if process.stderr: + process.stderr.close() + + return process + + @require_composer_enabled + def get_project_composer_port(self, project_id: str) -> int | None: + """Get the port number for a specific project's composer.""" + if project_id not in self.project_composers: + return None + return self.project_composers[project_id]["port"] + + @require_composer_enabled + async def teardown(self) -> None: + """Clean up resources when the service is torn down.""" + await logger.adebug("Tearing down MCP Composer service...") + await self.stop() + await logger.adebug("MCP Composer service teardown complete") diff --git a/src/backend/base/langflow/services/schema.py b/src/backend/base/langflow/services/schema.py index c8282d12238f..c0bf73687304 100644 --- a/src/backend/base/langflow/services/schema.py +++ b/src/backend/base/langflow/services/schema.py @@ -20,3 +20,4 @@ class ServiceType(str, Enum): TRACING_SERVICE = "tracing_service" TELEMETRY_SERVICE = "telemetry_service" JOB_QUEUE_SERVICE = "job_queue_service" + MCP_COMPOSER_SERVICE = "mcp_composer_service" diff --git a/src/backend/base/langflow/services/settings/base.py b/src/backend/base/langflow/services/settings/base.py index b10de52ddd34..b0e723b56d7d 100644 --- a/src/backend/base/langflow/services/settings/base.py +++ b/src/backend/base/langflow/services/settings/base.py @@ -265,6 +265,10 @@ class Settings(BaseSettings): mcp_server_enable_progress_notifications: bool = False """If set to False, Langflow will not send progress notifications in the MCP server.""" + # MCP Composer + mcp_composer_enabled: bool = True + """If set to False, Langflow will not start the MCP Composer service.""" + # Public Flow Settings public_flow_cleanup_interval: int = Field(default=3600, gt=600) """The interval in seconds at which public temporary flows will be cleaned up. diff --git a/src/backend/base/langflow/services/settings/feature_flags.py b/src/backend/base/langflow/services/settings/feature_flags.py index 12e7c0276c1c..a234710d41a0 100644 --- a/src/backend/base/langflow/services/settings/feature_flags.py +++ b/src/backend/base/langflow/services/settings/feature_flags.py @@ -3,7 +3,6 @@ class FeatureFlags(BaseSettings): mvp_components: bool = False - mcp_composer: bool = False class Config: env_prefix = "LANGFLOW_FEATURE_" diff --git a/src/backend/tests/unit/api/v1/test_mcp_projects.py b/src/backend/tests/unit/api/v1/test_mcp_projects.py index 3c69dab9a0b4..7ffcab0359b7 100644 --- a/src/backend/tests/unit/api/v1/test_mcp_projects.py +++ b/src/backend/tests/unit/api/v1/test_mcp_projects.py @@ -137,6 +137,45 @@ async def other_test_project(other_test_user): await session.commit() +@pytest.fixture(autouse=True) +def disable_mcp_composer_by_default(): + """Auto-fixture to disable MCP Composer for all tests by default.""" + with patch("langflow.api.v1.mcp_projects.get_settings_service") as mock_get_settings: + from langflow.services.deps import get_settings_service + + real_service = get_settings_service() + + # Create a mock that returns False for mcp_composer_enabled but delegates everything else + mock_service = MagicMock() + mock_service.settings = MagicMock() + mock_service.settings.mcp_composer_enabled = False + + # Copy any other settings that might be needed + mock_service.auth_settings = real_service.auth_settings + + mock_get_settings.return_value = mock_service + yield + + +@pytest.fixture +def enable_mcp_composer(): + """Fixture to explicitly enable MCP Composer for specific tests.""" + with patch("langflow.api.v1.mcp_projects.get_settings_service") as mock_get_settings: + from langflow.services.deps import get_settings_service + + real_service = get_settings_service() + + mock_service = MagicMock() + mock_service.settings = MagicMock() + mock_service.settings.mcp_composer_enabled = True + + # Copy any other settings that might be needed + mock_service.auth_settings = real_service.auth_settings + + mock_get_settings.return_value = mock_service + yield True + + async def test_handle_project_messages_success( client: AsyncClient, user_test_project, mock_sse_transport, logged_in_headers ): @@ -254,6 +293,7 @@ async def test_update_project_mcp_settings_other_user_project( ): """Test accessing a project belonging to another user.""" # We're using the GET endpoint since it works correctly and tests the same security constraints + # This test disables MCP Composer to test JWT token-based access control # Try to access the other user's project using active_user's credentials response = await client.get(f"api/v1/mcp/project/{other_test_project.id}/sse", headers=logged_in_headers) @@ -263,6 +303,21 @@ async def test_update_project_mcp_settings_other_user_project( assert response.json()["detail"] == "Project not found" +async def test_update_project_mcp_settings_other_user_project_with_composer( + client: AsyncClient, other_test_project, logged_in_headers, enable_mcp_composer +): + """Test accessing a project belonging to another user when MCP Composer is enabled.""" + # When MCP Composer is enabled, JWT tokens are not accepted for MCP endpoints + assert enable_mcp_composer # Fixture ensures MCP Composer is enabled + + # Try to access the other user's project using active_user's JWT credentials + response = await client.get(f"api/v1/mcp/project/{other_test_project.id}/sse", headers=logged_in_headers) + + # Verify the response - should get 401 because JWT tokens aren't accepted + assert response.status_code == 401 + assert "API key required" in response.json()["detail"] + + async def test_update_project_mcp_settings_empty_settings(client: AsyncClient, user_test_project, logged_in_headers): """Test updating MCP settings with empty settings list.""" # Use real database objects instead of mocks to avoid the coroutine issue diff --git a/src/frontend/src/controllers/API/queries/mcp/use-get-composer-url.ts b/src/frontend/src/controllers/API/queries/mcp/use-get-composer-url.ts new file mode 100644 index 000000000000..ca2f15d3c7f4 --- /dev/null +++ b/src/frontend/src/controllers/API/queries/mcp/use-get-composer-url.ts @@ -0,0 +1,41 @@ +import type { useQueryFunctionType } from "@/types/api"; +import { api } from "../../api"; +import { getURL } from "../../helpers/constants"; +import { UseRequestProcessor } from "../../services/request-processor"; + +interface ComposerUrlResponse { + project_id: string; + sse_url: string; + uses_composer: boolean; + error_message?: string; +} + +type UseGetProjectComposerUrlParams = { + projectId: string; +}; + +export const useGetProjectComposerUrl: useQueryFunctionType< + UseGetProjectComposerUrlParams, + ComposerUrlResponse +> = ({ projectId }, options) => { + const { query } = UseRequestProcessor(); + + const responseFn = async (): Promise => { + try { + const response = await api.get( + `${getURL("MCP")}/${projectId}/composer-url`, + ); + return response.data; + } catch (error) { + console.error(error); + throw error; + } + }; + + return query(["project-composer-url", projectId], responseFn, { + staleTime: 30000, // 30 seconds + retry: 1, + // Backend returns 200 responses with error_message field instead of HTTP errors + ...options, + }); +}; diff --git a/src/frontend/src/controllers/API/queries/mcp/use-patch-flows-mcp.ts b/src/frontend/src/controllers/API/queries/mcp/use-patch-flows-mcp.ts index ed9c2df756cf..def36e79681f 100644 --- a/src/frontend/src/controllers/API/queries/mcp/use-patch-flows-mcp.ts +++ b/src/frontend/src/controllers/API/queries/mcp/use-patch-flows-mcp.ts @@ -16,6 +16,12 @@ interface PatchFlowMCPRequest { interface PatchFlowMCPResponse { message: string; + result?: { + project_id: string; + sse_url?: string; + uses_composer: boolean; + error_message?: string; + }; } export const usePatchFlowsMCP: useMutationFunctionType< @@ -25,12 +31,14 @@ export const usePatchFlowsMCP: useMutationFunctionType< > = (params, options?) => { const { mutate, queryClient } = UseRequestProcessor(); - async function patchFlowMCP(requestData: PatchFlowMCPRequest): Promise { + async function patchFlowMCP( + requestData: PatchFlowMCPRequest, + ): Promise { const res = await api.patch( `${getURL("MCP")}/${params.project_id}`, requestData, ); - return res.data.message; + return res.data; } const mutation: UseMutationResult< @@ -38,8 +46,36 @@ export const usePatchFlowsMCP: useMutationFunctionType< any, PatchFlowMCPRequest > = mutate(["usePatchFlowsMCP"], patchFlowMCP, { + onSuccess: (data, variables, context) => { + const authSettings = (variables as unknown as PatchFlowMCPRequest) + .auth_settings; + // Update the auth settings cache immediately to prevent race conditions + const currentMCPData = queryClient.getQueryData([ + "useGetFlowsMCP", + params.project_id, + ]); + if (currentMCPData && authSettings !== undefined) { + queryClient.setQueryData(["useGetFlowsMCP", params.project_id], { + ...currentMCPData, + auth_settings: authSettings, + }); + } + + // Always invalidate the composer URL cache when auth settings change + // This ensures the query re-runs with the new auth state + queryClient.invalidateQueries({ + queryKey: ["project-composer-url", params.project_id], + }); + + // Call the original onSuccess if provided + if (options?.onSuccess) { + options.onSuccess(data, variables, context); + } + }, onSettled: () => { - queryClient.refetchQueries({ queryKey: ["useGetFlowsMCP"] }); + // Use invalidateQueries instead of refetchQueries to avoid race conditions + // This marks the queries as stale but doesn't immediately refetch them + queryClient.invalidateQueries({ queryKey: ["useGetFlowsMCP"] }); }, ...options, }); diff --git a/src/frontend/src/customization/feature-flags.ts b/src/frontend/src/customization/feature-flags.ts index b42d12b2a8dd..4bfb400aab46 100644 --- a/src/frontend/src/customization/feature-flags.ts +++ b/src/frontend/src/customization/feature-flags.ts @@ -18,5 +18,5 @@ export const ENABLE_MCP_NOTICE = false; export const ENABLE_KNOWLEDGE_BASES = false; export const ENABLE_MCP_COMPOSER = - process.env.LANGFLOW_FEATURE_MCP_COMPOSER === "true"; + process.env.LANGFLOW_MCP_COMPOSER_ENABLED === "true"; export const ENABLE_NEW_SIDEBAR = true; diff --git a/src/frontend/src/customization/utils/custom-mcp-url.ts b/src/frontend/src/customization/utils/custom-mcp-url.ts index e268cb041e73..ffbb6b135bcc 100644 --- a/src/frontend/src/customization/utils/custom-mcp-url.ts +++ b/src/frontend/src/customization/utils/custom-mcp-url.ts @@ -1,6 +1,16 @@ import { api } from "@/controllers/API/api"; -export const customGetMCPUrl = (projectId: string) => { +export const customGetMCPUrl = ( + projectId: string, + useComposer = false, + composerUrl?: string, +) => { + if (useComposer && composerUrl) { + // Use the per-project MCP Composer SSE URL + return composerUrl; + } + + // Fallback to direct Langflow SSE endpoint const apiHost = api.defaults.baseURL || window.location.origin; const apiUrl = `${apiHost}/api/v1/mcp/project/${projectId}/sse`; return apiUrl; diff --git a/src/frontend/src/pages/MainPage/pages/homePage/components/McpServerTab.tsx b/src/frontend/src/pages/MainPage/pages/homePage/components/McpServerTab.tsx index 3a808955cc36..242f8a5a2f57 100644 --- a/src/frontend/src/pages/MainPage/pages/homePage/components/McpServerTab.tsx +++ b/src/frontend/src/pages/MainPage/pages/homePage/components/McpServerTab.tsx @@ -1,3 +1,4 @@ +import { useQueryClient } from "@tanstack/react-query"; import { memo, type ReactNode, useCallback, useState } from "react"; import { useParams } from "react-router-dom"; import { Light as SyntaxHighlighter } from "react-syntax-highlighter"; @@ -12,6 +13,7 @@ import { useGetFlowsMCP, usePatchFlowsMCP, } from "@/controllers/API/queries/mcp"; +import { useGetProjectComposerUrl } from "@/controllers/API/queries/mcp/use-get-composer-url"; import { useGetInstalledMCP } from "@/controllers/API/queries/mcp/use-get-installed-mcp"; import { usePatchInstallMCP } from "@/controllers/API/queries/mcp/use-patch-install-mcp"; import { ENABLE_MCP_COMPOSER } from "@/customization/feature-flags"; @@ -157,13 +159,28 @@ const McpServerTab = ({ folderName }: { folderName: string }) => { const setSuccessData = useAlertStore((state) => state.setSuccessData); const setErrorData = useAlertStore((state) => state.setErrorData); - const { data: mcpProjectData } = useGetFlowsMCP({ projectId }); - const { mutate: patchFlowsMCP } = usePatchFlowsMCP({ project_id: projectId }); + const { data: mcpProjectData, isLoading: isLoadingMCPProjectData } = + useGetFlowsMCP({ projectId }); + const { mutate: patchFlowsMCP, isPending: isPatchingFlowsMCP } = + usePatchFlowsMCP({ project_id: projectId }); // Extract tools and auth_settings from the response const flowsMCP = mcpProjectData?.tools || []; const currentAuthSettings = mcpProjectData?.auth_settings; + // Only get composer URL for OAuth projects + // Disable the query during mutations to prevent stale auth state issues + const isOAuthProject = + currentAuthSettings?.auth_type === "oauth" && ENABLE_MCP_COMPOSER; + const shouldQueryComposerUrl = isOAuthProject && !isPatchingFlowsMCP; + + const { data: composerUrlData } = useGetProjectComposerUrl( + { + projectId, + }, + { enabled: !!projectId && shouldQueryComposerUrl }, + ); + const { mutate: patchInstallMCP } = usePatchInstallMCP({ project_id: projectId, }); @@ -252,9 +269,19 @@ const McpServerTab = ({ folderName }: { folderName: string }) => { }, }; - const apiUrl = customGetMCPUrl(projectId); + // Check if OAuth project has MCP Composer errors + const hasOAuthError = isOAuthProject && composerUrlData?.error_message; + + // Use the per-project MCP Composer SSE URL only if project uses composer, otherwise fallback to direct SSE + const apiUrl = customGetMCPUrl( + projectId, + isOAuthProject && + !!composerUrlData?.sse_url && + composerUrlData?.uses_composer, + composerUrlData?.sse_url, + ); - // Generate auth headers based on the authentication type + // Generate auth headers based on authentication type const getAuthHeaders = () => { // If MCP auth is disabled, use the previous API key behavior if (!ENABLE_MCP_COMPOSER) { @@ -303,8 +330,24 @@ const McpServerTab = ({ folderName }: { folderName: string }) => { ? `"uvx", ` : "" - }"mcp-proxy",${getAuthHeaders()} - "${apiUrl}" + }${ + isOAuthProject ? '"mcp-composer",' : '"mcp-proxy",' + }${getAuthHeaders()}${ + isOAuthProject + ? ` + "--mode", + "stdio", + "--sse-url",` + : "" + } + "${apiUrl}"${ + isOAuthProject + ? `, + "--disable-composer-tools", + "--client_auth_type", + "oauth"` + : "" + } ] } } @@ -346,6 +389,8 @@ const McpServerTab = ({ folderName }: { folderName: string }) => { const hasAuthentication = currentAuthSettings?.auth_type && currentAuthSettings.auth_type !== "none"; + const isLoadingMCPProjectAuth = isLoadingMCPProjectData || isPatchingFlowsMCP; + return (
@@ -375,7 +420,7 @@ const McpServerTab = ({ folderName }: { folderName: string }) => { content="Flows in this project can be exposed as callable MCP tools." side="right" > -
+
Flows/Tools {
{ENABLE_MCP_COMPOSER && (
- - Auth: + + Auth: {!hasAuthentication ? ( { None (public) ) : ( - - - {AUTH_METHODS[ - currentAuthSettings.auth_type as keyof typeof AUTH_METHODS - ]?.label || currentAuthSettings.auth_type} - + + + + {isLoadingMCPProjectAuth + ? "Loading..." + : AUTH_METHODS[ + currentAuthSettings.auth_type as keyof typeof AUTH_METHODS + ]?.label || currentAuthSettings.auth_type} + + )}
diff --git a/src/frontend/vite.config.mts b/src/frontend/vite.config.mts index 662a3270bec9..a3011adbc681 100644 --- a/src/frontend/vite.config.mts +++ b/src/frontend/vite.config.mts @@ -53,8 +53,8 @@ export default defineConfig(({ mode }) => { "process.env.LANGFLOW_AUTO_LOGIN": JSON.stringify( envLangflow.LANGFLOW_AUTO_LOGIN ?? true, ), - "process.env.LANGFLOW_FEATURE_MCP_COMPOSER": JSON.stringify( - envLangflow.LANGFLOW_FEATURE_MCP_COMPOSER ?? "false", + "process.env.LANGFLOW_MCP_COMPOSER_ENABLED": JSON.stringify( + envLangflow.LANGFLOW_MCP_COMPOSER_ENABLED ?? "false", ), }, plugins: [react(), svgr(), tsconfigPaths()], diff --git a/uv.lock b/uv.lock index f00cda8ba3f7..f72a29acb055 100644 --- a/uv.lock +++ b/uv.lock @@ -13094,4 +13094,4 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/02/90/2633473864f67a15526324b007a9f96c96f56d5f32ef2a56cc12f9548723/zstandard-0.23.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:fa6ce8b52c5987b3e34d5674b0ab529a4602b632ebab0a93b07bfb4dfc8f8a33", size = 5191299, upload-time = "2024-07-15T00:16:49.053Z" }, { url = "https://files.pythonhosted.org/packages/b0/4c/315ca5c32da7e2dc3455f3b2caee5c8c2246074a61aac6ec3378a97b7136/zstandard-0.23.0-cp313-cp313-win32.whl", hash = "sha256:a9b07268d0c3ca5c170a385a0ab9fb7fdd9f5fd866be004c4ea39e44edce47dd", size = 430862, upload-time = "2024-07-15T00:16:51.003Z" }, { url = "https://files.pythonhosted.org/packages/a2/bf/c6aaba098e2d04781e8f4f7c0ba3c7aa73d00e4c436bcc0cf059a66691d1/zstandard-0.23.0-cp313-cp313-win_amd64.whl", hash = "sha256:f3513916e8c645d0610815c257cbfd3242adfd5c4cfa78be514e5a3ebb42a41b", size = 495578, upload-time = "2024-07-15T00:16:53.135Z" }, -] +] \ No newline at end of file