Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 174 additions & 10 deletions src/backend/base/langflow/api/v1/flow_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
FlowVersionCreate,
FlowVersionRead,
FlowVersionRejectRequest,
FlowVersionPaginatedResponse,
)
from langflow.services.database.models.version_flow_input_sample.model import (
VersionFlowInputSample,
Expand Down Expand Up @@ -251,22 +252,37 @@ async def submit_for_approval(
)


@router.get("/pending-reviews", response_model=list[FlowVersionRead])
@router.get("/pending-reviews", response_model=FlowVersionPaginatedResponse)
async def get_pending_reviews(
session: DbSession,
current_user: CurrentActiveUser,
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=100),
page: int = Query(1, ge=1),
limit: int = Query(12, ge=1, le=100),
):
"""
Get all flow versions pending review (status = Submitted).

This endpoint is intended for admin users to see all submissions awaiting approval.
Returns paginated results.
"""
try:
# Get "Submitted" status ID
submitted_status_id = await _get_status_id_by_name(session, FlowStatusEnum.SUBMITTED.value)

# Count total items
count_stmt = (
select(func.count())
.select_from(FlowVersion)
.where(FlowVersion.status_id == submitted_status_id)
)
count_result = await session.exec(count_stmt)
total = count_result.one()

# Calculate pagination
pages = (total + limit - 1) // limit # Ceiling division
offset = (page - 1) * limit

# Get paginated results
stmt = (
select(FlowVersion)
.where(FlowVersion.status_id == submitted_status_id)
Expand All @@ -275,13 +291,13 @@ async def get_pending_reviews(
joinedload(FlowVersion.status),
)
.order_by(FlowVersion.submitted_at.desc())
.offset(skip)
.offset(offset)
.limit(limit)
)
result = await session.exec(stmt)
versions = result.unique().all()

