Skip to content

Commit 8b67a37

Browse files
jordanrfrazierautofix-ci[bot]ogabrielluizerichare
authored
feat: add s3 file storage implementation (#10526)
* Beginning cherry-pick of changes * more updates * Add local and s3 storage services, modified base storage service, delegate from langflow to lfx * more change * remove all instances of with session * test s3 * add tests and fix fe * [autofix.ci] apply automated fixes * [autofix.ci] apply automated fixes (attempt 2/3) * fix diamond inheritance and some flush / reset db patterns * [autofix.ci] apply automated fixes * Add methods for injectable session scopes * user run_until_complete for some async calls * [autofix.ci] apply automated fixes * Fix csv and json to data components * Fix some imports and add s3_comp and local-storage_serv testing * Fix test paths * more test fixes * Import and test fixes * fix remaining s3 compo tests * Fix langchain compatibility versions * Fix json/csv agent test mocks * Fix base file reads with structured output path * re-add lock * Add safety around file streaming gen * [autofix.ci] apply automated fixes * comment * improve error handling for file deletion * abstract reads to storage utils * [autofix.ci] apply automated fixes * remove unnecessary deps * fix lint issues * fix byte stream generator call in download_file function * Add import error handling for langchain dependencies in CSV and JSON agent components * Add pypdf as a dependency in both uv.lock and pyproject.toml files * ruff * [autofix.ci] apply automated fixes * [autofix.ci] apply automated fixes (attempt 2/3) * fix test patches * [autofix.ci] apply automated fixes * [autofix.ci] apply automated fixes (attempt 2/3) * [autofix.ci] apply automated fixes (attempt 3/3) * Add noop database service and fix tests to utilize that * comp index * mock the imports more but not sure if that's the best soln * ruff * [autofix.ci] apply automated fixes * uses commit in test since it yields * [autofix.ci] apply automated fixes * Fix prof pic test * update prof pic size to fix test? * ruff * update lockfile * update comp index * [autofix.ci] apply automated fixes * [autofix.ci] apply automated fixes (attempt 2/3) * use full path to altk test import * update lockfile. again? * Marks json test to allow a task leak, allowing since deprecated component * [autofix.ci] apply automated fixes * use lockfile from main to temporarily bypass issues with altk * Update component build index * [autofix.ci] apply automated fixes * Fix conftest session scope usage?) * Fix conftest yields * lockfile * [autofix.ci] apply automated fixes * Fix another test session patterns * Fix test type * ruff * [autofix.ci] apply automated fixes * [autofix.ci] apply automated fixes * Update s3.py * [autofix.ci] apply automated fixes * Document the append param * [autofix.ci] apply automated fixes * Update test_s3_endpoints.py * [autofix.ci] apply automated fixes * add back dep import script * mock fixes * fix save file tests * [autofix.ci] apply automated fixes * Flush after delete in upload_user_file * use current path in loop csv test * ruff * ruff * Updates the sqlite pragma test to use sqlite3 directly * ruff * ensure pragma test db is in wal mode * Adds back some relevant tests * ruff * Add more integration s3 tests * ruff * component index --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Gabriel Luiz Freitas Almeida <[email protected]> Co-authored-by: Eric Hare <[email protected]>
1 parent 348b1b8 commit 8b67a37

File tree

87 files changed

+6375
-1225
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

87 files changed

+6375
-1225
lines changed

.secrets.baseline

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -883,7 +883,7 @@
883883
"filename": "src/backend/tests/unit/api/v2/test_files.py",
884884
"hashed_secret": "61fbb5a12cd7b1f1fe1624120089efc0cd299e43",
885885
"is_verified": false,
886-
"line_number": 29,
886+
"line_number": 40,
887887
"is_secret": false
888888
}
889889
],
@@ -1412,5 +1412,5 @@
14121412
}
14131413
]
14141414
},
1415-
"generated_at": "2025-11-10T17:33:14Z"
1415+
"generated_at": "2025-11-19T18:36:04Z"
14161416
}

docs/docs/Develop/memory.mdx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ To fine-tune your database connection pool and timeout settings, you can set the
8080

8181
* `LANGFLOW_DB_CONNECT_TIMEOUT`: The number of seconds to wait before giving up on a lock to be released or establishing a connection to the database. This may be separate from the `pool_timeout` in `LANGFLOW_DB_CONNECTION_SETTINGS`. Default: 30.
8282

83+
* `LANGFLOW_MIGRATION_LOCK_NAMESPACE`: Optional namespace identifier for PostgreSQL advisory lock during migrations. If not provided, a hash of the database URL will be used. Useful when multiple Langflow instances share the same database and need coordinated migration locking
84+
8385
* `LANGFLOW_DB_CONNECTION_SETTINGS`: A JSON dictionary containing the following database connection pool settings:
8486

