Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 47 additions & 6 deletions src/backend/base/langflow/api/v1/published_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,30 @@ async def list_all_published_flows(
Public endpoint - no authentication required.
Returns paginated list with denormalized flow and publisher information.
"""
return await _list_all_published_flows(
session=session,
page=page,
limit=limit,
search=search,
category=category,
tags=tags, # This is now a string, not a Query object
status_filter=status_filter,
sort_by=sort_by,
order=order,
)


async def _list_all_published_flows(
session: AsyncSession,
page: int = 1,
limit: int = 10,
search: str | None = None,
category: str | None = None,
tags: str | None = None,
status_filter: str | None = None,
sort_by: str = "published_at",
order: str = "desc",
):
# Base query - apply status filter
if status_filter == "published":
query = (
Expand All @@ -740,7 +764,12 @@ async def list_all_published_flows(
# Apply search filter on denormalized flow_name and description
if search:
search_pattern = f"%{search}%"
query = query.where(or_(PublishedFlow.flow_name.ilike(search_pattern), PublishedFlow.description.ilike(search_pattern)))
query = query.where(
or_(
PublishedFlow.flow_name.ilike(search_pattern),
PublishedFlow.description.ilike(search_pattern),
)
)

# Apply category filter
if category:
Expand All @@ -756,25 +785,34 @@ async def list_all_published_flows(

# Count total with same status filter
if status_filter == "published":
count_query = select(func.count(PublishedFlow.id)).where(PublishedFlow.status == PublishStatusEnum.PUBLISHED)
count_query = select(func.count(PublishedFlow.id)).where(
PublishedFlow.status == PublishStatusEnum.PUBLISHED
)
elif status_filter == "unpublished":
count_query = select(func.count(PublishedFlow.id)).where(PublishedFlow.status == PublishStatusEnum.UNPUBLISHED)
count_query = select(func.count(PublishedFlow.id)).where(
PublishedFlow.status == PublishStatusEnum.UNPUBLISHED
)
else: # "all" or None - count all flows (default)
count_query = select(func.count(PublishedFlow.id))

# Apply same filters to count query
if search:
search_pattern = f"%{search}%"
count_query = count_query.where(
or_(PublishedFlow.flow_name.ilike(search_pattern), PublishedFlow.description.ilike(search_pattern))
or_(
PublishedFlow.flow_name.ilike(search_pattern),
PublishedFlow.description.ilike(search_pattern),
)
)
if category:
count_query = count_query.where(PublishedFlow.category == category)
if tags:
tag_list = [tag.strip() for tag in tags.split(",")]
for tag in tag_list:
# Cast JSON to text and check if tag is in the stringified array
count_query = count_query.where(cast(PublishedFlow.tags, Text).contains(f'"{tag}"'))
count_query = count_query.where(
cast(PublishedFlow.tags, Text).contains(f'"{tag}"')
)

total_result = await session.exec(count_query)
total = total_result.one()
Expand Down Expand Up @@ -804,7 +842,10 @@ async def list_all_published_flows(
rows = results.all()

# Format response - all fields already denormalized in PublishedFlow
items = [PublishedFlowRead.model_validate(published_flow, from_attributes=True) for published_flow in rows]
items = [
PublishedFlowRead.model_validate(published_flow, from_attributes=True)
for published_flow in rows
]

pages = (total + limit - 1) // limit if limit > 0 else 0

Expand Down
11 changes: 6 additions & 5 deletions src/backend/base/langflow/base/tools/run_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from langflow.field_typing import Tool
from langflow.graph.graph.base import Graph
from langflow.graph.vertex.base import Vertex
from langflow.helpers.flow import get_flow_inputs
from langflow.helpers.flow import get_flow_inputs, load_flow
from langflow.inputs.inputs import DropdownInput, InputTypes, MessageInput
from langflow.logging.logger import logger
from langflow.schema.data import Data
Expand Down Expand Up @@ -110,21 +110,22 @@ async def message_output(self) -> Message:
async def get_flow_names(self) -> list[str]:
# TODO: get flfow ID with flow name
flow_data = await self.alist_flows()
return [flow_data.data["name"] for flow_data in flow_data]
return [flow_data.flow_name for flow_data in flow_data['items']]

async def get_flow(self, flow_name_selected: str) -> Data | None:
# get flow from flow id
flow_datas = await self.alist_flows()
for flow_data in flow_datas:
if flow_data.data["name"] == flow_name_selected:
for flow_data in flow_datas['items']:
if flow_data.flow_name == flow_name_selected:
return flow_data
return None

async def get_graph(self, flow_name_selected: str | None = None) -> Graph:
if flow_name_selected:
flow_data = await self.get_flow(flow_name_selected)
if flow_data:
return Graph.from_payload(flow_data.data["data"])
flow = await load_flow(user_id=self.user_id,flow_id=str(flow_data.flow_id))
return flow
msg = "Flow not found"
raise ValueError(msg)
# Ensure a Graph is always returned or an exception is raised
Expand Down
Loading