diff --git a/src/backend/base/langflow/api/v1/monitor.py b/src/backend/base/langflow/api/v1/monitor.py index 10292ec91045..a5dacb7bef75 100644 --- a/src/backend/base/langflow/api/v1/monitor.py +++ b/src/backend/base/langflow/api/v1/monitor.py @@ -11,8 +11,8 @@ from langflow.schema.message import MessageResponse from langflow.services.auth.utils import get_current_active_user from langflow.services.database.models.message.model import MessageRead, MessageTable, MessageUpdate -from langflow.services.database.models.transactions.crud import transform_transaction_table -from langflow.services.database.models.transactions.model import TransactionTable +from langflow.services.database.models.transactions.crud import transform_transaction_table_for_logs +from langflow.services.database.models.transactions.model import TransactionLogsResponse, TransactionTable from langflow.services.database.models.vertex_builds.crud import ( delete_vertex_builds_by_flow_id, get_vertex_builds_by_flow_id, @@ -182,12 +182,12 @@ async def get_transactions( flow_id: Annotated[UUID, Query()], session: DbSession, params: Annotated[Params | None, Depends(custom_params)], -) -> Page[TransactionTable]: +) -> Page[TransactionLogsResponse]: try: stmt = ( select(TransactionTable) .where(TransactionTable.flow_id == flow_id) - .order_by(col(TransactionTable.timestamp)) + .order_by(col(TransactionTable.timestamp).desc()) ) import warnings @@ -195,6 +195,6 @@ async def get_transactions( warnings.filterwarnings( "ignore", category=DeprecationWarning, module=r"fastapi_pagination\.ext\.sqlalchemy" ) - return await apaginate(session, stmt, params=params, transformer=transform_transaction_table) + return await apaginate(session, stmt, params=params, transformer=transform_transaction_table_for_logs) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e diff --git a/src/backend/base/langflow/services/database/models/transactions/crud.py b/src/backend/base/langflow/services/database/models/transactions/crud.py index cdf1fd79f8e3..561b77447f2a 100644 --- a/src/backend/base/langflow/services/database/models/transactions/crud.py +++ b/src/backend/base/langflow/services/database/models/transactions/crud.py @@ -6,6 +6,7 @@ from langflow.services.database.models.transactions.model import ( TransactionBase, + TransactionLogsResponse, TransactionReadResponse, TransactionTable, ) @@ -76,7 +77,16 @@ async def log_transaction(db: AsyncSession, transaction: TransactionBase) -> Tra def transform_transaction_table( transaction: list[TransactionTable] | TransactionTable, -) -> list[TransactionReadResponse]: +) -> list[TransactionReadResponse] | TransactionReadResponse: if isinstance(transaction, list): return [TransactionReadResponse.model_validate(t, from_attributes=True) for t in transaction] return TransactionReadResponse.model_validate(transaction, from_attributes=True) + + +def transform_transaction_table_for_logs( + transaction: list[TransactionTable] | TransactionTable, +) -> list[TransactionLogsResponse] | TransactionLogsResponse: + """Transform transaction data for logs view, excluding error and flow_id.""" + if isinstance(transaction, list): + return [TransactionLogsResponse.model_validate(t, from_attributes=True) for t in transaction] + return TransactionLogsResponse.model_validate(transaction, from_attributes=True) diff --git a/src/backend/base/langflow/services/database/models/transactions/model.py b/src/backend/base/langflow/services/database/models/transactions/model.py index b42321ee6161..4b5c4f16df26 100644 --- a/src/backend/base/langflow/services/database/models/transactions/model.py +++ b/src/backend/base/langflow/services/database/models/transactions/model.py @@ -81,3 +81,27 @@ class TransactionTable(TransactionBase, table=True): # type: ignore[call-arg] class TransactionReadResponse(TransactionBase): id: UUID = Field(alias="transaction_id") flow_id: UUID + + +class TransactionLogsResponse(SQLModel): + """Transaction response model for logs view - excludes error and flow_id fields.""" + + model_config = {"populate_by_name": True, "from_attributes": True} + + id: UUID + timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + vertex_id: str = Field(nullable=False) + target_id: str | None = Field(default=None) + inputs: dict | None = Field(default=None) + outputs: dict | None = Field(default=None) + status: str = Field(nullable=False) + + @field_serializer("inputs") + def serialize_inputs(self, data) -> dict: + """Serialize with enforced text length and item count limits.""" + return serialize(data, max_length=get_max_text_length(), max_items=get_max_items_length()) + + @field_serializer("outputs") + def serialize_outputs(self, data) -> dict: + """Serialize outputs with enforced limits.""" + return serialize(data, max_length=get_max_text_length(), max_items=get_max_items_length()) diff --git a/src/backend/tests/unit/api/v1/test_transactions.py b/src/backend/tests/unit/api/v1/test_transactions.py new file mode 100644 index 000000000000..5e84b27215f6 --- /dev/null +++ b/src/backend/tests/unit/api/v1/test_transactions.py @@ -0,0 +1,226 @@ +"""Tests for transactions API endpoints and models.""" + +from datetime import datetime, timezone +from uuid import uuid4 + +import pytest +from fastapi import status +from httpx import AsyncClient +from langflow.services.database.models.transactions.crud import ( + transform_transaction_table, + transform_transaction_table_for_logs, +) +from langflow.services.database.models.transactions.model import ( + TransactionBase, + TransactionLogsResponse, + TransactionReadResponse, + TransactionTable, +) + + +class TestTransactionModels: + """Tests for transaction model classes.""" + + def test_transaction_base_creation(self): + """Test creating a TransactionBase instance.""" + flow_id = uuid4() + transaction = TransactionBase( + vertex_id="test-vertex-123", + target_id="target-vertex-456", + inputs={"key": "value"}, + outputs={"result": "success"}, + status="success", + flow_id=flow_id, + ) + + assert transaction.vertex_id == "test-vertex-123" + assert transaction.target_id == "target-vertex-456" + assert transaction.inputs == {"key": "value"} + assert transaction.outputs == {"result": "success"} + assert transaction.status == "success" + assert transaction.flow_id == flow_id + assert transaction.error is None + + def test_transaction_base_with_error(self): + """Test creating a TransactionBase with error status.""" + flow_id = uuid4() + transaction = TransactionBase( + vertex_id="test-vertex-123", + status="error", + error="Something went wrong", + flow_id=flow_id, + ) + + assert transaction.status == "error" + assert transaction.error == "Something went wrong" + + def test_transaction_base_filters_code_from_inputs(self): + """Test that 'code' key is filtered from inputs.""" + flow_id = uuid4() + inputs_with_code = {"key": "value", "code": "def foo(): pass"} + transaction = TransactionBase( + vertex_id="test-vertex", + inputs=inputs_with_code, + status="success", + flow_id=flow_id, + ) + + # The original dict should not be modified + assert "code" in inputs_with_code + # But the transaction inputs should not have 'code' + assert "code" not in transaction.inputs + assert transaction.inputs["key"] == "value" + + def test_transaction_base_flow_id_string_conversion(self): + """Test that string flow_id is converted to UUID.""" + flow_id_str = "12345678-1234-5678-1234-567812345678" + transaction = TransactionBase( + vertex_id="test-vertex", + status="success", + flow_id=flow_id_str, + ) + + from uuid import UUID + + assert isinstance(transaction.flow_id, UUID) + assert str(transaction.flow_id) == flow_id_str + + def test_transaction_logs_response_from_table(self): + """Test creating TransactionLogsResponse from TransactionTable.""" + table = TransactionTable( + id=uuid4(), + vertex_id="test-vertex", + target_id="target-vertex", + inputs={"input": "data"}, + outputs={"output": "result"}, + status="success", + error=None, + flow_id=uuid4(), + timestamp=datetime.now(timezone.utc), + ) + + response = TransactionLogsResponse.model_validate(table, from_attributes=True) + + assert response.id == table.id + assert response.vertex_id == table.vertex_id + assert response.target_id == table.target_id + assert response.status == table.status + # TransactionLogsResponse should not have error and flow_id fields + assert not hasattr(response, "error") or "error" not in response.model_fields + assert not hasattr(response, "flow_id") or "flow_id" not in response.model_fields + + +class TestTransactionTransformers: + """Tests for transaction transformer functions.""" + + def test_transform_transaction_table_single(self): + """Test transforming a single TransactionTable.""" + table = TransactionTable( + id=uuid4(), + vertex_id="test-vertex", + status="success", + flow_id=uuid4(), + ) + + result = transform_transaction_table(table) + assert isinstance(result, TransactionReadResponse) + + def test_transform_transaction_table_list(self): + """Test transforming a list of TransactionTable.""" + tables = [ + TransactionTable(id=uuid4(), vertex_id=f"vertex-{i}", status="success", flow_id=uuid4()) for i in range(3) + ] + + result = transform_transaction_table(tables) + assert isinstance(result, list) + assert len(result) == 3 + assert all(isinstance(r, TransactionReadResponse) for r in result) + + def test_transform_transaction_table_for_logs_single(self): + """Test transforming a single TransactionTable for logs view.""" + table = TransactionTable( + id=uuid4(), + vertex_id="test-vertex", + status="success", + flow_id=uuid4(), + ) + + result = transform_transaction_table_for_logs(table) + assert isinstance(result, TransactionLogsResponse) + + def test_transform_transaction_table_for_logs_list(self): + """Test transforming a list of TransactionTable for logs view.""" + tables = [ + TransactionTable(id=uuid4(), vertex_id=f"vertex-{i}", status="success", flow_id=uuid4()) for i in range(3) + ] + + result = transform_transaction_table_for_logs(tables) + assert isinstance(result, list) + assert len(result) == 3 + assert all(isinstance(r, TransactionLogsResponse) for r in result) + + +class TestTransactionsEndpoint: + """Tests for the /monitor/transactions endpoint.""" + + async def test_get_transactions_requires_auth(self, client: AsyncClient): + """Test that GET /monitor/transactions requires authentication.""" + response = await client.get("api/v1/monitor/transactions?flow_id=00000000-0000-0000-0000-000000000000") + assert response.status_code == status.HTTP_403_FORBIDDEN + + @pytest.mark.usefixtures("active_user") + async def test_get_transactions_returns_paginated_response(self, client: AsyncClient, logged_in_headers): + """Test that GET /monitor/transactions returns paginated response.""" + flow_id = "00000000-0000-0000-0000-000000000000" + response = await client.get(f"api/v1/monitor/transactions?flow_id={flow_id}", headers=logged_in_headers) + + assert response.status_code == status.HTTP_200_OK + result = response.json() + assert "items" in result + assert "total" in result + assert "page" in result + assert "size" in result + assert "pages" in result + assert isinstance(result["items"], list) + + @pytest.mark.usefixtures("active_user") + async def test_get_transactions_with_pagination_params(self, client: AsyncClient, logged_in_headers): + """Test GET /monitor/transactions with custom pagination parameters.""" + flow_id = "00000000-0000-0000-0000-000000000000" + response = await client.get( + f"api/v1/monitor/transactions?flow_id={flow_id}&page=1&size=10", headers=logged_in_headers + ) + + assert response.status_code == status.HTTP_200_OK + result = response.json() + assert result["page"] == 1 + assert result["size"] == 10 + + @pytest.mark.usefixtures("active_user") + async def test_get_transactions_requires_flow_id(self, client: AsyncClient, logged_in_headers): + """Test that GET /monitor/transactions requires flow_id parameter.""" + response = await client.get("api/v1/monitor/transactions", headers=logged_in_headers) + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + @pytest.mark.usefixtures("active_user") + async def test_get_transactions_invalid_flow_id_format(self, client: AsyncClient, logged_in_headers): + """Test GET /monitor/transactions with invalid flow_id format.""" + response = await client.get("api/v1/monitor/transactions?flow_id=invalid-uuid", headers=logged_in_headers) + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + @pytest.mark.usefixtures("active_user") + async def test_get_transactions_response_structure(self, client: AsyncClient, logged_in_headers): + """Test that transaction response items have the expected structure.""" + flow_id = uuid4() + response = await client.get(f"api/v1/monitor/transactions?flow_id={flow_id}", headers=logged_in_headers) + + assert response.status_code == status.HTTP_200_OK + result = response.json() + + # Verify pagination structure + assert "items" in result + assert "total" in result + assert "page" in result + assert "size" in result + assert "pages" in result + assert isinstance(result["items"], list) diff --git a/src/frontend/src/modals/flowLogsModal/config/flowLogsColumns.tsx b/src/frontend/src/modals/flowLogsModal/config/flowLogsColumns.tsx new file mode 100644 index 000000000000..8b6f0f32fed8 --- /dev/null +++ b/src/frontend/src/modals/flowLogsModal/config/flowLogsColumns.tsx @@ -0,0 +1,108 @@ +import type { ColDef } from "ag-grid-community"; +import { Badge } from "@/components/ui/badge"; + +const baseCellClass = + "flex items-center truncate cursor-default leading-normal"; + +const formatObjectValue = (value: unknown): string => { + if (value === null || value === undefined) { + return ""; + } + if (typeof value === "object") { + try { + return JSON.stringify(value); + } catch { + return String(value); + } + } + return String(value); +}; + +export function createFlowLogsColumns(): ColDef[] { + return [ + { + headerName: "Timestamp", + field: "timestamp", + flex: 1, + minWidth: 160, + filter: false, + sortable: false, + editable: false, + cellClass: baseCellClass, + }, + { + headerName: "Component", + field: "vertex_id", + flex: 1, + minWidth: 180, + filter: false, + sortable: false, + editable: false, + cellClass: baseCellClass, + }, + { + headerName: "Target", + field: "target_id", + flex: 1, + minWidth: 150, + filter: false, + sortable: false, + editable: false, + cellClass: baseCellClass, + }, + { + headerName: "Inputs", + field: "inputs", + flex: 1.2, + minWidth: 150, + filter: false, + sortable: false, + editable: false, + cellClass: baseCellClass, + valueGetter: (params) => formatObjectValue(params.data?.inputs), + }, + { + headerName: "Outputs", + field: "outputs", + flex: 1.2, + minWidth: 150, + filter: false, + sortable: false, + editable: false, + cellClass: baseCellClass, + valueGetter: (params) => formatObjectValue(params.data?.outputs), + }, + { + headerName: "Status", + field: "status", + flex: 0.6, + minWidth: 100, + filter: false, + sortable: false, + editable: false, + cellClass: baseCellClass, + cellRenderer: (params: { value: string | null | undefined }) => { + const status = params.value ?? "unknown"; + const isSuccess = status === "success"; + const isError = status === "error"; + + return ( +
+ + {status} + +
+ ); + }, + }, + ]; +} diff --git a/src/frontend/src/modals/flowLogsModal/index.tsx b/src/frontend/src/modals/flowLogsModal/index.tsx index 9026d2b54465..05c4eb44be6e 100644 --- a/src/frontend/src/modals/flowLogsModal/index.tsx +++ b/src/frontend/src/modals/flowLogsModal/index.tsx @@ -1,4 +1,3 @@ -import type { ColDef, ColGroupDef } from "ag-grid-community"; import { useCallback, useEffect, useState } from "react"; import { useSearchParams } from "react-router-dom"; import IconComponent from "@/components/common/genericIconComponent"; @@ -8,6 +7,7 @@ import { useGetTransactionsQuery } from "@/controllers/API/queries/transactions" import useFlowsManagerStore from "@/stores/flowsManagerStore"; import { convertUTCToLocalTimezone } from "@/utils/utils"; import BaseModal from "../baseModal"; +import { createFlowLogsColumns } from "./config/flowLogsColumns"; export default function FlowLogsModal({ children, @@ -19,9 +19,9 @@ export default function FlowLogsModal({ const [pageIndex, setPageIndex] = useState(1); const [pageSize, setPageSize] = useState(20); - const [columns, setColumns] = useState>([]); const [rows, setRows] = useState([]); const [searchParams] = useSearchParams(); + const columns = createFlowLogsColumns(); const flowIdFromUrl = searchParams.get("id"); const { data, isLoading, refetch } = useGetTransactionsQuery({ @@ -35,7 +35,7 @@ export default function FlowLogsModal({ useEffect(() => { if (data) { - const { columns, rows } = data; + const { rows } = data; if (data?.rows?.length > 0) { data.rows.map((row: any) => { @@ -43,7 +43,6 @@ export default function FlowLogsModal({ }); } - setColumns(columns.map((col) => ({ ...col, editable: true }))); setRows(rows); } }, [data]); diff --git a/src/frontend/tests/core/features/logs.spec.ts b/src/frontend/tests/core/features/logs.spec.ts index f6b5bbb4f903..030a01cdabbe 100644 --- a/src/frontend/tests/core/features/logs.spec.ts +++ b/src/frontend/tests/core/features/logs.spec.ts @@ -60,12 +60,20 @@ test( await page.getByText("Logs").click(); - await page.getByText("timestamp").first().isVisible(); - await page.getByText("flow_id").first().isVisible(); - await page.getByText("source").first().isVisible(); - await page.getByText("target", { exact: true }).first().isVisible(); - await page.getByText("target_args", { exact: true }).first().isVisible(); - await page.getByRole("gridcell").first().isVisible(); + // Verify the new column headers are present (inside the dialog) + const dialog = page.getByLabel("Dialog"); + await expect(dialog.getByText("Timestamp", { exact: true })).toBeVisible(); + await expect(dialog.getByText("Component", { exact: true })).toBeVisible(); + await expect(dialog.getByText("Target", { exact: true })).toBeVisible(); + await expect(dialog.getByText("Inputs", { exact: true })).toBeVisible(); + await expect(dialog.getByText("Outputs", { exact: true })).toBeVisible(); + await expect(dialog.getByText("Status", { exact: true })).toBeVisible(); + + // Verify there are log entries (grid cells) + await expect(dialog.getByRole("gridcell").first()).toBeVisible(); + + // Verify success status badge is displayed (scoped to dialog) + await expect(dialog.locator("text=success").first()).toBeVisible(); await page.keyboard.press("Escape"); @@ -73,17 +81,17 @@ test( await page.getByText("Settings", { exact: true }).click(); await page.getByText("Messages", { exact: true }).click(); - await page.getByText("index", { exact: true }).last().isVisible(); - await page.getByText("timestamp", { exact: true }).isVisible(); - await page.getByText("flow_id", { exact: true }).isVisible(); - await page.getByText("source", { exact: true }).isVisible(); - await page.getByText("target", { exact: true }).isVisible(); - await page.getByText("vertex_id", { exact: true }).isVisible(); - await page.getByText("status", { exact: true }).isVisible(); - await page.getByText("error", { exact: true }).isVisible(); - await page.getByText("outputs", { exact: true }).isVisible(); - await page.getByText("inputs", { exact: true }).isVisible(); - - await page.getByRole("gridcell").first().isVisible(); + await expect(page.getByText("index", { exact: true }).last()).toBeVisible(); + await expect(page.getByText("timestamp", { exact: true })).toBeVisible(); + await expect(page.getByText("flow_id", { exact: true })).toBeVisible(); + await expect(page.getByText("source", { exact: true })).toBeVisible(); + await expect(page.getByText("target", { exact: true })).toBeVisible(); + await expect(page.getByText("vertex_id", { exact: true })).toBeVisible(); + await expect(page.getByText("status", { exact: true })).toBeVisible(); + await expect(page.getByText("error", { exact: true })).toBeVisible(); + await expect(page.getByText("outputs", { exact: true })).toBeVisible(); + await expect(page.getByText("inputs", { exact: true })).toBeVisible(); + + await expect(page.getByRole("gridcell").first()).toBeVisible(); }, ); diff --git a/src/frontend/tests/extended/features/flow-logs-modal.spec.ts b/src/frontend/tests/extended/features/flow-logs-modal.spec.ts new file mode 100644 index 000000000000..dd8d6c908f77 --- /dev/null +++ b/src/frontend/tests/extended/features/flow-logs-modal.spec.ts @@ -0,0 +1,214 @@ +import { expect, test } from "../../fixtures"; +import { addCustomComponent } from "../../utils/add-custom-component"; +import { awaitBootstrapTest } from "../../utils/await-bootstrap-test"; + +test.describe("Flow Logs Modal", () => { + test( + "should open logs modal and show description", + { tag: ["@release", "@logs"] }, + async ({ page }) => { + await awaitBootstrapTest(page); + + await page.getByTestId("blank-flow").click(); + + await page.waitForSelector( + '[data-testid="sidebar-custom-component-button"]', + { + timeout: 3000, + }, + ); + + // Open the logs modal + await page.getByText("Logs").click(); + + // Verify modal is open by checking the description + await expect( + page.getByText("Inspect component executions."), + ).toBeVisible(); + + // Close modal + await page.keyboard.press("Escape"); + }, + ); + + test( + "should show 'No Data Available' when no logs exist", + { tag: ["@release", "@logs"] }, + async ({ page }) => { + await awaitBootstrapTest(page); + + await page.getByTestId("blank-flow").click(); + + await page.waitForSelector( + '[data-testid="sidebar-custom-component-button"]', + { + timeout: 3000, + }, + ); + + // Open logs modal without running any component + await page.getByText("Logs").click(); + + // Verify "No Data Available" message is shown + await expect(page.getByText("No Data Available")).toBeVisible(); + + // Close modal + await page.keyboard.press("Escape"); + }, + ); + + test( + "should close modal with escape key", + { tag: ["@release", "@logs"] }, + async ({ page }) => { + await awaitBootstrapTest(page); + + await page.getByTestId("blank-flow").click(); + + await page.waitForSelector( + '[data-testid="sidebar-custom-component-button"]', + { + timeout: 3000, + }, + ); + + // Open logs modal + await page.getByText("Logs").click(); + + // Verify modal is open + await expect( + page.getByText("Inspect component executions."), + ).toBeVisible(); + + // Close with Escape key + await page.keyboard.press("Escape"); + + // Verify modal is closed + await expect( + page.getByText("Inspect component executions."), + ).not.toBeVisible(); + }, + ); + + test( + "should display success status after successful component execution", + { tag: ["@release", "@logs"] }, + async ({ page }) => { + await awaitBootstrapTest(page); + + await page.getByTestId("blank-flow").click(); + + await page.waitForSelector( + '[data-testid="sidebar-custom-component-button"]', + { + timeout: 3000, + }, + ); + + // Add a custom component + await addCustomComponent(page); + + await page.waitForSelector('[data-testid="title-Custom Component"]', { + timeout: 3000, + }); + + // Run the component + await page.getByTestId("button_run_custom component").click(); + + await page.waitForSelector("text=built successfully", { timeout: 30000 }); + + // Open logs modal + await page.getByText("Logs").click(); + + // Verify success status badge is displayed (scoped to dialog) + const dialog = page.getByLabel("Dialog"); + await expect(dialog.locator("text=success").first()).toBeVisible(); + + // Close modal + await page.keyboard.press("Escape"); + }, + ); + + test( + "should display error status after failed component execution", + { tag: ["@release", "@logs"] }, + async ({ page }) => { + const customComponentCodeWithError = ` +from langflow.custom import Component +from langflow.io import MessageTextInput, Output +from langflow.schema import Data + + +class CustomComponent(Component): + display_name = "Custom Component" + description = "Use as a template to create your own component." + icon = "code" + name = "CustomComponent" + + inputs = [ + MessageTextInput( + name="input_value", + display_name="Input Value", + value="Hello, World!", + ), + ] + + outputs = [ + Output(display_name="Output", name="output", method="build_output"), + ] + + def build_output(self) -> Data: + msg = "THIS IS A TEST ERROR MESSAGE" + raise ValueError(msg) +`; + + await awaitBootstrapTest(page); + + await page.getByTestId("blank-flow").click(); + + await page.waitForSelector( + '[data-testid="sidebar-custom-component-button"]', + { + timeout: 3000, + }, + ); + + // Add a custom component + await addCustomComponent(page); + + await page.waitForTimeout(1000); + + await page.waitForSelector('[data-testid="title-Custom Component"]', { + timeout: 3000, + }); + await page.getByTestId("title-Custom Component").click(); + + // Open code editor and add error code + await page.getByTestId("code-button-modal").click(); + + await page.locator(".ace_content").click(); + await page.keyboard.press("ControlOrMeta+A"); + await page.locator("textarea").fill(customComponentCodeWithError); + + await page.getByText("Check & Save").last().click(); + + // Run the component (it will fail) + await page.getByTestId("button_run_custom component").click(); + + // Wait for error message to appear + await page.waitForSelector("text=THIS IS A TEST ERROR MESSAGE", { + timeout: 30000, + }); + + // Open logs modal + await page.getByText("Logs").click(); + + // Verify error status badge is displayed (scoped to dialog) + const dialog = page.getByLabel("Dialog"); + await expect(dialog.locator("text=error").first()).toBeVisible(); + + // Close modal + await page.keyboard.press("Escape"); + }, + ); +}); diff --git a/src/lfx/src/lfx/graph/utils.py b/src/lfx/src/lfx/graph/utils.py index 8a5dbb04828a..1a606abe96ce 100644 --- a/src/lfx/src/lfx/graph/utils.py +++ b/src/lfx/src/lfx/graph/utils.py @@ -11,7 +11,7 @@ from lfx.schema.message import Message # Database imports removed - lfx should be lightweight -from lfx.services.deps import get_db_service, get_settings_service +from lfx.services.deps import get_settings_service if TYPE_CHECKING: from lfx.graph.vertex.base import Vertex @@ -109,21 +109,25 @@ async def log_transaction( flow_id: str | UUID, source: Vertex, status, - target: Vertex | None = None, # noqa: ARG001 - error=None, # noqa: ARG001 + target: Vertex | None = None, + error=None, ) -> None: """Asynchronously logs a transaction record for a vertex in a flow if transaction storage is enabled. This is a lightweight implementation that only logs if database service is available. """ try: - settings_service = get_settings_service() - if not settings_service or not getattr(settings_service.settings, "transactions_storage_enabled", False): + # Guard against null source + if source is None: return - db_service = get_db_service() - if db_service is None: - logger.debug("Database service not available, skipping transaction logging") + # Use langflow's settings service to ensure transactions_storage_enabled is checked correctly + from langflow.services.deps import get_settings_service as langflow_get_settings_service + + settings_service = langflow_get_settings_service() + if not settings_service: + return + if not getattr(settings_service.settings, "transactions_storage_enabled", False): return if not flow_id: @@ -132,8 +136,32 @@ async def log_transaction( else: return - # Log basic transaction info - concrete implementation should be in langflow - logger.debug(f"Transaction logged: vertex={source.id}, flow={flow_id}, status={status}") + # Import here to avoid circular imports + from langflow.services.database.models.transactions.crud import ( + log_transaction as crud_log_transaction, + ) + from langflow.services.database.models.transactions.model import TransactionBase + from langflow.services.deps import session_scope + + if isinstance(flow_id, str): + flow_id = UUID(flow_id) + + inputs = _vertex_to_primitive_dict(source) if source else None + outputs = _vertex_to_primitive_dict(target) if target else None + + transaction = TransactionBase( + vertex_id=source.id, + target_id=target.id if target else None, + inputs=inputs, + outputs=outputs, + status=status, + error=str(error) if error else None, + flow_id=flow_id, + ) + + async with session_scope() as session: + await crud_log_transaction(session, transaction) + except Exception as exc: # noqa: BLE001 logger.debug(f"Error logging transaction: {exc!s}") diff --git a/src/lfx/src/lfx/graph/vertex/base.py b/src/lfx/src/lfx/graph/vertex/base.py index 1d8cdbb595ce..9259be508b2c 100644 --- a/src/lfx/src/lfx/graph/vertex/base.py +++ b/src/lfx/src/lfx/graph/vertex/base.py @@ -105,7 +105,6 @@ def __init__( self.use_result = False self.build_times: list[float] = [] self.state = VertexStates.ACTIVE - self.log_transaction_tasks: set[asyncio.Task] = set() self.output_names: list[str] = [ output["name"] for output in self.outputs if isinstance(output, dict) and "name" in output ] @@ -535,7 +534,7 @@ async def _log_transaction_async( target: Vertex | None = None, error=None, ) -> None: - """Log a transaction asynchronously with proper task handling and cancellation. + """Log a transaction asynchronously. Args: flow_id: The ID of the flow @@ -544,15 +543,10 @@ async def _log_transaction_async( target: Optional target vertex error: Optional error information """ - if self.log_transaction_tasks: - # Safely await and remove completed tasks - task = self.log_transaction_tasks.pop() - await task - - # Create and track new task - task = asyncio.create_task(log_transaction(flow_id, source, status, target, error)) - self.log_transaction_tasks.add(task) - task.add_done_callback(self.log_transaction_tasks.discard) + try: + await log_transaction(flow_id, source, status, target, error) + except Exception as exc: # noqa: BLE001 + logger.debug(f"Error logging transaction: {exc!s}") async def _get_result( self, @@ -657,6 +651,12 @@ async def _build_results( except Exception as exc: tb = traceback.format_exc() await logger.aexception(exc) + # Log transaction error + flow_id = self.graph.flow_id + if flow_id: + await self._log_transaction_async( + str(flow_id), source=self, target=None, status="error", error=str(exc) + ) msg = f"Error building Component {self.display_name}: \n\n{exc}" raise ComponentBuildError(msg, tb) from exc @@ -772,6 +772,11 @@ async def build( self.finalize_build() + # Log transaction after successful build + flow_id = self.graph.flow_id + if flow_id: + await self._log_transaction_async(str(flow_id), source=self, target=None, status="success") + return await self.get_requester_result(requester) async def get_requester_result(self, requester: Vertex | None):