-
Notifications
You must be signed in to change notification settings - Fork 8.2k
feat(api): Add real-time SSE support for webhook flow execution #11028
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThis pull request adds real-time webhook event streaming support across the platform. It introduces a new WebhookEventManager for event broadcasting, an SSE endpoint for streaming webhook progress, a React hook to consume events, and integration points throughout the build pipeline (backend API, LFX) to emit events during vertex execution. Changes
Sequence DiagramsequenceDiagram
participant Frontend as Frontend (React)
participant SSE as SSE Endpoint
participant EventMgr as WebhookEventManager
participant Backend as Backend Task
participant LFX as LFX Vertex Build
Frontend->>SSE: GET /webhook-events/{flow_id}
SSE->>EventMgr: subscribe(flow_id)
EventMgr-->>SSE: return asyncio.Queue
SSE-->>Frontend: connected event
Note over Backend,LFX: Webhook execution triggered
Backend->>Backend: webhook_run_flow()
Backend->>Backend: check has_listeners(flow_id)
alt UI Listeners Detected
Backend->>Backend: asyncio.create_task(simple_run_flow_task)
Backend->>EventMgr: emit(flow_id, vertices_sorted)
EventMgr-->>SSE: queue.put(event)
SSE-->>Frontend: vertices_sorted event
Backend->>LFX: execute vertex.build()
LFX->>EventMgr: record_build_start(flow_id, vertex_id)
EventMgr-->>SSE: (internal tracking)
LFX->>EventMgr: emit(flow_id, end_vertex, data)
EventMgr-->>SSE: queue.put(event)
SSE-->>Frontend: end_vertex event
Frontend->>Frontend: update flow state
LFX->>Backend: build complete
Backend->>EventMgr: emit(flow_id, end, result)
EventMgr-->>SSE: queue.put(event)
SSE-->>Frontend: end event
Frontend->>Frontend: finalize build state
else No UI Listeners
Backend->>Backend: fire-and-forget task
end
Frontend->>SSE: client disconnect
SSE->>EventMgr: unsubscribe(flow_id, queue)
EventMgr->>EventMgr: cleanup
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Pre-merge checks and finishing touchesImportant Pre-merge checks failedPlease resolve all errors before merging. Addressing warnings is optional. ❌ Failed checks (1 error, 2 warnings, 1 inconclusive)
✅ Passed checks (3 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #11028 +/- ##
========================================
Coverage 33.11% 33.11%
========================================
Files 1389 1391 +2
Lines 65685 65936 +251
Branches 9720 9746 +26
========================================
+ Hits 21750 21835 +85
- Misses 42821 42987 +166
Partials 1114 1114
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (7)
src/frontend/src/pages/FlowPage/index.tsx (1)
134-134: Remove or downgrade debug console.warn.The
console.warn("unmounting")should be removed for production code or converted to a debug-level log that can be conditionally disabled.- console.warn("unmounting");src/backend/tests/unit/test_webhook.py (1)
62-62: Redundant imports inside test functions.The
from unittest.mock import patchis already imported at line 2. The imports inside test functions are redundant.async def test_webhook_endpoint_requires_api_key_when_auto_login_false(client, added_webhook_test): """Test that webhook endpoint requires API key when WEBHOOK_AUTH_ENABLE=true.""" - # Mock the settings service to enable webhook authentication - from unittest.mock import patch - with patch("langflow.services.auth.utils.get_settings_service") as mock_settings:Also applies to: 102-102, 137-137, 157-157
src/lfx/src/lfx/graph/utils.py (1)
219-237: Inline serialization function works but could be extracted.The
serialize_for_jsonfunction correctly handles various types (primitives, dicts, lists, pydantic models). Consider extracting it to a module-level utility if this pattern is needed elsewhere.src/frontend/src/hooks/use-webhook-events.ts (2)
36-39: Remove or conditionally disable console.log statements.The console.log statements are useful for development but should be removed or wrapped in a debug flag for production builds.
- console.log("[useWebhookEvents] Connecting to SSE:", sseUrl); + // Uncomment for debugging: + // console.log("[useWebhookEvents] Connecting to SSE:", sseUrl);Also applies to: 44-46, 74-76, 91-93, 134-136
36-39: Add JSON.parse error handling for resilience.The
JSON.parse(event.data)calls in each event handler can throw if the server sends malformed data. Consider wrapping in try/catch.eventSource.addEventListener("connected", (event) => { - const data = JSON.parse(event.data); - console.log("[useWebhookEvents] Connected to flow:", data); + try { + const data = JSON.parse(event.data); + console.log("[useWebhookEvents] Connected to flow:", data); + } catch (e) { + console.error("[useWebhookEvents] Failed to parse connected event:", e); + } });Apply similar pattern to other event handlers (vertices_sorted, build_start, end_vertex, end).
Also applies to: 44-46, 74-76, 91-93, 134-136
src/backend/base/langflow/services/event_manager.py (1)
36-38: Potential memory leak in _vertex_start_times if vertices fail before get_build_duration is called.If a vertex build fails or is cancelled before
get_build_durationis called, the start time entry will remain in_vertex_start_timesindefinitely. Consider adding a TTL-based cleanup or explicit cleanup method for failed builds.For long-running servers, you may want to add periodic cleanup:
def cleanup_stale_start_times(self, max_age_seconds: float = 3600) -> None: """Remove start times older than max_age_seconds.""" current_time = time.time() for flow_id in list(self._vertex_start_times.keys()): for vertex_id, start_time in list(self._vertex_start_times[flow_id].items()): if current_time - start_time > max_age_seconds: self._vertex_start_times[flow_id].pop(vertex_id, None) if not self._vertex_start_times[flow_id]: del self._vertex_start_times[flow_id]src/backend/base/langflow/api/v1/endpoints.py (1)
671-695: Consider usingorjsoninstead ofjsonfor consistency.The file already imports
orjsonat line 10. Using it here would provide consistency and slightly better performance for JSON serialization in the SSE event stream.- import json - from langflow.services.event_manager import webhook_event_manager async def event_generator() -> AsyncGenerator[str, None]: """Generate SSE events from the webhook event manager.""" flow_id_str = str(flow.id) queue = await webhook_event_manager.subscribe(flow_id_str) try: # Send initial connection event - yield f"event: connected\ndata: {json.dumps({'flow_id': flow_id_str, 'flow_name': flow.name})}\n\n" + yield f"event: connected\ndata: {orjson.dumps({'flow_id': flow_id_str, 'flow_name': flow.name}).decode()}\n\n" while True: if await request.is_disconnected(): break try: event = await asyncio.wait_for(queue.get(), timeout=30.0) event_type = event["event"] - event_data = json.dumps(event["data"]) + event_data = orjson.dumps(event["data"]).decode() yield f"event: {event_type}\ndata: {event_data}\n\n" except asyncio.TimeoutError: # Heartbeat - yield f"event: heartbeat\ndata: {json.dumps({'timestamp': time.time()})}\n\n" + yield f"event: heartbeat\ndata: {orjson.dumps({'timestamp': time.time()}).decode()}\n\n"
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
src/frontend/package-lock.jsonis excluded by!**/package-lock.json
📒 Files selected for processing (8)
src/backend/base/langflow/api/v1/endpoints.py(5 hunks)src/backend/base/langflow/services/event_manager.py(1 hunks)src/backend/tests/unit/test_webhook.py(1 hunks)src/frontend/src/controllers/API/queries/_builds/use-get-builds-polling-mutation.ts(1 hunks)src/frontend/src/hooks/use-webhook-events.ts(1 hunks)src/frontend/src/pages/FlowPage/index.tsx(1 hunks)src/lfx/src/lfx/graph/utils.py(1 hunks)src/lfx/src/lfx/graph/vertex/base.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (10)
src/frontend/src/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/frontend_development.mdc)
src/frontend/src/**/*.{ts,tsx}: Use React 18 with TypeScript for frontend development
Use Zustand for state management
Files:
src/frontend/src/controllers/API/queries/_builds/use-get-builds-polling-mutation.tssrc/frontend/src/hooks/use-webhook-events.tssrc/frontend/src/pages/FlowPage/index.tsx
src/frontend/src/**/{hooks,services}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/frontend_development.mdc)
Use the useApi hook for API calls with proper error handling, including loading state, error state, and exception throwing
Files:
src/frontend/src/hooks/use-webhook-events.ts
src/frontend/src/**/{services,hooks}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/frontend_development.mdc)
Use async/await pattern for API calls instead of .then() chains
Files:
src/frontend/src/hooks/use-webhook-events.ts
src/frontend/src/hooks/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/frontend_development.mdc)
Export hooks as custom hooks (useHookName) from the hooks/ directory
Files:
src/frontend/src/hooks/use-webhook-events.ts
src/backend/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/backend_development.mdc)
src/backend/**/*.py: Use FastAPI async patterns withawaitfor async operations in component execution methods
Useasyncio.create_task()for background tasks and implement proper cleanup with try/except forasyncio.CancelledError
Usequeue.put_nowait()for non-blocking queue operations andasyncio.wait_for()with timeouts for controlled get operations
Files:
src/backend/base/langflow/services/event_manager.pysrc/backend/base/langflow/api/v1/endpoints.pysrc/backend/tests/unit/test_webhook.py
src/backend/base/langflow/api/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/backend_development.mdc)
Backend API endpoints should be organized by version (v1/, v2/) under
src/backend/base/langflow/api/with specific modules for features (chat.py, flows.py, users.py, etc.)
Files:
src/backend/base/langflow/api/v1/endpoints.py
src/backend/tests/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/testing.mdc)
src/backend/tests/**/*.py: Place backend unit tests insrc/backend/tests/directory, component tests insrc/backend/tests/unit/components/organized by component subdirectory, and integration tests accessible viamake integration_tests
Use same filename as component with appropriate test prefix/suffix (e.g.,my_component.py→test_my_component.py)
Use theclientfixture (FastAPI Test Client) defined insrc/backend/tests/conftest.pyfor API tests; it provides an asynchttpx.AsyncClientwith automatic in-memory SQLite database and mocked environment variables. Skip client creation by marking test with@pytest.mark.noclient
Inherit from the correctComponentTestBasefamily class located insrc/backend/tests/base.pybased on API access needs:ComponentTestBase(no API),ComponentTestBaseWithClient(needs API), orComponentTestBaseWithoutClient(pure logic). Provide three required fixtures:component_class,default_kwargs, andfile_names_mapping
Create comprehensive unit tests for all new backend components. If unit tests are incomplete, create a corresponding Markdown file documenting manual testing steps and expected outcomes
Test both sync and async code paths, mock external dependencies appropriately, test error handling and edge cases, validate input/output behavior, and test component initialization and configuration
Use@pytest.mark.asynciodecorator for async component tests and ensure async methods are properly awaited
Test background tasks usingasyncio.create_task()and verify completion withasyncio.wait_for()with appropriate timeout constraints
Test queue operations using non-blockingqueue.put_nowait()andasyncio.wait_for(queue.get(), timeout=...)to verify queue processing without blocking
Use@pytest.mark.no_blockbustermarker to skip the blockbuster plugin in specific tests
For database tests that may fail in batch runs, run them sequentially usinguv run pytest src/backend/tests/unit/test_database.pyr...
Files:
src/backend/tests/unit/test_webhook.py
**/test_*.py
📄 CodeRabbit inference engine (Custom checks)
**/test_*.py: Review test files for excessive use of mocks that may indicate poor test design - check if tests have too many mock objects that obscure what's actually being tested
Warn when mocks are used instead of testing real behavior and interactions, and suggest using real objects or test doubles when mocks become excessive
Ensure mocks are used appropriately for external dependencies only, not for core logic
Backend test files should follow the naming convention test_*.py with proper pytest structure
Test files should have descriptive test function names that explain what is being tested
Tests should be organized logically with proper setup and teardown
Consider including edge cases and error conditions for comprehensive test coverage
Verify tests cover both positive and negative scenarios where appropriate
For async functions in backend tests, ensure proper async testing patterns are used with pytest
For API endpoints, verify both success and error response testing
Files:
src/backend/tests/unit/test_webhook.py
src/frontend/src/**/*.{tsx,jsx,css,scss}
📄 CodeRabbit inference engine (.cursor/rules/frontend_development.mdc)
Use Tailwind CSS for styling
Files:
src/frontend/src/pages/FlowPage/index.tsx
src/frontend/src/**/*.{tsx,jsx}
📄 CodeRabbit inference engine (.cursor/rules/frontend_development.mdc)
src/frontend/src/**/*.{tsx,jsx}: Implement dark mode support using the useDarkMode hook and dark store
Use Lucide React for icon components in the application
Files:
src/frontend/src/pages/FlowPage/index.tsx
🧠 Learnings (11)
📚 Learning: 2025-11-24T19:46:45.790Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/frontend_development.mdc:0-0
Timestamp: 2025-11-24T19:46:45.790Z
Learning: Applies to src/frontend/src/components/**/*.{tsx,jsx} : Use React Flow for flow graph visualization with Node, Edge, Controls, and Background components
Applied to files:
src/frontend/src/hooks/use-webhook-events.tssrc/frontend/src/pages/FlowPage/index.tsx
📚 Learning: 2025-11-24T19:46:45.790Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/frontend_development.mdc:0-0
Timestamp: 2025-11-24T19:46:45.790Z
Learning: Applies to src/frontend/src/hooks/**/*.{ts,tsx} : Export hooks as custom hooks (useHookName) from the hooks/ directory
Applied to files:
src/frontend/src/hooks/use-webhook-events.ts
📚 Learning: 2025-11-24T19:46:45.790Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/frontend_development.mdc:0-0
Timestamp: 2025-11-24T19:46:45.790Z
Learning: Applies to src/frontend/src/**/{hooks,services}/**/*.{ts,tsx} : Use the useApi hook for API calls with proper error handling, including loading state, error state, and exception throwing
Applied to files:
src/frontend/src/hooks/use-webhook-events.ts
📚 Learning: 2025-06-23T12:46:42.048Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/frontend_development.mdc:0-0
Timestamp: 2025-06-23T12:46:42.048Z
Learning: React Flow should be used for flow graph visualization, with nodes and edges passed as props, and changes handled via onNodesChange and onEdgesChange callbacks.
Applied to files:
src/frontend/src/hooks/use-webhook-events.tssrc/frontend/src/pages/FlowPage/index.tsx
📚 Learning: 2025-11-24T19:46:09.104Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/backend_development.mdc:0-0
Timestamp: 2025-11-24T19:46:09.104Z
Learning: Applies to src/backend/base/langflow/api/**/*.py : Backend API endpoints should be organized by version (v1/, v2/) under `src/backend/base/langflow/api/` with specific modules for features (chat.py, flows.py, users.py, etc.)
Applied to files:
src/backend/base/langflow/api/v1/endpoints.py
📚 Learning: 2025-11-24T19:47:28.997Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-11-24T19:47:28.997Z
Learning: Applies to src/backend/tests/**/*.py : Test webhook endpoints by posting to `api/v1/webhook/{endpoint_name}` with appropriate payloads and validating response status codes
Applied to files:
src/backend/tests/unit/test_webhook.py
📚 Learning: 2025-11-24T19:47:28.997Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-11-24T19:47:28.997Z
Learning: Applies to src/backend/tests/**/*.py : Test real-time event streaming endpoints by consuming NDJSON lines using `response.aiter_lines()`, parsing JSON, and validating event structure and job_id consistency
Applied to files:
src/backend/tests/unit/test_webhook.py
📚 Learning: 2025-11-24T19:47:28.997Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-11-24T19:47:28.997Z
Learning: Applies to src/backend/tests/**/*.py : Test Langflow REST API endpoints using the `client` fixture with appropriate HTTP methods (GET, POST, etc.), headers (logged_in_headers), and payload validation
Applied to files:
src/backend/tests/unit/test_webhook.py
📚 Learning: 2025-11-24T19:47:28.997Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/testing.mdc:0-0
Timestamp: 2025-11-24T19:47:28.997Z
Learning: Applies to src/backend/tests/**/*.py : Use predefined JSON flows and utility functions from `tests.unit.build_utils` (create_flow, build_flow, get_build_events, consume_and_assert_stream) for flow execution testing
Applied to files:
src/backend/tests/unit/test_webhook.py
📚 Learning: 2025-06-23T12:46:42.048Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/frontend_development.mdc:0-0
Timestamp: 2025-06-23T12:46:42.048Z
Learning: Custom React Flow node types should be implemented as memoized components, using Handle components for connection points and supporting optional icons and labels.
Applied to files:
src/frontend/src/pages/FlowPage/index.tsx
📚 Learning: 2025-11-24T19:46:45.790Z
Learnt from: CR
Repo: langflow-ai/langflow PR: 0
File: .cursor/rules/frontend_development.mdc:0-0
Timestamp: 2025-11-24T19:46:45.790Z
Learning: Applies to src/frontend/src/components/nodes/**/*.{tsx,jsx} : Memoize custom React Flow node components using memo() to prevent unnecessary re-renders
Applied to files:
src/frontend/src/pages/FlowPage/index.tsx
🧬 Code graph analysis (7)
src/lfx/src/lfx/graph/vertex/base.py (2)
src/lfx/src/lfx/custom/custom_component/custom_component.py (1)
flow_id(201-202)src/backend/base/langflow/services/event_manager.py (3)
has_listeners(147-149)record_build_start(36-38)emit(100-145)
src/frontend/src/hooks/use-webhook-events.ts (2)
src/frontend/src/types/zustand/flow/index.ts (1)
VertexLayerElementType(46-49)src/frontend/src/types/api/index.ts (1)
VertexBuildTypeAPI(207-219)
src/backend/base/langflow/services/event_manager.py (2)
src/lfx/src/lfx/custom/custom_component/custom_component.py (1)
flow_id(201-202)src/backend/base/langflow/server.py (1)
error(53-61)
src/backend/base/langflow/api/v1/endpoints.py (1)
src/backend/base/langflow/services/event_manager.py (4)
emit(100-145)subscribe(64-79)unsubscribe(81-98)has_listeners(147-149)
src/backend/tests/unit/test_webhook.py (1)
src/backend/base/langflow/services/event_manager.py (8)
WebhookEventManager(17-149)has_listeners(147-149)subscribe(64-79)unsubscribe(81-98)emit(100-145)record_build_start(36-38)get_build_duration(40-48)_format_duration(54-62)
src/frontend/src/pages/FlowPage/index.tsx (4)
src/frontend/src/utils/reactflowUtils.ts (1)
customStringify(1142-1164)src/frontend/src/hooks/use-refresh-model-inputs.ts (1)
refreshAllModelInputs(68-121)src/frontend/src/hooks/use-webhook-events.ts (1)
useWebhookEvents(15-189)src/frontend/src/customization/feature-flags.ts (1)
ENABLE_NEW_SIDEBAR(23-23)
src/lfx/src/lfx/graph/utils.py (1)
src/backend/base/langflow/services/event_manager.py (3)
has_listeners(147-149)get_build_duration(40-48)emit(100-145)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (17)
- GitHub Check: Run Backend Tests / Unit Tests - Python 3.10 - Group 3
- GitHub Check: Run Backend Tests / Unit Tests - Python 3.10 - Group 4
- GitHub Check: Lint Backend / Run Mypy (3.12)
- GitHub Check: Run Backend Tests / Unit Tests - Python 3.10 - Group 1
- GitHub Check: Lint Backend / Run Mypy (3.13)
- GitHub Check: Run Backend Tests / LFX Tests - Python 3.10
- GitHub Check: Run Backend Tests / Unit Tests - Python 3.10 - Group 2
- GitHub Check: Run Backend Tests / Unit Tests - Python 3.10 - Group 5
- GitHub Check: Test Docker Images / Test docker images
- GitHub Check: Lint Backend / Run Mypy (3.11)
- GitHub Check: Lint Backend / Run Mypy (3.10)
- GitHub Check: Run Backend Tests / Integration Tests - Python 3.10
- GitHub Check: Run Frontend Unit Tests / Frontend Jest Unit Tests
- GitHub Check: Test Starter Templates
- GitHub Check: Optimize new Python code in this PR
- GitHub Check: Update Starter Projects
- GitHub Check: Update Component Index
🔇 Additional comments (26)
src/frontend/src/controllers/API/queries/_builds/use-get-builds-polling-mutation.ts (2)
122-148: Duration preservation logic looks good.The merge strategy correctly preserves duration data from SSE events when polling results lack it. This ensures real-time duration information from the SSE stream isn't lost during polling updates.
One minor note: the direct mutation of
lastEntry.data.durationat line 141 modifies the response object in place. This is acceptable here since the data is immediately set in state, but be aware this pattern could cause issues if the response object is used elsewhere.
152-154: Correct variable usage for error detection.Using
newFlowPoolfor error detection is appropriate since you want to check the freshly fetched build data for errors, not the merged result.src/lfx/src/lfx/graph/vertex/base.py (1)
748-764: Build start event emission is well-guarded.The implementation correctly:
- Uses lazy import to handle the case when langflow isn't available
- Checks for listeners before emitting (performance optimization)
- Records build start time for duration calculation
- Silently catches exceptions to avoid breaking the build process
The
# noqa: BLE001, S110comments appropriately acknowledge the intentional broad exception handling and the pass statement.src/frontend/src/pages/FlowPage/index.tsx (1)
59-60: Good integration of webhook events hook.The
useWebhookEvents()hook is correctly placed at the component's top level, following React hooks rules. The hook establishes SSE connection for real-time webhook feedback when a flow is open in the UI.src/backend/tests/unit/test_webhook.py (6)
416-433: Good test coverage for subscribe/unsubscribe lifecycle.The test correctly verifies the listener management lifecycle: initially no listeners, then has listeners after subscribe, then no listeners after unsubscribe.
435-456: Emit test properly validates event structure.The test verifies that emitted events are received with correct structure (event type, data, timestamp) and uses appropriate timeout with
asyncio.wait_for.
458-467: No-listener emit test is important for robustness.Good edge case coverage - verifying that emitting when no listeners exist doesn't raise errors.
469-491: Duration tracking test validates the full cycle.The test covers recording start, waiting, retrieving formatted duration, and verifying cleanup (subsequent get returns None). The assertion for "ms" format is appropriate given the 0.1s sleep.
493-508: Duration formatting test covers threshold boundaries.Good coverage of the formatting logic across milliseconds (< 1s), seconds (1s-60s), and minutes (≥ 60s) ranges.
510-533: Multiple subscribers test validates broadcast behavior.The test correctly verifies that both subscribers receive the same event, which is essential for the fan-out broadcasting pattern.
src/lfx/src/lfx/graph/utils.py (2)
256-273: End vertex event payload matches frontend expectations.The build_data structure aligns with the
VertexBuildTypeAPItype expected by the frontend. The hardcoded empty arrays fornext_vertices_idsandtop_level_verticesare acceptable here since these fields are typically populated by the build orchestration layer, not at the individual vertex build level.
274-275: Silent exception handling is appropriate for non-critical SSE emission.The broad exception handling with
passis intentional and appropriate here since SSE emission should never break the primary build flow.src/frontend/src/hooks/use-webhook-events.ts (3)
1-14: Well-documented hook with clear purpose.The JSDoc comment clearly explains the hook's purpose and its alignment with the normal Play button flow. Good use of TypeScript imports for type safety.
158-169: Error event handler correctly distinguishes error types.Good pattern to check for
eventOrError.datato differentiate between SSE errors with data payloads and generic connection errors. The catch block at line 165 handles JSON parse failures gracefully.
178-186: Proper cleanup with EventSource.close().The cleanup function correctly closes the EventSource and clears the ref, preventing memory leaks and orphaned connections.
src/backend/base/langflow/services/event_manager.py (6)
17-35: Well-designed event manager with proper initialization.Good use of:
defaultdictfor automatic collection initializationasyncio.Lockfor concurrent access protection- Clear documentation explaining the purpose
36-48: Duration tracking with proper cleanup.The
get_build_durationmethod correctly cleans up the start time after retrieval, preventing memory leaks for completed vertices.
50-62: Clean duration formatting implementation.The
_format_durationmethod handles edge cases well with clear thresholds for milliseconds, seconds, and minutes formatting.
64-79: Subscribe with bounded queue is a good pattern.The
maxsize=100prevents unbounded memory growth from slow consumers. Logging the listener count aids debugging.
100-145: Robust emit with timeout and dead queue cleanup.The implementation correctly:
- Copies listeners under lock to avoid iteration issues
- Uses
wait_forwith timeout to prevent blocking- Handles both timeout (skip event) and exceptions (remove queue)
- Cleans up dead queues to prevent memory leaks
152-153: Global singleton is appropriate for this use case.The singleton pattern is suitable here since webhook events need to be broadcast across the application from a single source of truth.
src/backend/base/langflow/api/v1/endpoints.py (5)
208-224: LGTM!The new parameters
emit_eventsandflow_idare well-documented and have sensible defaults. The docstring clearly explains thatflow_idis required whenemit_events=True.
225-248: LGTM!The lazy import avoids potential circular dependencies, and the event emission logic correctly extracts vertex IDs from the flow data before execution begins.
258-261: LGTM!The success event is correctly emitted after flow execution completes, providing the
run_idandsuccess: Truestatus for UI feedback.
277-280: LGTM!The error event correctly uses the same "end" event type with
success: Falseand includes the error message, allowing consistent event handling on the UI side.
776-789: LGTM - Correct use ofasyncio.create_task()for background execution.The fire-and-forget pattern is correctly implemented:
- Uses
asyncio.create_task()as recommended by coding guidelines for SSE event loop compatibility- The done callback prevents "Task exception was never retrieved" warnings by consuming any uncaught exceptions
Note that
simple_run_flow_taskalready handles all exceptions internally (lines 275-292), so the callback is primarily defensive.
… valid API key request to ensure accurate file existence assertion
…n and remove unnecessary whitespace style(test_webhook.py): remove extra blank line to adhere to PEP 8 style guidelines
This pull request introduces real-time Server-Sent Events (SSE) support for webhook-based flow execution, allowing the UI to receive live feedback about flow progress and results. It also updates the webhook execution logic to emit relevant events and improves the handling of session IDs in the
ChatOutputcomponent of the "Basic Prompt Chaining" starter project.Webhook SSE and Event Emission Enhancements:
/webhook-events/{flow_id_or_name}SSE endpoint to stream real-time webhook execution events to the UI, including connection, progress, and heartbeat events. (src/backend/base/langflow/api/v1/endpoints.py)vertices_sorted,end, and error events) before, after, and upon failure of a flow run. These events are sent via the new SSE mechanism if UI listeners are present. (src/backend/base/langflow/api/v1/endpoints.py) [1] [2] [3] [4]asyncio.create_taskfor background execution, ensuring compatibility with the SSE event loop and proper task lifecycle management. (src/backend/base/langflow/api/v1/endpoints.py)Starter Project and Component Improvements:
ChatOutputcomponent in the "Basic Prompt Chaining" starter project to better preserve and propagate thesession_idfrom incoming messages, ensuring correct message history tracking. (src/backend/base/langflow/initial_setup/starter_projects/Basic Prompt Chaining.json)code_hashin the starter project metadata to reflect the new component logic. (src/backend/base/langflow/initial_setup/starter_projects/Basic Prompt Chaining.json)Summary by CodeRabbit
New Features
Documentation
✏️ Tip: You can customize this high-level summary in your review settings.