Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
f080f0b
feat: add core graph execution debugging infrastructure
ogabrielluiz Oct 18, 2025
8bce7cc
test: add comprehensive test suite for execution debugger
ogabrielluiz Oct 18, 2025
c61d063
feat: add interactive debugging examples
ogabrielluiz Oct 18, 2025
121d940
feat: centralize graph mutations with async event system
ogabrielluiz Oct 20, 2025
0ab3248
refactor: remove deprecated debugging utilities
ogabrielluiz Oct 20, 2025
b2063cf
feat: add pure observer pattern event recorder
ogabrielluiz Oct 20, 2025
52c711c
docs: add comprehensive event debugging notebook
ogabrielluiz Oct 20, 2025
4246754
refactor: enhance event debugging notebook with structured outputs
ogabrielluiz Oct 20, 2025
6e2f430
chore: update ruff exclusion list to include Jupyter notebook examples
ogabrielluiz Oct 20, 2025
a2c2e65
refactor: replace direct access to _run_queue with getter method
ogabrielluiz Oct 20, 2025
44a3b4b
chore: update ruff exclusion list and add new init file
ogabrielluiz Oct 20, 2025
a0f4e56
refactor: remove marimo notebook
ogabrielluiz Oct 20, 2025
b948f1b
refactor: remove obsolete test files and debugging utilities
ogabrielluiz Oct 20, 2025
866ea49
feat: add event saving and loading functionality to EventBasedRecording
ogabrielluiz Oct 20, 2025
9d73199
update
HimavarshaVS Nov 10, 2025
25aa1f2
add md files
HimavarshaVS Nov 10, 2025
b93eeca
[autofix.ci] apply automated fixes
autofix-ci[bot] Nov 10, 2025
0a7c61c
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Nov 10, 2025
3294dfd
Merge branch main into graph-debugger
ogabrielluiz Nov 26, 2025
66aa46e
[autofix.ci] apply automated fixes
autofix-ci[bot] Nov 26, 2025
d136c33
chore: remove PR description files from repo root
ogabrielluiz Nov 26, 2025
dbf8d8b
[autofix.ci] apply automated fixes
autofix-ci[bot] Nov 26, 2025
46e138a
feat: update save and load methods to use JSON serialization for even…
ogabrielluiz Nov 26, 2025
8d117f7
fix: update event list types to use GraphMutationEvent for better typ…
ogabrielluiz Nov 26, 2025
a9a0195
feat: convert item_output and update_dependency methods to async for …
ogabrielluiz Nov 26, 2025
18739ff
feat: add tests for event-based graph recorder and mutation event system
ogabrielluiz Nov 26, 2025
6904ea2
fix: remove marimo dependency from development requirements
ogabrielluiz Nov 26, 2025
7584b3c
[autofix.ci] apply automated fixes
autofix-ci[bot] Nov 26, 2025
892c742
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Nov 26, 2025
e879125
export debug classes in dunder init
ogabrielluiz Nov 26, 2025
103cec3
feat: refactor summary and timeline display methods to return strings…
ogabrielluiz Nov 26, 2025
3106ac6
feat: make remove_from_predecessors and remove_vertex_from_runnables …
ogabrielluiz Nov 26, 2025
c128c6f
Merge branch 'main' into graph-debugger
ogabrielluiz Dec 18, 2025
2268e97
[autofix.ci] apply automated fixes
autofix-ci[bot] Dec 18, 2025
5d75fe5
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Dec 18, 2025
f0cf5f5
[autofix.ci] apply automated fixes (attempt 3/3)
autofix-ci[bot] Dec 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,7 @@
"legacy": false,
"lf_version": "1.7.0",
"metadata": {
"code_hash": "e179036a232d",
"code_hash": "03a00393fbab",
"dependencies": {
"dependencies": [
{
Expand Down Expand Up @@ -1205,7 +1205,7 @@
"show": true,
"title_case": false,
"type": "code",
"value": "from lfx.components.processing.converter import convert_to_data\nfrom lfx.custom.custom_component.component import Component\nfrom lfx.inputs.inputs import HandleInput\nfrom lfx.schema.data import Data\nfrom lfx.schema.dataframe import DataFrame\nfrom lfx.schema.message import Message\nfrom lfx.template.field.base import Output\n\n\nclass LoopComponent(Component):\n display_name = \"Loop\"\n description = (\n \"Iterates over a list of Data or Message objects, outputting one item at a time and \"\n \"aggregating results from loop inputs. Message objects are automatically converted to \"\n \"Data objects for consistent processing.\"\n )\n documentation: str = \"https://docs.langflow.org/loop\"\n icon = \"infinity\"\n\n inputs = [\n HandleInput(\n name=\"data\",\n display_name=\"Inputs\",\n info=\"The initial DataFrame to iterate over.\",\n input_types=[\"DataFrame\"],\n ),\n ]\n\n outputs = [\n Output(\n display_name=\"Item\",\n name=\"item\",\n method=\"item_output\",\n allows_loop=True,\n loop_types=[\"Message\"],\n group_outputs=True,\n ),\n Output(display_name=\"Done\", name=\"done\", method=\"done_output\", group_outputs=True),\n ]\n\n def initialize_data(self) -> None:\n \"\"\"Initialize the data list, context index, and aggregated list.\"\"\"\n if self.ctx.get(f\"{self._id}_initialized\", False):\n return\n\n # Ensure data is a list of Data objects\n data_list = self._validate_data(self.data)\n\n # Store the initial data and context variables\n self.update_ctx(\n {\n f\"{self._id}_data\": data_list,\n f\"{self._id}_index\": 0,\n f\"{self._id}_aggregated\": [],\n f\"{self._id}_initialized\": True,\n }\n )\n\n def _convert_message_to_data(self, message: Message) -> Data:\n \"\"\"Convert a Message object to a Data object using Type Convert logic.\"\"\"\n return convert_to_data(message, auto_parse=False)\n\n def _validate_data(self, data):\n \"\"\"Validate and return a list of Data objects. Message objects are auto-converted to Data.\"\"\"\n if isinstance(data, DataFrame):\n return data.to_data_list()\n if isinstance(data, Data):\n return [data]\n if isinstance(data, Message):\n # Auto-convert Message to Data\n converted_data = self._convert_message_to_data(data)\n return [converted_data]\n if isinstance(data, list) and all(isinstance(item, (Data, Message)) for item in data):\n # Convert any Message objects in the list to Data objects\n converted_list = []\n for item in data:\n if isinstance(item, Message):\n converted_list.append(self._convert_message_to_data(item))\n else:\n converted_list.append(item)\n return converted_list\n msg = \"The 'data' input must be a DataFrame, a list of Data/Message objects, or a single Data/Message object.\"\n raise TypeError(msg)\n\n def evaluate_stop_loop(self) -> bool:\n \"\"\"Evaluate whether to stop item or done output.\"\"\"\n current_index = self.ctx.get(f\"{self._id}_index\", 0)\n data_length = len(self.ctx.get(f\"{self._id}_data\", []))\n return current_index > data_length\n\n def item_output(self) -> Data:\n \"\"\"Output the next item in the list or stop if done.\"\"\"\n self.initialize_data()\n current_item = Data(text=\"\")\n\n if self.evaluate_stop_loop():\n self.stop(\"item\")\n else:\n # Get data list and current index\n data_list, current_index = self.loop_variables()\n if current_index < len(data_list):\n # Output current item and increment index\n try:\n current_item = data_list[current_index]\n except IndexError:\n current_item = Data(text=\"\")\n self.aggregated_output()\n self.update_ctx({f\"{self._id}_index\": current_index + 1})\n\n # Now we need to update the dependencies for the next run\n self.update_dependency()\n return current_item\n\n def update_dependency(self):\n item_dependency_id = self.get_incoming_edge_by_target_param(\"item\")\n if item_dependency_id not in self.graph.run_manager.run_predecessors[self._id]:\n self.graph.run_manager.run_predecessors[self._id].append(item_dependency_id)\n # CRITICAL: Also update run_map so remove_from_predecessors() works correctly\n # run_map[predecessor] = list of vertices that depend on predecessor\n if self._id not in self.graph.run_manager.run_map[item_dependency_id]:\n self.graph.run_manager.run_map[item_dependency_id].append(self._id)\n\n def done_output(self) -> DataFrame:\n \"\"\"Trigger the done output when iteration is complete.\"\"\"\n self.initialize_data()\n\n if self.evaluate_stop_loop():\n self.stop(\"item\")\n self.start(\"done\")\n\n aggregated = self.ctx.get(f\"{self._id}_aggregated\", [])\n\n return DataFrame(aggregated)\n self.stop(\"done\")\n return DataFrame([])\n\n def loop_variables(self):\n \"\"\"Retrieve loop variables from context.\"\"\"\n return (\n self.ctx.get(f\"{self._id}_data\", []),\n self.ctx.get(f\"{self._id}_index\", 0),\n )\n\n def aggregated_output(self) -> list[Data]:\n \"\"\"Return the aggregated list once all items are processed.\n\n Returns Data or Message objects depending on loop input types.\n \"\"\"\n self.initialize_data()\n\n # Get data list and aggregated list\n data_list = self.ctx.get(f\"{self._id}_data\", [])\n aggregated = self.ctx.get(f\"{self._id}_aggregated\", [])\n loop_input = self.item\n\n # Append the current loop input to aggregated if it's not already included\n if loop_input is not None and not isinstance(loop_input, str) and len(aggregated) <= len(data_list):\n # If the loop input is a Message, convert it to Data for consistency\n if isinstance(loop_input, Message):\n loop_input = self._convert_message_to_data(loop_input)\n aggregated.append(loop_input)\n self.update_ctx({f\"{self._id}_aggregated\": aggregated})\n return aggregated\n"
"value": "from lfx.components.processing.converter import convert_to_data\nfrom lfx.custom.custom_component.component import Component\nfrom lfx.inputs.inputs import HandleInput\nfrom lfx.schema.data import Data\nfrom lfx.schema.dataframe import DataFrame\nfrom lfx.schema.message import Message\nfrom lfx.template.field.base import Output\n\n\nclass LoopComponent(Component):\n display_name = \"Loop\"\n description = (\n \"Iterates over a list of Data or Message objects, outputting one item at a time and \"\n \"aggregating results from loop inputs. Message objects are automatically converted to \"\n \"Data objects for consistent processing.\"\n )\n documentation: str = \"https://docs.langflow.org/loop\"\n icon = \"infinity\"\n\n inputs = [\n HandleInput(\n name=\"data\",\n display_name=\"Inputs\",\n info=\"The initial DataFrame to iterate over.\",\n input_types=[\"DataFrame\"],\n ),\n ]\n\n outputs = [\n Output(\n display_name=\"Item\",\n name=\"item\",\n method=\"item_output\",\n allows_loop=True,\n loop_types=[\"Message\"],\n group_outputs=True,\n ),\n Output(display_name=\"Done\", name=\"done\", method=\"done_output\", group_outputs=True),\n ]\n\n def initialize_data(self) -> None:\n \"\"\"Initialize the data list, context index, and aggregated list.\"\"\"\n if self.ctx.get(f\"{self._id}_initialized\", False):\n return\n\n # Ensure data is a list of Data objects\n data_list = self._validate_data(self.data)\n\n # Store the initial data and context variables\n self.update_ctx(\n {\n f\"{self._id}_data\": data_list,\n f\"{self._id}_index\": 0,\n f\"{self._id}_aggregated\": [],\n f\"{self._id}_initialized\": True,\n }\n )\n\n def _convert_message_to_data(self, message: Message) -> Data:\n \"\"\"Convert a Message object to a Data object using Type Convert logic.\"\"\"\n return convert_to_data(message, auto_parse=False)\n\n def _validate_data(self, data):\n \"\"\"Validate and return a list of Data objects. Message objects are auto-converted to Data.\"\"\"\n if isinstance(data, DataFrame):\n return data.to_data_list()\n if isinstance(data, Data):\n return [data]\n if isinstance(data, Message):\n # Auto-convert Message to Data\n converted_data = self._convert_message_to_data(data)\n return [converted_data]\n if isinstance(data, list) and all(isinstance(item, (Data, Message)) for item in data):\n # Convert any Message objects in the list to Data objects\n converted_list = []\n for item in data:\n if isinstance(item, Message):\n converted_list.append(self._convert_message_to_data(item))\n else:\n converted_list.append(item)\n return converted_list\n msg = \"The 'data' input must be a DataFrame, a list of Data/Message objects, or a single Data/Message object.\"\n raise TypeError(msg)\n\n def evaluate_stop_loop(self) -> bool:\n \"\"\"Evaluate whether to stop item or done output.\"\"\"\n current_index = self.ctx.get(f\"{self._id}_index\", 0)\n data_length = len(self.ctx.get(f\"{self._id}_data\", []))\n return current_index > data_length\n\n async def item_output(self) -> Data:\n \"\"\"Output the next item in the list or stop if done.\"\"\"\n self.initialize_data()\n current_item = Data(text=\"\")\n\n if self.evaluate_stop_loop():\n self.stop(\"item\")\n else:\n # Get data list and current index\n data_list, current_index = self.loop_variables()\n if current_index < len(data_list):\n # Output current item and increment index\n try:\n current_item = data_list[current_index]\n except IndexError:\n current_item = Data(text=\"\")\n self.aggregated_output()\n self.update_ctx({f\"{self._id}_index\": current_index + 1})\n\n # Update dependencies using centralized graph API\n await self.update_dependency()\n\n return current_item\n\n async def update_dependency(self):\n \"\"\"Update loop dependencies using centralized graph API.\n\n This ensures run_predecessors and run_map stay synchronized.\n \"\"\"\n item_dependency_id = self.get_incoming_edge_by_target_param(\"item\")\n if item_dependency_id and item_dependency_id not in self.graph.run_manager.run_predecessors[self._id]:\n # CRITICAL: Both run_predecessors and run_map must be updated together.\n # run_map[predecessor] = list of vertices that depend on predecessor.\n # This is required for remove_from_predecessors() to work correctly.\n await self.graph.add_dynamic_dependency(self._id, item_dependency_id)\n\n def done_output(self) -> DataFrame:\n \"\"\"Trigger the done output when iteration is complete.\"\"\"\n self.initialize_data()\n\n if self.evaluate_stop_loop():\n self.stop(\"item\")\n self.start(\"done\")\n\n aggregated = self.ctx.get(f\"{self._id}_aggregated\", [])\n\n return DataFrame(aggregated)\n self.stop(\"done\")\n return DataFrame([])\n\n def loop_variables(self):\n \"\"\"Retrieve loop variables from context.\"\"\"\n return (\n self.ctx.get(f\"{self._id}_data\", []),\n self.ctx.get(f\"{self._id}_index\", 0),\n )\n\n def aggregated_output(self) -> list[Data]:\n \"\"\"Return the aggregated list once all items are processed.\n\n Returns Data or Message objects depending on loop input types.\n \"\"\"\n self.initialize_data()\n\n # Get data list and aggregated list\n data_list = self.ctx.get(f\"{self._id}_data\", [])\n aggregated = self.ctx.get(f\"{self._id}_aggregated\", [])\n loop_input = self.item\n\n # Append the current loop input to aggregated if it's not already included\n if loop_input is not None and not isinstance(loop_input, str) and len(aggregated) <= len(data_list):\n # If the loop input is a Message, convert it to Data for consistency\n if isinstance(loop_input, Message):\n loop_input = self._convert_message_to_data(loop_input)\n aggregated.append(loop_input)\n self.update_ctx({f\"{self._id}_aggregated\": aggregated})\n return aggregated\n\n def reset_loop_state(self) -> None:\n \"\"\"Reset loop internal state for fresh execution.\n\n This should be called before starting a new independent iteration\n of the graph to ensure the loop starts from a clean state.\n\n This method clears all loop-specific context variables including:\n - initialization flag\n - current index\n - aggregated results\n - stored data\n \"\"\"\n loop_id = self._id\n self.ctx.pop(f\"{loop_id}_initialized\", None)\n self.ctx.pop(f\"{loop_id}_index\", None)\n self.ctx.pop(f\"{loop_id}_aggregated\", None)\n self.ctx.pop(f\"{loop_id}_data\", None)\n"
},
"data": {
"_input_type": "HandleInput",
Expand Down
2 changes: 1 addition & 1 deletion src/lfx/src/lfx/_assets/component_index.json

Large diffs are not rendered by default.

42 changes: 32 additions & 10 deletions src/lfx/src/lfx/components/flow_controls/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def evaluate_stop_loop(self) -> bool:
data_length = len(self.ctx.get(f"{self._id}_data", []))
return current_index > data_length

def item_output(self) -> Data:
async def item_output(self) -> Data:
"""Output the next item in the list or stop if done."""
self.initialize_data()
current_item = Data(text="")
Expand All @@ -107,18 +107,22 @@ def item_output(self) -> Data:
self.aggregated_output()
self.update_ctx({f"{self._id}_index": current_index + 1})

# Now we need to update the dependencies for the next run
self.update_dependency()
# Update dependencies using centralized graph API
await self.update_dependency()

return current_item

def update_dependency(self):
async def update_dependency(self):
"""Update loop dependencies using centralized graph API.

This ensures run_predecessors and run_map stay synchronized.
"""
item_dependency_id = self.get_incoming_edge_by_target_param("item")
if item_dependency_id not in self.graph.run_manager.run_predecessors[self._id]:
self.graph.run_manager.run_predecessors[self._id].append(item_dependency_id)
# CRITICAL: Also update run_map so remove_from_predecessors() works correctly
# run_map[predecessor] = list of vertices that depend on predecessor
if self._id not in self.graph.run_manager.run_map[item_dependency_id]:
self.graph.run_manager.run_map[item_dependency_id].append(self._id)
if item_dependency_id and item_dependency_id not in self.graph.run_manager.run_predecessors[self._id]:
# CRITICAL: Both run_predecessors and run_map must be updated together.
# run_map[predecessor] = list of vertices that depend on predecessor.
# This is required for remove_from_predecessors() to work correctly.
await self.graph.add_dynamic_dependency(self._id, item_dependency_id)

def done_output(self) -> DataFrame:
"""Trigger the done output when iteration is complete."""
Expand Down Expand Up @@ -161,3 +165,21 @@ def aggregated_output(self) -> list[Data]:
aggregated.append(loop_input)
self.update_ctx({f"{self._id}_aggregated": aggregated})
return aggregated

def reset_loop_state(self) -> None:
"""Reset loop internal state for fresh execution.

This should be called before starting a new independent iteration
of the graph to ensure the loop starts from a clean state.

This method clears all loop-specific context variables including:
- initialization flag
- current index
- aggregated results
- stored data
"""
loop_id = self._id
self.ctx.pop(f"{loop_id}_initialized", None)
self.ctx.pop(f"{loop_id}_index", None)
self.ctx.pop(f"{loop_id}_aggregated", None)
self.ctx.pop(f"{loop_id}_data", None)
12 changes: 7 additions & 5 deletions src/lfx/src/lfx/custom/custom_component/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,14 +400,16 @@ def set_class_code(self) -> None:
try:
module = inspect.getmodule(self.__class__)
if module is None:
msg = "Could not find module for class"
raise ValueError(msg)
# Module not found - likely defined in notebook or REPL
self._code = f"# Component defined in {self.__class__.__name__}"
return

class_code = inspect.getsource(module)
self._code = class_code
except (OSError, TypeError) as e:
msg = f"Could not find source code for {self.__class__.__name__}"
raise ValueError(msg) from e
except (OSError, TypeError):
# Source code not available (e.g., defined in notebook, REPL, or dynamically)
# This is fine - just use a placeholder
self._code = f"# Component {self.__class__.__name__} (source code not available)"

def set(self, **kwargs):
"""Connects the component to other components or sets parameters and attributes.
Expand Down
4 changes: 2 additions & 2 deletions src/lfx/src/lfx/custom/custom_component/custom_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def stop(self, output_name: str | None = None) -> None:
msg = "Vertex is not set"
raise ValueError(msg)
try:
self.graph.mark_branch(vertex_id=self._vertex.id, output_name=output_name, state="INACTIVE")
self.graph.mark_branch_sync(vertex_id=self._vertex.id, output_name=output_name, state="INACTIVE")
except Exception as e:
msg = f"Error stopping {self.display_name}: {e}"
raise ValueError(msg) from e
Expand All @@ -163,7 +163,7 @@ def start(self, output_name: str | None = None) -> None:
msg = "Vertex is not set"
raise ValueError(msg)
try:
self.graph.mark_branch(vertex_id=self._vertex.id, output_name=output_name, state="ACTIVE")
self.graph.mark_branch_sync(vertex_id=self._vertex.id, output_name=output_name, state="ACTIVE")
except Exception as e:
msg = f"Error starting {self.display_name}: {e}"
raise ValueError(msg) from e
Expand Down
Loading
Loading