-
Notifications
You must be signed in to change notification settings - Fork 8.2k
feat: Graph Execution Debugging and Event System #10545
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 16 commits
f080f0b
8bce7cc
c61d063
121d940
0ab3248
b2063cf
52c711c
4246754
6e2f430
a2c2e65
44a3b4b
a0f4e56
b948f1b
866ea49
9d73199
25aa1f2
b93eeca
0a7c61c
3294dfd
66aa46e
d136c33
dbf8d8b
46e138a
8d117f7
a9a0195
18739ff
6904ea2
7584b3c
892c742
e879125
103cec3
3106ac6
c128c6f
2268e97
5d75fe5
f0cf5f5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,146 @@ | ||
| # Graph Execution Debugging and Event System | ||
|
|
||
| ## Overview | ||
|
|
||
| This PR introduces a comprehensive event-based debugging system for Langflow graph execution, enabling detailed tracking and analysis of graph state mutations during execution. The implementation uses a pure observer pattern that provides zero overhead when not in use, making it production-safe. | ||
|
|
||
| ## Key Features | ||
|
|
||
| ### 🎯 Graph Mutation Event System | ||
|
|
||
| - **Event Infrastructure**: New `GraphMutationEvent` system that tracks all graph state changes with before/after snapshots | ||
| - **Observer Pattern**: Pure observer pattern implementation with `register_observer()` and `unregister_observer()` methods | ||
| - **Zero Overhead**: Fast path when no observers are registered, ensuring no performance impact in production | ||
| - **Serializable Events**: Events can be serialized to dictionaries for replay and storage | ||
|
|
||
| ### 📊 Event-Based Recording | ||
|
|
||
| - **EventRecorder**: Observer that captures all graph mutations during execution | ||
| - **EventBasedRecording**: Rich recording object with analysis methods: | ||
| - `get_events_by_type()` - Filter events by type | ||
| - `get_events_for_vertex()` - Get all events for a specific vertex | ||
| - `get_queue_evolution()` - Track how the execution queue changes over time | ||
| - `get_dependency_changes()` - Monitor dependency modifications | ||
| - `show_summary()`, `show_timeline()`, `show_events_for_component()` - Visualization methods | ||
| - **Save/Load**: Recordings can be saved to and loaded from files for later analysis | ||
|
|
||
| ### 🔧 Graph Execution Improvements | ||
|
|
||
| #### Loop Component Enhancements | ||
| - **Synchronized Dependencies**: Loop component now properly updates both `run_predecessors` and `run_map` to keep dependency structures synchronized | ||
| - **State Reset**: New `reset_loop_state()` method for clean loop state management between executions | ||
| - **Better Documentation**: Added critical comments explaining the relationship between dependency structures | ||
|
|
||
| #### Graph Manager Refactoring | ||
| - **Async Methods**: Made `remove_from_predecessors()` and `remove_vertex_from_runnables()` async for consistency | ||
| - **Sync Variants**: Added `mark_branch_sync()` for synchronous contexts (used by custom components) | ||
| - **Centralized Mutations**: All graph mutations now go through centralized methods that emit events | ||
|
|
||
| ### 🧪 Testing Infrastructure | ||
|
|
||
| #### Execution Path Validation | ||
| - **Path Equivalence Testing**: New test suite that validates both `async_start()` and `arun()` execution paths produce identical results | ||
| - **Test Data Flows**: Uses test flows that don't require API keys for reliable CI testing | ||
| - **Comprehensive Tracing**: `ExecutionTracer` captures complete execution traces for comparison | ||
|
|
||
| #### Event System Tests | ||
| - **Mutation Event Tests**: Tests for queue operations, dependency updates, and event emission | ||
| - **Event Recorder Tests**: Tests for event capture, queue evolution tracking, and recording analysis | ||
| - **Graph Mutation Tests**: Tests ensuring both dependency structures stay synchronized | ||
|
|
||
| ### 🛠️ Component Validation Improvements | ||
|
|
||
| - **TYPE_CHECKING Block Support**: Component validation now properly handles `TYPE_CHECKING` blocks, extracting imports needed for `get_type_hints()` to work correctly | ||
| - **Better Error Handling**: Improved error handling for components defined in notebooks or REPL environments | ||
| - **Source Code Extraction**: More robust source code extraction with graceful fallbacks | ||
|
|
||
| ## Technical Details | ||
|
|
||
| ### Event Types Tracked | ||
|
|
||
| - `queue_extended` - When vertices are added to the execution queue | ||
| - `queue_dequeued` - When vertices are removed from the queue | ||
| - `dependency_added` - When dynamic dependencies are added | ||
| - `vertex_marked` - When vertex states change (ACTIVE/INACTIVE) | ||
|
|
||
| ### Architecture | ||
|
|
||
| ``` | ||
| Graph | ||
| ├── register_observer() / unregister_observer() | ||
| ├── _emit_event() - Emits events to all observers | ||
| └── All mutations → emit before/after events | ||
| ├── extend_run_queue() | ||
| ├── add_dynamic_dependency() | ||
| ├── mark_branch_sync() | ||
| └── remove_from_predecessors() | ||
| ``` | ||
|
|
||
| ### Usage Example | ||
|
|
||
| ```python | ||
| from lfx.graph.graph.base import Graph | ||
| from lfx.debug.event_recorder import record_graph_with_events | ||
|
|
||
| # Record graph execution | ||
| graph = Graph.from_payload(flow_data) | ||
| recording = await record_graph_with_events(graph, "My Flow") | ||
|
|
||
| # Analyze the recording | ||
| recording.show_summary() | ||
| recording.show_timeline() | ||
|
|
||
| # Get specific insights | ||
| queue_evolution = recording.get_queue_evolution() | ||
| dependency_changes = recording.get_dependency_changes() | ||
|
|
||
| # Save for later analysis | ||
| recording.save("flow_recording.pkl") | ||
| ``` | ||
|
|
||
| ## Files Changed | ||
|
|
||
| ### New Files | ||
| - `src/lfx/src/lfx/debug/__init__.py` - Debug module initialization | ||
| - `src/lfx/src/lfx/debug/events.py` - GraphMutationEvent and observer types | ||
| - `src/lfx/src/lfx/debug/event_recorder.py` - EventRecorder and EventBasedRecording | ||
| - `src/backend/tests/unit/graph/test_execution_path_validation.py` - Execution path equivalence tests | ||
| - `src/backend/tests/unit/graph/test_execution_path_equivalence.py` - Execution tracing utilities | ||
| - `src/backend/tests/unit/graph/test_event_recorder.py` - Event recorder tests | ||
| - `src/backend/tests/unit/graph/test_graph_mutation_events.py` - Mutation event tests | ||
|
|
||
| ### Modified Files | ||
| - `src/lfx/src/lfx/graph/graph/base.py` - Added observer pattern, event emission | ||
| - `src/lfx/src/lfx/graph/graph/runnable_vertices_manager.py` - Made methods async | ||
| - `src/lfx/src/lfx/components/logic/loop.py` - Improved dependency synchronization | ||
| - `src/lfx/src/lfx/custom/custom_component/component.py` - Better error handling | ||
| - `src/lfx/src/lfx/custom/custom_component/custom_component.py` - Use mark_branch_sync | ||
| - `src/lfx/src/lfx/custom/validate.py` - TYPE_CHECKING block support | ||
| - `pyproject.toml` - Added marimo dependency for debugging notebooks | ||
|
|
||
| ## Benefits | ||
|
|
||
| 1. **Debugging**: Comprehensive visibility into graph execution state changes | ||
| 2. **Testing**: Better test coverage with execution path validation | ||
| 3. **Reliability**: Synchronized dependency structures prevent bugs | ||
| 4. **Performance**: Zero overhead when debugging is not active | ||
| 5. **Extensibility**: Easy to add new event types and observers | ||
|
|
||
| ## Testing | ||
|
|
||
| - ✅ All existing tests pass | ||
| - ✅ New execution path validation tests pass | ||
| - ✅ Event system tests pass | ||
| - ✅ Loop component tests pass with improved dependency handling | ||
|
|
||
| ## Breaking Changes | ||
|
|
||
| None - This is a purely additive change. The event system is opt-in and has zero overhead when not used. | ||
|
|
||
| ## Future Work | ||
|
|
||
| - [ ] Add more event types (vertex execution start/end, memory updates, etc.) | ||
| - [ ] Create visualization tools for event recordings | ||
| - [ ] Add event filtering and querying capabilities | ||
| - [ ] Integrate with Langflow UI for real-time debugging | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| # Graph Execution Debugging and Event System | ||
|
|
||
| ## Summary | ||
|
|
||
| Introduces a comprehensive event-based debugging system for Langflow graph execution with zero overhead when not in use. Uses a pure observer pattern to track all graph state mutations during execution. | ||
|
|
||
| ## Key Changes | ||
|
|
||
| ### 🎯 Event System | ||
| - New `GraphMutationEvent` system tracking all graph state changes | ||
| - Observer pattern with `register_observer()` / `unregister_observer()` | ||
| - Zero overhead fast path when no observers registered | ||
| - Serializable events for replay and analysis | ||
|
|
||
| ### 📊 Event Recording | ||
| - `EventRecorder` captures all graph mutations | ||
| - `EventBasedRecording` with analysis methods (queue evolution, dependency changes, etc.) | ||
| - Save/load recordings for later analysis | ||
|
|
||
| ### 🔧 Graph Improvements | ||
| - **Loop Component**: Synchronized `run_predecessors` and `run_map` dependencies | ||
| - **Graph Manager**: Made `remove_from_predecessors()` async, added `mark_branch_sync()` for sync contexts | ||
| - **Component Validation**: Better `TYPE_CHECKING` block handling | ||
|
|
||
| ### 🧪 Testing | ||
| - Execution path validation tests (`async_start()` vs `arun()` equivalence) | ||
| - Event system tests | ||
| - Comprehensive test coverage | ||
|
|
||
| ## Usage | ||
|
|
||
| ```python | ||
| from lfx.debug.event_recorder import record_graph_with_events | ||
|
|
||
| recording = await record_graph_with_events(graph, "My Flow") | ||
| recording.show_summary() | ||
| recording.get_queue_evolution() | ||
| recording.save("recording.pkl") | ||
| ``` | ||
|
|
||
| ## Breaking Changes | ||
|
|
||
| None - purely additive, opt-in feature. | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| """Tests for event-based graph recorder (pure observer pattern).""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import pytest | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_event_recorder_captures_all_mutations(): | ||
| """Test that event recorder captures all graph mutations.""" | ||
| import json | ||
| from pathlib import Path | ||
|
|
||
| from lfx.debug.event_recorder import record_graph_with_events | ||
| from lfx.graph.graph.base import Graph | ||
|
|
||
| test_file = Path("src/backend/tests/data/LoopTest.json") | ||
| data = json.loads(test_file.read_text()) | ||
|
||
|
|
||
| graph = Graph.from_payload(data["data"]) | ||
|
|
||
| recording = await record_graph_with_events(graph, "Test") | ||
|
|
||
| # Should capture many events | ||
| assert len(recording.events) > 100 | ||
|
|
||
| # Should have queue and vertex events | ||
| queue_events = recording.get_events_by_type("queue_extended") | ||
| assert len(queue_events) > 0 | ||
|
|
||
| vertex_events = recording.get_events_by_type("vertex_marked") | ||
| assert len(vertex_events) > 0 | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_queue_evolution_tracking(): | ||
| """Test queue evolution analysis.""" | ||
| import json | ||
| from pathlib import Path | ||
|
|
||
| from lfx.debug.event_recorder import record_graph_with_events | ||
| from lfx.graph.graph.base import Graph | ||
|
|
||
| test_file = Path("src/backend/tests/data/LoopTest.json") | ||
| data = json.loads(test_file.read_text()) | ||
|
|
||
| graph = Graph.from_payload(data["data"]) | ||
| recording = await record_graph_with_events(graph, "Test") | ||
|
|
||
| queue_evo = recording.get_queue_evolution() | ||
|
|
||
| # Should have queue evolution entries | ||
| assert len(queue_evo) > 0 | ||
|
|
||
| # Each entry should have required fields | ||
| first = queue_evo[0] | ||
| assert "step" in first | ||
| assert "queue" in first | ||
| assert "changes" in first | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| pytest.main([__file__, "-v", "-s"]) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| """Tests for graph mutation event system.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from typing import TYPE_CHECKING | ||
|
|
||
| import pytest | ||
|
|
||
| if TYPE_CHECKING: | ||
| from lfx.debug.events import GraphMutationEvent | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_queue_operations_emit_events(): | ||
| """Test queue operations emit before/after events.""" | ||
| from lfx.graph.graph.base import Graph | ||
|
|
||
| events = [] | ||
|
|
||
| async def capture(event: GraphMutationEvent): | ||
| events.append(event) | ||
|
|
||
| graph = Graph() | ||
| graph.register_observer(capture) | ||
| await graph.extend_run_queue(["v1", "v2"]) | ||
|
|
||
| assert len(events) == 2 | ||
| assert events[0].timing == "before" | ||
| assert events[1].timing == "after" | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_dependency_updates_both_structures(): | ||
| """Test add_dynamic_dependency updates both structures.""" | ||
| from lfx.graph.graph.base import Graph | ||
|
|
||
| graph = Graph() | ||
| await graph.add_dynamic_dependency("v1", "v2") | ||
|
|
||
| assert "v2" in graph.run_manager.run_predecessors["v1"] | ||
| assert "v1" in graph.run_manager.run_map["v2"] | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_fast_path_no_overhead(): | ||
| """Test zero overhead without observers.""" | ||
| from lfx.graph.graph.base import Graph | ||
|
|
||
| graph = Graph() | ||
| await graph.extend_run_queue(["v1"]) | ||
| assert graph._mutation_step == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specify a language on this fence to satisfy markdownlint.
markdownlint(MD040) is flagging this block because the fence lacks a language hint. Adding something liketextkeeps the ASCII diagram readable and unblocks the lint job.In PR_DESCRIPTION.md around lines 66 to 77, the fenced code block containing the
ASCII architecture diagram lacks a language hint which triggers markdownlint
MD040; update the opening fence to include a language (for example change
totext) so the block becomes a ```text fenced block, keep the diagram contentunchanged, save the file and re-run the linter to confirm the warning is
resolved.