Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
8dd2736
fix: Add additional blocking functions for Alembic and dotenv in tests
ogabrielluiz Jun 3, 2025
fd6e667
fix: Implement database availability checks in Component and Database…
ogabrielluiz Jun 3, 2025
5ee198a
test: Add comprehensive tests for AgentComponent with OpenAI and Anth…
ogabrielluiz Jun 3, 2025
7d66e73
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Jul 1, 2025
68d7a6f
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Jul 1, 2025
580a53f
[autofix.ci] apply automated fixes
autofix-ci[bot] Jul 9, 2025
cfd126e
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Jul 9, 2025
a22ef24
refactor: Improve database availability handling in Component class
ogabrielluiz Jul 10, 2025
fb5866b
test: Add async test for message sending without database in Componen…
ogabrielluiz Jul 10, 2025
008c31e
[autofix.ci] apply automated fixes
autofix-ci[bot] Jul 10, 2025
0dd0d47
chore: Add new linting rule PLC0415 to pyproject.toml
ogabrielluiz Jul 10, 2025
8c92df9
test: Add async test for AgentComponent message event handling
ogabrielluiz Jul 10, 2025
b4caafd
Merge branch 'main' into deactivate-db-in-agent
ogabrielluiz Jul 14, 2025
3466414
[autofix.ci] apply automated fixes
autofix-ci[bot] Jul 14, 2025
7bb9c9c
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Jul 14, 2025
f43d68e
[autofix.ci] apply automated fixes (attempt 3/3)
autofix-ci[bot] Jul 14, 2025
4c81a6a
feat(graph): add component_config parameter to Graph initialization a…
ogabrielluiz Jul 14, 2025
b873f0a
fix(component): enhance database availability check to respect persis…
ogabrielluiz Jul 14, 2025
477b011
fix(tests): update model names in test_agent_component to use OPENAI_…
ogabrielluiz Jul 14, 2025
ad9007a
feat(api): add optional component_config parameter to build_flow and …
ogabrielluiz Jul 14, 2025
2ff2e04
chore(pre-commit): update ruff configuration to use local repository …
ogabrielluiz Jul 14, 2025
92a4bab
chore(pre-commit): update ruff hook entry to include 'check' for impr…
ogabrielluiz Jul 14, 2025
0df5e94
feat(api): introduce ComponentConfig model and integrate optional com…
ogabrielluiz Jul 14, 2025
2cb40e3
refactor(component): simplify database availability check for clarity…
ogabrielluiz Jul 14, 2025
1aceeb5
fix(api): ensure component_config is properly handled in build_graph_…
ogabrielluiz Jul 14, 2025
7dd90df
feat(tests): add tests for ComponentConfig behavior in flow building …
ogabrielluiz Jul 14, 2025
8e8bdfc
[autofix.ci] apply automated fixes
autofix-ci[bot] Jul 14, 2025
49cc70f
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Jul 14, 2025
7379597
refactor(component): remove unnecessary call to _set_output_required_…
ogabrielluiz Jul 14, 2025
39c5662
[autofix.ci] apply automated fixes
autofix-ci[bot] Jul 14, 2025
3aa24cc
Merge branch 'main' into deactivate-db-in-agent
ogabrielluiz Jul 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ ignore = [
"TD002", # Missing author in TODO
"TD003", # Missing issue link in TODO
"TRY301", # A bit too harsh (Abstract `raise` to an inner function)

"PLC0415",
# Rules that are TODOs
"ANN",
]
Expand Down
5 changes: 5 additions & 0 deletions src/backend/base/langflow/api/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from langflow.api.disconnect import DisconnectHandlerStreamingResponse
from langflow.api.utils import (
ComponentConfig,
CurrentActiveUser,
EventDeliveryType,
build_graph_from_data,
Expand Down Expand Up @@ -51,6 +52,7 @@ async def start_flow_build(
current_user: CurrentActiveUser,
queue_service: JobQueueService,
flow_name: str | None = None,
component_config: ComponentConfig | None = None,
) -> str:
"""Start the flow build process by setting up the queue and starting the build task.

Expand All @@ -72,6 +74,7 @@ async def start_flow_build(
log_builds=log_builds,
current_user=current_user,
flow_name=flow_name,
component_config=component_config,
)
queue_service.start_job(job_id, task_coro)
except Exception as e:
Expand Down Expand Up @@ -191,6 +194,7 @@ async def generate_flow_events(
log_builds: bool,
current_user: CurrentActiveUser,
flow_name: str | None = None,
component_config: ComponentConfig | None = None,
) -> None:
"""Generate events for flow building process.

Expand Down Expand Up @@ -263,6 +267,7 @@ async def create_graph(fresh_session, flow_id_str: str, flow_name: str | None) -
chat_service=chat_service,
user_id=str(current_user.id),
session_id=effective_session_id,
component_config=component_config,
)

