diff --git a/src/backend/base/langflow/api/v1/endpoints.py b/src/backend/base/langflow/api/v1/endpoints.py index 60a9e09551da..386ed44a8179 100644 --- a/src/backend/base/langflow/api/v1/endpoints.py +++ b/src/backend/base/langflow/api/v1/endpoints.py @@ -46,7 +46,7 @@ from langflow.schema.graph import Tweaks from langflow.services.auth.utils import api_key_security, get_current_active_user, get_webhook_user from langflow.services.cache.utils import save_uploaded_file -from langflow.services.database.models.flow.model import Flow, FlowRead +from langflow.services.database.models.flow.model import Flow from langflow.services.database.models.flow.utils import get_all_webhook_components_in_flow from langflow.services.database.models.user.model import User, UserRead from langflow.services.deps import get_session_service, get_settings_service, get_telemetry_service @@ -303,7 +303,7 @@ async def run_flow_generator( async def simplified_run_flow( *, background_tasks: BackgroundTasks, - flow: Annotated[FlowRead | None, Depends(get_flow_by_id_or_endpoint_name)], + flow_id_or_name: str, input_request: SimplifiedAPIRequest | None = None, stream: bool = False, api_key_user: Annotated[UserRead, Depends(api_key_security)], @@ -317,7 +317,7 @@ async def simplified_run_flow( Args: background_tasks (BackgroundTasks): FastAPI background task manager - flow (FlowRead | None): The flow to execute, loaded via dependency + flow_id_or_name (str): The flow ID or endpoint name to execute input_request (SimplifiedAPIRequest | None): Input parameters for the flow stream (bool): Whether to stream the response api_key_user (UserRead): Authenticated user from API key @@ -351,6 +351,9 @@ async def simplified_run_flow( if input_request is None: input_request = await parse_input_request_from_body(http_request) + # SECURITY FIX: Retrieve flow with user ownership validation + flow = await get_flow_by_id_or_endpoint_name(flow_id_or_name, str(api_key_user.id)) + if flow is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found") @@ -443,7 +446,6 @@ async def on_disconnect() -> None: @router.post("/webhook/{flow_id_or_name}", response_model=dict, status_code=HTTPStatus.ACCEPTED) # noqa: RUF100, FAST003 async def webhook_run_flow( flow_id_or_name: str, - flow: Annotated[Flow, Depends(get_flow_by_id_or_endpoint_name)], request: Request, background_tasks: BackgroundTasks, ): @@ -451,7 +453,6 @@ async def webhook_run_flow( Args: flow_id_or_name (str): The flow ID or endpoint name. - flow (Flow): The flow to be executed. request (Request): The incoming HTTP request. background_tasks (BackgroundTasks): The background tasks manager. @@ -467,8 +468,14 @@ async def webhook_run_flow( error_msg = "" # Get the appropriate user for webhook execution based on auth settings + # This will also validate flow ownership when auth is enabled webhook_user = await get_webhook_user(flow_id_or_name, request) + # SECURITY FIX: Retrieve flow with user ownership validation + flow = await get_flow_by_id_or_endpoint_name(flow_id_or_name, str(webhook_user.id)) + if not flow: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found") + try: try: data = await request.body() diff --git a/src/backend/base/langflow/helpers/flow.py b/src/backend/base/langflow/helpers/flow.py index 46f4b3810f33..fddb9c554d54 100644 --- a/src/backend/base/langflow/helpers/flow.py +++ b/src/backend/base/langflow/helpers/flow.py @@ -282,7 +282,13 @@ async def get_flow_by_id_or_endpoint_name(flow_id_or_name: str, user_id: str | U endpoint_name = None try: flow_id = UUID(flow_id_or_name) - flow = await session.get(Flow, flow_id) + # SECURITY FIX: Check user_id for UUID-based lookups to prevent cross-account access + if user_id: + uuid_user_id = UUID(user_id) if isinstance(user_id, str) else user_id + stmt = select(Flow).where(Flow.id == flow_id, Flow.user_id == uuid_user_id) + flow = (await session.exec(stmt)).first() + else: + flow = await session.get(Flow, flow_id) except ValueError: endpoint_name = flow_id_or_name stmt = select(Flow).where(Flow.endpoint_name == endpoint_name) diff --git a/src/backend/tests/unit/test_api_key_cross_account_security.py b/src/backend/tests/unit/test_api_key_cross_account_security.py new file mode 100644 index 000000000000..6ce642e9dbcd --- /dev/null +++ b/src/backend/tests/unit/test_api_key_cross_account_security.py @@ -0,0 +1,176 @@ +"""Test for API Key Cross-Account Security Issue #10202. + +This test reproduces the security vulnerability where an API key from one account +can be used to execute flows from another account. +""" + +import pytest +from httpx import AsyncClient +from langflow.services.auth.utils import get_password_hash +from langflow.services.database.models.api_key import ApiKeyCreate +from langflow.services.database.models.user.model import User, UserRead +from langflow.services.deps import get_db_service +from sqlmodel import select + + +@pytest.fixture +async def second_user(client): # noqa: ARG001 + """Create a second user for cross-account testing.""" + db_manager = get_db_service() + async with db_manager.with_session() as session: + user = User( + username="seconduser", + password=get_password_hash("testpassword2"), + is_active=True, + is_superuser=False, + ) + stmt = select(User).where(User.username == user.username) + if existing_user := (await session.exec(stmt)).first(): + user = existing_user + else: + session.add(user) + await session.commit() + await session.refresh(user) + user = UserRead.model_validate(user, from_attributes=True) + yield user + # Clean up + try: + async with db_manager.with_session() as session: + user_to_delete = await session.get(User, user.id) + if user_to_delete: + await session.delete(user_to_delete) + await session.commit() + except Exception: # noqa: S110 + pass # Cleanup failures are not critical for tests + + +@pytest.fixture +async def second_user_logged_in_headers(client, second_user): + """Get authentication headers for second user.""" + login_data = {"username": second_user.username, "password": "testpassword2"} # pragma: allowlist secret + response = await client.post("api/v1/login", data=login_data) + assert response.status_code == 200 + tokens = response.json() + a_token = tokens["access_token"] + return {"Authorization": f"Bearer {a_token}"} + + +@pytest.fixture +async def first_user_api_key(client: AsyncClient, logged_in_headers, active_user): # noqa: ARG001 + """Create an API key for the first user.""" + api_key_data = ApiKeyCreate(name="first-user-api-key") + response = await client.post( + "api/v1/api_key/", + json=api_key_data.model_dump(mode="json"), + headers=logged_in_headers, + ) + assert response.status_code == 200, response.text + data = response.json() + return data["api_key"] # Return the unmasked API key # pragma: allowlist secret + + +@pytest.fixture +async def second_user_flow(client: AsyncClient, second_user_logged_in_headers, second_user): # noqa: ARG001 + """Create a flow owned by the second user.""" + # Create a simple flow + flow_data = { + "name": "Second User Flow", + "description": "A flow belonging to the second user", + "data": {"nodes": [], "edges": []}, + } + + response = await client.post("api/v1/flows/", json=flow_data, headers=second_user_logged_in_headers) + assert response.status_code == 201, response.text + return response.json() + + +@pytest.mark.api_key_required +async def test_cross_account_api_key_should_not_run_flow( + client: AsyncClient, + first_user_api_key: str, + second_user_flow: dict, + active_user, + second_user, +): + """Test that reproduces the security vulnerability. + + - User 1 creates an API key + - User 2 creates a flow + - User 1's API key should NOT be able to execute User 2's flow. + + EXPECTED BEHAVIOR: This should fail with a 403 or 404 error + CURRENT BEHAVIOR: This succeeds (security vulnerability) + """ + # Get the flow ID from second user + flow_id = second_user_flow["id"] + + # Try to run second user's flow with first user's API key + headers = {"x-api-key": first_user_api_key} + payload = { + "input_value": "test message", + "input_type": "chat", + "output_type": "chat", + "tweaks": {}, + "stream": False, + } + + response = await client.post(f"/api/v1/run/{flow_id}", json=payload, headers=headers) + + # This SHOULD fail with 403 (Forbidden) or 404 (Not Found) + # But currently it will succeed (status 200), which is the security issue + assert response.status_code in [403, 404], ( + f"Security Issue: User 1's API key was able to execute User 2's flow! " + f"Expected 403 or 404, got {response.status_code}. " + f"User 1 ID: {active_user.id}, User 2 ID: {second_user.id}, Flow ID: {flow_id}" + ) + + +@pytest.mark.api_key_required +async def test_same_account_api_key_should_run_own_flow( + client: AsyncClient, + first_user_api_key: str, + starter_project: dict, +): + """Test that a user's API key CAN execute their own flows (legitimate use case). + + This should continue to work after the security fix. + """ + # Get the flow ID from the first user's starter project + flow_id = starter_project["id"] + + # Try to run first user's flow with first user's API key + headers = {"x-api-key": first_user_api_key} + payload = { + "input_value": "test message", + "input_type": "chat", + "output_type": "chat", + "tweaks": {}, + "stream": False, + } + + response = await client.post(f"/api/v1/run/{flow_id}", json=payload, headers=headers) + + # This SHOULD succeed + assert response.status_code == 200, ( + f"Legitimate use case failed: User should be able to run their own flow with their API key. " + f"Got status {response.status_code}" + ) + + +@pytest.mark.api_key_required +async def test_cross_account_get_flow_should_not_work( + client: AsyncClient, + first_user_api_key: str, + second_user_flow: dict, +): + """Test that a user cannot retrieve another user's flow details using their API key.""" + flow_id = second_user_flow["id"] + headers = {"x-api-key": first_user_api_key} + + response = await client.get(f"/api/v1/flows/{flow_id}", headers=headers) + + # This should fail with 403 or 404 + assert response.status_code in [403, 404], ( + f"Security Issue: User 1's API key was able to retrieve User 2's flow details! " + f"Expected 403 or 404, got {response.status_code}" + ) diff --git a/src/lfx/src/lfx/cli/commands.py b/src/lfx/src/lfx/cli/commands.py index 1c7e93c3954c..00777e829311 100644 --- a/src/lfx/src/lfx/cli/commands.py +++ b/src/lfx/src/lfx/cli/commands.py @@ -85,11 +85,6 @@ def serve_command( cat my_flow.json | lfx serve --stdin echo '{"nodes": [...]}' | lfx serve --stdin """ - # Configure logging with the specified level and import logger - from lfx.log.logger import configure, logger - - configure(log_level=log_level) - verbose_print = create_verbose_printer(verbose=verbose) # Validate input sources - exactly one must be provided @@ -139,11 +134,11 @@ def serve_command( temp_file_to_cleanup = None if flow_json is not None: - logger.info("Processing inline JSON content...") + verbose_print("Processing inline JSON content...") try: # Validate JSON syntax json_data = json.loads(flow_json) - logger.info("JSON content is valid") + verbose_print("✓ JSON content is valid") # Create a temporary file with the JSON content with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as temp_file: @@ -151,7 +146,7 @@ def serve_command( temp_file_to_cleanup = temp_file.name script_path = temp_file_to_cleanup - logger.info(f"Created temporary file: {script_path}") + verbose_print(f"✓ Created temporary file: {script_path}") except json.JSONDecodeError as e: typer.echo(f"Error: Invalid JSON content: {e}", err=True) @@ -161,17 +156,17 @@ def serve_command( raise typer.Exit(1) from e elif stdin: - logger.info("Reading JSON content from stdin...") + verbose_print("Reading JSON content from stdin...") try: # Read all content from stdin stdin_content = sys.stdin.read().strip() if not stdin_content: - logger.error("No content received from stdin") + verbose_print("Error: No content received from stdin") raise typer.Exit(1) # Validate JSON syntax json_data = json.loads(stdin_content) - logger.info("JSON content from stdin is valid") + verbose_print("✓ JSON content from stdin is valid") # Create a temporary file with the JSON content with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as temp_file: @@ -179,7 +174,7 @@ def serve_command( temp_file_to_cleanup = temp_file.name script_path = temp_file_to_cleanup - logger.info(f"Created temporary file from stdin: {script_path}") + verbose_print(f"✓ Created temporary file from stdin: {script_path}") except json.JSONDecodeError as e: verbose_print(f"Error: Invalid JSON content from stdin: {e}") @@ -215,10 +210,10 @@ def serve_command( raise typer.Exit(1) # Prepare the graph - logger.info("Preparing graph for serving...") + verbose_print("Preparing graph for serving...") try: graph.prepare() - logger.info("Graph prepared successfully") + verbose_print("✓ Graph prepared successfully") # Validate global variables for environment compatibility if check_variables: @@ -226,12 +221,12 @@ def serve_command( validation_errors = validate_global_variables_for_env(graph) if validation_errors: - logger.error("Global variable validation failed:") + verbose_print("✗ Global variable validation failed:") for error in validation_errors: - logger.error(f" - {error}") + verbose_print(f" - {error}") raise typer.Exit(1) else: - logger.info("Global variable validation skipped") + verbose_print("✓ Global variable validation skipped") except Exception as e: verbose_print(f"✗ Failed to prepare graph: {e}") raise typer.Exit(1) from e