Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 122 additions & 15 deletions src/backend/base/langflow/api/v1/endpoints.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import json
import time
from collections.abc import AsyncGenerator
from http import HTTPStatus
Expand Down Expand Up @@ -50,6 +51,7 @@
from langflow.services.database.models.flow.utils import get_all_webhook_components_in_flow
from langflow.services.database.models.user.model import User, UserRead
from langflow.services.deps import get_session_service, get_settings_service, get_telemetry_service
from langflow.services.event_manager import webhook_event_manager
from langflow.services.telemetry.schema import RunPayload
from langflow.utils.compression import compress_response
from langflow.utils.version import get_version_info
Expand All @@ -59,6 +61,9 @@

router = APIRouter(tags=["Base"])

# SSE Constants
SSE_HEARTBEAT_TIMEOUT_SECONDS = 30.0


async def parse_input_request_from_body(http_request: Request) -> SimplifiedAPIRequest:
"""Parse SimplifiedAPIRequest from HTTP request body.
Expand Down Expand Up @@ -205,9 +210,43 @@ async def simple_run_flow_task(
telemetry_service=None,
start_time: float | None = None,
run_id: str | None = None,
emit_events: bool = False,
flow_id: str | None = None,
):
"""Run a flow task as a BackgroundTask, therefore it should not throw exceptions."""
"""Run a flow task as a BackgroundTask, therefore it should not throw exceptions.

Args:
flow: The flow to execute
input_request: The simplified API request
stream: Whether to stream results
api_key_user: The user executing the flow
event_manager: Event manager for streaming
telemetry_service: Service for logging telemetry
start_time: Start time for duration calculation
run_id: Unique ID for this run
emit_events: Whether to emit events to webhook_event_manager (for UI feedback)
flow_id: Flow ID for event emission (required if emit_events=True)
"""
webhook_event_mgr = webhook_event_manager if emit_events and flow_id else None

try:
# Emit vertices_sorted event before starting execution
if emit_events and webhook_event_mgr and flow_id:
# Get all vertex IDs from the flow
vertex_ids = []
if flow.data and flow.data.get("nodes"):
vertex_ids = [node.get("id") for node in flow.data.get("nodes", []) if node.get("id")]

await webhook_event_mgr.emit(
flow_id,
"vertices_sorted",
{
"ids": vertex_ids,
"to_run": vertex_ids,
"run_id": run_id,
},
)

result = await simple_run_flow(
flow=flow,
input_request=input_request,
Expand All @@ -216,6 +255,11 @@ async def simple_run_flow_task(
event_manager=event_manager,
run_id=run_id,
)

# Emit end event if UI is listening
if emit_events and webhook_event_mgr and flow_id:
await webhook_event_mgr.emit(flow_id, "end", {"run_id": run_id, "success": True})

if telemetry_service and start_time is not None:
await telemetry_service.log_package_run(
RunPayload(
Expand All @@ -230,6 +274,11 @@ async def simple_run_flow_task(

except Exception as exc: # noqa: BLE001
await logger.aexception(f"Error running flow {flow.id} task")

# Emit error event if UI is listening
if emit_events and webhook_event_mgr and flow_id:
await webhook_event_mgr.emit(flow_id, "end", {"run_id": run_id, "success": False, "error": str(exc)})

if telemetry_service and start_time is not None:
await telemetry_service.log_package_run(
RunPayload(
Expand Down Expand Up @@ -608,23 +657,70 @@ async def simplified_run_flow_session(
)


@router.get("/webhook-events/{flow_id_or_name}")
async def webhook_events_stream(
flow_id_or_name: str, # noqa: ARG001
flow: Annotated[Flow, Depends(get_flow_by_id_or_endpoint_name)],
request: Request,
):
"""Server-Sent Events (SSE) endpoint for real-time webhook build updates.

