Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/backend/base/langflow/api/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,14 @@ async def _build_vertex(vertex_id: str, graph: Graph, event_manager: EventManage
result_data_response.duration = duration
result_data_response.timedelta = timedelta
vertex.add_build_time(timedelta)
inactivated_vertices = list(graph.inactivated_vertices)
# Capture both inactivated and conditionally excluded vertices
inactivated_vertices = list(graph.inactivated_vertices.union(graph.conditionally_excluded_vertices))
graph.reset_inactivated_vertices()
graph.reset_activated_vertices()

# Note: Do not reset conditionally_excluded_vertices each iteration
# This is handled by the ConditionalRouter component

# graph.stop_vertex tells us if the user asked
# to stop the build of the graph at a certain vertex
# if it is in next_vertices_ids, we need to remove other
Expand Down
43 changes: 40 additions & 3 deletions src/backend/base/langflow/components/logic/conditional_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,20 +129,55 @@ def evaluate_condition(self, input_text: str, match_text: str, operator: str, *,
return False

def iterate_and_stop_once(self, route_to_stop: str):
"""Handles cycle iteration counting and branch exclusion.

Uses two complementary mechanisms:
1. stop() - ACTIVE/INACTIVE state for cycle management (gets reset each iteration)
2. exclude_branch_conditionally() - Persistent exclusion for conditional routing

When max_iterations is reached, breaks the cycle by allowing the default_route to execute.
"""
if not self.__iteration_updated:
self.update_ctx({f"{self._id}_iteration": self.ctx.get(f"{self._id}_iteration", 0) + 1})
self.__iteration_updated = True
if self.ctx.get(f"{self._id}_iteration", 0) >= self.max_iterations and route_to_stop == self.default_route:
current_iteration = self.ctx.get(f"{self._id}_iteration", 0)

# Check if max iterations reached and we're trying to stop the default route
if current_iteration >= self.max_iterations and route_to_stop == self.default_route:
# Clear ALL conditional exclusions to allow default route to execute
if self._id in self.graph.conditional_exclusion_sources:
previous_exclusions = self.graph.conditional_exclusion_sources[self._id]
self.graph.conditionally_excluded_vertices -= previous_exclusions
del self.graph.conditional_exclusion_sources[self._id]

# Switch which route to stop - stop the NON-default route to break the cycle
route_to_stop = "true_result" if route_to_stop == "false_result" else "false_result"

# Call stop to break the cycle
self.stop(route_to_stop)
# Don't apply conditional exclusion when breaking cycle
return

# Normal case: Use BOTH mechanisms
# 1. stop() for cycle management (marks INACTIVE, updates run manager, gets reset)
self.stop(route_to_stop)

# 2. Conditional exclusion for persistent routing (doesn't get reset except by this router)
self.graph.exclude_branch_conditionally(self._id, output_name=route_to_stop)

def true_response(self) -> Message:
result = self.evaluate_condition(
self.input_text, self.match_text, self.operator, case_sensitive=self.case_sensitive
)
if result:

# Check if we should force output due to max_iterations on default route
current_iteration = self.ctx.get(f"{self._id}_iteration", 0)
force_output = current_iteration >= self.max_iterations and self.default_route == "true_result"

if result or force_output:
self.status = self.true_case_message
self.iterate_and_stop_once("false_result")
if not force_output: # Only stop the other branch if not forcing due to max iterations
self.iterate_and_stop_once("false_result")
return self.true_case_message
self.iterate_and_stop_once("true_result")
return Message(content="")
Expand All @@ -151,10 +186,12 @@ def false_response(self) -> Message:
result = self.evaluate_condition(
self.input_text, self.match_text, self.operator, case_sensitive=self.case_sensitive
)

if not result:
self.status = self.false_case_message
self.iterate_and_stop_once("true_result")
return self.false_case_message

self.iterate_and_stop_once("false_result")
return Message(content="")

Expand Down
65 changes: 61 additions & 4 deletions src/backend/base/langflow/graph/graph/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ def __init__(
self.vertices_to_run: set[str] = set()
self.stop_vertex: str | None = None
self.inactive_vertices: set = set()
# Conditional routing system (separate from ACTIVE/INACTIVE cycle management)
self.conditionally_excluded_vertices: set = set() # Vertices excluded by conditional routing
self.conditional_exclusion_sources: dict[str, set[str]] = {} # Maps source vertex -> excluded vertices
self.edges: list[CycleEdge] = []
self.vertices: list[Vertex] = []
self.run_manager = RunnableVerticesManager()
Expand Down Expand Up @@ -943,6 +946,59 @@ def mark_branch(self, vertex_id: str, state: str, output_name: str | None = None
vertices_to_run=self.vertices_to_run,
)

def exclude_branch_conditionally(self, vertex_id: str, output_name: str | None = None) -> None:
"""Marks a branch as conditionally excluded (for conditional routing).

This system is separate from the ACTIVE/INACTIVE state used for cycle management:
- ACTIVE/INACTIVE: Reset after each cycle iteration to allow cycles to continue
- Conditional exclusion: Persists until explicitly cleared by the same source vertex

Used by ConditionalRouter to ensure only one branch executes per condition evaluation.
If this vertex has previously excluded branches, they are cleared first to allow
re-evaluation on subsequent iterations (e.g., in cycles where condition may change).

Args:
vertex_id: The source vertex making the exclusion decision
output_name: The output name to follow when excluding downstream vertices
"""
# Clear any previous exclusions from this source vertex
if vertex_id in self.conditional_exclusion_sources:
previous_exclusions = self.conditional_exclusion_sources[vertex_id]
self.conditionally_excluded_vertices -= previous_exclusions
del self.conditional_exclusion_sources[vertex_id]

# Now exclude the new branch
visited = set()
excluded = set()
self._exclude_branch_conditionally(vertex_id, visited, excluded, output_name, skip_first=True)

# Track which vertices this source excluded
if excluded:
self.conditional_exclusion_sources[vertex_id] = excluded

def _exclude_branch_conditionally(
self, vertex_id: str, visited: set, excluded: set, output_name: str | None = None, *, skip_first: bool = False
) -> None:
"""Recursively excludes vertices in a branch for conditional routing."""
if vertex_id in visited:
return
visited.add(vertex_id)

# Don't exclude the first vertex (the router itself)
if not skip_first:
self.conditionally_excluded_vertices.add(vertex_id)
excluded.add(vertex_id)

for child_id in self.parent_child_map[vertex_id]:
# If we're at the router (skip_first=True) and have an output_name,
# only follow edges from that specific output
if skip_first and output_name:
edge = self.get_edge(vertex_id, child_id)
if edge and edge.source_handle.name != output_name:
continue
# After the first level, exclude all descendants
self._exclude_branch_conditionally(child_id, visited, excluded, output_name=None, skip_first=False)

def get_edge(self, source_id: str, target_id: str) -> CycleEdge | None:
"""Returns the edge between two vertices."""
for edge in self.edges:
Expand Down Expand Up @@ -2082,6 +2138,9 @@ def sort_layer_by_avg_build_time(vertices_ids: list[str]) -> list[str]:

def is_vertex_runnable(self, vertex_id: str) -> bool:
"""Returns whether a vertex is runnable."""
# Check if vertex is conditionally excluded (for conditional routing)
if vertex_id in self.conditionally_excluded_vertices:
return False
is_active = self.get_vertex(vertex_id).is_active()
is_loop = self.get_vertex(vertex_id).is_loop
return self.run_manager.is_vertex_runnable(vertex_id, is_active=is_active, is_loop=is_loop)
Expand Down Expand Up @@ -2114,10 +2173,8 @@ def find_runnable_predecessors(predecessor_id: str) -> None:
if predecessor_id in visited:
return
visited.add(predecessor_id)
predecessor_vertex = self.get_vertex(predecessor_id)
is_active = predecessor_vertex.is_active()
is_loop = predecessor_vertex.is_loop
if self.run_manager.is_vertex_runnable(predecessor_id, is_active=is_active, is_loop=is_loop):

if self.is_vertex_runnable(predecessor_id):
runnable_vertices.append(predecessor_id)
else:
for pred_pred_id in self.run_manager.run_predecessors.get(predecessor_id, []):
Expand Down