-
Notifications
You must be signed in to change notification settings - Fork 8.2k
refactor(loop): implement isolated subgraph execution for LoopComponent #11034
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
- Refactor LoopComponent to execute loop body as isolated subgraph - Add create_subgraph method to Graph class for creating isolated subgraphs - Add loop_utils with get_loop_body_vertices helper function - Add on_end_vertex event emission in async_start for progress tracking - Add comprehensive tests for subgraph execution and event emission This change enables proper vertex event streaming during loop execution by running each iteration as an isolated subgraph with its own context.
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThis pull request introduces a subgraph-based execution model for the LoopComponent, replacing traditional incremental loop patterns. It adds loop utility functions to extract and execute loop bodies as isolated subgraphs, enhances the Graph API with subgraph creation and vertex completion event emission, improves cycle detection in graph dependency sorting, and includes comprehensive unit tests for the new functionality. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant LoopComponent
participant LoopUtils
participant Graph as Subgraph API
participant EventMgr as Event Manager
Caller->>LoopComponent: done_output(event_manager)
LoopComponent->>LoopComponent: initialize_data()
LoopComponent->>LoopComponent: _validate_data(data)
LoopUtils->>LoopUtils: validate_data_input(data)
LoopComponent->>LoopComponent: get_loop_body_vertices()
LoopUtils->>LoopUtils: get_loop_body_vertices(...)
LoopComponent->>LoopComponent: execute_loop_body(data_list, event_manager)
par Per Data Item
LoopComponent->>Graph: create_subgraph(vertex_ids)
Graph-->>LoopComponent: isolated_subgraph
LoopComponent->>LoopComponent: deep_copy(subgraph)
LoopComponent->>LoopComponent: inject item via start_edge
LoopComponent->>Graph: async_start(subgraph, event_manager)
Graph->>EventMgr: on_end_vertex(vertex_id, build_data)
EventMgr-->>Graph: event emitted
Graph-->>LoopComponent: execution_results
LoopUtils->>LoopUtils: extract_loop_output(results)
end
LoopComponent->>LoopComponent: aggregate results → DataFrame
LoopComponent-->>Caller: DataFrame
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Areas requiring extra attention:
Pre-merge checks and finishing touches❌ Failed checks (4 warnings)
✅ Passed checks (3 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (6)
src/lfx/src/lfx/graph/graph/base.py (2)
380-393: Verify exception types in fallback and consider logging failures.The fallback from
model_dump()catchesAttributeErrorandTypeError, but ifmodel_dump()exists but raises a different exception (e.g.,ValidationErrorfrom Pydantic), the event emission will fail silently. Consider catching a broader exception or logging when fallback is used.try: result_data_dict = result.result_dict.model_dump() - except (AttributeError, TypeError): + except Exception: # noqa: BLE001 result_data_dict = result.result_dict
2294-2324: Add explicit deep copying to document isolation intent.While
add_nodes_and_edges()internally deep copies the data viaprocess_flow(), the subgraph currently creates references to the parent graph's nodes and edges. To make the isolation intent explicit and avoid relying on internal implementation details, add deep copies increate_subgraph():# Filter nodes to only include specified vertex IDs - subgraph_nodes = [n for n in self._vertices if n["id"] in vertex_ids] + subgraph_nodes = [copy.deepcopy(n) for n in self._vertices if n["id"] in vertex_ids] # Filter edges to only include those connecting vertices in the subgraph - subgraph_edges = [e for e in self._edges if e["source"] in vertex_ids and e["target"] in vertex_ids] + subgraph_edges = [copy.deepcopy(e) for e in self._edges if e["source"] in vertex_ids and e["target"] in vertex_ids]Note: The
copymodule is already imported at the top of the file.src/backend/tests/unit/components/flow_controls/test_loop.py (1)
391-394: Consider moving assertion outside the mock for clearer test failures.The assertion inside
mock_async_start(Line 393) will raiseAssertionErrorduring async iteration, which could produce confusing error messages. Consider capturing the value and asserting after execution:+ received_event_manager = None + async def mock_async_start(event_manager=None): - # Verify event_manager was passed - assert event_manager is mock_event_manager, "event_manager should be passed to async_start" + nonlocal received_event_manager + received_event_manager = event_manager yield MagicMock(valid=True, result_dict={"outputs": {}}) + + # After execution: + # assert received_event_manager is mock_event_managersrc/lfx/src/lfx/components/flow_controls/loop.py (1)
149-156: Document backward compatibility behavior more clearly.The
item_outputmethod is retained for compatibility but now callsself.stop("item")and returns empty Data. Consider adding a deprecation warning to inform users that this method no longer drives loop execution:def item_output(self) -> Data: """Output is no longer used - loop executes internally now. This method is kept for backward compatibility but does nothing. The actual loop execution happens in done_output(). """ + # Consider adding: warnings.warn("item_output is deprecated", DeprecationWarning) self.stop("item") return Data(text="")src/lfx/src/lfx/base/flow_controls/loop_utils.py (2)
76-88: Consider building a predecessor map for efficiency.The recursive
add_all_predecessorsiterates overgraph.successor_map.items()on each call, resulting in O(V²) complexity for predecessor lookups. For large graphs, consider pre-computing a predecessor map once before the loop.+ # Build predecessor map once for efficient lookups + predecessor_map: dict[str, list[str]] = {} + for pred_id, successors in graph.successor_map.items(): + for succ_id in successors: + predecessor_map.setdefault(succ_id, []).append(pred_id) + def add_all_predecessors(vertex_id: str, visited_predecessors: set[str]) -> None: """Recursively add all predecessors of a vertex.""" - # Find predecessors by checking which vertices have this vertex as a successor - for potential_pred_id, successors in graph.successor_map.items(): - if ( - vertex_id in successors - and potential_pred_id != vertex.id - and potential_pred_id not in visited_predecessors - ): + for potential_pred_id in predecessor_map.get(vertex_id, []): + if potential_pred_id != vertex.id and potential_pred_id not in visited_predecessors: visited_predecessors.add(potential_pred_id) loop_body.add(potential_pred_id) - # Recursively add predecessors of this predecessor add_all_predecessors(potential_pred_id, visited_predecessors)
242-245: Include error details in exception message.The error message includes the result object, but extracting specific error information would aid debugging.
# Stop all on error (as per design decision) if hasattr(result, "valid") and not result.valid: - msg = f"Error in loop iteration: {result}" + error_detail = getattr(result, "error", result) + msg = f"Error in loop iteration: {error_detail}" raise RuntimeError(msg)
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
src/backend/tests/unit/components/flow_controls/test_loop.py(2 hunks)src/lfx/src/lfx/base/flow_controls/loop_utils.py(1 hunks)src/lfx/src/lfx/components/flow_controls/loop.py(2 hunks)src/lfx/src/lfx/graph/graph/base.py(2 hunks)src/lfx/src/lfx/graph/graph/utils.py(1 hunks)src/lfx/tests/unit/components/flow_controls/test_loop_events.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/test_*.py
📄 CodeRabbit inference engine (Custom checks)
**/test_*.py: Review test files for excessive use of mocks that may indicate poor test design - check if tests have too many mock objects that obscure what's actually being tested
Warn when mocks are used instead of testing real behavior and interactions, and suggest using real objects or test doubles when mocks become excessive
Ensure mocks are used appropriately for external dependencies only, not for core logic
Backend test files should follow the naming convention test_*.py with proper pytest structure
Test files should have descriptive test function names that explain what is being tested
Tests should be organized logically with proper setup and teardown
Consider including edge cases and error conditions for comprehensive test coverage
Verify tests cover both positive and negative scenarios where appropriate
For async functions in backend tests, ensure proper async testing patterns are used with pytest
For API endpoints, verify both success and error response testing
Files:
src/lfx/tests/unit/components/flow_controls/test_loop_events.pysrc/backend/tests/unit/components/flow_controls/test_loop.py
src/backend/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/backend_development.mdc)
src/backend/**/*.py: Use FastAPI async patterns withawaitfor async operations in component execution methods
Useasyncio.create_task()for background tasks and implement proper cleanup with try/except forasyncio.CancelledError
Usequeue.put_nowait()for non-blocking queue operations andasyncio.wait_for()with timeouts for controlled get operations
Files:
src/backend/tests/unit/components/flow_controls/test_loop.py
src/backend/tests/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/testing.mdc)
src/backend/tests/**/*.py: Place backend unit tests insrc/backend/tests/directory, component tests insrc/backend/tests/unit/components/organized by component subdirectory, and integration tests accessible viamake integration_tests
Use same filename as component with appropriate test prefix/suffix (e.g.,my_component.py→test_my_component.py)
Use theclientfixture (FastAPI Test Client) defined insrc/backend/tests/conftest.pyfor API tests; it provides an asynchttpx.AsyncClientwith automatic in-memory SQLite database and mocked environment variables. Skip client creation by marking test with@pytest.mark.noclient
Inherit from the correctComponentTestBasefamily class located insrc/backend/tests/base.pybased on API access needs:ComponentTestBase(no API),ComponentTestBaseWithClient(needs API), orComponentTestBaseWithoutClient(pure logic). Provide three required fixtures:component_class,default_kwargs, andfile_names_mapping
Create comprehensive unit tests for all new backend components. If unit tests are incomplete, create a corresponding Markdown file documenting manual testing steps and expected outcomes
Test both sync and async code paths, mock external dependencies appropriately, test error handling and edge cases, validate input/output behavior, and test component initialization and configuration
Use@pytest.mark.asynciodecorator for async component tests and ensure async methods are properly awaited
Test background tasks usingasyncio.create_task()and verify completion withasyncio.wait_for()with appropriate timeout constraints
Test queue operations using non-blockingqueue.put_nowait()andasyncio.wait_for(queue.get(), timeout=...)to verify queue processing without blocking
Use@pytest.mark.no_blockbustermarker to skip the blockbuster plugin in specific tests
For database tests that may fail in batch runs, run them sequentially usinguv run pytest src/backend/tests/unit/test_database.pyr...
Files:
src/backend/tests/unit/components/flow_controls/test_loop.py
🧠 Learnings (10)
📚 Learning: 2025-11-24T19:47:28.997Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-11-24T19:47:28.997Z
Learning: Applies to src/backend/tests/**/*.py : Be aware of ContextVar propagation in async tests; test both direct event loop execution and `asyncio.to_thread` scenarios; ensure proper context isolation between test cases
Applied to files:
src/lfx/tests/unit/components/flow_controls/test_loop_events.pysrc/backend/tests/unit/components/flow_controls/test_loop.py
📚 Learning: 2025-11-24T19:47:28.997Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-11-24T19:47:28.997Z
Learning: Applies to src/backend/tests/**/*.py : Test both sync and async code paths, mock external dependencies appropriately, test error handling and edge cases, validate input/output behavior, and test component initialization and configuration
Applied to files:
src/lfx/tests/unit/components/flow_controls/test_loop_events.pysrc/backend/tests/unit/components/flow_controls/test_loop.py
📚 Learning: 2025-11-24T19:47:28.997Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-11-24T19:47:28.997Z
Learning: Applies to src/backend/tests/**/*.py : Use predefined JSON flows and utility functions from `tests.unit.build_utils` (create_flow, build_flow, get_build_events, consume_and_assert_stream) for flow execution testing
Applied to files:
src/lfx/tests/unit/components/flow_controls/test_loop_events.py
📚 Learning: 2025-11-24T19:46:09.104Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/backend_development.mdc:0-0
Timestamp: 2025-11-24T19:46:09.104Z
Learning: Applies to src/backend/base/langflow/components/**/__init__.py : Update `__init__.py` with alphabetically sorted imports when adding new components
Applied to files:
src/lfx/src/lfx/components/flow_controls/loop.py
📚 Learning: 2025-11-24T19:47:28.997Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-11-24T19:47:28.997Z
Learning: Applies to src/backend/tests/**/*.py : Test component versioning and backward compatibility using `file_names_mapping` fixture with `VersionComponentMapping` objects mapping component files across Langflow versions
Applied to files:
src/backend/tests/unit/components/flow_controls/test_loop.py
📚 Learning: 2025-11-24T19:47:28.997Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-11-24T19:47:28.997Z
Learning: Applies to src/backend/tests/**/*.py : Test component build config updates by calling `to_frontend_node()` to get the node template, then calling `update_build_config()` to apply configuration changes
Applied to files:
src/backend/tests/unit/components/flow_controls/test_loop.py
📚 Learning: 2025-11-24T19:47:28.997Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-11-24T19:47:28.997Z
Learning: Applies to src/backend/tests/**/*.py : Inherit from the correct `ComponentTestBase` family class located in `src/backend/tests/base.py` based on API access needs: `ComponentTestBase` (no API), `ComponentTestBaseWithClient` (needs API), or `ComponentTestBaseWithoutClient` (pure logic). Provide three required fixtures: `component_class`, `default_kwargs`, and `file_names_mapping`
Applied to files:
src/backend/tests/unit/components/flow_controls/test_loop.py
📚 Learning: 2025-11-24T19:47:28.997Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-11-24T19:47:28.997Z
Learning: Applies to src/backend/tests/**/*.py : Create comprehensive unit tests for all new backend components. If unit tests are incomplete, create a corresponding Markdown file documenting manual testing steps and expected outcomes
Applied to files:
src/backend/tests/unit/components/flow_controls/test_loop.py
📚 Learning: 2025-11-24T19:46:09.104Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/backend_development.mdc:0-0
Timestamp: 2025-11-24T19:46:09.104Z
Learning: Backend components should be structured with clear separation of concerns: agents, data processing, embeddings, input/output, models, text processing, prompts, tools, and vector stores
Applied to files:
src/backend/tests/unit/components/flow_controls/test_loop.py
📚 Learning: 2025-11-24T19:46:09.104Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/backend_development.mdc:0-0
Timestamp: 2025-11-24T19:46:09.104Z
Learning: Applies to src/backend/base/langflow/components/**/*.py : Add new components to the appropriate subdirectory under `src/backend/base/langflow/components/` (agents/, data/, embeddings/, input_output/, models/, processing/, prompts/, tools/, or vectorstores/)
Applied to files:
src/backend/tests/unit/components/flow_controls/test_loop.py
🧬 Code graph analysis (3)
src/lfx/tests/unit/components/flow_controls/test_loop_events.py (3)
src/lfx/src/lfx/components/flow_controls/loop.py (2)
execute_loop_body(120-147)get_loop_body_vertices(75-93)src/lfx/src/lfx/graph/graph/base.py (5)
prepare(2076-2099)async_start(352-396)create_subgraph(2294-2324)context(153-156)context(159-165)src/lfx/src/lfx/graph/vertex/base.py (1)
outgoing_edges(193-196)
src/lfx/src/lfx/components/flow_controls/loop.py (2)
src/lfx/src/lfx/base/flow_controls/loop_utils.py (6)
execute_loop_body(187-251)extract_loop_output(134-160)get_loop_body_start_edge(117-131)get_loop_body_start_vertex(100-114)get_loop_body_vertices(14-97)validate_data_input(163-184)src/lfx/src/lfx/schema/data.py (1)
Data(26-288)
src/backend/tests/unit/components/flow_controls/test_loop.py (3)
src/lfx/src/lfx/schema/data.py (1)
Data(26-288)src/lfx/src/lfx/components/flow_controls/loop.py (6)
LoopComponent(18-186)get_loop_body_vertices(75-93)_get_loop_body_start_vertex(95-105)_extract_loop_output(107-118)execute_loop_body(120-147)done_output(158-186)src/lfx/src/lfx/base/flow_controls/loop_utils.py (2)
get_loop_body_vertices(14-97)execute_loop_body(187-251)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
- GitHub Check: Run Backend Tests / Unit Tests - Python 3.10 - Group 5
- GitHub Check: Run Backend Tests / Unit Tests - Python 3.10 - Group 3
- GitHub Check: Run Backend Tests / Unit Tests - Python 3.10 - Group 2
- GitHub Check: Run Backend Tests / Unit Tests - Python 3.10 - Group 1
- GitHub Check: Lint Backend / Run Mypy (3.11)
- GitHub Check: Run Backend Tests / Unit Tests - Python 3.10 - Group 4
- GitHub Check: Test Docker Images / Test docker images
- GitHub Check: Run Backend Tests / Integration Tests - Python 3.10
- GitHub Check: Run Backend Tests / LFX Tests - Python 3.10
- GitHub Check: Test Starter Templates
- GitHub Check: Update Component Index
🔇 Additional comments (17)
src/lfx/src/lfx/graph/graph/utils.py (1)
691-711: LGTM! Cycle detection prevents infinite recursion during dependency sorting.The
processingset correctly tracks vertices currently in the DFS call stack, and returningindex_map[vertex]when a cycle is detected is a reasonable fallback that preserves the vertex's original position while breaking the recursion.src/backend/tests/unit/components/flow_controls/test_loop.py (4)
250-263: LGTM! Good test coverage for component initialization.The test correctly validates that
LoopComponentinitializes with expected attributes and methods when provided with data.
455-528: LGTM! Comprehensive tests for loop body vertex identification.Good coverage of simple and complex loop structures, including branching topologies. The mock graph structures accurately represent different scenarios.
571-609: Good test for cycle handling within loop body.The test correctly validates that internal cycles within the loop body are handled without infinite loops, and all vertices are still included in the result set.
300-308: No change needed - test assertion is correct.The error message in
validate_data_inputcontains"must be a DataFrame"as a substring within the full message"Data input must be a DataFrame, Data object, or list of Data objects, got {type(data)}". The test's use ofpytest.raises(TypeError, match="must be a DataFrame")will correctly match this error via partial regex matching. The test is valid and the error handling appropriately provides helpful context about expected input types.Likely an incorrect or invalid review comment.
src/lfx/tests/unit/components/flow_controls/test_loop_events.py (3)
1-17: Good test file structure with clear documentation.The module docstring clearly explains the key requirements being tested: event emission per vertex, original vertex ID preservation, and per-iteration events.
268-299: Test assumes predecessor inclusion but mock lacks predecessor_map.The test
test_includes_all_predecessorsexpectsllm_modelto be included as a predecessor ofprocessing_vertex, but the mock only sets upsuccessor_map. Looking atget_loop_body_verticesinloop_utils.py, it finds predecessors by iteratingsuccessor_mapin reverse (checking which vertices have the target as a successor). The test setup is correct sincellm_modelhasprocessing_vertexas a successor.
305-339: Good isolation test verifying deep copy per iteration.The test correctly validates that
deepcopyis called once per iteration (3 times for 3 items), ensuring state isolation between loop iterations.src/lfx/src/lfx/components/flow_controls/loop.py (5)
1-8: LGTM! Clean import organization from new loop_utils module.The imports are well-organized and alphabetically sorted, following the coding guidelines.
71-73: Good delegation to shared utility function.The
_validate_datamethod now properly delegates tovalidate_data_input, centralizing validation logic.
75-93: LGTM! Proper graph context validation before traversal.The method correctly checks for
_vertexpresence before attempting graph traversal, returning an empty set when no context is available.
120-147: LGTM! Clean orchestration of subgraph-based loop execution.The method properly extracts all loop body configuration upfront, then delegates to the shared
execute_loop_bodyutility, passing through theevent_managerfor UI event propagation.
178-186: Good error handling pattern - log then re-raise.The try/except correctly logs the error asynchronously and re-raises to propagate the failure. The inline import of
loggeris acceptable for error paths to avoid circular imports.src/lfx/src/lfx/base/flow_controls/loop_utils.py (4)
100-114: LGTM!Clean implementation that correctly identifies the first vertex in the loop body.
117-131: LGTM!Implementation correctly returns the edge object needed for parameter injection in
execute_loop_body.
163-184: LGTM!Clean validation with appropriate lazy import. Note that an empty list passes validation, which is likely the intended behavior for edge cases.
154-158: Handle emptyoutputsdict to avoidStopIteration.If
result_dict.outputsis truthy but empty (e.g.,{}),next(iter(result_dict.outputs.values()))will raiseStopIteration. Add a check or use a default.if result_dict.outputs: # Get first output value - first_output = next(iter(result_dict.outputs.values())) + first_output = next(iter(result_dict.outputs.values()), None) + if first_output is None: + continue if hasattr(first_output, "message"): return first_output.messageLikely an incorrect or invalid review comment.
| if hasattr(start_vertex, "custom_component") and start_vertex.custom_component: | ||
| start_vertex.custom_component.set(**{target_param: item}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log or warn when item injection is skipped.
If start_vertex.custom_component is None or doesn't have set(), the loop iteration silently proceeds without the input item, likely producing incorrect results. Consider logging a warning or raising an error.
# Use set() with the target parameter as a keyword argument
if hasattr(start_vertex, "custom_component") and start_vertex.custom_component:
start_vertex.custom_component.set(**{target_param: item})
+ else:
+ msg = f"Cannot inject loop item: start_vertex '{start_vertex_id}' has no custom_component"
+ raise RuntimeError(msg)Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/lfx/src/lfx/base/flow_controls/loop_utils.py around lines 234-235, the
code silently skips injecting the loop item when start_vertex.custom_component
is None or lacks a set() method; update the logic to detect these cases and emit
a warning log (or raise an explicit exception if that fits the flow) indicating
the iteration index and which parameter was skipped, and ensure the logger used
is the module/class logger so callers can see the skipped injection; do not
change the existing happy-path behavior other than adding the warning/exception
when injection cannot occur.
| from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch | ||
|
|
||
| from lfx.base.flow_controls.loop_utils import execute_loop_body, get_loop_body_vertices | ||
| from lfx.components.flow_controls.loop import LoopComponent | ||
| from lfx.schema.data import Data | ||
| from lfx.schema.dataframe import DataFrame |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add missing pytest import for async test markers.
from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch
+import pytest
+
from lfx.base.flow_controls.loop_utils import execute_loop_body, get_loop_body_vertices🤖 Prompt for AI Agents
In src/lfx/tests/unit/components/flow_controls/test_loop_events.py around lines
12 to 17, the module uses pytest async test markers but does not import pytest;
add a top-level import statement "import pytest" among the other imports (e.g.,
after unittest.mock imports) so pytest.mark.asyncio and other pytest markers are
available for the async tests.
| async def test_event_manager_passed_to_subgraph_async_start(self): | ||
| """Test that event_manager is passed to subgraph's async_start method.""" | ||
| mock_event_manager = MagicMock() | ||
| received_event_manager = None | ||
|
|
||
| # Create a mock subgraph that captures the event_manager | ||
| async def mock_async_start(event_manager=None): | ||
| nonlocal received_event_manager | ||
| received_event_manager = event_manager | ||
| yield MagicMock(valid=True, result_dict=MagicMock(outputs={})) | ||
|
|
||
| mock_subgraph = MagicMock() | ||
| mock_subgraph.prepare = MagicMock() | ||
| mock_subgraph.async_start = mock_async_start | ||
| mock_subgraph.get_vertex = MagicMock(return_value=MagicMock(custom_component=MagicMock())) | ||
|
|
||
| mock_graph = MagicMock() | ||
| mock_graph.create_subgraph = MagicMock(return_value=mock_subgraph) | ||
|
|
||
| data_list = [Data(text="item1")] | ||
|
|
||
| with patch("copy.deepcopy", return_value=mock_subgraph): | ||
| await execute_loop_body( | ||
| graph=mock_graph, | ||
| data_list=data_list, | ||
| loop_body_vertex_ids={"vertex1"}, | ||
| start_vertex_id="vertex1", | ||
| start_edge=MagicMock(target_handle=MagicMock(fieldName="data")), | ||
| end_vertex_id="vertex1", | ||
| event_manager=mock_event_manager, | ||
| ) | ||
|
|
||
| # Verify event_manager was passed to async_start | ||
| assert received_event_manager is mock_event_manager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing @pytest.mark.asyncio decorator on async tests.
Per coding guidelines, async test methods require the @pytest.mark.asyncio decorator. This applies to all async test methods in this file.
+ @pytest.mark.asyncio
async def test_event_manager_passed_to_subgraph_async_start(self):Apply the same decorator to: test_event_manager_passed_for_each_iteration, test_done_output_passes_event_manager, test_execute_loop_body_called_with_event_manager, test_each_iteration_uses_fresh_subgraph_copy, and test_start_vertex_receives_correct_item.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def test_event_manager_passed_to_subgraph_async_start(self): | |
| """Test that event_manager is passed to subgraph's async_start method.""" | |
| mock_event_manager = MagicMock() | |
| received_event_manager = None | |
| # Create a mock subgraph that captures the event_manager | |
| async def mock_async_start(event_manager=None): | |
| nonlocal received_event_manager | |
| received_event_manager = event_manager | |
| yield MagicMock(valid=True, result_dict=MagicMock(outputs={})) | |
| mock_subgraph = MagicMock() | |
| mock_subgraph.prepare = MagicMock() | |
| mock_subgraph.async_start = mock_async_start | |
| mock_subgraph.get_vertex = MagicMock(return_value=MagicMock(custom_component=MagicMock())) | |
| mock_graph = MagicMock() | |
| mock_graph.create_subgraph = MagicMock(return_value=mock_subgraph) | |
| data_list = [Data(text="item1")] | |
| with patch("copy.deepcopy", return_value=mock_subgraph): | |
| await execute_loop_body( | |
| graph=mock_graph, | |
| data_list=data_list, | |
| loop_body_vertex_ids={"vertex1"}, | |
| start_vertex_id="vertex1", | |
| start_edge=MagicMock(target_handle=MagicMock(fieldName="data")), | |
| end_vertex_id="vertex1", | |
| event_manager=mock_event_manager, | |
| ) | |
| # Verify event_manager was passed to async_start | |
| assert received_event_manager is mock_event_manager | |
| @pytest.mark.asyncio | |
| async def test_event_manager_passed_to_subgraph_async_start(self): | |
| """Test that event_manager is passed to subgraph's async_start method.""" | |
| mock_event_manager = MagicMock() | |
| received_event_manager = None | |
| # Create a mock subgraph that captures the event_manager | |
| async def mock_async_start(event_manager=None): | |
| nonlocal received_event_manager | |
| received_event_manager = event_manager | |
| yield MagicMock(valid=True, result_dict=MagicMock(outputs={})) | |
| mock_subgraph = MagicMock() | |
| mock_subgraph.prepare = MagicMock() | |
| mock_subgraph.async_start = mock_async_start | |
| mock_subgraph.get_vertex = MagicMock(return_value=MagicMock(custom_component=MagicMock())) | |
| mock_graph = MagicMock() | |
| mock_graph.create_subgraph = MagicMock(return_value=mock_subgraph) | |
| data_list = [Data(text="item1")] | |
| with patch("copy.deepcopy", return_value=mock_subgraph): | |
| await execute_loop_body( | |
| graph=mock_graph, | |
| data_list=data_list, | |
| loop_body_vertex_ids={"vertex1"}, | |
| start_vertex_id="vertex1", | |
| start_edge=MagicMock(target_handle=MagicMock(fieldName="data")), | |
| end_vertex_id="vertex1", | |
| event_manager=mock_event_manager, | |
| ) | |
| # Verify event_manager was passed to async_start | |
| assert received_event_manager is mock_event_manager |
🤖 Prompt for AI Agents
In src/lfx/tests/unit/components/flow_controls/test_loop_events.py around lines
23 to 56, several async test functions (including
test_event_manager_passed_to_subgraph_async_start) are missing the required
@pytest.mark.asyncio decorator; add @pytest.mark.asyncio above each async test
function in this file — specifically for
test_event_manager_passed_for_each_iteration,
test_done_output_passes_event_manager,
test_execute_loop_body_called_with_event_manager,
test_each_iteration_uses_fresh_subgraph_copy,
test_start_vertex_receives_correct_item (and the shown test) — to ensure pytest
runs these coroutine tests correctly.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #11034 +/- ##
==========================================
+ Coverage 33.09% 33.13% +0.04%
==========================================
Files 1389 1390 +1
Lines 65699 65811 +112
Branches 9720 9751 +31
==========================================
+ Hits 21743 21809 +66
- Misses 42842 42870 +28
- Partials 1114 1132 +18
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Summary
This PR refactors the LoopComponent to execute its loop body as an isolated subgraph rather than relying on the main graph's dependency resolution.
The Problem
Previously, the LoopComponent needed to coordinate with the graph's execution engine to handle iteration. This required modifications to graph run dependencies and created tight coupling between the loop logic and graph execution.
The Solution
The loop body is now identified and extracted as a subgraph that runs independently:
Loop body identification:
get_loop_body_vertices()traverses from the loop's "item" output to the vertex that feeds back to the loop input, collecting all vertices in between. This naturally handles nested loops by stopping at each loop's feedback edge.Subgraph execution: For each iteration, the loop creates an isolated copy of the loop body subgraph via
Graph.create_subgraph(). This subgraph has its own context, so state from one iteration doesn't leak into another.Internal execution: The
done_output()method now orchestrates the entire loop - it iterates over the data list, executes the subgraph for each item, and aggregates the results. Theitem_output()is kept for compatibility but no longer drives the loop.Benefits
on_end_vertexevents for each vertex in the loop body, enabling UI progress updatesChanges
LoopComponent: Refactored to use subgraph executionGraph.create_subgraph(): New method to create isolated subgraphsloop_utils.py: Helper functions for loop body identification and executionon_end_vertexevent emission inasync_start()for progress trackingSummary by CodeRabbit
Release Notes
✏️ Tip: You can customize this high-level summary in your review settings.