Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 121 additions & 2 deletions src/backend/base/langflow/helpers/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from fastapi import HTTPException
from lfx.log.logger import logger
from pydantic.v1 import BaseModel, Field, create_model
from sqlmodel import select
from sqlalchemy.orm import aliased
from sqlmodel import asc, desc, select

from langflow.schema.schema import INPUT_FIELD_NAME
from langflow.services.database.models.flow.model import Flow, FlowRead
Expand All @@ -19,13 +20,17 @@
from lfx.graph.schema import RunOutputs
from lfx.graph.vertex.base import Vertex

from langflow.schema.data import Data
from langflow.schema.data import Data

INPUT_TYPE_MAP = {
"ChatInput": {"type_hint": "Optional[str]", "default": '""'},
"TextInput": {"type_hint": "Optional[str]", "default": '""'},
"JSONInput": {"type_hint": "Optional[dict]", "default": "{}"},
}
SORT_DISPATCHER = {
"asc": asc,
"desc": desc,
}


async def list_flows(*, user_id: str | None = None) -> list[Data]:
Expand All @@ -44,6 +49,120 @@ async def list_flows(*, user_id: str | None = None) -> list[Data]:
raise ValueError(msg) from e


async def list_flows_by_flow_folder(
*,
user_id: str | None = None,
flow_id: str | None = None,
order_params: dict | None = {"column": "updated_at", "direction": "desc"}, # noqa: B006
) -> list[Data]:
if not user_id:
msg = "Session is invalid"
raise ValueError(msg)
if not flow_id:
msg = "Flow ID is required"
raise ValueError(msg)
try:
async with session_scope() as session:
uuid_user_id = UUID(user_id) if isinstance(user_id, str) else user_id
uuid_flow_id = UUID(flow_id) if isinstance(flow_id, str) else flow_id
# get all flows belonging to the specified user
# and inside the same folder as the specified flow
flow_ = aliased(Flow) # flow table alias, used to retrieve the folder
stmt = (
select(Flow.id, Flow.name, Flow.updated_at)
.join(flow_, Flow.folder_id == flow_.folder_id)
.where(flow_.id == uuid_flow_id)
.where(flow_.user_id == uuid_user_id)
.where(Flow.user_id == uuid_user_id)
.where(Flow.id != uuid_flow_id)
)
# sort flows by the specified column and direction
if order_params is not None:
sort_col = getattr(Flow, order_params.get("column", "updated_at"), Flow.updated_at)
sort_dir = SORT_DISPATCHER.get(order_params.get("direction", "desc"), desc)
stmt = stmt.order_by(sort_dir(sort_col))

flows = (await session.exec(stmt)).all()
return [Data(data=dict(flow._mapping)) for flow in flows] # noqa: SLF001
except Exception as e:
msg = f"Error listing flows: {e}"
raise ValueError(msg) from e


async def list_flows_by_folder_id(
*, user_id: str | None = None, folder_id: str | None = None, order_params: dict | None = None
) -> list[Data]:
if not user_id:
msg = "Session is invalid"
raise ValueError(msg)
if not folder_id:
msg = "Folder ID is required"
raise ValueError(msg)

if order_params is None:
order_params = {"column": "updated_at", "direction": "desc"}

try:
async with session_scope() as session:
uuid_user_id = UUID(user_id) if isinstance(user_id, str) else user_id
uuid_folder_id = UUID(folder_id) if isinstance(folder_id, str) else folder_id
stmt = (
select(Flow.id, Flow.name, Flow.updated_at)
.where(Flow.user_id == uuid_user_id)
.where(Flow.folder_id == uuid_folder_id)
)
if order_params is not None:
sort_col = getattr(Flow, order_params.get("column", "updated_at"), Flow.updated_at)
sort_dir = SORT_DISPATCHER.get(order_params.get("direction", "desc"), desc)
stmt = stmt.order_by(sort_dir(sort_col))

flows = (await session.exec(stmt)).all()
return [Data(data=dict(flow._mapping)) for flow in flows] # noqa: SLF001
except Exception as e:
msg = f"Error listing flows: {e}"
raise ValueError(msg) from e


async def get_flow_by_id_or_name(
*,
user_id: str | None = None,
flow_id: str | None = None,
flow_name: str | None = None,
) -> Data | None:
if not user_id:
msg = "Session is invalid"
raise ValueError(msg)
if not (flow_id or flow_name):
msg = "Flow ID or Flow Name is required"
raise ValueError(msg)

# set user provided flow id or flow name.
# if both are provided, flow_id is used.
attr, val = None, None
if flow_name:
attr = "name"
val = flow_name
if flow_id:
attr = "id"
val = flow_id
if not (attr and val):
msg = "Flow id or Name is required"
raise ValueError(msg)
try:
async with session_scope() as session:
uuid_user_id = UUID(user_id) if isinstance(user_id, str) else user_id # type: ignore[assignment]
uuid_flow_id_or_name = val # type: ignore[assignment]
if isinstance(val, str) and attr == "id":
uuid_flow_id_or_name = UUID(val) # type: ignore[assignment]
stmt = select(Flow).where(Flow.user_id == uuid_user_id).where(getattr(Flow, attr) == uuid_flow_id_or_name)
flow = (await session.exec(stmt)).first()
return flow.to_data() if flow else None

except Exception as e:
msg = f"Error getting flow by id: {e}"
raise ValueError(msg) from e


async def load_flow(
user_id: str, flow_id: str | None = None, flow_name: str | None = None, tweaks: dict | None = None
) -> Graph:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ def to_data(self):
"name": serialized.pop("name"),
"description": serialized.pop("description"),
"updated_at": serialized.pop("updated_at"),
"folder_id": serialized.pop("folder_id"),
}
return Data(data=data)

Expand Down
Empty file.
Empty file.
Empty file.
Loading
Loading