When a flow is open in the UI, this endpoint provides live feedback
of webhook execution progress, similar to clicking "Play" in the UI.
"""

async def event_generator() -> AsyncGenerator[str, None]:
"""Generate SSE events from the webhook event manager."""
flow_id_str = str(flow.id)
queue = await webhook_event_manager.subscribe(flow_id_str)

try:
# Send initial connection event
yield f"event: connected\ndata: {json.dumps({'flow_id': flow_id_str, 'flow_name': flow.name})}\n\n"

while True:
if await request.is_disconnected():
break

try:
event = await asyncio.wait_for(queue.get(), timeout=SSE_HEARTBEAT_TIMEOUT_SECONDS)
event_type = event["event"]
event_data = json.dumps(event["data"])
yield f"event: {event_type}\ndata: {event_data}\n\n"
except asyncio.TimeoutError:
yield f"event: heartbeat\ndata: {json.dumps({'timestamp': time.time()})}\n\n"

except asyncio.CancelledError:
pass
finally:
await webhook_event_manager.unsubscribe(flow_id_str, queue)

return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)


@router.post("/webhook/{flow_id_or_name}", response_model=dict, status_code=HTTPStatus.ACCEPTED) # noqa: RUF100, FAST003
async def webhook_run_flow(
flow_id_or_name: str,
flow: Annotated[Flow, Depends(get_flow_by_id_or_endpoint_name)],
request: Request,
background_tasks: BackgroundTasks,
):
"""Run a flow using a webhook request.

Args:
flow_id_or_name (str): The flow ID or endpoint name.
flow (Flow): The flow to be executed.
request (Request): The incoming HTTP request.
background_tasks (BackgroundTasks): The background tasks manager.
flow_id_or_name: The flow ID or endpoint name (used by dependency).
flow: The flow to be executed.
request: The incoming HTTP request.

Returns:
dict: A dictionary containing the status of the task.
A dictionary containing the status of the task.

Raises:
HTTPException: If the flow is not found or if there is an error processing the request.
Expand Down Expand Up @@ -662,17 +758,28 @@ async def webhook_run_flow(
session_id=None,
)

# Check if there are UI listeners connected via SSE
flow_id_str = str(flow.id)
has_ui_listeners = webhook_event_manager.has_listeners(flow_id_str)

await logger.adebug("Starting background task")
run_id = str(uuid4())
background_tasks.add_task(
simple_run_flow_task,
flow=flow,
input_request=input_request,
api_key_user=webhook_user,
telemetry_service=telemetry_service,
start_time=start_time,
run_id=run_id,

# Use asyncio.create_task to run in same event loop (needed for SSE)
background_task = asyncio.create_task(
simple_run_flow_task(
flow=flow,
input_request=input_request,
api_key_user=webhook_user,
telemetry_service=telemetry_service,
start_time=start_time,
run_id=run_id,
emit_events=has_ui_listeners,
flow_id=flow_id_str,
)
)
# Fire-and-forget: log exceptions but don't block
background_task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None)
except Exception as exc:
error_msg = str(exc)
raise HTTPException(status_code=500, detail=error_msg) from exc
Expand Down
171 changes: 171 additions & 0 deletions src/backend/base/langflow/services/event_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
"""Event Manager for Webhook Real-Time Updates.

This module provides an in-memory event broadcasting system for webhook builds.
When a UI is connected via SSE, it receives real-time build events.
"""

from __future__ import annotations

import asyncio
import time
from collections import defaultdict
from typing import Any

from loguru import logger

# Constants
SSE_QUEUE_MAX_SIZE = 100
SSE_EMIT_TIMEOUT_SECONDS = 1.0
SECONDS_PER_MINUTE = 60


class WebhookEventManager:
"""Manages SSE connections and broadcasts build events for webhooks.

When a flow is open in the UI, it subscribes to webhook events.
When a webhook is triggered, events are emitted to all subscribers.

This provides the same visual experience as clicking "Play" in the UI,
but triggered by external webhook calls.
"""

def __init__(self):
"""Initialize the event manager with empty listeners."""
# flow_id → set of queues (one per SSE connection)
self._listeners: dict[str, set[asyncio.Queue]] = defaultdict(set)
# flow_id → {vertex_id → start_time} for duration calculation
self._vertex_start_times: dict[str, dict[str, float]] = defaultdict(dict)
self._lock = asyncio.Lock()
logger.debug("WebhookEventManager initialized")

def record_build_start(self, flow_id: str, vertex_id: str) -> None:
"""Record when a vertex build starts for duration calculation."""
self._vertex_start_times[flow_id][vertex_id] = time.time()

