diff --git a/src/backend/base/langflow/alembic/versions/0882f9657f22_encrypt_existing_mcp_auth_settings_.py b/src/backend/base/langflow/alembic/versions/0882f9657f22_encrypt_existing_mcp_auth_settings_.py new file mode 100644 index 000000000000..1c0e0989686e --- /dev/null +++ b/src/backend/base/langflow/alembic/versions/0882f9657f22_encrypt_existing_mcp_auth_settings_.py @@ -0,0 +1,122 @@ +"""Encrypt existing MCP auth_settings credentials + +Revision ID: 0882f9657f22 +Revises: 1cb603706752 +Create Date: 2025-08-21 20:11:26.504681 + +""" +import json +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +import sqlmodel +from sqlalchemy.engine.reflection import Inspector +from langflow.utils import migration + + +# revision identifiers, used by Alembic. +revision: str = '0882f9657f22' +down_revision: Union[str, None] = '1cb603706752' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Encrypt sensitive fields in existing auth_settings data.""" + conn = op.get_bind() + + # Import encryption utilities + try: + from langflow.services.auth.mcp_encryption import encrypt_auth_settings + from langflow.services.deps import get_settings_service + + # Check if the folder table exists + inspector = sa.inspect(conn) + if 'folder' not in inspector.get_table_names(): + return + + # Query all folders with auth_settings + result = conn.execute( + sa.text("SELECT id, auth_settings FROM folder WHERE auth_settings IS NOT NULL") + ) + + # Encrypt auth_settings for each folder + for row in result: + folder_id = row.id + auth_settings = row.auth_settings + + if auth_settings: + try: + # Parse JSON if it's a string + if isinstance(auth_settings, str): + auth_settings_dict = json.loads(auth_settings) + else: + auth_settings_dict = auth_settings + + # Encrypt sensitive fields + encrypted_settings = encrypt_auth_settings(auth_settings_dict) + + # Update the record with encrypted data + if encrypted_settings: + conn.execute( + sa.text("UPDATE folder SET auth_settings = :auth_settings WHERE id = :id"), + {"auth_settings": json.dumps(encrypted_settings), "id": folder_id} + ) + except Exception as e: + # Log the error but continue with other records + print(f"Warning: Failed to encrypt auth_settings for folder {folder_id}: {e}") + + except ImportError as e: + # If encryption utilities are not available, skip the migration + print(f"Warning: Encryption utilities not available, skipping encryption migration: {e}") + + +def downgrade() -> None: + """Decrypt sensitive fields in auth_settings data (for rollback).""" + conn = op.get_bind() + + # Import decryption utilities + try: + from langflow.services.auth.mcp_encryption import decrypt_auth_settings + from langflow.services.deps import get_settings_service + + # Check if the folder table exists + inspector = sa.inspect(conn) + if 'folder' not in inspector.get_table_names(): + return + + # Query all folders with auth_settings + result = conn.execute( + sa.text("SELECT id, auth_settings FROM folder WHERE auth_settings IS NOT NULL") + ) + + # Decrypt auth_settings for each folder + for row in result: + folder_id = row.id + auth_settings = row.auth_settings + + if auth_settings: + try: + # Parse JSON if it's a string + if isinstance(auth_settings, str): + auth_settings_dict = json.loads(auth_settings) + else: + auth_settings_dict = auth_settings + + # Decrypt sensitive fields + decrypted_settings = decrypt_auth_settings(auth_settings_dict) + + # Update the record with decrypted data + if decrypted_settings: + conn.execute( + sa.text("UPDATE folder SET auth_settings = :auth_settings WHERE id = :id"), + {"auth_settings": json.dumps(decrypted_settings), "id": folder_id} + ) + except Exception as e: + # Log the error but continue with other records + print(f"Warning: Failed to decrypt auth_settings for folder {folder_id}: {e}") + + except ImportError as e: + # If decryption utilities are not available, skip the migration + print(f"Warning: Decryption utilities not available, skipping decryption migration: {e}") diff --git a/src/backend/base/langflow/api/v1/mcp_projects.py b/src/backend/base/langflow/api/v1/mcp_projects.py index 00ada2431a51..2a8150ccbeb0 100644 --- a/src/backend/base/langflow/api/v1/mcp_projects.py +++ b/src/backend/base/langflow/api/v1/mcp_projects.py @@ -8,7 +8,7 @@ from ipaddress import ip_address from pathlib import Path from subprocess import CalledProcessError -from typing import Annotated +from typing import Annotated, Any from uuid import UUID from anyio import BrokenResourceError @@ -39,6 +39,7 @@ from langflow.base.mcp.constants import MAX_MCP_SERVER_NAME_LENGTH from langflow.base.mcp.util import sanitize_mcp_name from langflow.logging import logger +from langflow.services.auth.mcp_encryption import decrypt_auth_settings, encrypt_auth_settings from langflow.services.database.models import Flow, Folder from langflow.services.database.models.api_key.crud import check_key, create_api_key from langflow.services.database.models.api_key.model import ApiKeyCreate @@ -205,7 +206,7 @@ async def list_project_tools( ) try: tool = MCPSettings( - id=str(flow.id), + id=flow.id, action_name=name, action_description=description, mcp_enabled=flow.mcp_enabled, @@ -219,10 +220,14 @@ async def list_project_tools( await logger.awarning(msg) continue - # Get project-level auth settings + # Get project-level auth settings and decrypt sensitive fields auth_settings = None if project.auth_settings: - auth_settings = AuthSettings(**project.auth_settings) + from langflow.api.v1.schemas import AuthSettings + + # Decrypt sensitive fields before returning + decrypted_settings = decrypt_auth_settings(project.auth_settings) + auth_settings = AuthSettings(**decrypted_settings) if decrypted_settings else None except Exception as e: msg = f"Error listing project tools: {e!s}" @@ -336,11 +341,33 @@ async def update_project_mcp_settings( if not project: raise HTTPException(status_code=404, detail="Project not found") - # Update project-level auth settings - if request.auth_settings: - project.auth_settings = request.auth_settings.model_dump(mode="json") - else: - project.auth_settings = None + # Update project-level auth settings with encryption + if "auth_settings" in request.model_fields_set: + if request.auth_settings is None: + # Explicitly set to None - clear auth settings + project.auth_settings = None + else: + # Use python mode to get raw values without SecretStr masking + auth_model = request.auth_settings + auth_dict = auth_model.model_dump(mode="python", exclude_none=True) + + # Extract actual secret values before encryption + from pydantic import SecretStr + + # Handle api_key if it's a SecretStr + api_key_val = getattr(auth_model, "api_key", None) + if isinstance(api_key_val, SecretStr): + auth_dict["api_key"] = api_key_val.get_secret_value() + + # Handle oauth_client_secret if it's a SecretStr + client_secret_val = getattr(auth_model, "oauth_client_secret", None) + if isinstance(client_secret_val, SecretStr): + auth_dict["oauth_client_secret"] = client_secret_val.get_secret_value() + + # Encrypt and store + encrypted_settings = encrypt_auth_settings(auth_dict) + project.auth_settings = encrypted_settings + session.add(project) # Query flows in the project @@ -458,7 +485,7 @@ async def install_mcp_config( should_generate_api_key = not settings_service.auth_settings.AUTO_LOGIN elif project.auth_settings: # When MCP_COMPOSER is enabled, only generate if auth_type is "apikey" - auth_settings = AuthSettings(**project.auth_settings) + auth_settings = AuthSettings(**project.auth_settings) if project.auth_settings else AuthSettings() should_generate_api_key = auth_settings.auth_type == "apikey" if should_generate_api_key: @@ -498,7 +525,7 @@ async def install_mcp_config( stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - stdout, stderr = await proc.communicate() + stdout, _ = await proc.communicate() if proc.returncode == 0 and stdout.strip(): wsl_ip = stdout.decode().strip().split()[0] # Get first IP address @@ -508,8 +535,8 @@ async def install_mcp_config( except OSError as e: await logger.awarning("Failed to get WSL IP address: %s. Using default URL.", str(e)) - # Build the base args for mcp-proxy - args = ["mcp-proxy"] + # Base args + args = ["mcp-composer"] if FEATURE_FLAGS.mcp_composer else ["mcp-proxy"] # Add authentication args based on MCP_COMPOSER feature flag and auth settings if not FEATURE_FLAGS.mcp_composer: @@ -518,6 +545,11 @@ async def install_mcp_config( if generated_api_key: args.extend(["--headers", "x-api-key", generated_api_key]) elif project.auth_settings: + # Decrypt sensitive fields before using them + decrypted_settings = decrypt_auth_settings(project.auth_settings) + auth_settings = AuthSettings(**decrypted_settings) if decrypted_settings else AuthSettings() + args.extend(["--auth_type", auth_settings.auth_type]) + # When MCP_COMPOSER is enabled, only add headers if auth_type is "apikey" auth_settings = AuthSettings(**project.auth_settings) if auth_settings.auth_type == "apikey" and generated_api_key: @@ -525,7 +557,10 @@ async def install_mcp_config( # If no auth_settings or auth_type is "none", don't add any auth headers # Add the SSE URL - args.append(sse_url) + if FEATURE_FLAGS.mcp_composer: + args.extend(["--sse-url", sse_url]) + else: + args.append(sse_url) if os_type == "Windows": command = "cmd" @@ -535,7 +570,7 @@ async def install_mcp_config( name = project.name # Create the MCP configuration - server_config = { + server_config: dict[str, Any] = { "command": command, "args": args, } diff --git a/src/backend/base/langflow/api/v1/schemas.py b/src/backend/base/langflow/api/v1/schemas.py index 6fcb8fd118dc..619f572febfd 100644 --- a/src/backend/base/langflow/api/v1/schemas.py +++ b/src/backend/base/langflow/api/v1/schemas.py @@ -8,6 +8,7 @@ BaseModel, ConfigDict, Field, + SecretStr, field_serializer, field_validator, model_serializer, @@ -449,7 +450,7 @@ class AuthSettings(BaseModel): oauth_server_url: str | None = None oauth_callback_path: str | None = None oauth_client_id: str | None = None - oauth_client_secret: str | None = None + oauth_client_secret: SecretStr | None = None oauth_auth_url: str | None = None oauth_token_url: str | None = None oauth_mcp_scope: str | None = None diff --git a/src/backend/base/langflow/services/auth/mcp_encryption.py b/src/backend/base/langflow/services/auth/mcp_encryption.py new file mode 100644 index 000000000000..ef84f99680c2 --- /dev/null +++ b/src/backend/base/langflow/services/auth/mcp_encryption.py @@ -0,0 +1,104 @@ +"""MCP Authentication encryption utilities for secure credential storage.""" + +from typing import Any + +from cryptography.fernet import InvalidToken +from loguru import logger + +from langflow.services.auth import utils as auth_utils +from langflow.services.deps import get_settings_service + +# Fields that should be encrypted when stored +SENSITIVE_FIELDS = [ + "oauth_client_secret", + "api_key", +] + + +def encrypt_auth_settings(auth_settings: dict[str, Any] | None) -> dict[str, Any] | None: + """Encrypt sensitive fields in auth_settings dictionary. + + Args: + auth_settings: Dictionary containing authentication settings + + Returns: + Dictionary with sensitive fields encrypted, or None if input is None + """ + if auth_settings is None: + return None + + settings_service = get_settings_service() + encrypted_settings = auth_settings.copy() + + for field in SENSITIVE_FIELDS: + if encrypted_settings.get(field): + try: + # Only encrypt if the value is not already encrypted + # Try to decrypt first - if it fails, it's not encrypted + try: + auth_utils.decrypt_api_key(encrypted_settings[field], settings_service) + # If decrypt succeeds, it's already encrypted + logger.debug(f"Field {field} is already encrypted") + except (ValueError, TypeError, KeyError, InvalidToken): + # If decrypt fails, the value is plaintext and needs encryption + encrypted_value = auth_utils.encrypt_api_key(encrypted_settings[field], settings_service) + encrypted_settings[field] = encrypted_value + logger.debug(f"Encrypted field {field}") + except (ValueError, TypeError, KeyError) as e: + logger.error(f"Failed to encrypt field {field}: {e}") + raise + + return encrypted_settings + + +def decrypt_auth_settings(auth_settings: dict[str, Any] | None) -> dict[str, Any] | None: + """Decrypt sensitive fields in auth_settings dictionary. + + Args: + auth_settings: Dictionary containing encrypted authentication settings + + Returns: + Dictionary with sensitive fields decrypted, or None if input is None + """ + if auth_settings is None: + return None + + settings_service = get_settings_service() + decrypted_settings = auth_settings.copy() + + for field in SENSITIVE_FIELDS: + if decrypted_settings.get(field): + try: + decrypted_value = auth_utils.decrypt_api_key(decrypted_settings[field], settings_service) + decrypted_settings[field] = decrypted_value + logger.debug(f"Decrypted field {field}") + except (ValueError, TypeError, KeyError, InvalidToken) as e: + # If decryption fails, assume the value is already plaintext + # This handles backward compatibility with existing unencrypted data + logger.debug(f"Field {field} appears to be plaintext or decryption failed: {e}") + # Keep the original value + + return decrypted_settings + + +def is_encrypted(value: str) -> bool: + """Check if a value appears to be encrypted. + + Args: + value: String value to check + + Returns: + True if the value appears to be encrypted (base64 Fernet token) + """ + if not value: + return False + + settings_service = get_settings_service() + try: + # Try to decrypt - if it succeeds, it's encrypted + auth_utils.decrypt_api_key(value, settings_service) + except (ValueError, TypeError, KeyError, InvalidToken): + # If decryption fails, it's not encrypted + return False + else: + return True diff --git a/src/backend/tests/unit/api/v1/test_mcp_projects.py b/src/backend/tests/unit/api/v1/test_mcp_projects.py index b30527c02680..3c69dab9a0b4 100644 --- a/src/backend/tests/unit/api/v1/test_mcp_projects.py +++ b/src/backend/tests/unit/api/v1/test_mcp_projects.py @@ -434,6 +434,81 @@ async def test_user_can_update_own_flow_mcp_settings( assert updated_flow.mcp_enabled is False +async def test_update_project_auth_settings_encryption( + client: AsyncClient, user_test_project, test_flow_for_update, logged_in_headers +): + """Test that sensitive auth_settings fields are encrypted when stored.""" + # Create settings with sensitive data + json_payload = { + "settings": [ + { + "id": str(test_flow_for_update.id), + "action_name": "test_action", + "action_description": "Test description", + "mcp_enabled": True, + "name": test_flow_for_update.name, + "description": test_flow_for_update.description, + } + ], + "auth_settings": { + "auth_type": "oauth", + "oauth_host": "localhost", + "oauth_port": "3000", + "oauth_server_url": "http://localhost:3000", + "oauth_callback_path": "/callback", + "oauth_client_id": "test-client-id", + "oauth_client_secret": "test-oauth-secret-value-456", + "oauth_auth_url": "https://oauth.example.com/auth", + "oauth_token_url": "https://oauth.example.com/token", + "oauth_mcp_scope": "read write", + "oauth_provider_scope": "user:email", + }, + } + + # Send the update request + response = await client.patch( + f"/api/v1/mcp/project/{user_test_project.id}", + json=json_payload, + headers=logged_in_headers, + ) + assert response.status_code == 200 + + # Verify the sensitive data is encrypted in the database + async with session_scope() as session: + updated_project = await session.get(Folder, user_test_project.id) + assert updated_project is not None + assert updated_project.auth_settings is not None + + # Check that sensitive field is encrypted (not plaintext) + stored_value = updated_project.auth_settings.get("oauth_client_secret") + assert stored_value is not None + assert stored_value != "test-oauth-secret-value-456" # Should be encrypted + + # The encrypted value should be a base64-like string (Fernet token) + assert len(stored_value) > 50 # Encrypted values are longer + + # Now test that the GET endpoint returns the data (SecretStr will be masked) + response = await client.get( + f"/api/v1/mcp/project/{user_test_project.id}", + headers=logged_in_headers, + ) + assert response.status_code == 200 + data = response.json() + + # SecretStr fields are masked in the response for security + assert data["auth_settings"]["oauth_client_secret"] == "**********" # noqa: S105 + assert data["auth_settings"]["oauth_client_id"] == "test-client-id" + assert data["auth_settings"]["auth_type"] == "oauth" + + # Verify that decryption is working by checking the actual decrypted value in the backend + from langflow.services.auth.mcp_encryption import decrypt_auth_settings + + async with session_scope() as session: + project = await session.get(Folder, user_test_project.id) + decrypted_settings = decrypt_auth_settings(project.auth_settings) + assert decrypted_settings["oauth_client_secret"] == "test-oauth-secret-value-456" # noqa: S105 + + async def test_project_sse_creation(user_test_project): """Test that SSE transport and MCP server are correctly created for a project.""" # Test getting an SSE transport for the first time diff --git a/src/backend/tests/unit/services/auth/__init__.py b/src/backend/tests/unit/services/auth/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/backend/tests/unit/services/auth/test_mcp_encryption.py b/src/backend/tests/unit/services/auth/test_mcp_encryption.py new file mode 100644 index 000000000000..2c609f7dde57 --- /dev/null +++ b/src/backend/tests/unit/services/auth/test_mcp_encryption.py @@ -0,0 +1,167 @@ +"""Test MCP authentication encryption functionality.""" + +from unittest.mock import Mock, patch + +import pytest +from cryptography.fernet import Fernet +from langflow.services.auth.mcp_encryption import ( + decrypt_auth_settings, + encrypt_auth_settings, + is_encrypted, +) +from pydantic import SecretStr + + +@pytest.fixture +def mock_settings_service(): + """Mock settings service for testing.""" + mock_service = Mock() + # Generate a valid Fernet key that's already properly formatted + # Fernet.generate_key() returns a URL-safe base64-encoded 32-byte key + valid_key = Fernet.generate_key() + # Decode it to string for storage + valid_key_str = valid_key.decode("utf-8") + + # Create a proper SecretStr object + secret_key_obj = SecretStr(valid_key_str) + mock_service.auth_settings.SECRET_KEY = secret_key_obj + return mock_service + + +@pytest.fixture +def sample_auth_settings(): + """Sample auth settings with sensitive data.""" + return { + "auth_type": "oauth", + "oauth_host": "localhost", + "oauth_port": "3000", + "oauth_server_url": "http://localhost:3000", + "oauth_callback_path": "/callback", + "oauth_client_id": "my-client-id", + "oauth_client_secret": "super-secret-password-123", + "oauth_auth_url": "https://oauth.example.com/auth", + "oauth_token_url": "https://oauth.example.com/token", + "oauth_mcp_scope": "read write", + "oauth_provider_scope": "user:email", + } + + +class TestMCPEncryption: + """Test MCP encryption functionality.""" + + @patch("langflow.services.auth.mcp_encryption.get_settings_service") + def test_encrypt_auth_settings(self, mock_get_settings, mock_settings_service, sample_auth_settings): + """Test that sensitive fields are encrypted.""" + mock_get_settings.return_value = mock_settings_service + + # Encrypt the settings + encrypted = encrypt_auth_settings(sample_auth_settings) + + # Check that sensitive fields are encrypted + assert encrypted is not None + assert encrypted["oauth_client_secret"] != sample_auth_settings["oauth_client_secret"] + + # Check that non-sensitive fields remain unchanged + assert encrypted["auth_type"] == sample_auth_settings["auth_type"] + assert encrypted["oauth_host"] == sample_auth_settings["oauth_host"] + assert encrypted["oauth_client_id"] == sample_auth_settings["oauth_client_id"] + + @patch("langflow.services.auth.mcp_encryption.get_settings_service") + def test_decrypt_auth_settings(self, mock_get_settings, mock_settings_service, sample_auth_settings): + """Test that encrypted fields can be decrypted.""" + mock_get_settings.return_value = mock_settings_service + + # First encrypt the settings + encrypted = encrypt_auth_settings(sample_auth_settings) + + # Then decrypt them + decrypted = decrypt_auth_settings(encrypted) + + # Verify all fields match the original + assert decrypted == sample_auth_settings + + @patch("langflow.services.auth.mcp_encryption.get_settings_service") + def test_encrypt_none_returns_none(self, mock_get_settings): # noqa: ARG002 + """Test that encrypting None returns None.""" + result = encrypt_auth_settings(None) + assert result is None + + @patch("langflow.services.auth.mcp_encryption.get_settings_service") + def test_decrypt_none_returns_none(self, mock_get_settings): # noqa: ARG002 + """Test that decrypting None returns None.""" + result = decrypt_auth_settings(None) + assert result is None + + @patch("langflow.services.auth.mcp_encryption.get_settings_service") + def test_encrypt_empty_dict(self, mock_get_settings): # noqa: ARG002 + """Test that encrypting empty dict returns empty dict.""" + result = encrypt_auth_settings({}) + assert result == {} + + @patch("langflow.services.auth.mcp_encryption.get_settings_service") + def test_idempotent_encryption(self, mock_get_settings, mock_settings_service, sample_auth_settings): + """Test that encrypting already encrypted data doesn't double-encrypt.""" + mock_get_settings.return_value = mock_settings_service + + # First encryption + encrypted_once = encrypt_auth_settings(sample_auth_settings) + + # Second encryption should detect already encrypted fields + encrypted_twice = encrypt_auth_settings(encrypted_once) + + # Should be the same + assert encrypted_once == encrypted_twice + + @patch("langflow.services.auth.mcp_encryption.get_settings_service") + def test_partial_auth_settings(self, mock_get_settings, mock_settings_service): + """Test encryption with only some sensitive fields present.""" + mock_get_settings.return_value = mock_settings_service + + partial_settings = { + "auth_type": "api", + "api_key": "sk-test-api-key-123", + "username": "admin", + } + + encrypted = encrypt_auth_settings(partial_settings) + + # API key should be encrypted + assert encrypted["api_key"] != partial_settings["api_key"] + + # Other fields unchanged + assert encrypted["auth_type"] == partial_settings["auth_type"] + assert encrypted["username"] == partial_settings["username"] + + @patch("langflow.services.auth.mcp_encryption.get_settings_service") + def test_backward_compatibility(self, mock_get_settings, mock_settings_service): + """Test that plaintext data is handled gracefully during decryption.""" + mock_get_settings.return_value = mock_settings_service + + # Simulate legacy plaintext data + plaintext_settings = { + "auth_type": "oauth", + "oauth_client_secret": "plaintext-secret", + "oauth_client_id": "client-123", + } + + # Decryption should handle plaintext gracefully + decrypted = decrypt_auth_settings(plaintext_settings) + + # Should return the same data + assert decrypted == plaintext_settings + + @patch("langflow.services.auth.mcp_encryption.get_settings_service") + def test_is_encrypted(self, mock_get_settings, mock_settings_service): + """Test the is_encrypted helper function.""" + mock_get_settings.return_value = mock_settings_service + + # Test with plaintext + assert not is_encrypted("plaintext-value") + assert not is_encrypted("") + assert not is_encrypted(None) + + # Test with encrypted value + from langflow.services.auth import utils as auth_utils + + encrypted_value = auth_utils.encrypt_api_key("secret-value", mock_settings_service) + assert is_encrypted(encrypted_value)