Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions src/backend/base/langflow/api/v1/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from langflow.api.v1.schemas import FlowListCreate
from langflow.helpers.user import get_user_by_flow_id_or_endpoint_name
from langflow.initial_setup.constants import STARTER_FOLDER_NAME
from langflow.services.auth.utils import get_current_active_user
from langflow.services.database.models.flow.model import (
AccessTypeEnum,
Flow,
Expand Down Expand Up @@ -586,3 +587,51 @@ async def read_basic_examples(

except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e


@router.post("/expand/", status_code=200, dependencies=[Depends(get_current_active_user)])
async def expand_compact_flow_endpoint(
compact_data: dict,
):
"""Expand a compact flow format to full flow format.

This endpoint takes a minimal flow representation (as generated by AI agents)
and expands it to the full format expected by the Langflow UI.

The compact format only requires:
- nodes: list of {id, type, values?}
- edges: list of {source, source_output, target, target_input}

The endpoint returns the full flow data with complete component templates.

Example input:
```json
{
"nodes": [
{"id": "1", "type": "ChatInput"},
{"id": "2", "type": "OpenAIModel", "values": {"model_name": "gpt-4"}}
],
"edges": [
{"source": "1", "source_output": "message", "target": "2", "target_input": "input_value"}
]
}
```
"""
from lfx.interface.components import component_cache, get_and_cache_all_types_dict

from langflow.processing.expand_flow import expand_compact_flow

# Ensure component cache is loaded
if component_cache.all_types_dict is None:
settings_service = get_settings_service()
await get_and_cache_all_types_dict(settings_service)

if component_cache.all_types_dict is None:
raise HTTPException(status_code=500, detail="Component cache not initialized")

try:
return expand_compact_flow(compact_data, component_cache.all_types_dict)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
283 changes: 283 additions & 0 deletions src/backend/base/langflow/processing/expand_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
"""Expand compact flow format to full flow format.

This module provides functionality to expand a minimal/compact flow format
(used by AI agents) into the full flow format expected by Langflow.
"""

from __future__ import annotations

from copy import deepcopy
from typing import Any

from pydantic import BaseModel, Field


class CompactNode(BaseModel):
"""A compact node representation for AI-generated flows."""

id: str
type: str
values: dict[str, Any] = Field(default_factory=dict)
# If edited is True, the node field must contain the full node data
edited: bool = False
node: dict[str, Any] | None = None


class CompactEdge(BaseModel):
"""A compact edge representation for AI-generated flows."""

source: str
source_output: str
target: str
target_input: str


class CompactFlowData(BaseModel):
"""The compact flow data structure."""

nodes: list[CompactNode]
edges: list[CompactEdge]


def _get_flat_components(all_types_dict: dict[str, Any]) -> dict[str, Any]:
"""Flatten the component types dict for easy lookup by component name."""
return {
comp_name: comp_data
for components in all_types_dict.values()
if isinstance(components, dict)
for comp_name, comp_data in components.items()
}


def _expand_node(
compact_node: CompactNode,
flat_components: dict[str, Any],
) -> dict[str, Any]:
"""Expand a compact node to full node format.

Args:
compact_node: The compact node to expand
flat_components: Flattened component templates dict

Returns:
Full node data structure

Raises:
ValueError: If component type is not found and node is not edited
"""
# If the node is edited, it should have full node data
if compact_node.edited:
if not compact_node.node:
msg = f"Node {compact_node.id} is marked as edited but has no node data"
raise ValueError(msg)
return {
"id": compact_node.id,
"type": "genericNode",
"data": {
"type": compact_node.type,
"node": compact_node.node,
"id": compact_node.id,
},
}

# Look up component template
if compact_node.type not in flat_components:
msg = f"Component type '{compact_node.type}' not found in component index"
raise ValueError(msg)

# Deep copy the template to avoid mutation
template_data = deepcopy(flat_components[compact_node.type])

# Merge user values into template
template = template_data.get("template", {})
for field_name, field_value in compact_node.values.items():
if field_name in template:
if isinstance(template[field_name], dict):
template[field_name]["value"] = field_value
else:
template[field_name] = field_value
else:
# Add as new field if not in template
template[field_name] = {"value": field_value}

return {
"id": compact_node.id,
"type": "genericNode",
"data": {
"type": compact_node.type,
"node": template_data,
"id": compact_node.id,
},
}


def _encode_handle(data: dict[str, Any]) -> str:
"""Encode a handle dict to the special string format used by ReactFlow.

Uses œ instead of " for JSON encoding.
"""
from lfx.utils.util import escape_json_dump

return escape_json_dump(data)


def _build_source_handle_data(
node_id: str,
component_type: str,
output_name: str,
output_types: list[str],
) -> dict[str, Any]:
"""Build the sourceHandle data dict for an edge."""
return {
"dataType": component_type,
"id": node_id,
"name": output_name,
"output_types": output_types,
}


def _build_target_handle_data(
node_id: str,
field_name: str,
input_types: list[str],
field_type: str,
) -> dict[str, Any]:
"""Build the targetHandle data dict for an edge."""
return {
"fieldName": field_name,
"id": node_id,
"inputTypes": input_types,
"type": field_type,
}


def _expand_edge(
compact_edge: CompactEdge,
expanded_nodes: dict[str, dict[str, Any]],
) -> dict[str, Any]:
"""Expand a compact edge to full edge format.

Args:
compact_edge: The compact edge to expand
expanded_nodes: Dict of node_id -> expanded node data

Returns:
Full edge data structure
"""
source_node = expanded_nodes.get(compact_edge.source)
target_node = expanded_nodes.get(compact_edge.target)

if not source_node:
msg = f"Source node '{compact_edge.source}' not found"
raise ValueError(msg)
if not target_node:
msg = f"Target node '{compact_edge.target}' not found"
raise ValueError(msg)

source_node_data = source_node["data"]["node"]
target_node_data = target_node["data"]["node"]

# Find output types from source node
source_outputs = source_node_data.get("outputs", [])
source_output = next(
(o for o in source_outputs if o.get("name") == compact_edge.source_output),
None,
)
output_types = source_output.get("types", []) if source_output else []

# If no outputs defined, use base_classes
if not output_types:
output_types = source_node_data.get("base_classes", [])

# Find input types and field type from target node template
target_template = target_node_data.get("template", {})
target_field = target_template.get(compact_edge.target_input, {})
input_types = target_field.get("input_types", [])
field_type = target_field.get("type", "str") if isinstance(target_field, dict) else "str"
if not input_types and isinstance(target_field, dict):
input_types = [field_type]

source_type = source_node["data"]["type"]

# Build handle data objects
source_handle_data = _build_source_handle_data(
compact_edge.source,
source_type,
compact_edge.source_output,
output_types,
)
target_handle_data = _build_target_handle_data(
compact_edge.target,
compact_edge.target_input,
input_types,
field_type,
)

# Encode handles to string format
source_handle_str = _encode_handle(source_handle_data)
target_handle_str = _encode_handle(target_handle_data)

edge_id = f"reactflow__edge-{compact_edge.source}{source_handle_str}-{compact_edge.target}{target_handle_str}"

return {
"source": compact_edge.source,
"sourceHandle": source_handle_str,
"target": compact_edge.target,
"targetHandle": target_handle_str,
"id": edge_id,
"data": {
"sourceHandle": source_handle_data,
"targetHandle": target_handle_data,
},
"className": "",
"selected": False,
"animated": False,
}


def expand_compact_flow(
compact_data: dict[str, Any],
all_types_dict: dict[str, Any],
) -> dict[str, Any]:
"""Expand a compact flow format to full flow format.

Args:
compact_data: The compact flow data with nodes and edges
all_types_dict: The component types dictionary from component_cache

Returns:
Full flow data structure ready for Langflow UI

Example compact input:
{
"nodes": [
{"id": "1", "type": "ChatInput"},
{"id": "2", "type": "OpenAIModel", "values": {"model_name": "gpt-4"}}
],
"edges": [
{"source": "1", "source_output": "message", "target": "2", "target_input": "input_value"}
]
}
"""
# Parse and validate compact data
flow_data = CompactFlowData(**compact_data)

# Flatten components for lookup
flat_components = _get_flat_components(all_types_dict)

# Expand nodes
expanded_nodes: dict[str, dict[str, Any]] = {}
for compact_node in flow_data.nodes:
expanded = _expand_node(compact_node, flat_components)
expanded_nodes[compact_node.id] = expanded

# Expand edges
expanded_edges = []
for compact_edge in flow_data.edges:
expanded = _expand_edge(compact_edge, expanded_nodes)
expanded_edges.append(expanded)

return {
"nodes": list(expanded_nodes.values()),
"edges": expanded_edges,
}
Loading
Loading