def get_build_duration(self, flow_id: str, vertex_id: str) -> str | None:
"""Get the formatted build duration for a vertex."""
start_time = self._vertex_start_times.get(flow_id, {}).get(vertex_id)
if start_time is None:
return None
elapsed = time.time() - start_time
# Clean up
self._vertex_start_times[flow_id].pop(vertex_id, None)
return self._format_duration(elapsed)

@staticmethod
def _format_duration(seconds: float) -> str:
"""Format duration in a human-readable way."""
if seconds < 1:
return f"{int(seconds * 1000)} ms"
if seconds < SECONDS_PER_MINUTE:
return f"{seconds:.1f} s"
minutes = int(seconds // SECONDS_PER_MINUTE)
secs = seconds % SECONDS_PER_MINUTE
return f"{minutes}m {secs:.1f}s"

async def subscribe(self, flow_id: str) -> asyncio.Queue:
"""Subscribe to receive events for a specific flow.

Args:
flow_id: The flow ID to subscribe to

Returns:
Queue that will receive events for this flow
"""
queue: asyncio.Queue = asyncio.Queue(maxsize=SSE_QUEUE_MAX_SIZE)
async with self._lock:
self._listeners[flow_id].add(queue)
listener_count = len(self._listeners[flow_id])

logger.info(f"New subscriber for flow {flow_id}. Total listeners: {listener_count}")
return queue

async def unsubscribe(self, flow_id: str, queue: asyncio.Queue) -> None:
"""Unsubscribe from flow events.

Args:
flow_id: The flow ID to unsubscribe from
queue: The queue to remove
"""
async with self._lock:
if flow_id in self._listeners:
self._listeners[flow_id].discard(queue)
listener_count = len(self._listeners[flow_id])

# Clean up empty sets
if not self._listeners[flow_id]:
del self._listeners[flow_id]
logger.info(f"All subscribers disconnected for flow {flow_id}")
else:
logger.info(f"Subscriber disconnected from flow {flow_id}. Remaining: {listener_count}")

async def emit(self, flow_id: str, event_type: str, data: Any) -> None:
"""Emit an event to all subscribers of a flow.

Args:
flow_id: The flow ID to emit to
event_type: Type of event (build_start, end_vertex, etc.)
data: Event data (will be JSON serialized)
"""
async with self._lock:
listeners = self._listeners.get(flow_id, set()).copy()

if not listeners:
# No one listening, skip emission (performance optimization)
return

logger.debug(f"Emitting {event_type} to {len(listeners)} listeners for flow {flow_id}")

# Prepare event
event = {
"event": event_type,
"data": data,
"timestamp": time.time(),
}

# Send to all queues
dead_queues: set[asyncio.Queue] = set()

for queue in listeners:
try:
await asyncio.wait_for(queue.put(event), timeout=SSE_EMIT_TIMEOUT_SECONDS)
except asyncio.TimeoutError:
# Queue is full (slow consumer), skip this event
logger.warning(f"Queue full for flow {flow_id}, dropping event {event_type}")
except Exception as e: # noqa: BLE001
# Queue is closed or broken, mark for removal
logger.error(f"Error putting event in queue for flow {flow_id}: {e}")
dead_queues.add(queue)

# Clean up dead queues
if dead_queues:
async with self._lock:
if flow_id in self._listeners:
self._listeners[flow_id] -= dead_queues
if not self._listeners[flow_id]:
del self._listeners[flow_id]

def has_listeners(self, flow_id: str) -> bool:
"""Check if there are any active listeners for a flow."""
return flow_id in self._listeners and len(self._listeners[flow_id]) > 0


# Module-level instance (can be replaced in tests via dependency injection)
# TODO: Consider migrating to langflow's service manager pattern for better DI
_webhook_event_manager: WebhookEventManager | None = None


def get_webhook_event_manager() -> WebhookEventManager:
"""Get the webhook event manager instance.

Returns:
The WebhookEventManager singleton instance.
"""
global _webhook_event_manager # noqa: PLW0603
if _webhook_event_manager is None:
_webhook_event_manager = WebhookEventManager()
return _webhook_event_manager


# Backwards compatibility alias
webhook_event_manager = get_webhook_event_manager()
Loading
Loading