-
Notifications
You must be signed in to change notification settings - Fork 7.6k
refactor: Simplify MCP session management and consolidate utility fun… #9893
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…ctions Remove complex session reuse logic, cleanup tasks, and reference counting from MCPSessionManager. Move create_input_schema_from_json_schema function into util.py
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughRefactors MCP utilities and session/client APIs within src/lfx/src/lfx/base/mcp/util.py: introduces dynamic JSON-schema→Pydantic model creation, adds Flow lookup by sanitized name, converts session management to async per-context sessions with health checks and cleanup, simplifies header handling, and updates client URL validation/redirect pre-check signatures. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Caller
participant Manager as MCPSessionManager
participant Client as MCP Client (Stdio/SSE)
participant Server
Caller->>Manager: get_session(context_id, params, transport_type)
alt existing session for context_id
Manager->>Manager: check health (task state, write-stream, connectivity)
alt unhealthy or server switched
Manager->>Manager: _cleanup_session(context_id)
Manager->>Client: create (async) via _create_stdio/_create_sse
Client->>Server: connect
Server-->>Client: connected/ready
Client-->>Manager: session handle
else healthy
Manager-->>Caller: existing session handle
end
else no session
Manager->>Client: create (async) via _create_stdio/_create_sse
Client->>Server: connect
Server-->>Client: connected/ready
Client-->>Manager: session handle
Manager-->>Caller: new session handle
end
sequenceDiagram
autonumber
participant Tooling as Tooling (update_tools)
participant Util as util.create_input_schema_from_json_schema
Note over Tooling,Util: Build Pydantic model from tool inputSchema
Tooling->>Util: schema dict ($defs, $ref, anyOf, defaults)
Util-->>Tooling: Pydantic BaseModel subclass
Tooling->>Tooling: assemble StructuredTool with arg schema
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/lfx/src/lfx/base/mcp/util.py (2)
838-839
: Fix invalid isinstance() unions (runtime TypeError).Using PEP 604 unions inside isinstance causes TypeError at runtime. Replace with tuples.
- is_timeout_error = isinstance(e, asyncio.TimeoutError | TimeoutError) + is_timeout_error = isinstance(e, (asyncio.TimeoutError, TimeoutError)) - if ( - isinstance(e, ConnectionError | TimeoutError | OSError | ValueError) + if ( + isinstance(e, (ConnectionError, TimeoutError, OSError, ValueError)) or is_closed_resource_error or is_mcp_connection_error or is_timeout_error ):Also applies to: 869-873
1125-1125
: Fix invalid isinstance() unions in SSE client too.Same issue as above; switch to tuples.
- is_timeout_error = isinstance(e, asyncio.TimeoutError | TimeoutError) + is_timeout_error = isinstance(e, (asyncio.TimeoutError, TimeoutError)) - if ( - isinstance(e, ConnectionError | TimeoutError | OSError | ValueError) + if ( + isinstance(e, (ConnectionError, TimeoutError, OSError, ValueError)) or is_closed_resource_error or is_mcp_connection_error or is_timeout_error ):Also applies to: 1156-1160
🧹 Nitpick comments (9)
src/lfx/src/lfx/base/mcp/util.py (9)
206-217
: Guard against cyclic $ref chains.resolve_ref can loop indefinitely on cyclic refs. Track seen refs and break on cycles.
- def resolve_ref(s: dict[str, Any] | None) -> dict[str, Any]: + def resolve_ref(s: dict[str, Any] | None) -> dict[str, Any]: """Follow a $ref chain until you land on a real subschema.""" if s is None: return {} - while "$ref" in s: - ref_name = s["$ref"].split("/")[-1] - s = defs.get(ref_name) + seen: set[str] = set() + while "$ref" in s: + ref_name = s["$ref"].split("/")[-1] + if ref_name in seen: + logger.warning(f"Cyclic $ref detected at '{ref_name}'") + return {"type": "string"} + seen.add(ref_name) + s = defs.get(ref_name) if s is None: logger.warning(f"Parsing input schema: Definition '{ref_name}' not found") return {"type": "string"} return s
936-943
: Handle None URL early in validate_url.urlparse(None) raises TypeError; return a friendly error instead.
- async def validate_url(self, url: str | None) -> tuple[bool, str]: + async def validate_url(self, url: str | None) -> tuple[bool, str]: """Validate the SSE URL before attempting connection.""" - try: + if not url: + return False, "URL is required." + try: parsed = urlparse(url)
979-987
: Broaden redirect handling for SSE pre-check.Also follow 301/302/308 to surface the final URL.
- if response.status_code == httpx.codes.TEMPORARY_REDIRECT: + if response.status_code in ( + httpx.codes.MOVED_PERMANENTLY, # 301 + httpx.codes.FOUND, # 302 + httpx.codes.TEMPORARY_REDIRECT, # 307 + httpx.codes.PERMANENT_REDIRECT, # 308 + ): return response.headers.get("Location", url)
152-159
: Avoid run_until_complete in potentially running loops.This can raise "This event loop is already running" in async environments. Use asyncio.run when no loop; otherwise offload to a worker thread.
- try: - loop = asyncio.get_event_loop() - return loop.run_until_complete(client.run_tool(tool_name, arguments=validated.model_dump())) + try: + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # No running loop + return asyncio.run(client.run_tool(tool_name, arguments=validated.model_dump())) + else: + # Running loop: execute in a separate thread with its own loop + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex: + fut = ex.submit(asyncio.run, client.run_tool(tool_name, arguments=validated.model_dump())) + return fut.result()
55-56
: Deduplicate emoji range in regex.The block U+2702–U+27B0 is listed twice; drop the duplicate.
- "\U00002702-\U000027b0" - "\U00002702-\U000027b0" + "\U00002702-\U000027b0"
178-191
: Sanitize the input name too for symmetry.You sanitize stored flow names but not the input; sanitize both to avoid mismatches.
async def get_flow_snake_case(flow_name: str, user_id: str, session, is_action: bool | None = None) -> Flow | None: - uuid_user_id = UUID(user_id) if isinstance(user_id, str) else user_id + uuid_user_id = UUID(user_id) if isinstance(user_id, str) else user_id + flow_name = sanitize_mcp_name(flow_name)
578-581
: Fix shutdown log message."Message is shutting down" → "Session is shutting down".
- # Session is being shut down - msg = "Message is shutting down" + # Session is being shut down + msg = "Session is shutting down"Also applies to: 641-644
25-26
: Remove unused constant.HTTP_ERROR_STATUS_CODE is declared but unused.
-HTTP_ERROR_STATUS_CODE = httpx_codes.BAD_REQUEST # HTTP status code for client errors
511-526
: Consider relying solely on connectivity test; private _write_stream is brittle.Accessing session._write_stream is an internal detail and may break across MCP/anyio versions. The connectivity probe already gates reuse.
Would you like me to gate reuse only on task.done() + connectivity check and drop the private stream inspection?
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/lfx/src/lfx/base/mcp/util.py
(33 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/lfx/src/lfx/base/mcp/util.py (3)
src/backend/tests/conftest.py (1)
flow
(544-560)src/backend/base/langflow/services/database/models/flow/model.py (1)
Flow
(186-212)src/backend/base/langflow/services/deps.py (1)
get_settings_service
(111-124)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Update Starter Projects
- GitHub Check: Run Ruff Check and Format
- GitHub Check: Ruff Style Check (3.13)
🔇 Additional comments (2)
src/lfx/src/lfx/base/mcp/util.py (2)
783-786
: Validate call sites after async signature change — get_session is now async with (context_id, params, type).
Ensure all callers were updated (no remaining sync calls or old-signature usages).
Locations: src/lfx/src/lfx/base/mcp/util.py:783–786 and 1052–1055.
Verification attempt failed: ripgrep errored with "unrecognized file type: python". Re-run searches:rg -nP '\.get_session\(' -C2 rg -nP 'validate_url\(' -C2 rg -nP 'pre_check_redirect\(' -C2 # fallback if rg type handling is broken: grep -RIn --include='*.py' '\.get_session(' .
180-181
: No action needed –Flow.is_component
is present on the Flow model.
Theis_component
field is declared in src/backend/base/langflow/services/database/models/flow/model.py (line 42 on FlowBase and line 237 on Flow).
if "anyOf" in s: | ||
# Handle common pattern for nullable types (anyOf with string and null) | ||
subtypes = [sub.get("type") for sub in s["anyOf"] if isinstance(sub, dict) and "type" in sub] | ||
|
||
# Check if this is a simple nullable type (e.g., str | None) | ||
if len(subtypes) == NULLABLE_TYPE_LENGTH and "null" in subtypes: | ||
# Get the non-null type | ||
non_null_type = next(t for t in subtypes if t != "null") | ||
# Map it to Python type | ||
if isinstance(non_null_type, str): | ||
return { | ||
"string": str, | ||
"integer": int, | ||
"number": float, | ||
"boolean": bool, | ||
"object": dict, | ||
"array": list, | ||
}.get(non_null_type, Any) | ||
return Any | ||
|
||
# For other anyOf cases, use the first non-null type | ||
subtypes = [parse_type(sub) for sub in s["anyOf"]] | ||
non_null_types = [t for t in subtypes if t is not None and t is not type(None)] | ||
if non_null_types: | ||
return non_null_types[0] | ||
return str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honor nullable fields even when required (JSON Schema anyOf with null).
Required-but-nullable properties (anyOf including "null") currently become non-nullable, causing validation failures for None. Make parse_type return T|None for simple nullable unions; this preserves nullability independent of "required".
- if "anyOf" in s:
- # Handle common pattern for nullable types (anyOf with string and null)
- subtypes = [sub.get("type") for sub in s["anyOf"] if isinstance(sub, dict) and "type" in sub]
-
- # Check if this is a simple nullable type (e.g., str | None)
- if len(subtypes) == NULLABLE_TYPE_LENGTH and "null" in subtypes:
- # Get the non-null type
- non_null_type = next(t for t in subtypes if t != "null")
- # Map it to Python type
- if isinstance(non_null_type, str):
- return {
- "string": str,
- "integer": int,
- "number": float,
- "boolean": bool,
- "object": dict,
- "array": list,
- }.get(non_null_type, Any)
- return Any
-
- # For other anyOf cases, use the first non-null type
- subtypes = [parse_type(sub) for sub in s["anyOf"]]
- non_null_types = [t for t in subtypes if t is not None and t is not type(None)]
- if non_null_types:
- return non_null_types[0]
- return str
+ if "anyOf" in s:
+ # Resolve refs first
+ variants = [resolve_ref(sub) for sub in s["anyOf"] if isinstance(sub, dict)]
+ # Simple nullable: exactly one "null" + one non-null schema
+ if len(variants) == NULLABLE_TYPE_LENGTH and any(v.get("type") == "null" for v in variants):
+ non_null_schema = next((v for v in variants if v.get("type") != "null"), {})
+ base = parse_type(non_null_schema)
+ return base | None
+ # Fallback: prefer first non-null parsed type
+ parsed = [parse_type(v) for v in variants]
+ for t in parsed:
+ if t is not None and t is not type(None):
+ return t
+ return str
Also applies to: 320-328
|
…ctions
Remove complex session reuse logic, cleanup tasks, and reference counting from MCPSessionManager. Move
create_input_schema_from_json_schema function into util.py
Summary by CodeRabbit
New Features
Refactor
Style