if not flow_name:
Expand Down
19 changes: 18 additions & 1 deletion src/backend/base/langflow/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from fastapi import Depends, HTTPException, Query
from fastapi_pagination import Params
from loguru import logger
from pydantic import BaseModel
from sqlalchemy import delete
from sqlmodel.ext.asyncio.session import AsyncSession

Expand Down Expand Up @@ -43,6 +44,10 @@ class EventDeliveryType(str, Enum):
POLLING = "polling"


class ComponentConfig(BaseModel):
persist_messages: bool = True


def has_api_terms(word: str):
return "api" in word and ("key" in word or ("token" in word and "tokens" not in word))

Expand Down Expand Up @@ -168,7 +173,19 @@ async def build_graph_from_data(flow_id: uuid.UUID | str, payload: dict, **kwarg
str_flow_id = str(flow_id)
session_id = kwargs.get("session_id") or str_flow_id

graph = Graph.from_payload(payload, str_flow_id, flow_name, kwargs.get("user_id"))
component_config = kwargs.get("component_config")
if component_config is None:
component_config = {}
if isinstance(component_config, ComponentConfig):
component_config = component_config.model_dump()

graph = Graph.from_payload(
payload,
str_flow_id,
flow_name,
user_id=kwargs.get("user_id"),
component_config=component_config,
)
for vertex_id in graph.has_session_id_vertices:
vertex = graph.get_vertex(vertex_id)
if vertex is None:
Expand Down
7 changes: 7 additions & 0 deletions src/backend/base/langflow/api/v1/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
)
from langflow.api.limited_background_tasks import LimitVertexBuildBackgroundTasks
from langflow.api.utils import (
ComponentConfig,
CurrentActiveUser,
DbSession,
EventDeliveryType,
Expand Down Expand Up @@ -154,6 +155,7 @@ async def build_flow(
queue_service: Annotated[JobQueueService, Depends(get_queue_service)],
flow_name: str | None = None,
event_delivery: EventDeliveryType = EventDeliveryType.POLLING,
component_config: Annotated[ComponentConfig | None, Body(embed=True)] = None,
):
"""Build and process a flow, returning a job ID for event polling.

Expand All @@ -173,6 +175,7 @@ async def build_flow(
queue_service: Queue service for job management
flow_name: Optional name for the flow
event_delivery: Optional event delivery type - default is streaming
component_config: Optional component configuration

Returns:
Dict with job_id that can be used to poll for build status
Expand All @@ -195,6 +198,7 @@ async def build_flow(
current_user=current_user,
queue_service=queue_service,
flow_name=flow_name,
component_config=component_config,
)

# This is required to support FE tests - we need to be able to set the event delivery to direct
Expand Down Expand Up @@ -573,6 +577,7 @@ async def build_public_tmp(
request: Request,
queue_service: Annotated[JobQueueService, Depends(get_queue_service)],
event_delivery: EventDeliveryType = EventDeliveryType.POLLING,
component_config: Annotated[ComponentConfig | None, Body(embed=True)] = None,
):
"""Build a public flow without requiring authentication.

Expand Down Expand Up @@ -601,6 +606,7 @@ async def build_public_tmp(
request: FastAPI request object (needed for cookie access)
queue_service: Queue service for job management
event_delivery: Optional event delivery type - default is streaming
component_config: Optional component configuration

Returns:
Dict with job_id that can be used to poll for build status
Expand All @@ -623,6 +629,7 @@ async def build_public_tmp(
current_user=owner_user,
queue_service=queue_service,
flow_name=flow_name or f"{client_id}_{flow_id}",
component_config=component_config,
)
except Exception as exc:
logger.exception("Error building public flow")
Expand Down
4 changes: 3 additions & 1 deletion src/backend/base/langflow/api/v1/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,9 @@ async def custom_component_update(
):
"""Update an existing custom component with new code and configuration.

Processes the provided code and template updates, applies parameter changes (including those loaded from the database), updates the component's build configuration, and validates outputs. Returns the updated component node as a JSON-serializable dictionary.
Processes the provided code and template updates, applies parameter changes (including those loaded from the
database), updates the component's build configuration, and validates outputs. Returns the updated component node
as a JSON-serializable dictionary.

Raises:
HTTPException: If an error occurs during component building or updating.
Expand Down
36 changes: 26 additions & 10 deletions src/backend/base/langflow/custom/custom_component/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from langflow.schema.data import Data
from langflow.schema.message import ErrorMessage, Message
from langflow.schema.properties import Source
from langflow.services.deps import get_db_service
from langflow.services.tracing.schema import Log
from langflow.template.field.base import UNDEFINED, Input, Output
from langflow.template.frontend_node.custom_components import ComponentFrontendNode
Expand Down Expand Up @@ -162,6 +163,7 @@ def __init__(self, **kwargs) -> None:
# Final setup
self._set_output_types(list(self._outputs_map.values()))
self.set_class_code()
self._database_available: bool | None = None

def _build_source(self, id_: str | None, display_name: str | None, source: str | None) -> Source:
source_dict = {}
Expand Down Expand Up @@ -1460,6 +1462,16 @@ def is_connected_to_chat_output(self) -> bool:

return has_chat_output(self.graph.get_vertex_neighbors(self._vertex))

def _is_database_available(self) -> bool:
"""Check if the database is available."""
if (
hasattr(self, "graph")
and hasattr(self.graph, "component_config")
and not self.graph.component_config.get("persist_messages", True)
):
return False
return get_db_service().database_available

def _should_skip_message(self, message: Message) -> bool:
"""Check if the message should be skipped based on vertex configuration and message type."""
return (
Expand All @@ -1479,25 +1491,32 @@ async def send_message(self, message: Message, id_: str | None = None):
message.session_id = session_id
if hasattr(message, "flow_id") and isinstance(message.flow_id, str):
message.flow_id = UUID(message.flow_id)
stored_message = await self._store_message(message)
if self._is_database_available():
stored_message = await self._store_message(message)
self._stored_message_id = stored_message.id
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe this is being used anymore

else:
stored_message = message

self._stored_message_id = stored_message.id
try:
complete_message = ""
if (
self._should_stream_message(stored_message, message)
self._should_stream_message(message)
and message is not None
and isinstance(message.text, AsyncIterator | Iterator)
):
complete_message = await self._stream_message(message.text, stored_message)
stored_message.text = complete_message
stored_message = await self._update_stored_message(stored_message)
if self._is_database_available():
stored_message = await self._update_stored_message(stored_message)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mutate the stored_message variable? I assume it does based on the context, but what consequences does that have downstream?

First time looking at this area, so I'm a bit confused to the message assignment -- we've got message, stored_message, complete_message.

else:
stored_message = message
else:
# Only send message event for non-streaming messages
await self._send_message_event(stored_message, id_=id_)
except Exception:
# remove the message from the database
await delete_message(stored_message.id)
if self._is_database_available():
await delete_message(stored_message.id)
raise
self.status = stored_message
return stored_message
Expand Down Expand Up @@ -1539,12 +1558,9 @@ def _send_event():

await asyncio.to_thread(_send_event)

def _should_stream_message(self, stored_message: Message, original_message: Message) -> bool:
def _should_stream_message(self, original_message: Message) -> bool:
return bool(
hasattr(self, "_event_manager")
and self._event_manager
and stored_message.id
and not isinstance(original_message.text, str)
hasattr(self, "_event_manager") and self._event_manager and not isinstance(original_message.text, str)
)

async def _update_stored_message(self, message: Message) -> Message:
Expand Down
6 changes: 5 additions & 1 deletion src/backend/base/langflow/graph/graph/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def __init__(
user_id: str | None = None,
log_config: LogConfig | None = None,
context: dict[str, Any] | None = None,
component_config: dict[str, Any] | None = None,
) -> None:
"""Initializes a new Graph instance.

Expand Down Expand Up @@ -125,6 +126,7 @@ def __init__(
self._call_order: list[str] = []
self._snapshots: list[dict[str, Any]] = []
self._end_trace_tasks: set[asyncio.Task] = set()
self.component_config: dict[str, Any] = component_config or {}

if context and not isinstance(context, dict):
msg = "Context must be a dictionary"
Expand Down Expand Up @@ -1049,6 +1051,7 @@ def from_payload(
flow_id: str | None = None,
flow_name: str | None = None,
user_id: str | None = None,
component_config: dict[str, Any] | None = None,
) -> Graph:
"""Creates a graph from a payload.

Expand All @@ -1057,6 +1060,7 @@ def from_payload(
flow_id: The ID of the flow.
flow_name: The flow name.
user_id: The user ID.
component_config: Component configuration dictionary.

Returns:
Graph: The created graph.
Expand All @@ -1066,7 +1070,7 @@ def from_payload(
try:
vertices = payload["nodes"]
edges = payload["edges"]
graph = cls(flow_id=flow_id, flow_name=flow_name, user_id=user_id)
graph = cls(flow_id=flow_id, flow_name=flow_name, user_id=user_id, component_config=component_config)
graph.add_nodes_and_edges(vertices, edges)
except KeyError as exc:
logger.exception(exc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4786,4 +4786,4 @@
"rag",
"q-a"
]
}
}
1 change: 1 addition & 0 deletions src/backend/base/langflow/interface/initialize/loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def instantiate_class(
_vertex=vertex,
_tracing_service=get_tracing_service(),
_id=vertex.id,
**vertex.graph.component_config,
)
if hasattr(custom_component, "set_event_manager"):
custom_component.set_event_manager(event_manager)
Expand Down
24 changes: 23 additions & 1 deletion src/backend/base/langflow/services/database/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class DatabaseService(Service):
name = "database_service"

def __init__(self, settings_service: SettingsService):
self.engine: AsyncEngine | None = None
self._logged_pragma = False
self.settings_service = settings_service
if settings_service.settings.database_url is None:
Expand All @@ -56,6 +57,7 @@ def __init__(self, settings_service: SettingsService):
# register the event listener for sqlite as part of this class.
# Using decorator will make the method not able to use self
event.listen(Engine, "connect", self.on_connection)

if self.settings_service.settings.database_connection_retry:
self.engine = self._create_engine_with_retry()
else:
Expand All @@ -68,6 +70,23 @@ def __init__(self, settings_service: SettingsService):
else:
self.alembic_log_path = Path(langflow_dir) / alembic_log_file

self._database_available = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Perhaps this is in a follow up PR)

Do you anticipate adding a new config to indicate "No database", or does not setting DATABASE_URL imply no database? Either way, we'll need some changes in the init here to account for that, especially around the create_ methods.

Copy link
Contributor Author

@ogabrielluiz ogabrielluiz Jul 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be easier for sure. My solution is a hacky one. Perhaps checking for the var first, then checking for the message table would be better.

I'll try to refactor the settings soon and then we can have settings specifically for the initialization, allowing us to skip steps if we so desire


# Check if there's a message table in the database
async def _is_database_available(self) -> bool:
async with self.with_session() as session, session.bind.connect() as conn:
# Use run_sync to inspect the connection
def check_tables(conn):
inspector = inspect(conn)
return "message" in inspector.get_table_names()

self._database_available = await conn.run_sync(check_tables)
return self._database_available

@property
def database_available(self) -> bool:
return self._database_available

async def initialize_alembic_log_file(self):
# Ensure the directory and file for the alembic log file exists
await anyio.Path(self.alembic_log_path.parent).mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -369,6 +388,7 @@ async def run_migrations(self, *, fix=False) -> None:
logger.debug("Alembic not initialized")
should_initialize_alembic = True
await asyncio.to_thread(self._run_migrations, should_initialize_alembic, fix)
self._database_available = await self._is_database_available()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this is checked here rather than in init?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function checks if the message table exists, so in init it would not exist the first time Langflow runs.


@staticmethod
def try_downgrade_upgrade_until_success(alembic_cfg, retries=5) -> None:
Expand Down Expand Up @@ -476,4 +496,6 @@ async def teardown(self) -> None:
await teardown_superuser(settings_service, session)
except Exception: # noqa: BLE001
logger.exception("Error tearing down database")
await self.engine.dispose()
if self.engine is not None:
await self.engine.dispose()
self.engine = None
4 changes: 4 additions & 0 deletions src/backend/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ def blockbuster(request):
.can_block_in("rich/traceback.py", "_render_stack")
.can_block_in("langchain_core/_api/internal.py", "is_caller_internal")
.can_block_in("langchain_core/runnables/utils.py", "get_function_nonlocals")
.can_block_in("alembic/versions", "_load_revisions")
.can_block_in("dotenv/main.py", "find_dotenv")
.can_block_in("alembic/script/base.py", "_load_revisions")
.can_block_in("alembic/env.py", "_do_run_migrations")
)

for func in ["os.stat", "os.path.abspath", "os.scandir", "os.listdir"]:
Expand Down
Loading
Loading