diff --git a/src/backend/base/langflow/api/router.py b/src/backend/base/langflow/api/router.py index 55fb02cf2774..a5b48a4c7329 100644 --- a/src/backend/base/langflow/api/router.py +++ b/src/backend/base/langflow/api/router.py @@ -28,6 +28,7 @@ users_router, validate_router, variables_router, + spec_flow_builder_router, ) from langflow.api.v1.voice_mode import router as voice_mode_router from langflow.api.v2 import files_router as files_router_v2 @@ -49,6 +50,7 @@ router_v1.include_router(chat_router) router_v1.include_router(endpoints_router) router_v1.include_router(files_router) +router_v1.include_router(spec_flow_builder_router) router_v1.include_router(flexstore_router) router_v1.include_router(flows_router) router_v1.include_router(folders_router) diff --git a/src/backend/base/langflow/api/v1/__init__.py b/src/backend/base/langflow/api/v1/__init__.py index 46cd441a4ade..3628f6e0a2cb 100644 --- a/src/backend/base/langflow/api/v1/__init__.py +++ b/src/backend/base/langflow/api/v1/__init__.py @@ -25,6 +25,7 @@ from langflow.api.v1.validate import router as validate_router from langflow.api.v1.variable import router as variables_router from langflow.api.v1.voice_mode import router as voice_mode_router +from langflow.spec_flow_builder.api import router as spec_flow_builder_router __all__ = [ "agent_builder_router", @@ -54,4 +55,5 @@ "validate_router", "variables_router", "voice_mode_router", + "spec_flow_builder_router", ] diff --git a/src/backend/base/langflow/api/v1/spec_flow_builder.py b/src/backend/base/langflow/api/v1/spec_flow_builder.py new file mode 100644 index 000000000000..f18aa3b11408 --- /dev/null +++ b/src/backend/base/langflow/api/v1/spec_flow_builder.py @@ -0,0 +1,341 @@ +"""Spec Flow Builder API - Endpoints for YAML spec validation and flow creation.""" + +import logging +from typing import Any, Dict, List, Optional +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException, status +from sqlmodel import select + +from langflow.api.utils import CurrentActiveUser, DbSession +from langflow.services.database.models.flow import Flow, FlowCreate, FlowRead +from langflow.services.database.models.folder.constants import DEFAULT_FOLDER_NAME +from langflow.services.database.models.folder.model import Folder +from langflow.services.spec_flow_builder import ( + AnalyzeComponentsRequest, + ComponentResolver, + ComponentStatus, + CreateFlowRequest, + EdgeBuilder, + FlowPreview, + PreviewFlowRequest, + SpecFlowConverter, + SpecFlowValidator, + ValidateSpecRequest, + ValidationReport, +) +from langflow.services.spec_flow_builder.utils import sanitize_flow_name + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/spec-builder", tags=["Spec Flow Builder"]) + + +# ============================================================================ +# Helper Functions +# ============================================================================ + + +async def _get_or_create_folder(session: DbSession, user_id: UUID, folder_name: str = DEFAULT_FOLDER_NAME) -> Folder: + """Get or create a folder for the user.""" + # Check if folder exists + statement = select(Folder).where(Folder.name == folder_name, Folder.user_id == user_id) + result = await session.exec(statement) + folder = result.first() + + if not folder: + # Create folder + folder = Folder(name=folder_name, user_id=user_id) + session.add(folder) + await session.commit() + await session.refresh(folder) + + return folder + + +async def _create_flow_from_json( + session: DbSession, user_id: UUID, flow_json: Dict[str, Any], flow_name: Optional[str], folder_id: Optional[UUID] +) -> Flow: + """Create a flow in the database from flow JSON.""" + # Get or create folder + if folder_id: + folder = await session.get(Folder, folder_id) + if not folder: + raise ValueError(f"Folder with ID {folder_id} not found") + else: + folder = await _get_or_create_folder(session, user_id) + + # Extract name from flow_json or use provided name + name = flow_name or flow_json.get("name", "Untitled Flow") + name = sanitize_flow_name(name) + + # Create flow + flow_create = FlowCreate( + name=name, + description=flow_json.get("description", ""), + data=flow_json.get("data", {}), + folder_id=folder.id, + user_id=user_id, + ) + + flow = Flow.model_validate(flow_create, from_attributes=True) + session.add(flow) + await session.commit() + await session.refresh(flow) + + return flow + + +# ============================================================================ +# API Endpoints +# ============================================================================ + + +@router.post("/validate", response_model=ValidationReport, status_code=status.HTTP_200_OK) +async def validate_spec(request: ValidateSpecRequest, session: DbSession) -> ValidationReport: + """ + Validate YAML specification. + + Performs comprehensive validation: + - Component existence check + - Config field validation + - Provides relationship validation + + Returns detailed validation report with errors and warnings. + """ + try: + # Initialize resolver and validator + resolver = ComponentResolver() + validator = SpecFlowValidator(resolver) + + # Perform validation + report = await validator.validate(request.yaml_content) + + return report + + except Exception as e: + logger.error(f"Validation error: {e}", exc_info=True) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Validation failed: {str(e)}") + + +@router.post("/analyze-components", response_model=Dict[str, List[ComponentStatus]], status_code=status.HTTP_200_OK) +async def analyze_components(request: AnalyzeComponentsRequest, session: DbSession) -> Dict[str, List[ComponentStatus]]: + """ + Analyze components in the specification. + + Returns detailed information about each component: + - Whether it exists in the catalog + - What it maps to + - Available fields + - Issues found + """ + try: + # Initialize resolver and validator + resolver = ComponentResolver() + validator = SpecFlowValidator(resolver) + + # Perform validation + report = await validator.validate(request.yaml_content) + + # Return component statuses grouped by validity + valid_components = [comp for comp in report.components if comp.exists] + invalid_components = [comp for comp in report.components if not comp.exists] + + return {"valid": valid_components, "invalid": invalid_components} + + except Exception as e: + logger.error(f"Component analysis error: {e}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Component analysis failed: {str(e)}" + ) + + +@router.post("/preview", response_model=FlowPreview, status_code=status.HTTP_200_OK) +async def preview_flow(request: PreviewFlowRequest, session: DbSession) -> FlowPreview: + """ + Preview flow without creating it (dry-run). + + Converts YAML to flow JSON and returns: + - Complete flow structure + - Node and edge counts + - Validation summary + """ + try: + # Initialize components + resolver = ComponentResolver() + validator = SpecFlowValidator(resolver) + edge_builder = EdgeBuilder(resolver) + converter = SpecFlowConverter(resolver, edge_builder) + + # Validate first + validation_report = await validator.validate(request.yaml_content) + + # Convert to flow JSON + flow_json = await converter.convert(request.yaml_content) + + # Build preview + nodes = flow_json.get("data", {}).get("nodes", []) + edges = flow_json.get("data", {}).get("edges", []) + + preview = FlowPreview( + flow_json=flow_json, + nodes_count=len(nodes), + edges_count=len(edges), + validation_summary={ + "is_valid": validation_report.is_valid, + "error_count": len(validation_report.errors), + "warning_count": len(validation_report.warnings), + "components_valid": validation_report.summary.valid_components if validation_report.summary else 0, + }, + ) + + return preview + + except ValueError as e: + logger.error(f"Preview error: {e}") + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) + except Exception as e: + logger.error(f"Preview error: {e}", exc_info=True) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Preview failed: {str(e)}") + + +@router.post("/create-flow", response_model=FlowRead, status_code=status.HTTP_201_CREATED) +async def create_flow( + request: CreateFlowRequest, session: DbSession, current_user: CurrentActiveUser +) -> FlowRead: + """ + Create a flow from YAML specification. + + Full pipeline: + 1. Validate YAML (fails if invalid) + 2. Convert to flow JSON + 3. Create flow in database + 4. Return created flow + + Requires authentication. + """ + try: + # Initialize components + resolver = ComponentResolver() + validator = SpecFlowValidator(resolver) + edge_builder = EdgeBuilder(resolver) + converter = SpecFlowConverter(resolver, edge_builder) + + # Step 1: Validate + logger.info("Validating specification...") + validation_report = await validator.validate(request.yaml_content) + + if not validation_report.is_valid: + error_msg = "; ".join(validation_report.errors[:5]) # Limit to first 5 errors + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Specification validation failed: {error_msg}", + ) + + # Step 2: Convert + logger.info("Converting specification to flow JSON...") + flow_json = await converter.convert(request.yaml_content) + + # Step 3: Create flow + logger.info("Creating flow in database...") + folder_id = UUID(request.folder_id) if request.folder_id else None + flow = await _create_flow_from_json( + session=session, + user_id=current_user.id, + flow_json=flow_json, + flow_name=request.flow_name, + folder_id=folder_id, + ) + + logger.info(f"Flow created successfully: {flow.id}") + + # Return as FlowRead + return FlowRead.model_validate(flow, from_attributes=True) + + except HTTPException: + raise + except ValueError as e: + logger.error(f"Flow creation error: {e}") + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) + except Exception as e: + logger.error(f"Flow creation error: {e}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Flow creation failed: {str(e)}" + ) + + +@router.get("/component-catalog", response_model=Dict[str, Any], status_code=status.HTTP_200_OK) +async def get_component_catalog(session: DbSession) -> Dict[str, Any]: + """ + Get all available components. + + Returns the component catalog with all available components + organized by category. + + This is a cached wrapper around /api/v1/all. + """ + try: + resolver = ComponentResolver() + components = await resolver.fetch_all_components() + + # Return summary info + return { + "categories": list(components.keys()), + "total_categories": len(components), + "total_components": sum(len(comps) for comps in components.values()), + "components": components, + } + + except Exception as e: + logger.error(f"Failed to fetch component catalog: {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to fetch component catalog: {str(e)}" + ) + + +@router.get( + "/component/{category}/{component_name}", response_model=Dict[str, Any], status_code=status.HTTP_200_OK +) +async def get_component_details(category: str, component_name: str, session: DbSession) -> Dict[str, Any]: + """ + Get detailed information for a specific component. + + Returns: + - Component template with all fields + - Input fields list + - Output types + - Display name and description + """ + try: + resolver = ComponentResolver() + await resolver.fetch_all_components() + + # Get template + template = resolver.get_component_template(category, component_name) + if not template: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Component '{component_name}' not found in category '{category}'", + ) + + # Get additional info + input_fields = resolver.get_input_fields(category, component_name) + output_types = resolver.get_output_types(category, component_name) + display_name = resolver.get_component_display_name(category, component_name) + + return { + "category": category, + "component_name": component_name, + "display_name": display_name, + "template": template, + "input_fields": input_fields, + "output_types": output_types, + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Failed to get component details: {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get component details: {str(e)}" + ) diff --git a/src/backend/base/langflow/services/spec/component_schema_inspector.py b/src/backend/base/langflow/services/spec/component_schema_inspector.py index 1941337f2999..4b592b6792e4 100644 --- a/src/backend/base/langflow/services/spec/component_schema_inspector.py +++ b/src/backend/base/langflow/services/spec/component_schema_inspector.py @@ -54,6 +54,8 @@ def __init__(self, components_root: Optional[str] = None): """ self.components_root = components_root or "langflow.components" self._schema_cache: Dict[str, ComponentSchema] = {} + # Additional cache keyed by Python class name for robust lookup + self._schema_cache_by_class: Dict[str, ComponentSchema] = {} self._last_scan_time = 0 self._cache_duration = 300 # 5 minutes @@ -68,7 +70,23 @@ def get_component_schema(self, component_name: str) -> Optional[ComponentSchema] ComponentSchema or None if not found """ self._ensure_fresh_cache() - return self._schema_cache.get(component_name) + # Try by display/name key first + schema = self._schema_cache.get(component_name) + if schema: + return schema + + # Fallback: try by class name key + schema = self._schema_cache_by_class.get(component_name) + if schema: + return schema + + # Final fallback: case-insensitive search across both caches + lowered = component_name.lower() + for s in self._schema_cache.values(): + if s.name.lower() == lowered or s.class_name.lower() == lowered: + return s + + return None def get_all_schemas(self) -> Dict[str, ComponentSchema]: """ @@ -89,7 +107,7 @@ def get_component_io_mapping(self) -> Dict[str, Dict[str, Any]]: """ self._ensure_fresh_cache() - mapping = {} + mapping: Dict[str, Dict[str, Any]] = {} for name, schema in self._schema_cache.items(): # Determine primary input and output fields input_field = None @@ -115,7 +133,7 @@ def get_component_io_mapping(self) -> Dict[str, Dict[str, Any]]: if not output_field and schema.outputs: output_field = schema.outputs[0].get("name") - mapping[name] = { + entry = { "input_field": input_field, "output_field": output_field, "output_types": schema.output_types, @@ -125,6 +143,10 @@ def get_component_io_mapping(self) -> Dict[str, Dict[str, Any]]: "description": schema.description } + # Map under both display/name and class name for flexible lookups + mapping[name] = entry + mapping[schema.class_name] = entry + return mapping def _ensure_fresh_cache(self) -> None: @@ -138,6 +160,7 @@ def _scan_components(self) -> None: """Scan all components and build schema cache.""" logger.info(f"Scanning components in {self.components_root}") self._schema_cache.clear() + self._schema_cache_by_class.clear() try: # Import the components package @@ -182,7 +205,9 @@ def _analyze_module(self, module_name: str) -> None: if self._is_component_class(obj) and obj.__module__ == module_name: schema = self._extract_component_schema(obj, module_name) if schema: + # Cache by display/name and by class name self._schema_cache[schema.name] = schema + self._schema_cache_by_class[schema.class_name] = schema logger.debug(f"Extracted schema for {schema.name}") except Exception as e: @@ -495,8 +520,24 @@ def validate_component_connection(self, source_comp: str, target_comp: str, } # Check type compatibility - compatible = any(otype in target_schema.input_types - for otype in source_schema.output_types) + # Special-case: tool connections targeting the 'tools' input should be considered compatible. + # Tool semantics imply registration rather than direct data type matching. + if isinstance(target_input, str) and target_input.lower() == "tools": + return { + 'valid': True, + 'source_types': source_schema.output_types, + 'target_types': target_schema.input_types, + 'error': None + } + + # Treat 'any'/'Any'/'object' on target as wildcard accepting any source type + source_types = set(source_schema.output_types or []) + target_types = set(target_schema.input_types or []) + + if any(t in target_types for t in ("any", "Any", "object")): + compatible = True + else: + compatible = bool(source_types & target_types) return { 'valid': compatible, diff --git a/src/backend/base/langflow/spec_flow_builder/__init__.py b/src/backend/base/langflow/spec_flow_builder/__init__.py new file mode 100644 index 000000000000..46bd4d4c30a5 --- /dev/null +++ b/src/backend/base/langflow/spec_flow_builder/__init__.py @@ -0,0 +1,21 @@ +"""Spec Flow Builder Module.""" + +from .component_resolver import ComponentResolver +from .models import ComponentStatus, ValidateSpecRequest, ValidationReport, CreateFlowRequest, CreateFlowResponse +from .validator import SpecValidator +from .node_builder import NodeBuilder +from .config_builder import ConfigBuilder +from .edge_builder import EdgeBuilder + +__all__ = [ + "ComponentResolver", + "ValidateSpecRequest", + "ComponentStatus", + "ValidationReport", + "SpecValidator", + "CreateFlowRequest", + "CreateFlowResponse", + "NodeBuilder", + "ConfigBuilder", + "EdgeBuilder", +] \ No newline at end of file diff --git a/src/backend/base/langflow/spec_flow_builder/api.py b/src/backend/base/langflow/spec_flow_builder/api.py new file mode 100644 index 000000000000..f60f01c2e0ff --- /dev/null +++ b/src/backend/base/langflow/spec_flow_builder/api.py @@ -0,0 +1,318 @@ +"""API endpoints for spec_flow_builder.""" + +import logging +from uuid import UUID + +import yaml +from fastapi import APIRouter, HTTPException +from sqlmodel import select + +from langflow.api.utils import CurrentActiveUser, DbSession +from langflow.interface.components import get_and_cache_all_types_dict +from langflow.services.database.models.flow import Flow, FlowCreate +from langflow.services.database.models.folder.constants import DEFAULT_FOLDER_NAME +from langflow.services.database.models.folder.model import Folder +from langflow.services.deps import get_settings_service +from .component_resolver import ComponentResolver +from .models import ValidateSpecRequest, ValidationReport, CreateFlowRequest, CreateFlowResponse +from .validator import SpecValidator +from .provides_validator import ProvidesConnectionValidator +from .config_validator import ConfigValidator +from .node_builder import NodeBuilder +from .config_builder import ConfigBuilder +from .edge_builder import EdgeBuilder + +logger = logging.getLogger(__name__) + +# Create router with prefix and tags +router = APIRouter(prefix="/spec-builder", tags=["Spec Flow Builder"]) + + +# ============================================================================ +# Helper Functions +# ============================================================================ + + +async def _get_or_create_folder(session: DbSession, user_id: UUID, folder_name: str = DEFAULT_FOLDER_NAME) -> Folder: + """ + Get or create a folder for the user. + + Args: + session: Database session + user_id: User ID + folder_name: Name of the folder (default: DEFAULT_FOLDER_NAME) + + Returns: + Folder instance + """ + # Check if folder exists + statement = select(Folder).where(Folder.name == folder_name, Folder.user_id == user_id) + result = await session.exec(statement) + folder = result.first() + + if not folder: + # Create folder + folder = Folder(name=folder_name, user_id=user_id) + session.add(folder) + await session.commit() + await session.refresh(folder) + + return folder + + +# ============================================================================ +# API Endpoints +# ============================================================================ + + +@router.post("/validate", response_model=ValidationReport) +async def validate_spec(request: ValidateSpecRequest) -> ValidationReport: + """ + Validate a YAML specification. + + This endpoint checks if all components defined in the YAML spec + exist in the Langflow component catalog. + + Request body: + { + "yaml_content": "id: urn:agent:...\ncomponents:\n- type: PromptComponent\n ..." + } + + Response: + { + "valid": true, + "total_components": 6, + "found_components": 6, + "missing_components": 0, + "components": [ + { + "id": "eoc-prompt", + "name": "Agent Instructions", + "yaml_type": "PromptComponent", + "found": true, + "catalog_name": "Prompt Template", + "category": "processing" + }, + ... + ], + "errors": [] + } + + Args: + request: ValidateSpecRequest with yaml_content + + Returns: + ValidationReport with detailed validation results + + Raises: + HTTPException: If validation process fails unexpectedly + """ + try: + logger.info("Received validation request") + + # Create resolver + resolver = ComponentResolver() + # Run main validation (component existence, counts, etc.) + validator = SpecValidator(resolver) + report = await validator.validate(request.yaml_content) + + provides_validator = ProvidesConnectionValidator(resolver) + provides_errors = await provides_validator.validate(request.yaml_content) + if provides_errors: + raise HTTPException(status_code=400, detail={"errors": provides_errors}) + + # Validate component config keys and types against catalog templates + config_validator = ConfigValidator(resolver) + config_errors = await config_validator.validate(request.yaml_content) + if config_errors: + raise HTTPException(status_code=400, detail={"errors": config_errors}) + + # Run validation (this checks component existence; 'provides' is checked by dependency) + + logger.info(f"Validation complete: valid={report.valid}, found={report.found_components}/{report.total_components}") + + return report + + except HTTPException as e: + # Propagate intended HTTP errors (e.g., 400 for validation) + raise e + except Exception as e: + logger.error(f"Validation endpoint error: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Validation failed: {str(e)}") + + +@router.post("/create-flow", response_model=CreateFlowResponse) +async def create_flow( + request: CreateFlowRequest, session: DbSession, current_user: CurrentActiveUser +) -> CreateFlowResponse: + """ + Create a flow from YAML specification. + + This endpoint takes a YAML specification and creates a complete flow by: + 1. Building nodes from components (NodeBuilder) + 2. Applying configuration to nodes (ConfigBuilder) + 3. Creating edges based on 'provides' relationships (EdgeBuilder) + 4. Saving the flow to the database + + Request body: + { + "yaml_content": "id: urn:agent:...\ncomponents:\n- type: PromptComponent\n ...", + "flow_name": "My Custom Flow", # Optional + "folder_id": "folder-uuid" # Optional + } + + Response: + { + "success": true, + "message": "Flow created successfully", + "flow_id": "flow-uuid", + "flow_name": "My Custom Flow" + } + + Args: + request: CreateFlowRequest with yaml_content and optional flow_name/folder_id + session: Database session (injected) + current_user: Current authenticated user (injected) + + Returns: + CreateFlowResponse with creation status and flow details + + Raises: + HTTPException: If flow creation fails or user is not authenticated + """ + try: + logger.info(f"Received create-flow request from user {current_user.id}") + + # Step 1: Parse YAML to extract metadata + try: + spec = yaml.safe_load(request.yaml_content) + except yaml.YAMLError as e: + logger.error(f"Failed to parse YAML: {e}") + return CreateFlowResponse( + success=False, + message=f"Invalid YAML format: {str(e)}", + flow_id=None, + flow_name=None, + ) + + # Step 2: Validate the YAML + resolver = ComponentResolver() + validator = SpecValidator(resolver) + report = await validator.validate(request.yaml_content) + + if not report.valid: + logger.warning(f"YAML validation failed: {report.errors}") + return CreateFlowResponse( + success=False, + message=f"YAML validation failed: {', '.join(report.errors[:3])}", + flow_id=None, + flow_name=None, + ) + + # Step 2: Fetch all components once (will be used by all builders) + logger.info("Fetching component catalog") + settings_service = get_settings_service() + all_components = await get_and_cache_all_types_dict(settings_service) + total_components = sum(len(comps) for comps in all_components.values()) + logger.info(f"Fetched {total_components} components across {len(all_components)} categories") + + # Step 3: Build nodes + logger.info("Building nodes from YAML specification") + node_builder = NodeBuilder(all_components) + nodes = await node_builder.build_nodes(request.yaml_content) + + # Step 4: Apply configuration to nodes + logger.info("Applying configuration to nodes") + config_builder = ConfigBuilder(all_components) + configured_nodes = await config_builder.apply_config(nodes, request.yaml_content) + + # Step 5: Build edges + logger.info("Building edges between nodes") + logger.info(f"Passing {len(configured_nodes)} nodes to EdgeBuilder") + + # Debug: Log node yaml_component_ids before passing to EdgeBuilder + node_yaml_ids = [node.get("data", {}).get("yaml_component_id") for node in configured_nodes] + logger.info(f"Node yaml_component_ids being passed to EdgeBuilder: {node_yaml_ids}") + + edge_builder = EdgeBuilder(all_components) + edges = await edge_builder.build_edges(configured_nodes, request.yaml_content) + + logger.info(f"EdgeBuilder returned {len(edges)} edges") + if len(edges) == 0: + logger.warning("⚠️ No edges were created! This might indicate a problem.") + logger.warning(f"Nodes have yaml_component_ids: {node_yaml_ids}") + # Parse YAML to show provides relationships + try: + yaml_provides_count = 0 + for comp in spec.get("components", []): + provides = comp.get("provides", []) + if provides: + yaml_provides_count += len(provides) + logger.warning(f"Component '{comp.get('id')}' has {len(provides)} provides entries") + logger.warning(f"YAML has {yaml_provides_count} total provides relationships but 0 edges were created!") + except Exception as e: + logger.error(f"Error checking provides relationships: {e}") + + # Step 6: Build complete flow JSON structure + logger.info("Building complete flow JSON structure") + flow_name = request.flow_name or spec.get("name", "Untitled Flow") + flow_description = spec.get("description", "") + + flow_json = { + "name": flow_name, + "description": flow_description, + "data": { + "nodes": configured_nodes, + "edges": edges, + "viewport": {"x": 0, "y": 0, "zoom": 1}, + }, + } + + logger.info(f"✓ Flow JSON built with {len(configured_nodes)} nodes and {len(edges)} edges") + + # Step 7: Save flow to database + logger.info("Saving flow to database") + + # Get or create folder + folder_id_uuid = None + if request.folder_id: + try: + folder_id_uuid = UUID(request.folder_id) + folder = await session.get(Folder, folder_id_uuid) + if not folder: + logger.warning(f"Folder with ID {folder_id_uuid} not found, using default folder") + folder = await _get_or_create_folder(session, current_user.id) + except ValueError: + logger.warning(f"Invalid folder_id format: {request.folder_id}, using default folder") + folder = await _get_or_create_folder(session, current_user.id) + else: + folder = await _get_or_create_folder(session, current_user.id) + + # Create flow record + flow_create = FlowCreate( + name=flow_name, + description=flow_description, + data=flow_json.get("data", {}), + folder_id=folder.id, + user_id=current_user.id, + ) + + flow = Flow.model_validate(flow_create, from_attributes=True) + session.add(flow) + await session.commit() + await session.refresh(flow) + + logger.info(f"Flow created successfully with ID: {flow.id}") + + return CreateFlowResponse( + success=True, + message="Flow created successfully", + flow_id=str(flow.id), + flow_name=flow.name, + ) + + except HTTPException as e: + raise e + except Exception as e: + logger.error(f"Create-flow endpoint error: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Flow creation failed: {str(e)}") \ No newline at end of file diff --git a/src/backend/base/langflow/spec_flow_builder/component_resolver.py b/src/backend/base/langflow/spec_flow_builder/component_resolver.py new file mode 100644 index 000000000000..3a3346171378 --- /dev/null +++ b/src/backend/base/langflow/spec_flow_builder/component_resolver.py @@ -0,0 +1,172 @@ +"""Component Resolver for discovering and matching components.""" + +import logging +import re +from typing import Any, Dict, Optional, Tuple + +logger = logging.getLogger(__name__) + + +class ComponentResolver: + """Resolves YAML component types to actual Langflow components.""" + + def __init__(self): + """Initialize the component resolver.""" + self._cache: Optional[Dict[str, Any]] = None + + async def fetch_all_components(self) -> Dict[str, Any]: + """ + Fetch all components from the component catalog. + + This uses the same method as the /api/v1/all endpoint to get + all available components in Langflow. + + Returns: + Dict of all components organized by category. + Example structure: + { + "processing": { + "Prompt Template": { + "template": {"code": {"value": "class PromptComponent..."}}, + ... + } + }, + "agents": { + "Agent": { + "template": {"code": {"value": "class AgentComponent..."}}, + ... + } + } + } + """ + try: + from langflow.interface.components import get_and_cache_all_types_dict + from langflow.services.deps import get_settings_service + + logger.info("Fetching all components from catalog") + all_types = await get_and_cache_all_types_dict(settings_service=get_settings_service()) + + self._cache = all_types + logger.info(f"Cached {len(all_types)} component categories") + return all_types + + except Exception as e: + logger.error(f"Failed to fetch components: {e}") + return {} + + def get_cached_components(self) -> Dict[str, Any]: + """Return the cached component catalog if available, else empty dict. + + This allows downstream validators to reuse the already-fetched catalog + without re-reading JSON from disk. + + Returns: + Dict[str, Any]: Cached components by category. + """ + return self._cache or {} + + def _extract_class_name_from_code(self, comp_data: Dict[str, Any]) -> Optional[str]: + """ + Extract the class name from component's code.value field. + + The component catalog stores the actual Python class code in: + component_data["template"]["code"]["value"] + + We use regex to find the class name. + + Example: + code.value = "class PromptComponent(Component):\\n def build()..." + Returns: "PromptComponent" + + Example: + code.value = "class ChatInput(Component):\\n pass" + Returns: "ChatInput" + + Args: + comp_data: Component data dict from catalog + + Returns: + Class name if found (e.g., "PromptComponent"), None otherwise + """ + try: + template = comp_data.get("template", {}) + code_field = template.get("code", {}) + code_value = code_field.get("value", "") + + if not code_value: + return None + + # Extract first class name using regex + # Pattern: "class ClassName" or "class ClassName(BaseClass)" + class_match = re.search(r'class\s+(\w+)', code_value) + + if class_match: + class_name = class_match.group(1) + return class_name + + return None + + except Exception as e: + logger.debug(f"Failed to extract class name: {e}") + return None + + def find_component(self, yaml_type: str) -> Optional[Tuple[str, str, Dict[str, Any]]]: + """ + Find component by YAML type (which is the class name). + + Process: + 1. Take the YAML type (e.g., "PromptComponent") + 2. Search all components in the catalog + 3. For each component, extract class name from template.code.value + 4. If class name matches (case-insensitive), return the component + + Example 1: + Input: yaml_type = "PromptComponent" + Search catalog → Find "class PromptComponent" in code.value + Found in: category="processing", component_name="Prompt Template" + Return: ("processing", "Prompt Template", {...component_data...}) + + Example 2: + Input: yaml_type = "ChatInput" + Search catalog → Find "class ChatInput" in code.value + Found in: category="input_output", component_name="ChatInput" + Return: ("input_output", "ChatInput", {...component_data...}) + + Args: + yaml_type: Type from YAML - this is the CLASS NAME + (e.g., "PromptComponent", "AgentComponent", "ChatInput") + + Returns: + Tuple of (category, catalog_component_name, component_data) if found + None if not found + + Where: + - category: The category folder (e.g., "processing", "agents") + - catalog_component_name: The display name in catalog (e.g., "Prompt Template") + - component_data: Full component data dict + """ + if not self._cache: + logger.warning("Component cache not initialized. Call fetch_all_components() first.") + return None + + logger.info(f"Searching for component with class name: {yaml_type}") + + # Search all categories + for category, components in self._cache.items(): + # Search all components in this category + for comp_name, comp_data in components.items(): + # Extract class name from code.value + class_name = self._extract_class_name_from_code(comp_data) + + if not class_name: + # This component doesn't have a class name in code.value + continue + + # Compare: YAML type should match class name (case-insensitive) + if class_name.lower() == yaml_type.lower(): + logger.info(f"✓ Found: {yaml_type} → {category}.{comp_name} (class: {class_name})") + return (category, comp_name, comp_data) + + # Not found in any category + logger.warning(f"✗ Component not found: {yaml_type}") + return None \ No newline at end of file diff --git a/src/backend/base/langflow/spec_flow_builder/config_builder.py b/src/backend/base/langflow/spec_flow_builder/config_builder.py new file mode 100644 index 000000000000..24e898cdae89 --- /dev/null +++ b/src/backend/base/langflow/spec_flow_builder/config_builder.py @@ -0,0 +1,353 @@ +"""Config Builder - Adds configuration to flow nodes.""" + +import logging +from typing import Any, Dict, List, Tuple + +import yaml + +logger = logging.getLogger(__name__) + + +class ConfigBuilder: + """ + Builds and applies configuration to flow nodes. + + This class is responsible for: + - Extracting config from YAML components + - Mapping config values to node templates + - Validating config against component schemas + - Type conversion and validation + """ + + def __init__(self, all_components: Dict[str, Any]): + """ + Initialize the ConfigBuilder. + + Args: + all_components: Component catalog from get_and_cache_all_types_dict() + Structure: {category: {component_name: component_data}} + """ + self.all_components = all_components + logger.info("ConfigBuilder initialized") + + def _validate_and_convert_value(self, input_type: str, yaml_value: Any, existing_value: Any = None) -> Tuple[bool, Any]: + """ + Validate and convert YAML value based on template field _input_type. + + Args: + input_type: The _input_type from template field (e.g., "IntInput", "BoolInput") + yaml_value: The value from YAML config + existing_value: The existing value from the template field (for merging with defaults) + + Returns: + Tuple of (is_valid, converted_value) + - is_valid: True if value is valid for the input type + - converted_value: The value converted to the appropriate type, or None if invalid + """ + try: + # Integer inputs + if input_type == "IntInput": + if isinstance(yaml_value, int) and not isinstance(yaml_value, bool): + return True, yaml_value + try: + return True, int(yaml_value) + except (ValueError, TypeError): + return False, None + + # Float inputs + elif input_type == "FloatInput": + if isinstance(yaml_value, float): + return True, yaml_value + if isinstance(yaml_value, int) and not isinstance(yaml_value, bool): + return True, float(yaml_value) + try: + return True, float(yaml_value) + except (ValueError, TypeError): + return False, None + + # Boolean inputs + elif input_type == "BoolInput": + if isinstance(yaml_value, bool): + return True, yaml_value + if isinstance(yaml_value, str): + if yaml_value.lower() in ["true", "1", "yes"]: + return True, True + if yaml_value.lower() in ["false", "0", "no"]: + return True, False + if isinstance(yaml_value, int): + return True, bool(yaml_value) + return False, None + + # Slider inputs (number/float) + elif input_type == "SliderInput": + if isinstance(yaml_value, (int, float)) and not isinstance(yaml_value, bool): + return True, float(yaml_value) + try: + return True, float(yaml_value) + except (ValueError, TypeError): + return False, None + + # String-based inputs + elif input_type in [ + "StrInput", + "MessageInput", + "MessageTextInput", + "MultilineInput", + "SecretStrInput", + "MultilineSecretInput", + "DropdownInput", + "TabInput", + "QueryInput", + "FileInput", + "PromptInput", + "HandleInput", + "ConnectionInput", + "AuthInput", + "DataFrameInput", + "SortableListInput", + ]: + if isinstance(yaml_value, str): + return True, yaml_value + # Convert to string + return True, str(yaml_value) + + # Dictionary inputs + elif input_type in ["DictInput", "NestedDictInput", "McpInput"]: + if isinstance(yaml_value, dict): + return True, yaml_value + return False, None + + # Table/Array inputs + elif input_type in ["TableInput", "DataInput"]: + if isinstance(yaml_value, list): + # For TableInput, merge with existing defaults if available + if input_type == "TableInput" and existing_value and isinstance(existing_value, list): + # Create dict from existing values + merged_dict = {} + for item in existing_value: + if isinstance(item, dict) and "key" in item and "value" in item: + merged_dict[item["key"]] = item["value"] + + # Merge with YAML values (YAML overrides defaults) + for item in yaml_value: + if isinstance(item, dict) and "key" in item and "value" in item: + merged_dict[item["key"]] = item["value"] + + # Convert back to list format + logger.debug(f"Merged TableInput: {len(existing_value)} existing + {len(yaml_value)} YAML = {len(merged_dict)} total items") + return True, [{"key": k, "value": v} for k, v in merged_dict.items()] + + # For DataInput or TableInput without defaults, just return the list + return True, yaml_value + return False, None + + # Multiselect (array of strings) + elif input_type == "MultiselectInput": + if isinstance(yaml_value, list): + # Ensure all items are strings + return True, [str(item) for item in yaml_value] + if isinstance(yaml_value, str): + # Single value, wrap in array + return True, [yaml_value] + return False, None + + # Unknown input type - fallback to string + else: + logger.warning(f"Unknown _input_type '{input_type}', treating as string") + if isinstance(yaml_value, str): + return True, yaml_value + return True, str(yaml_value) + + except Exception as e: + logger.error(f"Error validating/converting value for input_type {input_type}: {e}") + return False, None + + def _find_yaml_component_by_id( + self, node: Dict[str, Any], yaml_components: List[Dict[str, Any]] + ) -> Dict[str, Any] | None: + """ + Find YAML component by the ID stored in node data. + + Args: + node: Node dictionary with yaml_component_id in data + yaml_components: List of YAML components + + Returns: + Matching YAML component dictionary or None if not found + """ + # Get the stored YAML component ID from node + yaml_comp_id = node.get("data", {}).get("yaml_component_id") + + if not yaml_comp_id: + logger.warning(f"Node {node.get('id')} has no yaml_component_id") + return None + + # Find matching YAML component + for yaml_comp in yaml_components: + if yaml_comp.get("id") == yaml_comp_id: + return yaml_comp + + logger.warning(f"Could not find YAML component with id: {yaml_comp_id}") + return None + + def _apply_config_to_node(self, node: Dict[str, Any], yaml_component: Dict[str, Any]) -> Dict[str, Any]: + """ + Apply config from YAML component to node template. + + Args: + node: Node dictionary to modify + yaml_component: YAML component with config section + + Returns: + Modified node dictionary + """ + config = yaml_component.get("config", {}) + if not config: + logger.debug(f"No config found for component {yaml_component.get('id')}") + return node + + # Get template from node + template = node.get("data", {}).get("node", {}).get("template", {}) + if not template: + logger.warning(f"No template found in node {node.get('id')}") + return node + + component_id = yaml_component.get("id", "unknown") + config_applied_count = 0 + + # Apply each config key-value pair + for config_key, config_value in config.items(): + # Check if field exists in template + if config_key not in template: + logger.warning( + f"Config key '{config_key}' not found in template for component '{component_id}'. " + f"Available keys: {list(template.keys())[:10]}..." + ) + continue + + # Get field metadata + field = template[config_key] + input_type = field.get("_input_type", "StrInput") # Default to StrInput if not found + existing_value = field.get("value") # Get existing value for merging + + # Validate and convert value + is_valid, converted_value = self._validate_and_convert_value(input_type, config_value, existing_value) + + if not is_valid: + logger.warning( + f"Invalid value type for '{config_key}' in component '{component_id}': " + f"expected {input_type}, got {type(config_value).__name__}. Value: {config_value}" + ) + continue + + # Set the value in the template + field["value"] = converted_value + + # Mark as user-configured to prevent component updates from overwriting + field["user_configured"] = True + field["configured_from_spec"] = True + + # Special handling for DropdownInput to preserve dropdown properties + if input_type == "DropdownInput": + # The template already contains options, options_metadata, combobox, etc. + # We only update the value, keeping all other dropdown properties intact + + # Optional: Validate that the configured value is valid for this dropdown + options = field.get("options", []) + is_combobox = field.get("combobox", False) + + # Only validate if options exist and combobox is disabled (strict dropdown) + if options and not is_combobox and converted_value not in options: + logger.warning( + f"Value '{converted_value}' for '{config_key}' in component '{component_id}' " + f"is not in the available options: {options}. " + f"This may cause issues unless combobox mode is enabled." + ) + + logger.debug( + f"Preserved dropdown properties for '{config_key}': " + f"options={len(options)} items, combobox={is_combobox}" + ) + + # Set advanced to false for any field configured in YAML + if "advanced" in field: + field["advanced"] = False + logger.debug(f"Set advanced=false for configured field '{config_key}' in component '{component_id}'") + + config_applied_count += 1 + logger.debug( + f"Applied config to '{component_id}.{config_key}': " + f"{repr(converted_value)[:100]} (input_type: {input_type})" + ) + + logger.info(f"Applied {config_applied_count}/{len(config)} config values to component '{component_id}'") + + return node + + async def apply_config(self, nodes: List[Dict[str, Any]], yaml_content: str) -> List[Dict[str, Any]]: + """ + Apply configuration to nodes from YAML specification. + + This method: + 1. Parses the YAML to extract components with configs + 2. Matches each node to its YAML component by ID + 3. Applies config values to node template fields + 4. Validates types and converts values as needed + + Args: + nodes: List of node dictionaries from NodeBuilder + yaml_content: YAML specification content + + Returns: + List of nodes with configuration applied + + Raises: + ValueError: If YAML parsing fails or configuration is invalid + """ + logger.info(f"Applying configuration to {len(nodes)} nodes") + + try: + # Parse YAML to get components + spec = yaml.safe_load(yaml_content) + yaml_components = spec.get("components", []) + + if not yaml_components: + logger.warning("No components found in YAML specification") + return nodes + + logger.info(f"Found {len(yaml_components)} components in YAML") + + # Apply config to each node + configured_nodes = [] + nodes_with_config = 0 + + for node in nodes: + # Find matching YAML component by ID + yaml_comp = self._find_yaml_component_by_id(node, yaml_components) + + if not yaml_comp: + logger.warning(f"Could not find YAML component for node {node.get('id')}") + configured_nodes.append(node) + continue + + # Check if component has config + if not yaml_comp.get("config"): + logger.debug(f"No config to apply for component {yaml_comp.get('id')}") + configured_nodes.append(node) + continue + + # Apply config to node + configured_node = self._apply_config_to_node(node, yaml_comp) + configured_nodes.append(configured_node) + nodes_with_config += 1 + + logger.info(f"Configuration applied to {nodes_with_config}/{len(nodes)} nodes") + + return configured_nodes + + except yaml.YAMLError as e: + logger.error(f"Failed to parse YAML: {e}") + raise ValueError(f"Invalid YAML format: {str(e)}") + except Exception as e: + logger.error(f"Error applying config: {e}", exc_info=True) + raise ValueError(f"Failed to apply configuration: {str(e)}") diff --git a/src/backend/base/langflow/spec_flow_builder/config_validator.py b/src/backend/base/langflow/spec_flow_builder/config_validator.py new file mode 100644 index 000000000000..6095e44640c5 --- /dev/null +++ b/src/backend/base/langflow/spec_flow_builder/config_validator.py @@ -0,0 +1,197 @@ +"""Configuration validator for YAML specifications. + +Validates that each component's `config` keys exist in the component's +catalog template and that provided values match expected types and list +shapes. Uses the shared ComponentResolver cache (no direct JSON reads). +""" + +import logging +from typing import Any, Dict, List, Optional, Tuple + +import yaml + +from .component_resolver import ComponentResolver + +logger = logging.getLogger(__name__) + + +class ConfigValidator: + """Validates per-component config keys and types against catalog templates.""" + + def __init__(self, resolver: ComponentResolver): + self.resolver = resolver + + async def validate(self, yaml_content: str) -> List[str]: + """Validate configuration blocks in the YAML spec. + + Checks (only for provided keys, no missing-key enforcement): + - `config` must be an object when present + - Each provided `config` key must exist in the component's template + - Types of provided values must match the template `type` + - If template field has `list: true`, provided value must be a list; + non-list when `list: false` + + Notes: + - Special-case: skip type/list validation for keys 'headers' and 'body'. + These keys often accept flexible structures. + + Returns list of error messages (empty if valid). + """ + errors: List[str] = [] + IGNORED_TYPE_KEYS = {"headers", "body"} + + # Parse YAML + try: + spec_dict = yaml.safe_load(yaml_content) + except yaml.YAMLError as e: + logger.error(f"YAML parsing error: {e}") + return [f"YAML parsing error: {str(e)}"] + + if not isinstance(spec_dict, dict) or not spec_dict: + return ["Empty or invalid YAML content"] + + # Ensure catalog is loaded (prefer cache, fetch if empty) + try: + if not self.resolver.get_cached_components(): + await self.resolver.fetch_all_components() + except Exception as e: + logger.error(f"Failed to load component catalog: {e}") + return [f"Failed to load component catalog: {str(e)}"] + + components = spec_dict.get("components", []) + if not isinstance(components, list): + return ["'components' must be a list"] + + for comp in components: + if not isinstance(comp, dict): + errors.append("Each item in 'components' must be an object") + continue + + comp_id = comp.get("id", "unknown") + comp_type = comp.get("type") + comp_cfg = comp.get("config", {}) + + if not comp_type: + errors.append(f"Component '{comp_id}' missing required field 'type'") + continue + + # Lookup component in catalog + lookup = self.resolver.find_component(comp_type) + if not lookup: + # Component existence is validated elsewhere; skip config checks + logger.debug(f"Skipping config validation for unknown component type: {comp_type}") + continue + + _category, _catalog_name, comp_data = lookup + template = comp_data.get("template") + if not isinstance(template, dict): + # No template means no config validation context + logger.debug(f"No template found for component type: {comp_type}") + continue + + # config must be dict if present + if comp_cfg is None: + comp_cfg = {} + if not isinstance(comp_cfg, dict): + errors.append( + f"Component '{comp_id}' (type: '{comp_type}') has invalid 'config': expected object" + ) + continue + + # Allowed keys are template field names excluding internal marker keys + internal_keys = {"_type"} + allowed_keys = {k for k in template.keys() if k not in internal_keys} + + # Unknown keys + for key in comp_cfg.keys(): + if key not in allowed_keys: + errors.append( + f"Unknown config key '{key}' for component '{comp_id}' (type: '{comp_type}')." + ) + + # Type checks for provided keys + for key, value in comp_cfg.items(): + # Skip type/list validation for flexible keys + if key in IGNORED_TYPE_KEYS: + continue + t_cfg = template.get(key) + if not isinstance(t_cfg, dict): + # If template missing details, skip strict type checking + continue + + list_expected = bool(t_cfg.get("list", False)) + field_type = self._normalize_field_type(t_cfg.get("type")) + + # List shape validation + if list_expected and not isinstance(value, list): + errors.append( + f"Config key '{key}' on component '{comp_id}' expects a list, got {type(value).__name__}." + ) + continue + if not list_expected and isinstance(value, list): + errors.append( + f"Config key '{key}' on component '{comp_id}' expects a non-list value, got list." + ) + continue + + # Base type validation + def is_type_ok(v: Any, expected: Optional[str]) -> bool: + if expected is None or expected == "any": + return True + if expected == "str": + return isinstance(v, str) + if expected == "int": + return isinstance(v, int) + if expected == "float": + return isinstance(v, (float, int)) + if expected == "bool": + return isinstance(v, bool) + if expected == "dict": + return isinstance(v, dict) + # For types like 'code', 'query', treat as string + if expected in {"code", "query", "password", "secret", "file"}: + return isinstance(v, str) + # Fallback: accept any + return True + + if list_expected: + for idx, item in enumerate(value): + if not is_type_ok(item, field_type): + errors.append( + f"Config key '{key}[{idx}]' on component '{comp_id}' has wrong type: " + f"expected {field_type}, got {type(item).__name__}." + ) + else: + if not is_type_ok(value, field_type): + errors.append( + f"Config key '{key}' on component '{comp_id}' has wrong type: " + f"expected {field_type}, got {type(value).__name__}." + ) + + return errors + + @staticmethod + def _normalize_field_type(t: Any) -> Optional[str]: + """Normalize template field 'type' values to basic Python types. + + Returns one of: 'str', 'int', 'float', 'bool', 'dict', 'code', 'query', 'file', 'any', or None. + """ + if not isinstance(t, str) or not t: + return None + t_lower = t.strip().lower() + if t_lower in {"str", "string"}: + return "str" + if t_lower in {"int", "integer"}: + return "int" + if t_lower in {"float", "double"}: + return "float" + if t_lower in {"bool", "boolean"}: + return "bool" + if t_lower in {"dict", "object"}: + return "dict" + if t_lower in {"code", "query", "file", "password", "secret"}: + return t_lower + if t_lower in {"other", "any"}: + return "any" + # Unknown types: be permissive + return None \ No newline at end of file diff --git a/src/backend/base/langflow/spec_flow_builder/converter.py b/src/backend/base/langflow/spec_flow_builder/converter.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/backend/base/langflow/spec_flow_builder/edge_builder.py b/src/backend/base/langflow/spec_flow_builder/edge_builder.py new file mode 100644 index 000000000000..516204ffce4d --- /dev/null +++ b/src/backend/base/langflow/spec_flow_builder/edge_builder.py @@ -0,0 +1,312 @@ +"""Edge Builder - Connects flow nodes based on component relationships.""" + +import json +import logging +from typing import Any, Dict, List, Optional + +import yaml + +logger = logging.getLogger(__name__) + + +class EdgeBuilder: + """ + Builds edges (connections) between flow nodes. + + This class is responsible for: + - Parsing 'provides' relationships from YAML + - Creating edges between nodes + - Mapping output/input connections + """ + + def __init__(self, all_components: Dict[str, Any]): + """ + Initialize the EdgeBuilder. + + Args: + all_components: Component catalog from get_and_cache_all_types_dict() + Structure: {category: {component_name: component_data}} + """ + self.all_components = all_components + logger.info("EdgeBuilder initialized") + + def _find_node_by_yaml_id(self, nodes: List[Dict[str, Any]], yaml_component_id: str) -> Optional[Dict[str, Any]]: + """ + Find node by yaml_component_id. + + Args: + nodes: List of node dictionaries + yaml_component_id: The YAML component ID to search for + + Returns: + Matching node dictionary or None if not found + """ + for node in nodes: + if node.get("data", {}).get("yaml_component_id") == yaml_component_id: + return node + return None + + def _encode_handle_string(self, handle_dict: Dict[str, Any]) -> str: + """ + Encode handle dictionary as JSON string with \\u0153 (œ) instead of quotes. + + Args: + handle_dict: Dictionary to encode + + Returns: + JSON string with quotes replaced by \\u0153 + """ + # Convert dict to JSON string + json_string = json.dumps(handle_dict, separators=(",", ":"), ensure_ascii=False) + + # Replace double quotes with \u0153 + encoded_string = json_string.replace('"', "\u0153") + + return encoded_string + + def _build_source_handle(self, source_node: Dict[str, Any]) -> tuple[Dict[str, Any], str]: + """ + Build sourceHandle structure from source node. + + For tool components (asTools: true), uses the tool-specific output structure. + For regular components, uses the component's first output. + + Args: + source_node: The source node dictionary + + Returns: + Tuple of (handle_dict, encoded_handle_string) + """ + node_data = source_node.get("data", {}).get("node", {}) + node_id = source_node.get("id") + # Get the component type from data.type (e.g., "Prompt Template", "KnowledgeHubSearch") + data_type = source_node.get("data", {}).get("type", node_data.get("display_name", "")) + + # Check if this is a tool component + is_tool = source_node.get("data", {}).get("asTools", False) + + if is_tool: + # Tool components always use these fixed values + output_name = "component_as_tool" + output_types = ["Tool"] + logger.debug(f"Building source handle for tool component {node_id}: name={output_name}, types={output_types}") + else: + # Regular components - get from outputs[0] + outputs = node_data.get("outputs", []) + if not outputs: + logger.warning(f"Node {node_id} has no outputs, using defaults") + output_name = "output" + output_types = ["Message"] + else: + first_output = outputs[0] + output_name = first_output.get("name", "output") + output_types = first_output.get("types", ["Message"]) + + # Build handle dict + handle_dict = {"dataType": data_type, "id": node_id, "name": output_name, "output_types": output_types} + + # Encode with \u0153 + encoded_string = self._encode_handle_string(handle_dict) + + return handle_dict, encoded_string + + def _build_target_handle( + self, target_node: Dict[str, Any], use_as: str + ) -> tuple[Dict[str, Any], str]: + """ + Build targetHandle structure from target node and useAs field. + + Args: + target_node: The target node dictionary + use_as: The 'useAs' field name from provides relationship + + Returns: + Tuple of (handle_dict, encoded_handle_string) + """ + node_data = target_node.get("data", {}).get("node", {}) + node_id = target_node.get("id") + + # Get template field for the useAs parameter + template = node_data.get("template", {}) + field = template.get(use_as, {}) + + # Extract field metadata + field_name = use_as + field_type = field.get("type", "str") + input_types = field.get("input_types", ["Message"]) + + # Build handle dict + handle_dict = {"fieldName": field_name, "id": node_id, "inputTypes": input_types, "type": field_type} + + # Encode with \u0153 + encoded_string = self._encode_handle_string(handle_dict) + + return handle_dict, encoded_string + + def _generate_edge_id(self, source_id: str, source_handle: str, target_id: str, target_handle: str) -> str: + """ + Generate edge ID in the format: xy-edge__{source}{sourceHandle}-{target}{targetHandle}. + + Args: + source_id: Source node ID + source_handle: Encoded source handle string + target_id: Target node ID + target_handle: Encoded target handle string + + Returns: + Edge ID string + """ + edge_id = f"xy-edge__{source_id}{source_handle}-{target_id}{target_handle}" + return edge_id + + def _build_single_edge( + self, source_node: Dict[str, Any], target_node: Dict[str, Any], use_as: str + ) -> Dict[str, Any]: + """ + Build a single edge structure. + + Args: + source_node: Source node dictionary + target_node: Target node dictionary + use_as: The 'useAs' field name from provides relationship + + Returns: + Complete edge structure + """ + # Build source and target handles + source_handle_dict, source_handle_encoded = self._build_source_handle(source_node) + target_handle_dict, target_handle_encoded = self._build_target_handle(target_node, use_as) + + # Get node IDs + source_id = source_node.get("id") + target_id = target_node.get("id") + + # Generate edge ID + edge_id = self._generate_edge_id(source_id, source_handle_encoded, target_id, target_handle_encoded) + + # Build complete edge structure + edge = { + "source": source_id, + "sourceHandle": source_handle_encoded, + "target": target_id, + "targetHandle": target_handle_encoded, + "data": {"targetHandle": target_handle_dict, "sourceHandle": source_handle_dict}, + "id": edge_id, + } + + logger.debug(f"Built edge: {source_id} -> {target_id} (useAs: {use_as})") + return edge + + async def build_edges(self, nodes: List[Dict[str, Any]], yaml_content: str) -> List[Dict[str, Any]]: + """ + Build edges from YAML specification 'provides' relationships. + + This method: + 1. Parses YAML to find all components with 'provides' sections + 2. For each provides entry: + - Finds source node (the component with provides) + - Finds target node (the component named in provides.in) + - Builds sourceHandle from source node outputs + - Builds targetHandle from target node template field (provides.useAs) + - Encodes handles with \\u0153 instead of quotes + - Creates edge with proper ID format + + Args: + nodes: List of node dictionaries + yaml_content: YAML specification content + + Returns: + List of edge dictionaries connecting the nodes + + Raises: + ValueError: If edge relationships are invalid + """ + logger.info(f"Building edges for {len(nodes)} nodes") + + # Debug: Log all node yaml_component_ids + node_yaml_ids = [node.get("data", {}).get("yaml_component_id") for node in nodes] + logger.info(f"Available node yaml_component_ids: {node_yaml_ids}") + + try: + # Parse YAML to get components + spec = yaml.safe_load(yaml_content) + yaml_components = spec.get("components", []) + + if not yaml_components: + logger.warning("No components found in YAML specification") + return [] + + logger.info(f"Found {len(yaml_components)} components in YAML") + + edges = [] + + # Process each component looking for 'provides' relationships + for yaml_component in yaml_components: + component_id = yaml_component.get("id") + provides_list = yaml_component.get("provides", []) + + logger.debug(f"Processing component {component_id}: provides={provides_list}") + + if not provides_list: + logger.debug(f"Component {component_id} has no provides relationships") + continue + + # Find source node for this component + source_node = self._find_node_by_yaml_id(nodes, component_id) + if not source_node: + logger.warning(f"Could not find source node for component {component_id}") + logger.warning(f"Searched for yaml_component_id='{component_id}' in nodes with ids: {node_yaml_ids}") + continue + + logger.info(f"Found source node {source_node.get('id')} for component {component_id}") + logger.info(f"Processing {len(provides_list)} provides entries for component {component_id}") + + # Process each provides entry + for provides_entry in provides_list: + use_as = provides_entry.get("useAs") + target_yaml_id = provides_entry.get("in") + description = provides_entry.get("description", "") + + logger.debug(f"Provides entry: useAs={use_as}, in={target_yaml_id}, description={description}") + + if not use_as or not target_yaml_id: + logger.warning( + f"Invalid provides entry in component {component_id}: missing 'useAs' or 'in'" + ) + continue + + # Find target node + target_node = self._find_node_by_yaml_id(nodes, target_yaml_id) + if not target_node: + logger.warning( + f"Could not find target node for component {target_yaml_id} " + f"(referenced by {component_id})" + ) + logger.warning(f"Searched for yaml_component_id='{target_yaml_id}' in nodes with ids: {node_yaml_ids}") + continue + + logger.info(f"Found target node {target_node.get('id')} for component {target_yaml_id}") + + # Build edge + try: + edge = self._build_single_edge(source_node, target_node, use_as) + edges.append(edge) + + logger.info( + f"✓ Created edge: {source_node.get('id')} -> {target_node.get('id')} " + f"(useAs: {use_as}, description: {description})" + ) + except Exception as e: + logger.error(f"Failed to build edge from {component_id} to {target_yaml_id}: {e}", exc_info=True) + continue + + logger.info(f"✓ Successfully created {len(edges)} edges total") + + return edges + + except yaml.YAMLError as e: + logger.error(f"Failed to parse YAML: {e}") + raise ValueError(f"Invalid YAML format: {str(e)}") + except Exception as e: + logger.error(f"Error building edges: {e}", exc_info=True) + raise ValueError(f"Failed to build edges: {str(e)}") diff --git a/src/backend/base/langflow/spec_flow_builder/models.py b/src/backend/base/langflow/spec_flow_builder/models.py new file mode 100644 index 000000000000..e7aa43a496c9 --- /dev/null +++ b/src/backend/base/langflow/spec_flow_builder/models.py @@ -0,0 +1,51 @@ +"""Pydantic models for spec_flow_builder API.""" + +from typing import List, Optional + +from pydantic import BaseModel, Field + + +class ValidateSpecRequest(BaseModel): + """Request model for validating a YAML specification.""" + + yaml_content: str = Field(..., description="YAML specification content to validate") + + +class ComponentStatus(BaseModel): + """Status of a single component in the specification.""" + + id: str = Field(..., description="Component ID from YAML") + name: str = Field(..., description="Component name from YAML") + yaml_type: str = Field(..., description="Component type from YAML (e.g., PromptComponent)") + found: bool = Field(..., description="Whether component exists in catalog") + catalog_name: Optional[str] = Field(None, description="Catalog component name if found (e.g., 'Prompt Template')") + category: Optional[str] = Field(None, description="Component category if found (e.g., 'processing')") + error: Optional[str] = Field(None, description="Error message if not found") + + +class ValidationReport(BaseModel): + """Validation report for the entire specification.""" + + valid: bool = Field(..., description="Overall validation status - True if all components found") + total_components: int = Field(..., description="Total number of components in YAML") + found_components: int = Field(..., description="Number of components found in catalog") + missing_components: int = Field(..., description="Number of missing components") + components: List[ComponentStatus] = Field(..., description="Detailed status for each component") + errors: List[str] = Field(default_factory=list, description="List of validation errors") + + +class CreateFlowRequest(BaseModel): + """Request model for creating a flow from YAML specification.""" + + yaml_content: str = Field(..., description="YAML specification content to convert to flow") + flow_name: Optional[str] = Field(None, description="Optional custom name for the flow") + folder_id: Optional[str] = Field(None, description="Optional folder ID to save the flow in") + + +class CreateFlowResponse(BaseModel): + """Response model for flow creation.""" + + success: bool = Field(..., description="Whether the flow was created successfully") + message: str = Field(..., description="Success or error message") + flow_id: Optional[str] = Field(None, description="ID of the created flow if successful") + flow_name: Optional[str] = Field(None, description="Name of the created flow if successful") \ No newline at end of file diff --git a/src/backend/base/langflow/spec_flow_builder/node_builder.py b/src/backend/base/langflow/spec_flow_builder/node_builder.py new file mode 100644 index 000000000000..759a93a9497f --- /dev/null +++ b/src/backend/base/langflow/spec_flow_builder/node_builder.py @@ -0,0 +1,255 @@ +"""Node Builder - Creates flow nodes from YAML components.""" + +import logging +import secrets +from typing import Any, Dict, List, Tuple + +import yaml + +logger = logging.getLogger(__name__) + + +class NodeBuilder: + """ + Builds flow nodes from YAML specification components. + + This class is responsible for: + - Matching YAML component types with catalog components + - Creating node structures with proper IDs and positions + - Generating node IDs and positions + """ + + def __init__(self, all_components: Dict[str, Any]): + """ + Initialize the NodeBuilder. + + Args: + all_components: Component catalog from get_and_cache_all_types_dict() + Structure: {category: {component_name: component_data}} + """ + self.all_components = all_components + total_count = sum(len(comps) for comps in all_components.values()) + logger.info(f"NodeBuilder initialized with {total_count} components") + + def _find_component_by_type(self, yaml_type: str) -> Tuple[str, str, Dict[str, Any]]: + """ + Find component template by searching for the class name in the code. + + Args: + yaml_type: The component type from YAML (e.g., "PromptComponent") + + Returns: + Tuple of (category, component_name, component_data) + + Raises: + ValueError: If component type is not found in catalog + """ + logger.debug(f"Looking for component with type: {yaml_type}") + + components = self.all_components + search_pattern = f"class {yaml_type}" + + # Search through all categories and components + for category, category_components in components.items(): + for component_name, component_data in category_components.items(): + # Check if template.code.value contains the class definition + if "template" in component_data and "code" in component_data["template"]: + code_value = component_data["template"]["code"].get("value", "") + if search_pattern in code_value: + logger.info(f"Found component: {component_name} in category: {category}") + return category, component_name, component_data + + # Component not found + error_msg = f"Component type '{yaml_type}' not found in catalog. Searched for: {search_pattern}" + logger.error(error_msg) + raise ValueError(error_msg) + + def _generate_node_id(self, display_name: str) -> str: + """ + Generate unique node ID in the format: {display_name}-{random}. + + Args: + display_name: The display name of the component + + Returns: + Unique node ID (e.g., "Prompt Template-a3f2b1") + """ + # Generate 5-character random suffix + random_suffix = secrets.token_hex(3)[:5] # 6 hex chars -> take 5 + node_id = f"{display_name}-{random_suffix}" + logger.debug(f"Generated node ID: {node_id}") + return node_id + + def _calculate_position(self, index: int) -> Dict[str, float]: + """ + Calculate node position for auto-layout. + + Uses a simple vertical stacking layout with some horizontal offset. + + Args: + index: The index of the node (0-based) + + Returns: + Position dictionary with x and y coordinates + """ + # Start position + start_x = 220.0 + start_y = 166.0 + + # Spacing between nodes + vertical_spacing = 300.0 + + position = {"x": start_x, "y": start_y + (index * vertical_spacing)} + + logger.debug(f"Calculated position for node {index}: {position}") + return position + + def _build_single_node( + self, yaml_component: Dict[str, Any], index: int, component_template: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Build a single node structure from YAML component and component template. + + Args: + yaml_component: Component definition from YAML + index: Index of the component (for position calculation) + component_template: Full component template from /api/v1/all + + Returns: + Complete node structure ready for flow JSON + """ + # Generate node ID + template_display_name = component_template.get("display_name", "Component") + node_id = self._generate_node_id(template_display_name) + + # Calculate position + position = self._calculate_position(index) + + # Create a deep copy of the component template for the node + import copy + + node_template = copy.deepcopy(component_template) + + # Override display_name and description from YAML + node_template["display_name"] = yaml_component.get("name", template_display_name) + node_template["description"] = yaml_component.get("description", node_template.get("description", "")) + + # Check if component should be used as a tool + as_tools = yaml_component.get("asTools", False) + if as_tools: + node_template["tool_mode"] = True + # Replace outputs with tool output structure + logger.debug(f"Component {yaml_component.get('id')} has asTools=true, replacing outputs with tool structure") + node_template["outputs"] = [ + { + "types": ["Tool"], + "selected": "Tool", + "name": "component_as_tool", + "display_name": "Toolset", + "method": "to_toolkit", + "value": "__UNDEFINED__", + "cache": True, + "required_inputs": None, + "allows_loop": False, + "group_outputs": False, + "options": None, + "tool_mode": True, + } + ] + + # Build the complete node structure + node = { + "id": node_id, + "type": "genericNode", + "position": position, + "data": { + "node": node_template, + "showNode": True, + "type": template_display_name, # Original template display name + "id": node_id, + "yaml_component_id": yaml_component.get("id"), # Store YAML component ID for config matching + # "asTools": as_tools, # Store asTools flag for EdgeBuilder + }, + "selected": True, + "measured": {"width": 320, "height": 254}, + "dragging": False, + } + + logger.debug( + f"Built node: {node_id} (type: {template_display_name}, yaml_id: {yaml_component.get('id')}, asTools: {as_tools})" + ) + return node + + async def build_nodes(self, yaml_content: str) -> List[Dict[str, Any]]: + """ + Build all nodes from YAML specification. + + This is the main method that: + 1. Parses the YAML content + 2. Fetches all components from /api/v1/all + 3. For each YAML component, finds the matching template + 4. Builds the node structure with proper overrides + + Args: + yaml_content: YAML specification content + + Returns: + List of node dictionaries ready for flow creation + + Raises: + ValueError: If YAML is invalid, components are missing, or API calls fail + """ + logger.info("Building nodes from YAML specification") + + try: + # Step 1: Parse YAML + spec = yaml.safe_load(yaml_content) + yaml_components = spec.get("components", []) + + if not yaml_components: + logger.warning("No components found in YAML specification") + return [] + + logger.info(f"Found {len(yaml_components)} components in YAML") + + # Step 2: Build nodes for each YAML component + nodes = [] + for index, yaml_component in enumerate(yaml_components): + try: + # Get component type from YAML + yaml_type = yaml_component.get("type") + if not yaml_type: + logger.warning(f"Component at index {index} has no 'type' field, skipping") + continue + + yaml_id = yaml_component.get("id", f"component-{index}") + logger.info(f"Processing component {index + 1}/{len(yaml_components)}: {yaml_id} (type: {yaml_type})") + + # Find matching component template + category, component_name, component_template = self._find_component_by_type(yaml_type) + + # Build node structure + node = self._build_single_node(yaml_component, index, component_template) + + nodes.append(node) + + except ValueError as e: + # Component not found or other validation error + logger.error(f"Failed to build node for component {yaml_component.get('id', index)}: {e}") + raise + except Exception as e: + logger.error( + f"Unexpected error building node for component {yaml_component.get('id', index)}: {e}", + exc_info=True, + ) + raise ValueError(f"Failed to build node: {str(e)}") + + logger.info(f"Successfully built {len(nodes)} nodes") + return nodes + + except yaml.YAMLError as e: + logger.error(f"Failed to parse YAML: {e}") + raise ValueError(f"Invalid YAML format: {str(e)}") + except Exception as e: + logger.error(f"Error building nodes: {e}", exc_info=True) + raise diff --git a/src/backend/base/langflow/spec_flow_builder/provides_validator.py b/src/backend/base/langflow/spec_flow_builder/provides_validator.py new file mode 100644 index 000000000000..b473e068849c --- /dev/null +++ b/src/backend/base/langflow/spec_flow_builder/provides_validator.py @@ -0,0 +1,480 @@ +"""Validator for 'provides' connections in YAML specs. + +This module provides a validation routine, optionally exposed as a FastAPI +dependency, to validate the "provides" connections in incoming YAML payloads. It ensures: +- All referenced components exist +- Each target component can accept the provided input type +- The YAML structure for connections is well-formed + +On validation failure, it raises an HTTPException with status 400 and detailed errors. +""" + +import logging +import re +from typing import Any, Dict, List, Tuple, Optional + +import yaml +from fastapi import HTTPException, Body +from .models import ValidateSpecRequest + +from .component_resolver import ComponentResolver + +logger = logging.getLogger(__name__) + + +class ProvidesConnectionValidator: + """Validates 'provides' connections using catalog and component schemas.""" + + def __init__(self, resolver: ComponentResolver): + self.resolver = resolver + self._components_catalog: Optional[Dict[str, Any]] = None + + def _load_components_catalog(self) -> Dict[str, Any]: + """Load component catalog from resolver cache to access tool_mode flags and I/O details. + + Returns an empty dict if cache is unavailable. + """ + if self._components_catalog is not None: + return self._components_catalog + + try: + cached = self.resolver.get_cached_components() + self._components_catalog = cached or {} + except Exception: + self._components_catalog = {} + return self._components_catalog + + def _find_comp_data_by_class(self, class_name: str) -> Optional[Dict[str, Any]]: + """Find and return component data in the catalog by Python class name. + + This scans the catalog categories, reads `template.code.value`, and matches + the first `class ` occurrence against `class_name` (case-insensitive). + Returns the matching component's data dict if found, else None. + """ + catalog = self._load_components_catalog() + try: + for _category, comps in catalog.items(): + if not isinstance(comps, dict): + continue + for _comp_name, comp_data in comps.items(): + try: + template = comp_data.get("template", {}) + code_field = template.get("code", {}) + code_value = code_field.get("value", "") + if not isinstance(code_value, str) or not code_value: + continue + # Extract the first class name declaration + match = re.search(r"class\s+(\w+)", code_value) + if not match: + continue + found_class = match.group(1) + if isinstance(found_class, str) and found_class.lower() == class_name.lower(): + return comp_data + except Exception: + # Ignore malformed entries and keep scanning + continue + except Exception: + return None + return None + + def _find_tool_mode_output(self, component_class: str) -> Optional[str]: + """Find an output field marked with tool_mode=true for the given component. + + Returns the output name if found, else None. + """ + try: + comp_data = self._find_comp_data_by_class(component_class) + if not isinstance(comp_data, dict): + return None + outputs = comp_data.get("outputs", []) + for out in outputs: + if out.get("tool_mode") is True: + name = out.get("name") + if isinstance(name, str) and name: + return name + except Exception: + return None + return None + + def _template_supports_tool_mode(self, component_class: str) -> bool: + """Return True if any item inside the component's template has tool_mode=true. + + This checks the catalog's `template` object for fields whose config includes + `tool_mode: true`. If none are present (or template missing), returns False. + """ + try: + comp_data = self._find_comp_data_by_class(component_class) + if not isinstance(comp_data, dict): + return False + template = comp_data.get("template") + if not isinstance(template, dict): + return False + for _field_name, field_cfg in template.items(): + if isinstance(field_cfg, dict) and field_cfg.get("tool_mode") is True: + return True + except Exception: + return False + return False + + def _preferred_system_prompt_input(self, component_class: str) -> Optional[str]: + """Return preferred system prompt input name if present on target. + + Prefers 'system_message', falls back to 'system_prompt'. Returns None if neither. + """ + catalog = self._load_components_catalog() + try: + for category, comps in catalog.items(): + comp_data = comps.get(component_class) + if not isinstance(comp_data, dict): + continue + inputs = comp_data.get("template", {}) + # Inputs in all_components.json are under the 'template' key + if isinstance(inputs, dict): + if "system_message" in inputs: + return "system_message" + if "system_prompt" in inputs: + return "system_prompt" + except Exception: + return None + return None + + async def _get_class_name_for_yaml_type(self, yaml_type: str) -> Tuple[str, str, Dict[str, Any]] | None: + """Find component info in catalog for a given YAML type (class name). + + Returns tuple (category, catalog_name, comp_data) or None. + """ + return self.resolver.find_component(yaml_type) + + async def _validate_yaml_structure(self, spec: Dict[str, Any]) -> List[str]: + """Basic structural validation for components and provides blocks.""" + errors: List[str] = [] + + components = spec.get("components") + if components is None: + errors.append("Missing 'components' in YAML specification") + return errors + + if not isinstance(components, list): + errors.append("'components' must be a list") + return errors + + for idx, comp in enumerate(components): + if not isinstance(comp, dict): + errors.append(f"Component at index {idx} must be a dictionary") + continue + + # Required fields for component identification + if "id" not in comp or not comp.get("id"): + errors.append(f"Component at index {idx} missing required field 'id'") + if "type" not in comp or not comp.get("type"): + errors.append(f"Component '{comp.get('id', idx)}' missing required field 'type'") + + # Validate provides structure if present + provides = comp.get("provides", []) + if provides is None: + # Allows explicit null but warns + errors.append(f"Component '{comp.get('id', idx)}' has null 'provides'; expected list") + continue + if not isinstance(provides, list): + errors.append(f"Component '{comp.get('id', idx)}' 'provides' must be a list") + continue + + for p_idx, pr in enumerate(provides): + if not isinstance(pr, dict): + errors.append( + f"Component '{comp.get('id', idx)}' provides[{p_idx}] must be a dictionary" + ) + continue + if "useAs" not in pr or not pr.get("useAs"): + errors.append( + f"Component '{comp.get('id', idx)}' provides[{p_idx}] missing required field 'useAs'" + ) + if "in" not in pr or not pr.get("in"): + errors.append( + f"Component '{comp.get('id', idx)}' provides[{p_idx}] missing required field 'in'" + ) + + return errors + + async def validate(self, yaml_content: str) -> List[str]: + """Run full provides validation. Returns list of error messages (empty if valid).""" + errors: List[str] = [] + + # Parse YAML + try: + spec_dict = yaml.safe_load(yaml_content) + except yaml.YAMLError as e: + logger.error(f"YAML parsing error: {e}") + return [f"YAML parsing error: {str(e)}"] + + if not isinstance(spec_dict, dict) or not spec_dict: + return ["Empty or invalid YAML content"] + + # Structural validation + struct_errors = await self._validate_yaml_structure(spec_dict) + if struct_errors: + errors.extend(struct_errors) + + # If structure invalid, no need to proceed further + if errors: + return errors + + # Load component catalog, preferring existing cache from the shared resolver + try: + if not self.resolver.get_cached_components(): + await self.resolver.fetch_all_components() + except Exception as e: + logger.error(f"Failed to load component catalog: {e}") + return [f"Failed to load component catalog: {str(e)}"] + + components = spec_dict.get("components", []) + comp_by_id: Dict[str, Dict[str, Any]] = {c.get("id"): c for c in components if isinstance(c, dict)} + + # Validate that all components exist in catalog by type + for comp in components: + comp_type = comp.get("type") + if not self.resolver.find_component(comp_type): + errors.append( + f"Component '{comp.get('id', 'unknown')}' (type: '{comp_type}') not found in catalog" + ) + + # Early return if we already have missing components + if errors: + return errors + + # Import the inspector lazily to avoid heavy import on app start + try: + from langflow.services.spec.component_schema_inspector import ComponentSchemaInspector + inspector = ComponentSchemaInspector() + except Exception as e: + logger.error(f"Failed to initialize ComponentSchemaInspector: {e}") + return ["Internal validator setup error"] + + # Build helper: class name per component (based on YAML type) + def get_class_name_from_yaml_type(yaml_type: str) -> str: + # The YAML 'type' is already the class name for most components + return yaml_type + + # Preliminary check: disallow asTools when template does not support tool-mode + for comp in components: + comp_id = comp.get("id", "unknown") + comp_type = comp.get("type", "") + comp_class = get_class_name_from_yaml_type(comp_type) + try: + supports_tools = self._template_supports_tool_mode(comp_class) + except Exception: + supports_tools = False + if bool(comp.get("asTools", False)) and not supports_tools: + errors.append( + ( + f"Component '{comp_id}' declares asTools: true but its catalog template has no items " + f"with tool_mode: true (component class: '{comp_class}')." + ) + ) + + # Additional rule: if a component is declared as a tool (asTools: true), + # all its provides connections must use useAs: 'tools'. Mixing non-tool useAs + # is disallowed to avoid ambiguous semantics. + for comp in components: + comp_id = comp.get("id", "unknown") + if bool(comp.get("asTools", False)): + provides_list = comp.get("provides", []) or [] + for pr in provides_list: + if isinstance(pr, dict): + pr_use_as = pr.get("useAs") + pr_target = pr.get("in") + if pr_use_as and pr_use_as != "tools": + errors.append( + ( + f"Component '{comp_id}' is declared as a tool (asTools: true) but declares a " + f"connection to '{pr_target}' using useAs: '{pr_use_as}'. Components declared as tools " + f"must only use useAs: 'tools'." + ) + ) + + # Validate provides connections + for comp in components: + source_id = comp.get("id", "unknown") + source_type = comp.get("type", "") + source_class = get_class_name_from_yaml_type(source_type) + + provides_list = comp.get("provides", []) + for pr in provides_list: + target_id = pr.get("in") + use_as = pr.get("useAs") + + # If connection declares tools usage, enforce template-based support and declaration + if use_as == "tools": + # First: component must support tool-mode via template + if not self._template_supports_tool_mode(source_class): + errors.append( + ( + f"Component '{source_id}' ({source_class}) does not support tool mode via template: " + f"no template items with tool_mode: true; cannot use 'useAs: tools' to '{target_id}'." + ) + ) + continue + + # Second: require explicit asTools: true declaration + source_is_tools = bool(comp.get("asTools", False)) + if not source_is_tools: + errors.append( + ( + f"Component '{source_id}' supports tool mode via template but is not declared as a tool " + f"(missing asTools: true); cannot use 'useAs: tools' to '{target_id}'." + ) + ) + continue + + # Target existence by id + if target_id not in comp_by_id: + errors.append( + f"Component '{source_id}' provides to unknown target id '{target_id}'" + ) + continue + + target_comp = comp_by_id[target_id] + target_type = target_comp.get("type", "") + target_class = get_class_name_from_yaml_type(target_type) + + # Check that inspector knows both classes + if inspector.get_component_schema(source_class) is None: + errors.append( + f"Source component class '{source_class}' not recognized in Langflow" + ) + continue + if inspector.get_component_schema(target_class) is None: + errors.append( + f"Target component class '{target_class}' not recognized in Langflow" + ) + continue + + # Determine default fields via IO mapping + io_map = inspector.get_component_io_mapping() + src_map = io_map.get(source_class) + tgt_map = io_map.get(target_class) + + # Fallbacks + source_output = (src_map or {}).get("output_field") or "output" + target_input = (tgt_map or {}).get("input_field") or "input_value" + + # Adjust fields based on useAs semantics and tool_mode flags + if use_as == "tools": + # Target agents should receive tools on the 'tools' input + target_input = "tools" + # Do not rely on outputs for tool-mode validation; keep default source output + elif use_as == "system_prompt": + # Prefer explicit system prompt inputs if available + preferred_input = self._preferred_system_prompt_input(target_class) + if isinstance(preferred_input, str) and preferred_input: + target_input = preferred_input + + # Verify chosen fields exist on source/target; if not, keep fallbacks + source_schema = inspector.get_component_schema(source_class) + target_schema = inspector.get_component_schema(target_class) + try: + src_outputs = {out.get("name") for out in (source_schema.outputs if source_schema else [])} + tgt_inputs = {inp.get("name") for inp in (target_schema.inputs if target_schema else [])} + if source_output not in src_outputs: + source_output = (src_map or {}).get("output_field") or "output" + # For tool connections, require explicit 'tools' input on target; do not fallback + if use_as == "tools": + if "tools" not in tgt_inputs: + errors.append( + f"Target '{target_id}' ({target_class}) does not expose a 'tools' input to receive tools" + ) + continue + # Keep target_input as 'tools' if present + else: + # For other useAs types, if chosen input is missing, fallback to default input + if target_input not in tgt_inputs: + target_input = (tgt_map or {}).get("input_field") or "input_value" + except Exception: + # On any introspection error, retain original fallbacks + source_output = (src_map or {}).get("output_field") or source_output + target_input = (tgt_map or {}).get("input_field") or target_input + + # Additional semantic checks based on useAs (prioritize specific messages) + if use_as == "system_prompt": + # Expect str input or a dedicated 'system_message' input field on target + target_schema = inspector.get_component_schema(target_class) + target_inputs = {i.get("name") for i in (target_schema.inputs if target_schema else [])} + target_types = set((target_schema.input_types if target_schema else [])) + # Accept both common field names: 'system_message' (models) and 'system_prompt' (agents) + if ( + "str" not in target_types + and "system_message" not in target_inputs + and "system_prompt" not in target_inputs + ): + errors.append( + f"Target '{target_id}' ({target_class}) does not accept system prompts" + ) + continue + elif use_as == "tools": + # Tools should generally connect into agents; enforce agent-like target + target_schema = inspector.get_component_schema(target_class) + class_name = (target_schema.class_name if target_schema else "").lower() + module_path = (target_schema.module_path if target_schema else "").lower() + if "agent" not in class_name and "agents" not in module_path: + errors.append( + f"Target '{target_id}' ({target_class}) is not an agent and cannot receive tools" + ) + continue + elif use_as == "input": + # Generic input should be accepted; minimal check is that target has any inputs + target_schema = inspector.get_component_schema(target_class) + if not target_schema or not target_schema.inputs: + errors.append( + f"Target '{target_id}' ({target_class}) cannot accept inputs" + ) + continue + + # Validate I/O field existence and type compatibility + result = inspector.validate_component_connection( + source_comp=source_class, + target_comp=target_class, + source_output=source_output, + target_input=target_input, + ) + + if not result.get("valid"): + # Enhance error based on useAs for better messaging + base_error = result.get("error") or "Invalid connection" + errors.append( + f"Invalid provides connection: {source_id} ({source_class}) → {target_id} ({target_class})" + f" as '{use_as}': {base_error}" + ) + continue + + return errors + + +async def validate_provides_validator(request_model: ValidateSpecRequest = Body(...)) -> ValidateSpecRequest: + """FastAPI dependency to validate 'provides' connections before handler. + + Args: + request_model: Pydantic model with `yaml_content` field (e.g., ValidateSpecRequest) + + Raises: + HTTPException(400): If any validation error is found + """ + try: + yaml_content = getattr(request_model, "yaml_content", None) + if yaml_content is None: + raise HTTPException(status_code=400, detail={"errors": ["Missing 'yaml_content' in request body"]}) + + resolver = ComponentResolver() + validator = ProvidesConnectionValidator(resolver) + errors = await validator.validate(yaml_content) + + if errors: + raise HTTPException(status_code=400, detail={"errors": errors}) + + return request_model + + except HTTPException: + # Propagate HTTP 400 raised above + raise + except Exception as e: + logger.error(f"Provides validation error: {e}", exc_info=True) + raise HTTPException(status_code=400, detail={"errors": [f"Validation error: {str(e)}"]}) \ No newline at end of file diff --git a/src/backend/base/langflow/spec_flow_builder/utils.py b/src/backend/base/langflow/spec_flow_builder/utils.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/backend/base/langflow/spec_flow_builder/validator.py b/src/backend/base/langflow/spec_flow_builder/validator.py new file mode 100644 index 000000000000..10005a50f622 --- /dev/null +++ b/src/backend/base/langflow/spec_flow_builder/validator.py @@ -0,0 +1,177 @@ +"""Validator for YAML specifications.""" + +import logging +from typing import List + +import yaml + +from .component_resolver import ComponentResolver +from .models import ComponentStatus, ValidationReport + +logger = logging.getLogger(__name__) + + +class SpecValidator: + """Validates YAML specifications - checks if components exist.""" + + def __init__(self, component_resolver: ComponentResolver): + """ + Initialize validator with component resolver. + + Args: + component_resolver: ComponentResolver instance for finding components + """ + self.resolver = component_resolver + + async def validate(self, yaml_content: str) -> ValidationReport: + """ + Validate YAML specification - check component existence only. + + This is Step 1 of validation. We ONLY check if components exist in the catalog. + Later steps will validate configs, connections, etc. + + Steps: + 1. Parse the YAML content + 2. Load the component catalog + 3. For each component in YAML: + - Get the type field (class name) + - Search catalog using component_resolver + - Record whether it was found or not + 4. Build and return a ValidationReport + + Example: + Input YAML has 6 components with types: + - PromptComponent + - ChatInput + - AgentComponent + - KnowledgeHubSearchComponent + - APIRequestComponent + - ChatOutput + + Validator checks each one: + ✓ PromptComponent → Found as "Prompt Template" in "processing" + ✓ ChatInput → Found as "ChatInput" in "input_output" + ✓ AgentComponent → Found as "Agent" in "agents" + ... etc + + Returns: ValidationReport with valid=True, all 6 found + + Args: + yaml_content: YAML specification string + + Returns: + ValidationReport with validation results + """ + errors: List[str] = [] + component_statuses: List[ComponentStatus] = [] + + try: + # Step 1: Parse YAML + spec_dict = yaml.safe_load(yaml_content) + if not spec_dict: + return ValidationReport( + valid=False, + total_components=0, + found_components=0, + missing_components=0, + components=[], + errors=["Empty or invalid YAML content"], + ) + + # Step 2: Load component catalog + await self.resolver.fetch_all_components() + + # Step 3: Get components list from YAML + components = spec_dict.get("components", []) + if not components: + errors.append("No components defined in specification") + + # Step 4: Check each component + found_count = 0 + + for component in components: + # Extract component info from YAML + comp_id = component.get("id", "unknown") + comp_name = component.get("name", "unknown") + comp_type = component.get("type", "unknown") + + logger.info(f"Validating component: id={comp_id}, name={comp_name}, type={comp_type}") + + # Try to find component in catalog + result = self.resolver.find_component(comp_type) + + if result: + # Component found! + category, catalog_name, comp_data = result + + component_statuses.append( + ComponentStatus( + id=comp_id, + name=comp_name, + yaml_type=comp_type, + found=True, + catalog_name=catalog_name, + category=category, + error=None, + ) + ) + found_count += 1 + logger.info(f" ✓ Found: {comp_type} → {category}.{catalog_name}") + + else: + # Component NOT found + error_msg = f"Component type '{comp_type}' not found in catalog" + + component_statuses.append( + ComponentStatus( + id=comp_id, + name=comp_name, + yaml_type=comp_type, + found=False, + catalog_name=None, + category=None, + error=error_msg, + ) + ) + errors.append(f"Component '{comp_id}' (type: '{comp_type}') not found in catalog") + logger.warning(f" ✗ Not found: {comp_type}") + + # Build final report + total = len(components) + missing = total - found_count + is_valid = missing == 0 # Valid only if ALL components found + + logger.info(f"Validation complete: {found_count}/{total} components found") + + return ValidationReport( + valid=is_valid, + total_components=total, + found_components=found_count, + missing_components=missing, + components=component_statuses, + errors=errors, + ) + + except yaml.YAMLError as e: + # YAML parsing failed + logger.error(f"YAML parsing error: {e}") + return ValidationReport( + valid=False, + total_components=0, + found_components=0, + missing_components=0, + components=[], + errors=[f"YAML parsing error: {str(e)}"], + ) + + except Exception as e: + # Unexpected error + logger.error(f"Validation error: {e}", exc_info=True) + return ValidationReport( + valid=False, + total_components=0, + found_components=0, + missing_components=0, + components=[], + errors=[f"Validation failed: {str(e)}"], + ) \ No newline at end of file diff --git a/src/backend/tests/unit/spec_flow_builder/test_config_validator.py b/src/backend/tests/unit/spec_flow_builder/test_config_validator.py new file mode 100644 index 000000000000..125e82d08d47 --- /dev/null +++ b/src/backend/tests/unit/spec_flow_builder/test_config_validator.py @@ -0,0 +1,178 @@ +import sys +from typing import Any, Dict + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + + +def build_app(monkeypatch, fake_catalog: Dict[str, Any]) -> TestClient: + from langflow.spec_flow_builder import api as spec_api + + # Patch fetch_all_components to return provided fake catalog + from langflow.spec_flow_builder.component_resolver import ComponentResolver + + async def _fake_fetch_all_components(self): + self._cache = fake_catalog + return fake_catalog + + monkeypatch.setattr(ComponentResolver, "fetch_all_components", _fake_fetch_all_components) + + # Lightweight inspector stub sufficient for cases without provides + class StubSchema: + def __init__(self, class_name: str): + self.class_name = class_name + self.module_path = f"langflow.components.{class_name.lower()}" + self.inputs = [] + self.input_types = ["any"] + self.outputs = [] + self.output_types = ["any"] + + @property + def name(self): + return self.class_name + + class StubInspector: + def get_component_schema(self, name: str): + return StubSchema(name) + + def get_component_io_mapping(self): + return {} + + def validate_component_connection(self, source_comp, target_comp, source_output, target_input): + return {"valid": True, "error": None} + + monkeypatch.setitem( + sys.modules, + "langflow.services.spec.component_schema_inspector", + type("mod", (), {"ComponentSchemaInspector": StubInspector}), + ) + + app = FastAPI() + app.include_router(spec_api.router) + return TestClient(app) + + +def test_config_unknown_key(monkeypatch): + fake_catalog: Dict[str, Any] = { + "tools": { + "APIRequest": { + "template": { + "code": {"value": "class APIRequestComponent(Component):\n pass"}, + # Deliberately exclude 'url' to trigger unknown key + "headers": {"type": "dict"}, + "body": {"type": "dict"}, + } + } + } + } + + client = build_app(monkeypatch, fake_catalog) + + yaml_spec = """ +components: + - id: svc + type: APIRequestComponent + config: + url: "http://example.com" +""" + + resp = client.post("/spec-builder/validate", json={"yaml_content": yaml_spec}) + assert resp.status_code == 400 + body = resp.json() + assert any("Unknown config key 'url'" in e for e in body.get("detail", {}).get("errors", [])) + + +def test_config_headers_body_type_ignored(monkeypatch): + fake_catalog: Dict[str, Any] = { + "tools": { + "APIRequest": { + "template": { + "code": {"value": "class APIRequestComponent(Component):\n pass"}, + # Include keys so they are recognized, but type/list is ignored + "headers": {"type": "dict"}, + "body": {"type": "dict"}, + } + } + } + } + + client = build_app(monkeypatch, fake_catalog) + + yaml_spec = """ +components: + - id: svc + type: APIRequestComponent + config: + headers: + - name: "Content-Type" + value: "application/json" + body: + - key: "a" + value: 1 +""" + + resp = client.post("/spec-builder/validate", json={"yaml_content": yaml_spec}) + assert resp.status_code == 200, resp.text + body = resp.json() + assert "errors" in body and len(body["errors"]) == 0 + + +def test_config_missing_required_not_enforced(monkeypatch): + fake_catalog: Dict[str, Any] = { + "input_output": { + "ChatInput": { + "template": { + # Provide a code value so resolver can map class name + "code": {"value": "class ChatInput(Component):\n pass"}, + # Include 'message' so it's a known key and not flagged + "message": {"type": "str"}, + } + } + } + } + + client = build_app(monkeypatch, fake_catalog) + + yaml_spec = """ +components: + - id: req + type: ChatInput + config: + message: "hello" +""" + + # No error even though 'code' is defined in template and missing + resp = client.post("/spec-builder/validate", json={"yaml_content": yaml_spec}) + assert resp.status_code == 200 + body = resp.json() + assert "errors" in body and len(body["errors"]) == 0 + + +def test_config_type_enforced_for_known_key(monkeypatch): + fake_catalog: Dict[str, Any] = { + "input_output": { + "ChatInput": { + "template": { + # Provide a code value so resolver can map class name + "code": {"value": "class ChatInput(Component):\n pass"}, + "temperature": {"type": "float"}, + } + } + } + } + + client = build_app(monkeypatch, fake_catalog) + + yaml_spec = """ +components: + - id: req + type: ChatInput + config: + temperature: "hot" +""" + + resp = client.post("/spec-builder/validate", json={"yaml_content": yaml_spec}) + assert resp.status_code == 400 + body = resp.json() + assert any("expected float, got str" in e for e in body.get("detail", {}).get("errors", [])) \ No newline at end of file diff --git a/src/backend/tests/unit/spec_flow_builder/test_provides_middleware.py b/src/backend/tests/unit/spec_flow_builder/test_provides_middleware.py new file mode 100644 index 000000000000..5dfb9948c243 --- /dev/null +++ b/src/backend/tests/unit/spec_flow_builder/test_provides_middleware.py @@ -0,0 +1,343 @@ +import time +import sys +from typing import Any, Dict + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + + +# Build a minimal router app including our validate endpoint +def build_app(monkeypatch) -> TestClient: + from langflow.spec_flow_builder import api as spec_api + + # Mock component catalog to avoid heavy imports + fake_catalog: Dict[str, Any] = { + "processing": { + "Prompt Template": { + "template": { + "code": {"value": "class PromptComponent(Component):\n pass"} + } + }, + }, + "input_output": { + "ChatInput": { + "template": { + "code": {"value": "class ChatInput(Component):\n pass"} + } + }, + "ChatOutput": { + "template": { + "code": {"value": "class ChatOutput(Component):\n pass"} + } + }, + }, + "agents": { + "Agent": { + "template": { + "code": {"value": "class AgentComponent(Component):\n pass"} + } + }, + }, + "tools": { + "KnowledgeHubSearch": { + "template": { + "code": {"value": "class KnowledgeHubSearchComponent(Component):\n pass"}, + # Mark template as supporting tool-mode + "tool_config": {"tool_mode": True} + } + }, + "APIRequest": { + "template": { + "code": {"value": "class APIRequestComponent(Component):\n pass"} + } + }, + }, + } + + # Patch fetch_all_components to return fake catalog + from langflow.spec_flow_builder.component_resolver import ComponentResolver + + async def _fake_fetch_all_components(self): + self._cache = fake_catalog + return fake_catalog + + monkeypatch.setattr(ComponentResolver, "fetch_all_components", _fake_fetch_all_components) + + # Patch inspector with a lightweight stub + class StubSchema: + def __init__(self, class_name: str): + self.class_name = class_name + self.module_path = f"langflow.components.{class_name.lower()}" + # Inputs: agent accepts str/message; chatoutput accepts message; others produce outputs + if class_name == "AgentComponent": + self.inputs = [ + {"name": "input_value", "field_type": "MessageInput"}, + {"name": "system_message", "field_type": "StrInput"}, + {"name": "tools", "field_type": "DataInput"}, + ] + self.input_types = ["Message", "str", "Data"] + elif class_name == "ChatOutput": + self.inputs = [{"name": "input_value", "field_type": "MessageInput"}] + self.input_types = ["Message"] + else: + self.inputs = [] + self.input_types = ["any"] + + # Outputs + if class_name in {"ChatInput", "AgentComponent", "PromptComponent", "KnowledgeHubSearchComponent", "APIRequestComponent"}: + self.outputs = [{"name": "output", "field_type": "Output"}] + # Simplify type mapping + if class_name in {"ChatInput", "AgentComponent"}: + self.output_types = ["Message"] + elif class_name == "PromptComponent": + self.output_types = ["str"] + else: + self.output_types = ["Data"] + else: + self.outputs = [] + self.output_types = ["any"] + + @property + def name(self): + return self.class_name + + class StubInspector: + def get_component_schema(self, name: str): + known = { + "PromptComponent", + "ChatInput", + "ChatOutput", + "AgentComponent", + "KnowledgeHubSearchComponent", + "APIRequestComponent", + } + return StubSchema(name) if name in known else None + + def get_component_io_mapping(self): + return { + "PromptComponent": {"input_field": "template", "output_field": "output", "output_types": ["str"], "input_types": ["str"]}, + "ChatInput": {"input_field": "message", "output_field": "output", "output_types": ["Message"], "input_types": ["Message"]}, + "AgentComponent": {"input_field": "input_value", "output_field": "output", "output_types": ["Message"], "input_types": ["Message", "str", "Data"]}, + "KnowledgeHubSearchComponent": {"input_field": "search_query", "output_field": "output", "output_types": ["Data"], "input_types": ["str"]}, + "APIRequestComponent": {"input_field": "parameters", "output_field": "output", "output_types": ["Data"], "input_types": ["Data"]}, + "ChatOutput": {"input_field": "input_value", "output_field": "output", "output_types": ["Message"], "input_types": ["Message"]}, + } + + def validate_component_connection(self, source_comp, target_comp, source_output, target_input): + # Accept if types intersect + src = self.get_component_schema(source_comp) + tgt = self.get_component_schema(target_comp) + if not src or not tgt: + return {"valid": False, "error": "Component schema not found"} + compatible = any(t in tgt.input_types for t in src.output_types) + return { + "valid": compatible, + "source_types": src.output_types, + "target_types": tgt.input_types, + "error": None if compatible else "Type mismatch between components", + } + + # Patch the inspector in the middleware module import path + import langflow.spec_flow_builder.provides_validator as pm + + monkeypatch.setitem( + sys.modules, + "langflow.services.spec.component_schema_inspector", + type("mod", (), {"ComponentSchemaInspector": StubInspector}), + ) + + app = FastAPI() + app.include_router(spec_api.router) + return TestClient(app) + + +def test_valid_connections(monkeypatch): + client = build_app(monkeypatch) + + yaml_spec = """ +components: + - id: eoc-prompt + name: Agent Instructions + type: PromptComponent + provides: + - useAs: system_prompt + in: eoc-agent + - id: eoc-request + name: EOC Check Request + type: ChatInput + provides: + - useAs: input + in: eoc-agent + - id: eoc-agent + name: EOC Validation Agent + type: AgentComponent + provides: + - useAs: input + in: eoc-formatter + - id: eoc-search + name: EOC Document Search + type: KnowledgeHubSearchComponent + asTools: true + provides: + - useAs: tools + in: eoc-agent + - id: eoc-formatter + name: EOC Validation Results + type: ChatOutput +""" + + resp = client.post("/spec-builder/validate", json={"yaml_content": yaml_spec}) + assert resp.status_code == 200, resp.text + body = resp.json() + assert "errors" in body + assert len(body["errors"]) == 0 + + +def test_invalid_target_id(monkeypatch): + client = build_app(monkeypatch) + + yaml_spec = """ +components: + - id: prompt + type: PromptComponent + provides: + - useAs: system_prompt + in: missing-agent + - id: agent + type: AgentComponent +""" + + resp = client.post("/spec-builder/validate", json={"yaml_content": yaml_spec}) + assert resp.status_code == 400 + body = resp.json() + assert any("unknown target id" in e.lower() for e in body.get("detail", {}).get("errors", [])) + + +def test_component_cannot_accept_input(monkeypatch): + client = build_app(monkeypatch) + + # Patch inspector to make ChatOutput not accept inputs + import langflow.spec_flow_builder.provides_validator as pm + StubInspector = sys.modules["langflow.services.spec.component_schema_inspector"].ComponentSchemaInspector + + class BadOutputStubInspector(StubInspector): + def get_component_schema(self, name: str): + schema = super().get_component_schema(name) + if schema and name == "ChatOutput": + schema.inputs = [] + schema.input_types = [] + return schema + + # Repatch + sys.modules["langflow.services.spec.component_schema_inspector"].ComponentSchemaInspector = BadOutputStubInspector + + yaml_spec = """ +components: + - id: agent + type: AgentComponent + provides: + - useAs: input + in: sink + - id: sink + type: ChatOutput +""" + + resp = client.post("/spec-builder/validate", json={"yaml_content": yaml_spec}) + assert resp.status_code == 400 + body = resp.json() + assert any("cannot accept inputs" in e.lower() for e in body.get("detail", {}).get("errors", [])) + + +def test_tools_target_without_tools_input(monkeypatch): + client = build_app(monkeypatch) + + yaml_spec = """ +components: + - id: search + type: KnowledgeHubSearchComponent + asTools: true + provides: + - useAs: tools + in: sink + - id: sink + type: ChatOutput +""" + + resp = client.post("/spec-builder/validate", json={"yaml_content": yaml_spec}) + assert resp.status_code == 400 + body = resp.json() + assert any("does not expose a 'tools' input" in e for e in body.get("detail", {}).get("errors", [])) + + +def test_as_tools_must_use_tools(monkeypatch): + client = build_app(monkeypatch) + + yaml_spec = """ +components: + - id: search + type: KnowledgeHubSearchComponent + asTools: true + provides: + - useAs: input + in: agent + - id: agent + type: AgentComponent +""" + + resp = client.post("/spec-builder/validate", json={"yaml_content": yaml_spec}) + assert resp.status_code == 400 + body = resp.json() + assert any("must only use useAs: 'tools'" in e for e in body.get("detail", {}).get("errors", [])) + + +def test_malformed_yaml(monkeypatch): + client = build_app(monkeypatch) + + yaml_spec = """ +components: {} +""" + resp = client.post("/spec-builder/validate", json={"yaml_content": yaml_spec}) + assert resp.status_code == 400 + + +def test_yaml_parsing_error(monkeypatch): + client = build_app(monkeypatch) + + yaml_spec = """ +components: + - id: a + type: AgentComponent + provides: [ + - useAs: input + in: b +""" # missing closing + + resp = client.post("/spec-builder/validate", json={"yaml_content": yaml_spec}) + assert resp.status_code == 400 + assert "yaml parsing error" in resp.json().get("detail", {}).get("errors", [""])[0].lower() + + +def test_performance_large_yaml(monkeypatch): + client = build_app(monkeypatch) + + # Generate large spec with chained provides + parts = ["components:"] + num = 500 + for i in range(num): + comp_type = "ChatInput" if i == 0 else ("AgentComponent" if i % 3 == 0 else "PromptComponent") + target = f"comp-{i+1}" if i < num - 1 else "sink" + use_as = "input" if comp_type != "PromptComponent" else "system_prompt" + parts.append( + f" - id: comp-{i}\n type: {comp_type}\n provides:\n - useAs: {use_as}\n in: {target}" + ) + parts.append(" - id: sink\n type: ChatOutput") + yaml_spec = "\n".join(parts) + + start = time.time() + resp = client.post("/spec-builder/validate", json={"yaml_content": yaml_spec}) + duration = time.time() - start + + # Should complete within a reasonable time + assert resp.status_code in (200, 400) + assert duration < 2.5 \ No newline at end of file