Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
93868eb
Changed SSE to Streamable_HTTP implementation on backend
lucaseduoli Sep 15, 2025
f869703
Changed SSE to Streamable HTTP on frontend
lucaseduoli Sep 15, 2025
b336bbc
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 7, 2025
cfa2958
Added support for SSE as a fallback
lucaseduoli Oct 7, 2025
0ceb91b
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 7, 2025
fbb46a2
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Oct 7, 2025
3f2189d
added /sse
lucaseduoli Oct 7, 2025
6e05c86
updated code
lucaseduoli Oct 7, 2025
1da771b
created test to test streamable http
lucaseduoli Oct 7, 2025
53230ca
Test SSE fallback on HTTP/SSE tab
lucaseduoli Oct 7, 2025
ee7dc90
Changed nvidia remix starter project
lucaseduoli Oct 7, 2025
9942282
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 7, 2025
f76fa05
Added changes for backwards compatibility
lucaseduoli Oct 7, 2025
f335063
fixed issues on mcp util
lucaseduoli Oct 7, 2025
eb3b2bf
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 7, 2025
610a936
Fixed tests to have a helper to initialize server, with health check
lucaseduoli Oct 7, 2025
8fc478c
updated package lock
lucaseduoli Oct 7, 2025
35f1d47
fixed backend tests
lucaseduoli Oct 7, 2025
433ca14
fixed backend test
lucaseduoli Oct 7, 2025
96d8066
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 7, 2025
d241380
format imports
lucaseduoli Oct 7, 2025
0f30857
fixed backend test
lucaseduoli Oct 7, 2025
cd1bc63
fixed frontend tests
lucaseduoli Oct 7, 2025
075de85
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 7, 2025
70c38f5
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Oct 7, 2025
321fab5
changed http to streamable http
lucaseduoli Oct 7, 2025
6db08a5
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 7, 2025
e22fc7b
fixed mcp util test
lucaseduoli Oct 7, 2025
987777f
fixed test
lucaseduoli Oct 7, 2025
3675b31
ruff fix
lucaseduoli Oct 7, 2025
bf778cc
wait timeout on mcp server test
lucaseduoli Oct 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"devDependencies": {
"@types/node": "^24.7.0"
}
}

Large diffs are not rendered by default.

110 changes: 39 additions & 71 deletions src/backend/tests/unit/base/mcp/test_mcp_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@

import pytest
from lfx.base.mcp import util
from lfx.base.mcp.util import MCPSessionManager, MCPSseClient, MCPStdioClient, _process_headers, validate_headers
from lfx.base.mcp.util import (
MCPSessionManager,
MCPSseClient,
MCPStdioClient,
MCPStreamableHttpClient,
_process_headers,
validate_headers,
)


class TestMCPSessionManager:
Expand Down Expand Up @@ -186,11 +193,11 @@ def test_validate_headers_injection_attempts(self):
assert result == {"safe-header": "safe-value"}


class TestSSEHeaderIntegration:
"""Integration test to verify headers are properly passed through the entire SSE flow."""
class TestStreamableHTTPHeaderIntegration:
"""Integration test to verify headers are properly passed through the entire StreamableHTTP flow."""