return [
items = [
FlowVersionRead(
id=v.id,
original_flow_id=v.original_flow_id,
Expand Down Expand Up @@ -312,6 +328,13 @@ async def get_pending_reviews(
for v in versions
]

return FlowVersionPaginatedResponse(
items=items,
total=total,
page=page,
pages=pages,
)

except HTTPException:
raise
except Exception as e:
Expand All @@ -322,6 +345,125 @@ async def get_pending_reviews(
)


@router.get("/all", response_model=FlowVersionPaginatedResponse)
async def get_all_flow_versions(
session: DbSession,
current_user: CurrentActiveUser,
page: int = Query(1, ge=1),
limit: int = Query(12, ge=1, le=100),
status: str | None = Query(None),
):
"""
Get all flow versions with optional status filtering.

This endpoint returns all flow versions (Submitted, Approved, Rejected) in a single list.
Optionally filter by status using the status parameter.
Returns paginated results.

Valid status values: "Submitted", "Approved", "Rejected"
If status is not provided, returns all flow versions.
"""
try:
# Build base query
query_conditions = []

if status:
# Validate status name
valid_statuses = ["Submitted", "Approved", "Rejected"]
if status not in valid_statuses:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid status. Must be one of: {', '.join(valid_statuses)}",
)
status_id = await _get_status_id_by_name(session, status)
query_conditions.append(FlowVersion.status_id == status_id)
else:
# Get all relevant statuses (Submitted, Approved, Rejected)
submitted_id = await _get_status_id_by_name(session, FlowStatusEnum.SUBMITTED.value)
approved_id = await _get_status_id_by_name(session, FlowStatusEnum.APPROVED.value)
rejected_id = await _get_status_id_by_name(session, FlowStatusEnum.REJECTED.value)
query_conditions.append(
FlowVersion.status_id.in_([submitted_id, approved_id, rejected_id])
)

# Count total items
count_stmt = (
select(func.count())
.select_from(FlowVersion)
.where(*query_conditions)
)
count_result = await session.exec(count_stmt)
total = count_result.one()

# Calculate pagination
pages = (total + limit - 1) // limit # Ceiling division
offset = (page - 1) * limit

# Get paginated results
stmt = (
select(FlowVersion)
.where(*query_conditions)
.options(
joinedload(FlowVersion.submitter),
joinedload(FlowVersion.reviewer),
joinedload(FlowVersion.status),
)
.order_by(FlowVersion.submitted_at.desc())
.offset(offset)
.limit(limit)
)
result = await session.exec(stmt)
versions = result.unique().all()

items = [
FlowVersionRead(
id=v.id,
original_flow_id=v.original_flow_id,
version_flow_id=v.version_flow_id,
status_id=v.status_id,
version=v.version,
title=v.title,
description=v.description,
tags=v.tags,
agent_logo=v.agent_logo,
sample_id=v.sample_id,
submitted_by=v.submitted_by,
submitted_by_name=v.submitted_by_name,
submitted_by_email=v.submitted_by_email,
submitted_at=v.submitted_at,
reviewed_by=v.reviewed_by,
reviewed_by_name=v.reviewed_by_name,
reviewed_by_email=v.reviewed_by_email,
reviewed_at=v.reviewed_at,
rejection_reason=v.rejection_reason,
created_at=v.created_at,
updated_at=v.updated_at,
status_name=v.status.status_name if v.status else None,
# Use stored name/email, fallback to User relationship for backward compatibility
submitter_name=v.submitted_by_name or (v.submitter.username if v.submitter else None),
submitter_email=v.submitted_by_email,
reviewer_name=v.reviewed_by_name or (v.reviewer.username if v.reviewer else None),
)
for v in versions
]

return FlowVersionPaginatedResponse(
items=items,
total=total,
page=page,
pages=pages,
)

except HTTPException:
raise
except Exception as e:
logger.error(f"Error fetching all flow versions: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to fetch flow versions: {str(e)}",
)


@router.get("/my-submissions", response_model=list[FlowVersionRead])
async def get_my_submissions(
session: DbSession,
Expand Down Expand Up @@ -385,18 +527,19 @@ async def get_my_submissions(
)


@router.get("/by-status/{status_name}", response_model=list[FlowVersionRead])
@router.get("/by-status/{status_name}", response_model=FlowVersionPaginatedResponse)
async def get_versions_by_status(
status_name: str,
session: DbSession,
current_user: CurrentActiveUser,
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=100),
page: int = Query(1, ge=1),
limit: int = Query(12, ge=1, le=100),
):
"""
Get flow versions filtered by status name.

Valid status names: Draft, Submitted, Approved, Rejected, Published, Unpublished, Deleted
Returns paginated results.
"""
try:
# Validate status name
Expand All @@ -409,6 +552,20 @@ async def get_versions_by_status(

status_id = await _get_status_id_by_name(session, status_name)

# Count total items
count_stmt = (
select(func.count())
.select_from(FlowVersion)
.where(FlowVersion.status_id == status_id)
)
count_result = await session.exec(count_stmt)
total = count_result.one()

# Calculate pagination
pages = (total + limit - 1) // limit # Ceiling division
offset = (page - 1) * limit

# Get paginated results
stmt = (
select(FlowVersion)
.where(FlowVersion.status_id == status_id)
Expand All @@ -418,13 +575,13 @@ async def get_versions_by_status(
joinedload(FlowVersion.status),
)
.order_by(FlowVersion.submitted_at.desc())
.offset(skip)
.offset(offset)
.limit(limit)
)
result = await session.exec(stmt)
versions = result.unique().all()

return [
items = [
FlowVersionRead(
id=v.id,
original_flow_id=v.original_flow_id,
Expand Down Expand Up @@ -456,6 +613,13 @@ async def get_versions_by_status(
for v in versions
]

return FlowVersionPaginatedResponse(
items=items,
total=total,
page=page,
pages=pages,
)

except HTTPException:
raise
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,5 +236,14 @@ class FlowVersionRejectRequest(SQLModel):
rejection_reason: str | None = None


class FlowVersionPaginatedResponse(SQLModel):
"""Schema for paginated flow version responses."""

items: list[FlowVersionRead]
total: int
page: int
pages: int


# Rebuild Pydantic models to resolve forward references
FlowVersionRead.model_rebuild()
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { useState } from "react";
import { useNavigate } from "react-router-dom";
import { Button } from "@/components/ui/button";
import {
Dialog,
Expand Down Expand Up @@ -31,6 +32,7 @@ import SubmitForApprovalModal from "@/modals/submitForApprovalModal";
import PublishFlowModal from "@/modals/publishFlowModal";

export default function FlowToolbarOptions() {
const navigate = useNavigate();
const [open, setOpen] = useState<boolean>(false);
const [openSubmitModal, setOpenSubmitModal] = useState<boolean>(false);
const [openPublishModal, setOpenPublishModal] = useState<boolean>(false);
Expand Down Expand Up @@ -104,6 +106,7 @@ export default function FlowToolbarOptions() {
approveVersion(latestVersionId, {
onSuccess: () => {
setSuccessData({ title: `"${currentFlowName}" has been approved` });
navigate("/all-requests");
},
onError: (error: any) => {
setErrorData({
Expand Down Expand Up @@ -136,6 +139,7 @@ export default function FlowToolbarOptions() {
setSuccessData({ title: `"${currentFlowName}" has been rejected` });
setOpenRejectModal(false);
setRejectionReason("");
navigate("/all-requests");
},
onError: (error: any) => {
setErrorData({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ export * from "./use-get-flow-latest-status";
export * from "./use-submit-flow-for-approval";
export * from "./use-get-pending-reviews";
export * from "./use-get-versions-by-status";
export * from "./use-get-all-flow-versions";
export * from "./use-approve-version";
export * from "./use-reject-version";
export * from "./use-cancel-submission";
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { useQuery } from "@tanstack/react-query";
import { api } from "../../api";
import { getURL } from "../../helpers/constants";
import type { FlowVersionPaginatedResponse } from "./use-get-pending-reviews";

export const useGetAllFlowVersions = (
page: number = 1,
limit: number = 12,
status?: string
) => {
return useQuery<FlowVersionPaginatedResponse>({
queryKey: ["all-flow-versions", page, limit, status],
queryFn: async () => {
const response = await api.get<FlowVersionPaginatedResponse>(
`${getURL("FLOW_VERSIONS")}/all`,
{
params: {
page,
limit,
...(status && status !== "all" ? { status } : {}),
},
}
);
return response.data;
},
staleTime: 0, // Always fetch fresh data
refetchOnMount: "always", // Refetch when component mounts
refetchOnWindowFocus: true,
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,22 @@ export interface FlowVersionRead {
updated_at: string;
}

export const useGetPendingReviews = () => {
return useQuery<FlowVersionRead[]>({
queryKey: ["pending-reviews"],
export interface FlowVersionPaginatedResponse {
items: FlowVersionRead[];
total: number;
page: number;
pages: number;
}

export const useGetPendingReviews = (page: number = 1, limit: number = 12) => {
return useQuery<FlowVersionPaginatedResponse>({
queryKey: ["pending-reviews", page, limit],
queryFn: async () => {
const response = await api.get<FlowVersionRead[]>(
`${getURL("FLOW_VERSIONS")}/pending-reviews`
const response = await api.get<FlowVersionPaginatedResponse>(
`${getURL("FLOW_VERSIONS")}/pending-reviews`,
{
params: { page, limit },
}
);
return response.data;
},
Expand Down
Loading