8587
- `pool_size`: The base number of connections to keep open in the connection pool. Default: 20.

pyproject.toml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ dependencies = [
7575
"duckduckgo_search==7.2.1",
7676
"opensearch-py==2.8.0",
7777
"langchain-google-genai==2.0.6",
78-
"langchain-cohere==0.3.3",
78+
"langchain-cohere>=0.3.3,<1.0.0",
7979
"langchain-huggingface==0.3.1",
8080
"langchain-anthropic==0.3.14",
8181
"langchain-astradb>=0.6.1,<1.0.0",
@@ -85,14 +85,14 @@ dependencies = [
8585
"langchain-pinecone>=0.2.8,<1.0.0",
8686
"langchain-mistralai==0.2.3",
8787
"langchain-chroma>=0.2.6,<1.0.0",
88-
"langchain-aws==0.2.33",
88+
"langchain-aws>=0.2.33,<1.0.0",
8989
"langchain-unstructured==0.1.5",
9090
"langchain-milvus==0.1.7",
9191
"langchain-mongodb==0.7.0",
9292
"langchain-nvidia-ai-endpoints==0.3.8",
9393
"langchain-google-calendar-tools==0.0.1",
94-
"langchain-google-community==2.0.3",
95-
"langchain-elasticsearch==0.3.0",
94+
"langchain-google-community>=2.0.3,<3.0.0",
95+
"langchain-elasticsearch>=0.3.0,<1.0.0",
9696
"langchain-ollama==0.3.10",
9797
"langchain-sambanova==0.1.0",
9898
"langchain-community>=0.3.21,<1.0.0",
@@ -138,6 +138,7 @@ dependencies = [
138138
"cuga==0.1.10",
139139
"agent-lifecycle-toolkit~=0.4.1",
140140
"astrapy>=2.1.0,<3.0.0",
141+
"aioboto3>=15.2.0,<16.0.0"
141142
]
142143

143144

src/backend/base/langflow/__main__.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -891,9 +891,7 @@ async def aapi_key():
891891
await delete_api_key(session, api_key.id)
892892

893893
api_key_create = ApiKeyCreate(name="CLI")
894-
unmasked_api_key = await create_api_key(session, api_key_create, user_id=superuser.id)
895-
await session.commit()
896-
return unmasked_api_key
894+
return await create_api_key(session, api_key_create, user_id=superuser.id)
897895

898896
unmasked_api_key = asyncio.run(aapi_key())
899897
# Create a banner to display the API key and tell the user it won't be shown again

src/backend/base/langflow/alembic/env.py

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
# noqa: INP001
22
import asyncio
3+
import hashlib
4+
import os
35
from logging.config import fileConfig
46
from typing import Any
57

8+
69
from alembic import context
710
from sqlalchemy import pool, text
811
from sqlalchemy.event import listen
912
from sqlalchemy.ext.asyncio import async_engine_from_config
1013

14+
from lfx.log.logger import logger
15+
1116
from langflow.services.database.service import SQLModel
1217

18+
1319
# this is the Alembic Config object, which provides
1420
# access to the values within the .ini file in use.
1521
config = context.config
@@ -96,25 +102,33 @@ def _do_run_migrations(connection):
96102
configure_kwargs["prepare_threshold"] = None
97103

98104
context.configure(**configure_kwargs)
99-
100105
with context.begin_transaction():
101106
if connection.dialect.name == "postgresql":
102-
connection.execute(text("SET LOCAL lock_timeout = '60s';"))
103-
connection.execute(text("SELECT pg_advisory_xact_lock(112233);"))
107+
# Use namespace from environment variable if provided, otherwise use default static key
108+
namespace = os.getenv("LANGFLOW_MIGRATION_LOCK_NAMESPACE")
109+
if namespace:
110+
lock_key = int(hashlib.sha256(namespace.encode()).hexdigest()[:16], 16) % (2**63 - 1)
111+
logger.info(f"Using migration lock namespace: {namespace}, lock_key: {lock_key}")
112+
else:
113+
lock_key = 11223344
114+
logger.info(f"Using default migration lock_key: {lock_key}")
115+
116+
connection.execute(text("SET LOCAL lock_timeout = '180s';"))
117+
connection.execute(text(f"SELECT pg_advisory_xact_lock({lock_key});"))
104118
context.run_migrations()
105119

106-
107120
async def _run_async_migrations() -> None:
108-
# Get database URL to determine dialect
109-
url = config.get_main_option("sqlalchemy.url")
110-
connect_args: dict[str, Any] = {}
121+
# Disable prepared statements for PostgreSQL (required for PgBouncer compatibility)
122+
# SQLite doesn't support this parameter, so only add it for PostgreSQL
123+
config_section = config.get_section(config.config_ini_section, {})
124+
db_url = config_section.get("sqlalchemy.url", "")
111125

112-
# Only add prepare_threshold for PostgreSQL
113-
if url and "postgresql" in url:
126+
connect_args: dict[str, Any] = {}
127+
if db_url and "postgresql" in db_url:
114128
connect_args["prepare_threshold"] = None
115129

116130
connectable = async_engine_from_config(
117-
config.get_section(config.config_ini_section, {}),
131+
config_section,
118132
prefix="sqlalchemy.",
119133
poolclass=pool.NullPool,
120134
connect_args=connect_args,

src/backend/base/langflow/api/utils/core.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from fastapi_pagination import Params
1111
from lfx.graph.graph.base import Graph
1212
from lfx.log.logger import logger
13+
from lfx.services.deps import injectable_session_scope, injectable_session_scope_readonly, session_scope
1314
from sqlalchemy import delete
1415
from sqlmodel.ext.asyncio.session import AsyncSession
1516

@@ -19,7 +20,6 @@
1920
from langflow.services.database.models.transactions.model import TransactionTable
2021
from langflow.services.database.models.user.model import User
2122
from langflow.services.database.models.vertex_builds.model import VertexBuildTable
22-
from langflow.services.deps import get_session, session_scope
2323
from langflow.services.store.utils import get_lf_version_from_pypi
2424
from langflow.utils.constants import LANGFLOW_GLOBAL_VAR_HEADER_PREFIX
2525

@@ -35,7 +35,10 @@
3535

3636
CurrentActiveUser = Annotated[User, Depends(get_current_active_user)]
3737
CurrentActiveMCPUser = Annotated[User, Depends(get_current_active_user_mcp)]
38-
DbSession = Annotated[AsyncSession, Depends(get_session)]
38+
# DbSession with auto-commit for write operations
39+
DbSession = Annotated[AsyncSession, Depends(injectable_session_scope)]
40+
# DbSessionReadOnly for read-only operations (no auto-commit, reduces lock contention)
41+
DbSessionReadOnly = Annotated[AsyncSession, Depends(injectable_session_scope_readonly)]
3942

4043

4144
class EventDeliveryType(str, Enum):

src/backend/base/langflow/api/v1/chat.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ async def build_vertex(
289289
if isinstance(cache, CacheMiss):
290290
# If there's no cache
291291
await logger.awarning(f"No cache found for {flow_id_str}. Building graph starting at {vertex_id}")
292+
292293
async with session_scope() as session:
293294
graph = await build_graph_from_db(
294295
flow_id=flow_id,

src/backend/base/langflow/api/v1/files.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,9 @@ async def list_profile_pictures(
173173
people_path = config_path / "profile_pictures" / "People"
174174
space_path = config_path / "profile_pictures" / "Space"
175175

176-
# List files directly from local filesystem
176+
# List files directly from local filesystem - bundled with the container
177177
people = [f.name for f in people_path.iterdir() if f.is_file()] if people_path.exists() else []
178178
space = [f.name for f in space_path.iterdir() if f.is_file()] if space_path.exists() else []
179-
180179
except Exception as e:
181180
raise HTTPException(status_code=500, detail=str(e)) from e
182181

src/backend/base/langflow/api/v1/flows.py

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,12 @@ async def create_flow(
160160
):
161161
try:
162162
db_flow = await _new_flow(session=session, flow=flow, user_id=current_user.id)
163-
await session.commit()
163+
await session.flush()
164164
await session.refresh(db_flow)
165-
166165
await _save_flow_to_fs(db_flow)
167166

167+
# Convert to FlowRead while session is still active to avoid detached instance errors
168+
flow_read = FlowRead.model_validate(db_flow, from_attributes=True)
168169
except Exception as e:
169170
if "UNIQUE constraint failed" in str(e):
170171
# Get the name of the column that failed
@@ -180,7 +181,7 @@ async def create_flow(
180181
if isinstance(e, HTTPException):
181182
raise
182183
raise HTTPException(status_code=500, detail=str(e)) from e
183-
return db_flow
184+
return flow_read
184185

185186

186187
@router.get("/", response_model=list[FlowRead] | Page[FlowRead] | list[FlowHeader], status_code=200)
@@ -258,8 +259,9 @@ async def read_flows(
258259
flow_headers = [FlowHeader.model_validate(flow, from_attributes=True) for flow in flows]
259260
return compress_response(flow_headers)
260261

261-
# Compress the full flows response
262-
return compress_response(flows)
262+
# Convert to FlowRead while session is still active to avoid detached instance errors
263+
flow_reads = [FlowRead.model_validate(flow, from_attributes=True) for flow in flows]
264+
return compress_response(flow_reads)
263265

264266
stmt = stmt.where(Flow.folder_id == folder_id)
265267

@@ -295,7 +297,8 @@ async def read_flow(
295297
):
296298
"""Read a flow."""
297299
if user_flow := await _read_flow(session, flow_id, current_user.id):
298-
return user_flow
300+
# Convert to FlowRead while session is still active to avoid detached instance errors
301+
return FlowRead.model_validate(user_flow, from_attributes=True)
299302
raise HTTPException(status_code=404, detail="Flow not found")
300303

301304

@@ -358,11 +361,13 @@ async def update_flow(
358361
db_flow.folder_id = default_folder.id
359362

360363
session.add(db_flow)
361-
await session.commit()
364+
await session.flush()
362365
await session.refresh(db_flow)
363-
364366
await _save_flow_to_fs(db_flow)
365367

368+
# Convert to FlowRead while session is still active to avoid detached instance errors
369+
flow_read = FlowRead.model_validate(db_flow, from_attributes=True)
370+
366371
except Exception as e:
367372
if "UNIQUE constraint failed" in str(e):
368373
# Get the name of the column that failed
@@ -379,7 +384,7 @@ async def update_flow(
379384
raise HTTPException(status_code=e.status_code, detail=str(e)) from e
380385
raise HTTPException(status_code=500, detail=str(e)) from e
381386

382-
return db_flow
387+
return flow_read
383388

384389

385390
@router.delete("/{flow_id}", status_code=200)
@@ -398,7 +403,6 @@ async def delete_flow(
398403
if not flow:
399404
raise HTTPException(status_code=404, detail="Flow not found")
400405
await cascade_delete_flow(session, flow.id)
401-
await session.commit()
402406
return {"message": "Flow deleted successfully"}
403407

404408

@@ -416,10 +420,12 @@ async def create_flows(
416420
db_flow = Flow.model_validate(flow, from_attributes=True)
417421
session.add(db_flow)
418422
db_flows.append(db_flow)
419-
await session.commit()
423+
424+
await session.flush()
420425
for db_flow in db_flows:
421426
await session.refresh(db_flow)
422-
return db_flows
427+
428+
return [FlowRead.model_validate(db_flow, from_attributes=True) for db_flow in db_flows]
423429

424430

425431
@router.post("/upload/", response_model=list[FlowRead], status_code=201)
@@ -444,10 +450,13 @@ async def upload_file(
444450
response_list.append(response)
445451

446452
try:
447-
await session.commit()
453+
await session.flush()
448454
for db_flow in response_list:
449455
await session.refresh(db_flow)
450456
await _save_flow_to_fs(db_flow)
457+
458+
# Convert to FlowRead while session is still active to avoid detached instance errors
459+
flow_reads = [FlowRead.model_validate(db_flow, from_attributes=True) for db_flow in response_list]
451460
except Exception as e:
452461
if "UNIQUE constraint failed" in str(e):
453462
# Get the name of the column that failed
@@ -464,7 +473,7 @@ async def upload_file(
464473
raise
465474
raise HTTPException(status_code=500, detail=str(e)) from e
466475

467-
return response_list
476+
return flow_reads
468477

469478

470479
@router.delete("/")
@@ -491,7 +500,7 @@ async def delete_multiple_flows(
491500
for flow in flows_to_delete:
492501
await cascade_delete_flow(db, flow.id)
493502

494-
await db.commit()
503+
await db.flush()
495504
return {"deleted": len(flows_to_delete)}
496505
except Exception as exc:
497506
raise HTTPException(status_code=500, detail=str(exc)) from exc

src/backend/base/langflow/api/v1/mcp_projects.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,8 @@ async def update_project_mcp_settings(
440440
session.add(flow)
441441
updated_flows.append(flow)
442442

443+
await session.flush()
444+
443445
response: dict[str, Any] = {
444446
"message": f"Updated MCP settings for {len(updated_flows)} flows and project auth settings"
445447
}
@@ -1201,8 +1203,6 @@ async def init_mcp_servers():
12011203

12021204
# Auto-configure starter projects with MCP server settings if enabled
12031205
await auto_configure_starter_projects_mcp(session)
1204-
# Commit any auth settings updates
1205-
await session.commit()
12061206

12071207
except Exception as e: # noqa: BLE001
12081208
msg = f"Failed to initialize MCP servers: {e}"

0 commit comments

Comments
 (0)