async def test_headers_processing(self):
"""Test that headers flow properly from server config through to SSE client connection."""
"""Test that headers flow properly from server config through to StreamableHTTP client connection."""
# Test the header processing function directly
headers_input = [
{"key": "Authorization", "value": "Bearer test-token"},
Expand Down Expand Up @@ -231,24 +238,24 @@ async def test_headers_processing(self):
result = _process_headers(invalid_headers)
assert result == {"valid-header": "good"}

async def test_sse_client_header_storage(self):
async def test_streamable_http_client_header_storage(self):
"""Test that SSE client properly stores headers in connection params."""
sse_client = MCPSseClient()
streamable_http_client = MCPStreamableHttpClient()
test_url = "http://test.url"
test_headers = {"Authorization": "Bearer test123", "Custom": "value"}

# Test that headers are properly stored in connection params
# Set connection params as a dict like the implementation expects
sse_client._connection_params = {
streamable_http_client._connection_params = {
"url": test_url,
"headers": test_headers,
"timeout_seconds": 30,
"sse_read_timeout_seconds": 30,
}

# Verify headers are stored
assert sse_client._connection_params["url"] == test_url
assert sse_client._connection_params["headers"] == test_headers
assert streamable_http_client._connection_params["url"] == test_url
assert streamable_http_client._connection_params["headers"] == test_headers


class TestFieldNameConversion:
Expand Down Expand Up @@ -930,22 +937,22 @@ async def test_session_reuse(self, stdio_client):
await stdio_client.disconnect()


class TestMCPSseClientWithDeepWikiServer:
class TestMCPStreamableHttpClientWithDeepWikiServer:
"""Test MCPSseClient with the DeepWiki MCP server."""

@pytest.fixture
def sse_client(self):
def streamable_http_client(self):
"""Create an SSE client for testing."""
return MCPSseClient()
return MCPStreamableHttpClient()

@pytest.mark.asyncio
async def test_connect_to_deepwiki_server(self, sse_client):
async def test_connect_to_deepwiki_server(self, streamable_http_client):
"""Test connecting to the DeepWiki MCP server."""
url = "https://mcp.deepwiki.com/sse"

try:
# Connect to the server
tools = await sse_client.connect_to_server(url)
tools = await streamable_http_client.connect_to_server(url)

# Verify tools were returned
assert len(tools) > 0
Expand All @@ -961,16 +968,16 @@ async def test_connect_to_deepwiki_server(self, sse_client):
# If the server is not accessible, skip the test
pytest.skip(f"DeepWiki server not accessible: {e}")
finally:
await sse_client.disconnect()
await streamable_http_client.disconnect()

@pytest.mark.asyncio
async def test_run_wiki_structure_tool(self, sse_client):
async def test_run_wiki_structure_tool(self, streamable_http_client):
"""Test running the read_wiki_structure tool."""
url = "https://mcp.deepwiki.com/sse"

try:
# Connect to the server
tools = await sse_client.connect_to_server(url)
tools = await streamable_http_client.connect_to_server(url)

# Find the read_wiki_structure tool
wiki_tool = None
Expand All @@ -982,7 +989,7 @@ async def test_run_wiki_structure_tool(self, sse_client):
assert wiki_tool is not None, "read_wiki_structure tool not found"

# Run the tool with a test repository (use repoName as expected by the API)
result = await sse_client.run_tool("read_wiki_structure", {"repoName": "microsoft/vscode"})
result = await streamable_http_client.run_tool("read_wiki_structure", {"repoName": "microsoft/vscode"})

# Verify the result
assert result is not None
Expand All @@ -993,16 +1000,16 @@ async def test_run_wiki_structure_tool(self, sse_client):
# If the server is not accessible or the tool fails, skip the test
pytest.skip(f"DeepWiki server test failed: {e}")
finally:
await sse_client.disconnect()
await streamable_http_client.disconnect()

@pytest.mark.asyncio
async def test_ask_question_tool(self, sse_client):
async def test_ask_question_tool(self, streamable_http_client):
"""Test running the ask_question tool."""
url = "https://mcp.deepwiki.com/sse"

try:
# Connect to the server
tools = await sse_client.connect_to_server(url)
tools = await streamable_http_client.connect_to_server(url)

# Find the ask_question tool
ask_tool = None
Expand All @@ -1014,7 +1021,7 @@ async def test_ask_question_tool(self, sse_client):
assert ask_tool is not None, "ask_question tool not found"

# Run the tool with a test question (use repoName as expected by the API)
result = await sse_client.run_tool(
result = await streamable_http_client.run_tool(
"ask_question", {"repoName": "microsoft/vscode", "question": "What is VS Code?"}
)

Expand All @@ -1027,14 +1034,14 @@ async def test_ask_question_tool(self, sse_client):
# If the server is not accessible or the tool fails, skip the test
pytest.skip(f"DeepWiki server test failed: {e}")
finally:
await sse_client.disconnect()
await streamable_http_client.disconnect()

@pytest.mark.asyncio
async def test_url_validation(self, sse_client):
async def test_url_validation(self, streamable_http_client):
"""Test URL validation for SSE connections."""
# Test valid URL
valid_url = "https://mcp.deepwiki.com/sse"
is_valid, error = await sse_client.validate_url(valid_url)
is_valid, error = await streamable_http_client.validate_url(valid_url)
# Either valid or accessible, or rate-limited (429) which indicates server is reachable
if not is_valid and "429" in error:
# Rate limiting indicates the server is accessible but limiting requests
Expand All @@ -1044,29 +1051,10 @@ async def test_url_validation(self, sse_client):

# Test invalid URL
invalid_url = "not_a_url"
is_valid, error = await sse_client.validate_url(invalid_url)
is_valid, error = await streamable_http_client.validate_url(invalid_url)
assert not is_valid
assert error != ""

@pytest.mark.asyncio
async def test_redirect_handling(self, sse_client):
"""Test redirect handling for SSE connections."""
# Test with the DeepWiki URL
url = "https://mcp.deepwiki.com/sse"

try:
# Check for redirects
final_url = await sse_client.pre_check_redirect(url)

# Should return a URL (either original or redirected)
assert final_url is not None
assert isinstance(final_url, str)
assert final_url.startswith("http")

except Exception as e:
# If the server is not accessible, skip the test
pytest.skip(f"DeepWiki server not accessible for redirect test: {e}")

@pytest.fixture
def mock_tool(self):
"""Create a mock MCP tool."""
Expand Down Expand Up @@ -1117,14 +1105,14 @@ async def test_validate_url_valid(self, sse_client):
mock_response.status_code = 200
mock_client.return_value.__aenter__.return_value.get.return_value = mock_response

is_valid, error_msg = await sse_client.validate_url("http://test.url", {})
is_valid, error_msg = await sse_client.validate_url("http://test.url")

assert is_valid is True
assert error_msg == ""

async def test_validate_url_invalid_format(self, sse_client):
"""Test URL validation with invalid format."""
is_valid, error_msg = await sse_client.validate_url("invalid-url", {})
is_valid, error_msg = await sse_client.validate_url("invalid-url")

assert is_valid is False
assert "Invalid URL format" in error_msg
Expand All @@ -1136,7 +1124,7 @@ async def test_validate_url_with_404_response(self, sse_client):
mock_response.status_code = 404
mock_client.return_value.__aenter__.return_value.get.return_value = mock_response

is_valid, error_msg = await sse_client.validate_url("http://test.url", {})
is_valid, error_msg = await sse_client.validate_url("http://test.url")

assert is_valid is True
assert error_msg == ""
Expand All @@ -1149,7 +1137,6 @@ async def test_connect_to_server_with_headers(self, sse_client):

with (
patch.object(sse_client, "validate_url", return_value=(True, "")),
patch.object(sse_client, "pre_check_redirect", return_value=test_url),
patch.object(sse_client, "_get_or_create_session") as mock_get_session,
):
# Mock session
Expand Down Expand Up @@ -1194,29 +1181,10 @@ async def test_headers_passed_to_session_manager(self, sse_client):
result_session = await sse_client._get_or_create_session()

# Verify session manager was called with correct parameters including normalized headers
mock_manager.get_session.assert_called_once_with("test_context", sse_client._connection_params, "sse")
assert result_session == mock_session

async def test_pre_check_redirect_with_headers(self, sse_client):
"""Test pre-check redirect functionality with custom headers."""
test_url = "http://test.url"
redirect_url = "http://redirect.url"
# Use pre-validated headers since pre_check_redirect expects already validated headers
test_headers = {"authorization": "Bearer token123"} # already normalized

with patch("httpx.AsyncClient") as mock_client:
mock_response = MagicMock()
mock_response.status_code = 307
mock_response.headers.get.return_value = redirect_url
mock_client.return_value.__aenter__.return_value.get.return_value = mock_response

result = await sse_client.pre_check_redirect(test_url, test_headers)

assert result == redirect_url
# Verify validated headers were passed to the request
mock_client.return_value.__aenter__.return_value.get.assert_called_with(
test_url, timeout=2.0, headers={"Accept": "text/event-stream", **test_headers}
mock_manager.get_session.assert_called_once_with(
"test_context", sse_client._connection_params, "streamable_http"
)
assert result_session == mock_session

async def test_run_tool_with_retry_on_connection_error(self, sse_client):
"""Test that run_tool retries on connection errors."""
Expand Down
32 changes: 16 additions & 16 deletions src/backend/tests/unit/components/data/test_mcp_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

This test suite validates the MCP component functionality using real MCP servers:
- Everything server (stdio mode) - provides echo and other tools
- DeepWiki server (SSE mode) - provides wiki-related tools
- HTTP/SSE servers (streamable HTTP mode) - provides various tools
"""

import shutil
from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from lfx.base.mcp.util import MCPSessionManager, MCPSseClient, MCPStdioClient
from lfx.base.mcp.util import MCPSessionManager, MCPStdioClient, MCPStreamableHttpClient
from lfx.components.agents.mcp_component import MCPToolsComponent

from tests.base import ComponentTestBaseWithoutClient, VersionComponentMapping
Expand Down Expand Up @@ -45,9 +45,9 @@ async def test_component_initialization(self, component_class, default_kwargs):

# Check that the component has the expected attributes
assert hasattr(component, "stdio_client")
assert hasattr(component, "sse_client")
assert hasattr(component, "streamable_http_client")
assert isinstance(component.stdio_client, MCPStdioClient)
assert isinstance(component.sse_client, MCPSseClient)
assert isinstance(component.streamable_http_client, MCPStreamableHttpClient)

# Check that the component has a session manager
session_manager = component.stdio_client._get_session_manager()
Expand Down Expand Up @@ -87,11 +87,11 @@ async def test_stdio_mode_integration(self, component):
pytest.skip(f"Everything server not accessible: {e}")

@pytest.mark.asyncio
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need not remove sse yet . lets get the streamable http first and later we can give notice and then deprecate the SSE.

async def test_sse_mode_integration(self, component):
"""Test the component in SSE mode with DeepWiki server."""
# Configure for SSE mode
component.mode = "SSE"
component.sse_url = "https://mcp.deepwiki.com/sse"
async def test_streamable_http_mode_integration(self, component):
"""Test the component in Streamable HTTP mode with DeepWiki server."""
# Configure for Streamable HTTP mode
component.mode = "Streamable HTTP"
component.streamable_http_url = "https://mcp.deepwiki.com/mcp"

try:
# Mock the update_tool_list method to simulate server connection
Expand All @@ -111,27 +111,27 @@ async def test_sse_mode_integration(self, component):
@pytest.mark.asyncio
async def test_session_context_setting(self, component):
"""Test that session context is properly set."""
# Set session context
# Set session context on both clients
component.stdio_client.set_session_context("test_context")
component.sse_client.set_session_context("test_context")
component.streamable_http_client.set_session_context("test_context")

# Verify context was set
assert component.stdio_client._session_context == "test_context"
assert component.sse_client._session_context == "test_context"
assert component.streamable_http_client._session_context == "test_context"

@pytest.mark.asyncio
async def test_session_manager_sharing(self, component):
"""Test that session managers are shared through component cache."""
# Get session managers
# Get session managers from both clients
stdio_manager = component.stdio_client._get_session_manager()
sse_manager = component.sse_client._get_session_manager()
http_manager = component.streamable_http_client._get_session_manager()

# Both should be MCPSessionManager instances
assert isinstance(stdio_manager, MCPSessionManager)
assert isinstance(sse_manager, MCPSessionManager)
assert isinstance(http_manager, MCPSessionManager)

# They should be the same instance (shared through cache)
assert stdio_manager is sse_manager
assert stdio_manager is http_manager


class TestMCPComponentErrorHandling:
Expand Down
Loading
Loading