From 355f26cc1b98e171bd2404a0e1673183118153cc Mon Sep 17 00:00:00 2001 From: Benedict Date: Sun, 17 Aug 2025 22:40:35 +0800 Subject: [PATCH 01/17] Added multi-tenancy support --- mcp_clickhouse/mcp_env.py | 218 +++++++++++++++++++++----------------- 1 file changed, 121 insertions(+), 97 deletions(-) diff --git a/mcp_clickhouse/mcp_env.py b/mcp_clickhouse/mcp_env.py index 40c0424..a720a12 100644 --- a/mcp_clickhouse/mcp_env.py +++ b/mcp_clickhouse/mcp_env.py @@ -1,4 +1,4 @@ -"""Environment configuration for the MCP ClickHouse server. +"""Environment configuration for the MCP ClickHouse server with Multi-Tenancy support. This module handles all environment variable configuration with sensible defaults and type conversion. @@ -6,7 +6,7 @@ from dataclasses import dataclass import os -from typing import Optional +from typing import Optional, Dict, List from enum import Enum @@ -30,29 +30,40 @@ class ClickHouseConfig: This class handles all environment variable configuration with sensible defaults and type conversion. It provides typed methods for accessing each configuration value. - Required environment variables (only when CLICKHOUSE_ENABLED=true): - CLICKHOUSE_HOST: The hostname of the ClickHouse server - CLICKHOUSE_USER: The username for authentication - CLICKHOUSE_PASSWORD: The password for authentication + Required environment variables (only when CH__CLICKHOUSE_ENABLED=true): + CH__CLICKHOUSE_HOST: The hostname of the ClickHouse server + CH__CLICKHOUSE_USER: The username for authentication + CH__CLICKHOUSE_PASSWORD: The password for authentication Optional environment variables (with defaults): - CLICKHOUSE_PORT: The port number (default: 8443 if secure=True, 8123 if secure=False) - CLICKHOUSE_SECURE: Enable HTTPS (default: true) - CLICKHOUSE_VERIFY: Verify SSL certificates (default: true) - CLICKHOUSE_CONNECT_TIMEOUT: Connection timeout in seconds (default: 30) - CLICKHOUSE_SEND_RECEIVE_TIMEOUT: Send/receive timeout in seconds (default: 300) - CLICKHOUSE_DATABASE: Default database to use (default: None) - CLICKHOUSE_PROXY_PATH: Path to be added to the host URL. For instance, for servers behind an HTTP proxy (default: None) - CLICKHOUSE_MCP_SERVER_TRANSPORT: MCP server transport method - "stdio", "http", or "sse" (default: stdio) - CLICKHOUSE_MCP_BIND_HOST: Host to bind the MCP server to when using HTTP or SSE transport (default: 127.0.0.1) - CLICKHOUSE_MCP_BIND_PORT: Port to bind the MCP server to when using HTTP or SSE transport (default: 8000) - CLICKHOUSE_ENABLED: Enable ClickHouse server (default: true) + CH__CLICKHOUSE_PORT: The port number (default: 8443 if secure=True, 8123 if secure=False) + CH__CLICKHOUSE_SECURE: Enable HTTPS (default: true) + CH__CLICKHOUSE_VERIFY: Verify SSL certificates (default: true) + CH__CLICKHOUSE_CONNECT_TIMEOUT: Connection timeout in seconds (default: 30) + CH__CLICKHOUSE_SEND_RECEIVE_TIMEOUT: Send/receive timeout in seconds (default: 300) + CH__CLICKHOUSE_DATABASE: Default database to use (default: None) + CH__CLICKHOUSE_PROXY_PATH: Path to be added to the host URL. For instance, for servers behind an HTTP proxy (default: None) + CH__CLICKHOUSE_MCP_SERVER_TRANSPORT: MCP server transport method - "stdio", "http", or "sse" (default: stdio) + CH__CLICKHOUSE_MCP_BIND_HOST: Host to bind the MCP server to when using HTTP or SSE transport (default: 127.0.0.1) + CH__CLICKHOUSE_MCP_BIND_PORT: Port to bind the MCP server to when using HTTP or SSE transport (default: 8000) + CH__CLICKHOUSE_ENABLED: Enable ClickHouse server (default: true) """ - def __init__(self): + def __init__(self, tenant_prefix: str = ""): """Initialize the configuration from environment variables.""" + self.tenant_prefix = tenant_prefix if self.enabled: self._validate_required_vars() + + def _getenv(self, key: str, default=None, cast=str): + prefixed_key = f"CH_{self.tenant_prefix}_{key}" + val = os.getenv(prefixed_key, os.getenv(key, default)) + if val is not None and cast is not str: + try: + return cast(val) + except Exception: + raise ValueError(f"Invalid value for {prefixed_key or key}: {val}") + return val @property def enabled(self) -> bool: @@ -60,12 +71,12 @@ def enabled(self) -> bool: Default: True """ - return os.getenv("CLICKHOUSE_ENABLED", "true").lower() == "true" + return self._getenv("CLICKHOUSE_ENABLED", "true", cast=lambda v: v.lower() == "true") @property def host(self) -> str: """Get the ClickHouse host.""" - return os.environ["CLICKHOUSE_HOST"] + return self._getenv("CLICKHOUSE_HOST") @property def port(self) -> int: @@ -74,24 +85,23 @@ def port(self) -> int: Defaults to 8443 if secure=True, 8123 if secure=False. Can be overridden by CLICKHOUSE_PORT environment variable. """ - if "CLICKHOUSE_PORT" in os.environ: - return int(os.environ["CLICKHOUSE_PORT"]) - return 8443 if self.secure else 8123 + default = 8443 if self.secure else 8123 + return self._getenv("CLICKHOUSE_PORT", default, cast=int) @property def username(self) -> str: """Get the ClickHouse username.""" - return os.environ["CLICKHOUSE_USER"] + return self._getenv("CLICKHOUSE_USER") @property def password(self) -> str: """Get the ClickHouse password.""" - return os.environ["CLICKHOUSE_PASSWORD"] + return self._getenv("CLICKHOUSE_PASSWORD") @property def database(self) -> Optional[str]: """Get the default database name if set.""" - return os.getenv("CLICKHOUSE_DATABASE") + return self._getenv("CLICKHOUSE_DATABASE") @property def secure(self) -> bool: @@ -99,7 +109,7 @@ def secure(self) -> bool: Default: True """ - return os.getenv("CLICKHOUSE_SECURE", "true").lower() == "true" + return self._getenv("CLICKHOUSE_SECURE", "true", cast=lambda v: v.lower() == "true") @property def verify(self) -> bool: @@ -107,7 +117,7 @@ def verify(self) -> bool: Default: True """ - return os.getenv("CLICKHOUSE_VERIFY", "true").lower() == "true" + return self._getenv("CLICKHOUSE_VERIFY", "true", cast=lambda v: v.lower() == "true") @property def connect_timeout(self) -> int: @@ -115,7 +125,7 @@ def connect_timeout(self) -> int: Default: 30 """ - return int(os.getenv("CLICKHOUSE_CONNECT_TIMEOUT", "30")) + return self._getenv("CLICKHOUSE_CONNECT_TIMEOUT", 30, cast=int) @property def send_receive_timeout(self) -> int: @@ -123,44 +133,11 @@ def send_receive_timeout(self) -> int: Default: 300 (ClickHouse default) """ - return int(os.getenv("CLICKHOUSE_SEND_RECEIVE_TIMEOUT", "300")) + return self._getenv("CLICKHOUSE_SEND_RECEIVE_TIMEOUT", 300, cast=int) @property - def proxy_path(self) -> str: - return os.getenv("CLICKHOUSE_PROXY_PATH") - - @property - def mcp_server_transport(self) -> str: - """Get the MCP server transport method. - - Valid options: "stdio", "http", "sse" - Default: "stdio" - """ - transport = os.getenv("CLICKHOUSE_MCP_SERVER_TRANSPORT", TransportType.STDIO.value).lower() - - # Validate transport type - if transport not in TransportType.values(): - valid_options = ", ".join(f'"{t}"' for t in TransportType.values()) - raise ValueError(f"Invalid transport '{transport}'. Valid options: {valid_options}") - return transport - - @property - def mcp_bind_host(self) -> str: - """Get the host to bind the MCP server to. - - Only used when transport is "http" or "sse". - Default: "127.0.0.1" - """ - return os.getenv("CLICKHOUSE_MCP_BIND_HOST", "127.0.0.1") - - @property - def mcp_bind_port(self) -> int: - """Get the port to bind the MCP server to. - - Only used when transport is "http" or "sse". - Default: 8000 - """ - return int(os.getenv("CLICKHOUSE_MCP_BIND_PORT", "8000")) + def proxy_path(self) -> Optional[str]: + return self._getenv("CLICKHOUSE_PROXY_PATH") def get_client_config(self) -> dict: """Get the configuration dictionary for clickhouse_connect client. @@ -177,7 +154,7 @@ def get_client_config(self) -> dict: "verify": self.verify, "connect_timeout": self.connect_timeout, "send_receive_timeout": self.send_receive_timeout, - "client_name": "mcp_clickhouse", + "client_name": f"{self.tenant_prefix}mcp_clickhouse", } # Add optional database if set @@ -197,11 +174,10 @@ def _validate_required_vars(self) -> None: """ missing_vars = [] for var in ["CLICKHOUSE_HOST", "CLICKHOUSE_USER", "CLICKHOUSE_PASSWORD"]: - if var not in os.environ: + if not self._getenv(var): missing_vars.append(var) - if missing_vars: - raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}") + raise ValueError(f"Missing required environment variables for tenant '{self.tenant_prefix}': {', '.join(missing_vars)}") @dataclass @@ -215,23 +191,34 @@ class ChDBConfig: CHDB_DATA_PATH: The path to the chDB data directory (only required if CHDB_ENABLED=true) """ - def __init__(self): + def __init__(self, tenant_prefix: str = ""): """Initialize the configuration from environment variables.""" + self.tenant_prefix = tenant_prefix if self.enabled: self._validate_required_vars() + def _getenv(self, key: str, default=None, cast=str): + prefixed_key = f"CH_{self.tenant_prefix}_{key}" + val = os.getenv(prefixed_key, os.getenv(key, default)) + if val is not None and cast is not str: + try: + return cast(val) + except Exception: + raise ValueError(f"Invalid value for {prefixed_key or key}: {val}") + return val + @property def enabled(self) -> bool: """Get whether chDB is enabled. Default: False """ - return os.getenv("CHDB_ENABLED", "false").lower() == "true" + return self._getenv("CHDB_ENABLED", "false", cast=lambda v: v.lower() == "true") @property def data_path(self) -> str: """Get the chDB data path.""" - return os.getenv("CHDB_DATA_PATH", ":memory:") + return self._getenv("CHDB_DATA_PATH", ":memory:") def get_client_config(self) -> dict: """Get the configuration dictionary for chDB client. @@ -251,33 +238,70 @@ def _validate_required_vars(self) -> None: """ pass - -# Global instance placeholders for the singleton pattern -_CONFIG_INSTANCE = None -_CHDB_CONFIG_INSTANCE = None - - -def get_config(): +def get_mcp_config() -> dict: """ - Gets the singleton instance of ClickHouseConfig. - Instantiates it on the first call. + Get the MCP server configuration from environment variables. """ - global _CONFIG_INSTANCE - if _CONFIG_INSTANCE is None: - # Instantiate the config object here, ensuring load_dotenv() has likely run - _CONFIG_INSTANCE = ClickHouseConfig() - return _CONFIG_INSTANCE + # Global MCP transport config (single for all tenants) + MCP_TRANSPORT = os.getenv("CLICKHOUSE_MCP_SERVER_TRANSPORT", TransportType.STDIO.value).lower() + if MCP_TRANSPORT not in TransportType.values(): + raise ValueError(f"Invalid MCP transport '{MCP_TRANSPORT}'. Valid options: {TransportType.values()}") + MCP_BIND_HOST = os.getenv("CLICKHOUSE_MCP_BIND_HOST", "127.0.0.1") + MCP_BIND_PORT = int(os.getenv("CLICKHOUSE_MCP_BIND_PORT", 8000)) -def get_chdb_config() -> ChDBConfig: - """ - Gets the singleton instance of ChDBConfig. - Instantiates it on the first call. + return { + "mcp_server_transport": MCP_TRANSPORT, + "mcp_bind_host": MCP_BIND_HOST, + "mcp_bind_port": MCP_BIND_PORT, + } - Returns: - ChDBConfig: The chDB configuration instance - """ - global _CHDB_CONFIG_INSTANCE - if _CHDB_CONFIG_INSTANCE is None: - _CHDB_CONFIG_INSTANCE = ChDBConfig() - return _CHDB_CONFIG_INSTANCE +# Global instance placeholders for the singleton pattern +_CLICKHOUSE_TENANTS: Dict[str, ClickHouseConfig] = {} +_CHDB_TENANTS: Dict[str, ChDBConfig] = {} + +def load_clickhouse_configs() -> Dict[str, ClickHouseConfig]: + global _CLICKHOUSE_TENANTS + for key in os.environ: + if key.endswith("CLICKHOUSE_HOST") and key.startswith("CH_"): + # CH__CLICKHOUSE_HOST + tenant_prefix = key[len("CH_"): -len("_CLICKHOUSE_HOST")] + _CLICKHOUSE_TENANTS[tenant_prefix] = ClickHouseConfig(tenant_prefix=tenant_prefix) + if not _CLICKHOUSE_TENANTS and "CLICKHOUSE_HOST" in os.environ: + _CLICKHOUSE_TENANTS["default"] = ClickHouseConfig(tenant_prefix="") + + return _CLICKHOUSE_TENANTS + +def load_chdb_configs() -> Dict[str, ChDBConfig]: + global _CHDB_TENANTS + for key in os.environ: + if key.endswith("CHDB_DATA_PATH") and key.startswith("CH_"): + # CH__CLICKHOUSE_HOST + tenant_prefix = key[len("CH_"): -len("_CHDB_DATA_PATH")] + _CHDB_TENANTS[tenant_prefix] = ChDBConfig(tenant_prefix=tenant_prefix) + if not _CHDB_TENANTS and "CHDB_DATA_PATH" in os.environ: + _CHDB_TENANTS["default"] = ChDBConfig(tenant_prefix="") + return _CHDB_TENANTS + +def get_config(tenant: str = "default") -> ClickHouseConfig: + """Get ClickHouseConfig for a specific tenant.""" + global _CLICKHOUSE_TENANTS + + # Check for tenant in the global config map + if tenant not in _CLICKHOUSE_TENANTS: + raise ValueError(f"No ClickHouse config found for tenant '{tenant}'") + + return _CLICKHOUSE_TENANTS[tenant] + +def get_chdb_config(tenant: str = "default") -> ChDBConfig: + """Get ChDBConfig for a specific tenant.""" + global _CHDB_TENANTS + + # Check for tenant in the global config map + if tenant not in _CHDB_TENANTS: + raise ValueError(f"No ChDB config found for tenant '{tenant}'") + return _CHDB_TENANTS[tenant] + +def list_tenants() -> List[str]: + """Get list of all tenant names.""" + return [tenant for tenant in _CLICKHOUSE_TENANTS.keys()] \ No newline at end of file From 9c593be3906760ae747e7ab9acfc903302d0763f Mon Sep 17 00:00:00 2001 From: Benedict Date: Sun, 17 Aug 2025 22:41:45 +0800 Subject: [PATCH 02/17] Enable multi-tenancy for mcp methods --- mcp_clickhouse/mcp_server.py | 153 ++++++++++++++++++++--------------- 1 file changed, 89 insertions(+), 64 deletions(-) diff --git a/mcp_clickhouse/mcp_server.py b/mcp_clickhouse/mcp_server.py index 589ff2e..cef0bfe 100644 --- a/mcp_clickhouse/mcp_server.py +++ b/mcp_clickhouse/mcp_server.py @@ -17,7 +17,7 @@ from starlette.requests import Request from starlette.responses import PlainTextResponse -from mcp_clickhouse.mcp_env import get_config, get_chdb_config +from mcp_clickhouse.mcp_env import load_clickhouse_configs, load_chdb_configs, list_tenants, get_config, get_chdb_config from mcp_clickhouse.chdb_prompt import CHDB_PROMPT @@ -61,11 +61,24 @@ class Table: ) logger = logging.getLogger(MCP_SERVER_NAME) -QUERY_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=10) -atexit.register(lambda: QUERY_EXECUTOR.shutdown(wait=True)) +# List of Tenants +TENANTS = list_tenants() + +# Create a ThreadPoolExecutor per tenant +QUERY_EXECUTOR = { + tenant: concurrent.futures.ThreadPoolExecutor(max_workers=10) + for tenant in TENANTS +} + +# Ensure all executors are properly shutdown on exit +atexit.register(lambda: [executor.shutdown(wait=True) for executor in QUERY_EXECUTOR.values()]) + +# Default query timeout for selects SELECT_QUERY_TIMEOUT_SECS = 30 load_dotenv() +load_clickhouse_configs() +load_chdb_configs() mcp = FastMCP( name=MCP_SERVER_NAME, @@ -85,29 +98,40 @@ async def health_check(request: Request) -> PlainTextResponse: Returns OK if the server is running and can connect to ClickHouse. """ try: - # Check if ClickHouse is enabled by trying to create config - # If ClickHouse is disabled, this will succeed but connection will fail - clickhouse_enabled = os.getenv("CLICKHOUSE_ENABLED", "true").lower() == "true" - - if not clickhouse_enabled: - # If ClickHouse is disabled, check chDB status - chdb_config = get_chdb_config() - if chdb_config.enabled: - return PlainTextResponse("OK - MCP server running with chDB enabled") - else: - # Both ClickHouse and chDB are disabled - this is an error - return PlainTextResponse( - "ERROR - Both ClickHouse and chDB are disabled. At least one must be enabled.", - status_code=503, - ) - - # Try to create a client connection to verify ClickHouse connectivity - client = create_clickhouse_client() - version = client.server_version - return PlainTextResponse(f"OK - Connected to ClickHouse {version}") + reports = [] + + for tenant in TENANTS: + tenant_report = f"Tenant '{tenant}': " + + # Check if ClickHouse is enabled by trying to create config + # If ClickHouse is disabled, this will succeed but connection will fail + try: + clickhouse_config = get_config(tenant) + if clickhouse_config.enabled: + client = create_clickhouse_client(tenant) + version = client.server_version + tenant_report += f"ClickHouse OK (v{version})" + else: + tenant_report += "ClickHouse Disabled" + except Exception as e: + tenant_report += f"ClickHouse ERROR ({str(e)})" + + # Check chDB status if enabled + try: + chdb_config = get_chdb_config(tenant) + if chdb_config.enabled: + tenant_report += ", chDB OK" + else: + tenant_report += ", chDB Disabled" + except Exception as e: + tenant_report += f", chDB ERROR ({str(e)})" + + reports.append(tenant_report) + + return PlainTextResponse("\n".join(reports)) + except Exception as e: - # Return 503 Service Unavailable if we can't connect to ClickHouse - return PlainTextResponse(f"ERROR - Cannot connect to ClickHouse: {str(e)}", status_code=503) + return PlainTextResponse(f"ERROR - Health check failed: {str(e)}", status_code=503) def result_to_table(query_columns, result) -> List[Table]: @@ -128,10 +152,10 @@ def to_json(obj: Any) -> str: return obj -def list_databases(): +def list_databases(tenant: str): """List available ClickHouse databases""" logger.info("Listing all databases") - client = create_clickhouse_client() + client = create_clickhouse_client(tenant) result = client.command("SHOW DATABASES") # Convert newline-separated string to list and trim whitespace @@ -140,15 +164,15 @@ def list_databases(): else: databases = [result] - logger.info(f"Found {len(databases)} databases") + logger.info(f"Found {len(databases)} databases for tenant '{tenant}'") return json.dumps(databases) -def list_tables(database: str, like: Optional[str] = None, not_like: Optional[str] = None): +def list_tables(tenant: str, database: str, like: Optional[str] = None, not_like: Optional[str] = None): """List available ClickHouse tables in a database, including schema, comment, row count, and column count.""" - logger.info(f"Listing tables in database '{database}'") - client = create_clickhouse_client() + logger.info(f"Listing tables for tenant '{tenant}' in database '{database}'") + client = create_clickhouse_client(tenant) query = f"SELECT database, name, engine, create_table_query, dependencies_database, dependencies_table, engine_full, sorting_key, primary_key, total_rows, total_bytes, total_bytes_uncompressed, parts, active_parts, total_marks, comment FROM system.tables WHERE database = {format_query_value(database)}" if like: query += f" AND name LIKE {format_query_value(like)}" @@ -172,7 +196,7 @@ def list_tables(database: str, like: Optional[str] = None, not_like: Optional[st ) ] - logger.info(f"Found {len(tables)} tables") + logger.info(f"Found {len(tables)} tables for tenant '{tenant}'") return [asdict(table) for table in tables] @@ -188,38 +212,38 @@ def execute_query(query: str): raise ToolError(f"Query execution failed: {str(err)}") -def run_select_query(query: str): +def run_select_query(tenant: str, query: str): """Run a SELECT query in a ClickHouse database""" - logger.info(f"Executing SELECT query: {query}") + logger.info(f"Executing SELECT query for tenant '{tenant}': {query}") try: - future = QUERY_EXECUTOR.submit(execute_query, query) + future = QUERY_EXECUTOR[tenant].submit(execute_query, query) try: result = future.result(timeout=SELECT_QUERY_TIMEOUT_SECS) # Check if we received an error structure from execute_query if isinstance(result, dict) and "error" in result: - logger.warning(f"Query failed: {result['error']}") + logger.warning(f"Query failed for tenant '{tenant}': {result['error']}") # MCP requires structured responses; string error messages can cause # serialization issues leading to BrokenResourceError return { "status": "error", - "message": f"Query failed: {result['error']}", + "message": f"Query failed for tenant '{tenant}': {result['error']}", } return result except concurrent.futures.TimeoutError: - logger.warning(f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds: {query}") + logger.warning(f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds for tenant '{tenant}': {query}") future.cancel() - raise ToolError(f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds") + raise ToolError(f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds for tenant '{tenant}'") except ToolError: raise except Exception as e: - logger.error(f"Unexpected error in run_select_query: {str(e)}") - raise RuntimeError(f"Unexpected error during query execution: {str(e)}") + logger.error(f"Unexpected error in run_select_query for tenant '{tenant}': {str(e)}") + raise RuntimeError(f"Unexpected error during query execution for tenant '{tenant}': {str(e)}") -def create_clickhouse_client(): - client_config = get_config().get_client_config() +def create_clickhouse_client(tenant: str): + client_config = get_config(tenant).get_client_config() logger.info( - f"Creating ClickHouse client connection to {client_config['host']}:{client_config['port']} " + f"Creating ClickHouse client connection for tenant '{tenant}', to {client_config['host']}:{client_config['port']} " f"as {client_config['username']} " f"(secure={client_config['secure']}, verify={client_config['verify']}, " f"connect_timeout={client_config['connect_timeout']}s, " @@ -230,10 +254,10 @@ def create_clickhouse_client(): client = clickhouse_connect.get_client(**client_config) # Test the connection version = client.server_version - logger.info(f"Successfully connected to ClickHouse server version {version}") + logger.info(f"Successfully connected to ClickHouse server version {version} for tenant '{tenant}'") return client except Exception as e: - logger.error(f"Failed to connect to ClickHouse: {str(e)}") + logger.error(f"Failed to connect to ClickHouse for tenant '{tenant}': {str(e)}") raise @@ -267,10 +291,10 @@ def get_readonly_setting(client) -> str: return "1" # Default to basic read-only mode if setting isn't present -def create_chdb_client(): +def create_chdb_client(tenant: str): """Create a chDB client connection.""" if not get_chdb_config().enabled: - raise ValueError("chDB is not enabled. Set CHDB_ENABLED=true to enable it.") + raise ValueError(f"chDB is not enabled for tenant '{tenant}'. Set CHDB_ENABLED=true to enable it.") return _chdb_client @@ -297,33 +321,33 @@ def execute_chdb_query(query: str): return {"error": str(err)} -def run_chdb_select_query(query: str): +def run_chdb_select_query(tenant: str, query: str): """Run SQL in chDB, an in-process ClickHouse engine""" - logger.info(f"Executing chDB SELECT query: {query}") + logger.info(f"Executing chDB SELECT query for tenant '{tenant}': {query}") try: - future = QUERY_EXECUTOR.submit(execute_chdb_query, query) + future = QUERY_EXECUTOR[tenant].submit(execute_chdb_query, query) try: result = future.result(timeout=SELECT_QUERY_TIMEOUT_SECS) # Check if we received an error structure from execute_chdb_query if isinstance(result, dict) and "error" in result: - logger.warning(f"chDB query failed: {result['error']}") + logger.warning(f"chDB query failed for tenant '{tenant}': {result['error']}") return { "status": "error", - "message": f"chDB query failed: {result['error']}", + "message": f"chDB query failed for tenant '{tenant}': {result['error']}", } return result except concurrent.futures.TimeoutError: logger.warning( - f"chDB query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds: {query}" + f"chDB query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds for tenant '{tenant}': {query}" ) future.cancel() return { "status": "error", - "message": f"chDB query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds", + "message": f"chDB query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds for tenant '{tenant}'", } except Exception as e: - logger.error(f"Unexpected error in run_chdb_select_query: {e}") - return {"status": "error", "message": f"Unexpected error: {e}"} + logger.error(f"Unexpected error in run_chdb_select_query for tenant '{tenant}': {e}") + return {"status": "error", "message": f"Unexpected error for tenant '{tenant}': {e}"} def chdb_initial_prompt() -> str: @@ -331,25 +355,26 @@ def chdb_initial_prompt() -> str: return CHDB_PROMPT -def _init_chdb_client(): +def _init_chdb_client(tenant: str): """Initialize the global chDB client instance.""" try: - if not get_chdb_config().enabled: - logger.info("chDB is disabled, skipping client initialization") + if not get_chdb_config(tenant).enabled: + logger.info("chDB is disabled for tenant '{tenant}', skipping client initialization") return None - client_config = get_chdb_config().get_client_config() + client_config = get_chdb_config(tenant).get_client_config() data_path = client_config["data_path"] - logger.info(f"Creating chDB client with data_path={data_path}") + logger.info(f"Creating chDB client with data_path={data_path} for tenant '{tenant}'") client = chs.Session(path=data_path) - logger.info(f"Successfully connected to chDB with data_path={data_path}") + logger.info(f"Successfully connected to chDB with data_path={data_path} for tenant '{tenant}'") return client except Exception as e: - logger.error(f"Failed to initialize chDB client: {e}") + logger.error(f"Failed to initialize chDB client for tenant '{tenant}': {e}") return None # Register tools based on configuration +# For multi-tenancy, we will use global flags to bypass this if os.getenv("CLICKHOUSE_ENABLED", "true").lower() == "true": mcp.add_tool(Tool.from_function(list_databases)) mcp.add_tool(Tool.from_function(list_tables)) From 031448cd7e02ec51a49986252d782aa869a6feff Mon Sep 17 00:00:00 2001 From: Benedict Date: Sun, 17 Aug 2025 22:42:00 +0800 Subject: [PATCH 03/17] Return mcp global config in main --- mcp_clickhouse/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mcp_clickhouse/main.py b/mcp_clickhouse/main.py index 97599a4..0a622b5 100644 --- a/mcp_clickhouse/main.py +++ b/mcp_clickhouse/main.py @@ -1,9 +1,9 @@ from .mcp_server import mcp -from .mcp_env import get_config, TransportType +from .mcp_env import get_mcp_config, TransportType def main(): - config = get_config() + config = get_mcp_config() transport = config.mcp_server_transport # For HTTP and SSE transports, we need to specify host and port From 2496d55deac4bc3084c134ef4395533fdd167d45 Mon Sep 17 00:00:00 2001 From: Benedict Date: Sun, 17 Aug 2025 22:47:32 +0800 Subject: [PATCH 04/17] updated unit tests --- tests/test_chdb_tool.py | 11 ++++++----- tests/test_mcp_server.py | 30 +++++++++++++++--------------- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/tests/test_chdb_tool.py b/tests/test_chdb_tool.py index 1e16a93..bceb4f8 100644 --- a/tests/test_chdb_tool.py +++ b/tests/test_chdb_tool.py @@ -6,24 +6,25 @@ load_dotenv() +tenant = "example" class TestChDBTools(unittest.TestCase): @classmethod def setUpClass(cls): """Set up the environment before chDB tests.""" - cls.client = create_chdb_client() + cls.client = create_chdb_client(tenant) def test_run_chdb_select_query_simple(self): """Test running a simple SELECT query in chDB.""" query = "SELECT 1 as test_value" - result = run_chdb_select_query(query) + result = run_chdb_select_query(tenant, query) self.assertIsInstance(result, list) self.assertIn("test_value", str(result)) def test_run_chdb_select_query_with_url_table_function(self): """Test running a SELECT query with url table function in chDB.""" query = "SELECT COUNT(1) FROM url('https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_0.parquet', 'Parquet')" - result = run_chdb_select_query(query) + result = run_chdb_select_query(tenant, query) print(result) self.assertIsInstance(result, list) self.assertIn("1000000", str(result)) @@ -31,7 +32,7 @@ def test_run_chdb_select_query_with_url_table_function(self): def test_run_chdb_select_query_failure(self): """Test running a SELECT query with an error in chDB.""" query = "SELECT * FROM non_existent_table_chDB" - result = run_chdb_select_query(query) + result = run_chdb_select_query(tenant, query) print(result) self.assertIsInstance(result, dict) self.assertEqual(result["status"], "error") @@ -40,7 +41,7 @@ def test_run_chdb_select_query_failure(self): def test_run_chdb_select_query_empty_result(self): """Test running a SELECT query that returns empty result in chDB.""" query = "SELECT 1 WHERE 1 = 0" - result = run_chdb_select_query(query) + result = run_chdb_select_query(tenant, query) print(result) self.assertIsInstance(result, list) self.assertEqual(len(result), 0) diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 0119790..7742248 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -9,7 +9,7 @@ # Load environment variables load_dotenv() - +tenant = "example" @pytest.fixture(scope="module") def event_loop(): @@ -22,7 +22,7 @@ def event_loop(): @pytest_asyncio.fixture(scope="module") async def setup_test_database(): """Set up test database and tables before running tests.""" - client = create_clickhouse_client() + client = create_clickhouse_client(tenant) # Test database and table names test_db = "test_mcp_db" @@ -93,7 +93,7 @@ async def test_list_databases(mcp_server, setup_test_database): test_db, _, _ = setup_test_database async with Client(mcp_server) as client: - result = await client.call_tool("list_databases", {}) + result = await client.call_tool("list_databases", {"tenant": tenant}) # The result should be a list containing at least one item assert len(result) >= 1 @@ -111,7 +111,7 @@ async def test_list_tables_basic(mcp_server, setup_test_database): test_db, test_table, test_table2 = setup_test_database async with Client(mcp_server) as client: - result = await client.call_tool("list_tables", {"database": test_db}) + result = await client.call_tool("list_tables", {"tenant": tenant, "database": test_db}) assert len(result) >= 1 tables = json.loads(result[0].text) @@ -147,7 +147,7 @@ async def test_list_tables_with_like_filter(mcp_server, setup_test_database): async with Client(mcp_server) as client: # Test with LIKE filter - result = await client.call_tool("list_tables", {"database": test_db, "like": "test_%"}) + result = await client.call_tool("list_tables", {"tenant": tenant, "database": test_db, "like": "test_%"}) tables_data = json.loads(result[0].text) @@ -168,7 +168,7 @@ async def test_list_tables_with_not_like_filter(mcp_server, setup_test_database) async with Client(mcp_server) as client: # Test with NOT LIKE filter - result = await client.call_tool("list_tables", {"database": test_db, "not_like": "test_%"}) + result = await client.call_tool("list_tables", {"tenant": tenant, "database": test_db, "not_like": "test_%"}) tables_data = json.loads(result[0].text) @@ -189,7 +189,7 @@ async def test_run_select_query_success(mcp_server, setup_test_database): async with Client(mcp_server) as client: query = f"SELECT id, name, age FROM {test_db}.{test_table} ORDER BY id" - result = await client.call_tool("run_select_query", {"query": query}) + result = await client.call_tool("run_select_query", {"tenant": tenant, "query": query}) query_result = json.loads(result[0].text) @@ -215,7 +215,7 @@ async def test_run_select_query_with_aggregation(mcp_server, setup_test_database async with Client(mcp_server) as client: query = f"SELECT COUNT(*) as count, AVG(age) as avg_age FROM {test_db}.{test_table}" - result = await client.call_tool("run_select_query", {"query": query}) + result = await client.call_tool("run_select_query", {"tenant": tenant, "query": query}) query_result = json.loads(result[0].text) @@ -232,7 +232,7 @@ async def test_run_select_query_with_join(mcp_server, setup_test_database): async with Client(mcp_server) as client: # Insert related data for join - client_direct = create_clickhouse_client() + client_direct = create_clickhouse_client(tenant) client_direct.command(f""" INSERT INTO {test_db}.{test_table2} (event_id, event_type, timestamp) VALUES (2001, 'purchase', '2024-01-01 14:00:00') @@ -243,7 +243,7 @@ async def test_run_select_query_with_join(mcp_server, setup_test_database): COUNT(DISTINCT event_type) as event_types_count FROM {test_db}.{test_table2} """ - result = await client.call_tool("run_select_query", {"query": query}) + result = await client.call_tool("run_select_query", {"tenant": tenant, "query": query}) query_result = json.loads(result[0].text) assert query_result["rows"][0][0] == 3 # login, logout, purchase @@ -260,7 +260,7 @@ async def test_run_select_query_error(mcp_server, setup_test_database): # Should raise ToolError with pytest.raises(ToolError) as exc_info: - await client.call_tool("run_select_query", {"query": query}) + await client.call_tool("run_select_query", {"tenant": tenant, "query": query}) assert "Query execution failed" in str(exc_info.value) @@ -274,7 +274,7 @@ async def test_run_select_query_syntax_error(mcp_server): # Should raise ToolError with pytest.raises(ToolError) as exc_info: - await client.call_tool("run_select_query", {"query": query}) + await client.call_tool("run_select_query", {"tenant": tenant, "query": query}) assert "Query execution failed" in str(exc_info.value) @@ -285,7 +285,7 @@ async def test_table_metadata_details(mcp_server, setup_test_database): test_db, test_table, _ = setup_test_database async with Client(mcp_server) as client: - result = await client.call_tool("list_tables", {"database": test_db}) + result = await client.call_tool("list_tables", {"tenant": tenant, "database": test_db}) tables = json.loads(result[0].text) # Find our test table @@ -323,7 +323,7 @@ async def test_system_database_access(mcp_server): """Test that we can access system databases.""" async with Client(mcp_server) as client: # List tables in system database - result = await client.call_tool("list_tables", {"database": "system"}) + result = await client.call_tool("list_tables", {"tenant": tenant, "database": "system"}) tables = json.loads(result[0].text) # System database should have many tables @@ -352,7 +352,7 @@ async def test_concurrent_queries(mcp_server, setup_test_database): # Execute all queries concurrently results = await asyncio.gather( - *[client.call_tool("run_select_query", {"query": query}) for query in queries] + *[client.call_tool("run_select_query", {"tenant": tenant, "query": query}) for query in queries] ) # Verify all queries succeeded From 206c7b58e3301f122b02de7818765180c76c3c9e Mon Sep 17 00:00:00 2001 From: Benedict Date: Sun, 17 Aug 2025 22:49:16 +0800 Subject: [PATCH 05/17] fixed chdb test cases --- tests/test_chdb_tool.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/test_chdb_tool.py b/tests/test_chdb_tool.py index bceb4f8..4c1e97d 100644 --- a/tests/test_chdb_tool.py +++ b/tests/test_chdb_tool.py @@ -6,16 +6,16 @@ load_dotenv() -tenant = "example" - class TestChDBTools(unittest.TestCase): @classmethod def setUpClass(cls): """Set up the environment before chDB tests.""" + tenant = "example" cls.client = create_chdb_client(tenant) def test_run_chdb_select_query_simple(self): """Test running a simple SELECT query in chDB.""" + tenant = "example" query = "SELECT 1 as test_value" result = run_chdb_select_query(tenant, query) self.assertIsInstance(result, list) @@ -23,6 +23,7 @@ def test_run_chdb_select_query_simple(self): def test_run_chdb_select_query_with_url_table_function(self): """Test running a SELECT query with url table function in chDB.""" + tenant = "example" query = "SELECT COUNT(1) FROM url('https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_0.parquet', 'Parquet')" result = run_chdb_select_query(tenant, query) print(result) @@ -31,6 +32,7 @@ def test_run_chdb_select_query_with_url_table_function(self): def test_run_chdb_select_query_failure(self): """Test running a SELECT query with an error in chDB.""" + tenant = "example" query = "SELECT * FROM non_existent_table_chDB" result = run_chdb_select_query(tenant, query) print(result) @@ -40,6 +42,7 @@ def test_run_chdb_select_query_failure(self): def test_run_chdb_select_query_empty_result(self): """Test running a SELECT query that returns empty result in chDB.""" + tenant = "example" query = "SELECT 1 WHERE 1 = 0" result = run_chdb_select_query(tenant, query) print(result) From 53515a083d3b050c64356392c87ab485de4d3a98 Mon Sep 17 00:00:00 2001 From: Benedict Date: Sun, 17 Aug 2025 22:53:13 +0800 Subject: [PATCH 06/17] fixed mcp test cases --- tests/test_mcp_server.py | 61 ++++++++++++++++++++++------------------ 1 file changed, 33 insertions(+), 28 deletions(-) diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 7742248..4f2e116 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -9,7 +9,6 @@ # Load environment variables load_dotenv() -tenant = "example" @pytest.fixture(scope="module") def event_loop(): @@ -22,7 +21,10 @@ def event_loop(): @pytest_asyncio.fixture(scope="module") async def setup_test_database(): """Set up test database and tables before running tests.""" - client = create_clickhouse_client(tenant) + # Test tenant + test_tenant = "example" + + client = create_clickhouse_client(test_tenant) # Test database and table names test_db = "test_mcp_db" @@ -75,7 +77,7 @@ async def setup_test_database(): (1003, 'login', '2024-01-01 12:00:00') """) - yield test_db, test_table, test_table2 + yield test_tenant, test_db, test_table, test_table2 # Cleanup after tests client.command(f"DROP DATABASE IF EXISTS {test_db}") @@ -90,10 +92,10 @@ def mcp_server(): @pytest.mark.asyncio async def test_list_databases(mcp_server, setup_test_database): """Test the list_databases tool.""" - test_db, _, _ = setup_test_database + test_tenant, test_db, _, _ = setup_test_database async with Client(mcp_server) as client: - result = await client.call_tool("list_databases", {"tenant": tenant}) + result = await client.call_tool("list_databases", {"tenant": test_tenant}) # The result should be a list containing at least one item assert len(result) >= 1 @@ -108,10 +110,10 @@ async def test_list_databases(mcp_server, setup_test_database): @pytest.mark.asyncio async def test_list_tables_basic(mcp_server, setup_test_database): """Test the list_tables tool without filters.""" - test_db, test_table, test_table2 = setup_test_database + test_tenant, test_db, test_table, test_table2 = setup_test_database async with Client(mcp_server) as client: - result = await client.call_tool("list_tables", {"tenant": tenant, "database": test_db}) + result = await client.call_tool("list_tables", {"tenant": test_tenant, "database": test_db}) assert len(result) >= 1 tables = json.loads(result[0].text) @@ -143,11 +145,11 @@ async def test_list_tables_basic(mcp_server, setup_test_database): @pytest.mark.asyncio async def test_list_tables_with_like_filter(mcp_server, setup_test_database): """Test the list_tables tool with LIKE filter.""" - test_db, test_table, _ = setup_test_database + test_tenant, test_db, test_table, _ = setup_test_database async with Client(mcp_server) as client: # Test with LIKE filter - result = await client.call_tool("list_tables", {"tenant": tenant, "database": test_db, "like": "test_%"}) + result = await client.call_tool("list_tables", {"tenant": test_tenant, "database": test_db, "like": "test_%"}) tables_data = json.loads(result[0].text) @@ -164,11 +166,11 @@ async def test_list_tables_with_like_filter(mcp_server, setup_test_database): @pytest.mark.asyncio async def test_list_tables_with_not_like_filter(mcp_server, setup_test_database): """Test the list_tables tool with NOT LIKE filter.""" - test_db, _, test_table2 = setup_test_database + test_tenant, test_db, _, test_table2 = setup_test_database async with Client(mcp_server) as client: # Test with NOT LIKE filter - result = await client.call_tool("list_tables", {"tenant": tenant, "database": test_db, "not_like": "test_%"}) + result = await client.call_tool("list_tables", {"tenant": test_tenant, "database": test_db, "not_like": "test_%"}) tables_data = json.loads(result[0].text) @@ -185,11 +187,11 @@ async def test_list_tables_with_not_like_filter(mcp_server, setup_test_database) @pytest.mark.asyncio async def test_run_select_query_success(mcp_server, setup_test_database): """Test running a successful SELECT query.""" - test_db, test_table, _ = setup_test_database + test_tenant, test_db, test_table, _ = setup_test_database async with Client(mcp_server) as client: query = f"SELECT id, name, age FROM {test_db}.{test_table} ORDER BY id" - result = await client.call_tool("run_select_query", {"tenant": tenant, "query": query}) + result = await client.call_tool("run_select_query", {"tenant": test_tenant, "query": query}) query_result = json.loads(result[0].text) @@ -211,11 +213,11 @@ async def test_run_select_query_success(mcp_server, setup_test_database): @pytest.mark.asyncio async def test_run_select_query_with_aggregation(mcp_server, setup_test_database): """Test running a SELECT query with aggregation.""" - test_db, test_table, _ = setup_test_database + test_tenant, test_db, test_table, _ = setup_test_database async with Client(mcp_server) as client: query = f"SELECT COUNT(*) as count, AVG(age) as avg_age FROM {test_db}.{test_table}" - result = await client.call_tool("run_select_query", {"tenant": tenant, "query": query}) + result = await client.call_tool("run_select_query", {"tenant": test_tenant, "query": query}) query_result = json.loads(result[0].text) @@ -228,11 +230,11 @@ async def test_run_select_query_with_aggregation(mcp_server, setup_test_database @pytest.mark.asyncio async def test_run_select_query_with_join(mcp_server, setup_test_database): """Test running a SELECT query with JOIN.""" - test_db, test_table, test_table2 = setup_test_database + test_tenant, test_db, test_table, test_table2 = setup_test_database async with Client(mcp_server) as client: # Insert related data for join - client_direct = create_clickhouse_client(tenant) + client_direct = create_clickhouse_client(test_tenant) client_direct.command(f""" INSERT INTO {test_db}.{test_table2} (event_id, event_type, timestamp) VALUES (2001, 'purchase', '2024-01-01 14:00:00') @@ -243,7 +245,7 @@ async def test_run_select_query_with_join(mcp_server, setup_test_database): COUNT(DISTINCT event_type) as event_types_count FROM {test_db}.{test_table2} """ - result = await client.call_tool("run_select_query", {"tenant": tenant, "query": query}) + result = await client.call_tool("run_select_query", {"tenant": test_tenant, "query": query}) query_result = json.loads(result[0].text) assert query_result["rows"][0][0] == 3 # login, logout, purchase @@ -252,7 +254,7 @@ async def test_run_select_query_with_join(mcp_server, setup_test_database): @pytest.mark.asyncio async def test_run_select_query_error(mcp_server, setup_test_database): """Test running a SELECT query that results in an error.""" - test_db, _, _ = setup_test_database + test_tenant, test_db, _, _ = setup_test_database async with Client(mcp_server) as client: # Query non-existent table @@ -260,13 +262,15 @@ async def test_run_select_query_error(mcp_server, setup_test_database): # Should raise ToolError with pytest.raises(ToolError) as exc_info: - await client.call_tool("run_select_query", {"tenant": tenant, "query": query}) + await client.call_tool("run_select_query", {"tenant": test_tenant, "query": query}) assert "Query execution failed" in str(exc_info.value) @pytest.mark.asyncio -async def test_run_select_query_syntax_error(mcp_server): +async def test_run_select_query_syntax_error(mcp_server, setup_test_database): + test_tenant, _, _, _ = setup_test_database + """Test running a SELECT query with syntax error.""" async with Client(mcp_server) as client: # Invalid SQL syntax @@ -274,7 +278,7 @@ async def test_run_select_query_syntax_error(mcp_server): # Should raise ToolError with pytest.raises(ToolError) as exc_info: - await client.call_tool("run_select_query", {"tenant": tenant, "query": query}) + await client.call_tool("run_select_query", {"tenant": test_tenant, "query": query}) assert "Query execution failed" in str(exc_info.value) @@ -282,10 +286,10 @@ async def test_run_select_query_syntax_error(mcp_server): @pytest.mark.asyncio async def test_table_metadata_details(mcp_server, setup_test_database): """Test that table metadata is correctly retrieved.""" - test_db, test_table, _ = setup_test_database + test_tenant, test_db, test_table, _ = setup_test_database async with Client(mcp_server) as client: - result = await client.call_tool("list_tables", {"tenant": tenant, "database": test_db}) + result = await client.call_tool("list_tables", {"tenant": test_tenant, "database": test_db}) tables = json.loads(result[0].text) # Find our test table @@ -319,11 +323,12 @@ async def test_table_metadata_details(mcp_server, setup_test_database): @pytest.mark.asyncio -async def test_system_database_access(mcp_server): +async def test_system_database_access(mcp_server, setup_test_database): """Test that we can access system databases.""" + test_tenant, _, _, _ = setup_test_database async with Client(mcp_server) as client: # List tables in system database - result = await client.call_tool("list_tables", {"tenant": tenant, "database": "system"}) + result = await client.call_tool("list_tables", {"tenant": test_tenant, "database": "system"}) tables = json.loads(result[0].text) # System database should have many tables @@ -339,7 +344,7 @@ async def test_system_database_access(mcp_server): @pytest.mark.asyncio async def test_concurrent_queries(mcp_server, setup_test_database): """Test running multiple queries concurrently.""" - test_db, test_table, test_table2 = setup_test_database + test_tenant, test_db, test_table, test_table2 = setup_test_database async with Client(mcp_server) as client: # Run multiple queries concurrently @@ -352,7 +357,7 @@ async def test_concurrent_queries(mcp_server, setup_test_database): # Execute all queries concurrently results = await asyncio.gather( - *[client.call_tool("run_select_query", {"tenant": tenant, "query": query}) for query in queries] + *[client.call_tool("run_select_query", {"tenant": test_tenant, "query": query}) for query in queries] ) # Verify all queries succeeded From 908a10532ef79cc88c2a0aece54f1c72a03e265d Mon Sep 17 00:00:00 2001 From: Benedict Date: Sun, 17 Aug 2025 22:59:46 +0800 Subject: [PATCH 07/17] updated all test cases and added to-dos --- mcp_clickhouse/mcp_server.py | 8 ++++++++ tests/test_chdb_tool.py | 6 ++++++ tests/test_mcp_server.py | 5 +++++ tests/test_tool.py | 19 ++++++++++++------- 4 files changed, 31 insertions(+), 7 deletions(-) diff --git a/mcp_clickhouse/mcp_server.py b/mcp_clickhouse/mcp_server.py index cef0bfe..8680333 100644 --- a/mcp_clickhouse/mcp_server.py +++ b/mcp_clickhouse/mcp_server.py @@ -215,6 +215,10 @@ def execute_query(query: str): def run_select_query(tenant: str, query: str): """Run a SELECT query in a ClickHouse database""" logger.info(f"Executing SELECT query for tenant '{tenant}': {query}") + + if tenant not in TENANTS: + tenant = "default" # TO-DO: Should return some error since tenant does not exist + try: future = QUERY_EXECUTOR[tenant].submit(execute_query, query) try: @@ -324,6 +328,10 @@ def execute_chdb_query(query: str): def run_chdb_select_query(tenant: str, query: str): """Run SQL in chDB, an in-process ClickHouse engine""" logger.info(f"Executing chDB SELECT query for tenant '{tenant}': {query}") + + if tenant not in TENANTS: + tenant = "default" # TO-DO: Should return some error since tenant does not exist + try: future = QUERY_EXECUTOR[tenant].submit(execute_chdb_query, query) try: diff --git a/tests/test_chdb_tool.py b/tests/test_chdb_tool.py index 4c1e97d..e164b26 100644 --- a/tests/test_chdb_tool.py +++ b/tests/test_chdb_tool.py @@ -6,6 +6,12 @@ load_dotenv() +## TO-DO +""" +- Set up multiple tenants +- Test with wrong tenant -> Should we execute default or not execute? +""" + class TestChDBTools(unittest.TestCase): @classmethod def setUpClass(cls): diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 4f2e116..689967b 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -17,6 +17,11 @@ def event_loop(): yield loop loop.close() +## TO-DO +""" +- Set up multiple tenants +- Test with wrong tenant -> Should we execute default or not execute? +""" @pytest_asyncio.fixture(scope="module") async def setup_test_database(): diff --git a/tests/test_tool.py b/tests/test_tool.py index 50878c4..08f0418 100644 --- a/tests/test_tool.py +++ b/tests/test_tool.py @@ -8,12 +8,17 @@ load_dotenv() +## TO-DO +""" +- Set up multiple tenants +- Test with wrong tenant -> Should we execute default or not execute? +""" class TestClickhouseTools(unittest.TestCase): @classmethod def setUpClass(cls): """Set up the environment before tests.""" - cls.client = create_clickhouse_client() + cls.client = create_clickhouse_client("example") # Prepare test database and table cls.test_db = "test_tool_db" @@ -43,21 +48,21 @@ def tearDownClass(cls): def test_list_databases(self): """Test listing databases.""" - result = list_databases() + result = list_databases("example") # Parse JSON response databases = json.loads(result) self.assertIn(self.test_db, databases) def test_list_tables_without_like(self): """Test listing tables without a 'LIKE' filter.""" - result = list_tables(self.test_db) + result = list_tables("example", self.test_db) self.assertIsInstance(result, list) self.assertEqual(len(result), 1) self.assertEqual(result[0]["name"], self.test_table) def test_list_tables_with_like(self): """Test listing tables with a 'LIKE' filter.""" - result = list_tables(self.test_db, like=f"{self.test_table}%") + result = list_tables("example", self.test_db, like=f"{self.test_table}%") self.assertIsInstance(result, list) self.assertEqual(len(result), 1) self.assertEqual(result[0]["name"], self.test_table) @@ -65,7 +70,7 @@ def test_list_tables_with_like(self): def test_run_select_query_success(self): """Test running a SELECT query successfully.""" query = f"SELECT * FROM {self.test_db}.{self.test_table}" - result = run_select_query(query) + result = run_select_query("example", query) self.assertIsInstance(result, dict) self.assertEqual(len(result["rows"]), 2) self.assertEqual(result["rows"][0][0], 1) @@ -77,13 +82,13 @@ def test_run_select_query_failure(self): # Should raise ToolError with self.assertRaises(ToolError) as context: - run_select_query(query) + run_select_query("example", query) self.assertIn("Query execution failed", str(context.exception)) def test_table_and_column_comments(self): """Test that table and column comments are correctly retrieved.""" - result = list_tables(self.test_db) + result = list_tables("example", self.test_db) self.assertIsInstance(result, list) self.assertEqual(len(result), 1) From 6e273845ef3ec22d0170f0287a58b158e0643446 Mon Sep 17 00:00:00 2001 From: Benedict Date: Sun, 17 Aug 2025 23:06:52 +0800 Subject: [PATCH 08/17] added to-do to README --- README.md | 3 +++ mcp_clickhouse/mcp_env.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index d83294f..99603f9 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,8 @@ # ClickHouse MCP Server +TO-DO: Update README + + [![PyPI - Version](https://img.shields.io/pypi/v/mcp-clickhouse)](https://pypi.org/project/mcp-clickhouse) An MCP server for ClickHouse. diff --git a/mcp_clickhouse/mcp_env.py b/mcp_clickhouse/mcp_env.py index a720a12..d67a996 100644 --- a/mcp_clickhouse/mcp_env.py +++ b/mcp_clickhouse/mcp_env.py @@ -242,7 +242,7 @@ def get_mcp_config() -> dict: """ Get the MCP server configuration from environment variables. """ - # Global MCP transport config (single for all tenants) + # Global MCP transport config MCP_TRANSPORT = os.getenv("CLICKHOUSE_MCP_SERVER_TRANSPORT", TransportType.STDIO.value).lower() if MCP_TRANSPORT not in TransportType.values(): raise ValueError(f"Invalid MCP transport '{MCP_TRANSPORT}'. Valid options: {TransportType.values()}") From cb9a2f4b67b2ec78f2b4e7236cb1f6532263a9a2 Mon Sep 17 00:00:00 2001 From: Benedict Date: Sun, 17 Aug 2025 23:37:28 +0800 Subject: [PATCH 09/17] fixed tenant instantiation in classes --- mcp_clickhouse/mcp_env.py | 62 +++++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/mcp_clickhouse/mcp_env.py b/mcp_clickhouse/mcp_env.py index d67a996..198f801 100644 --- a/mcp_clickhouse/mcp_env.py +++ b/mcp_clickhouse/mcp_env.py @@ -30,33 +30,33 @@ class ClickHouseConfig: This class handles all environment variable configuration with sensible defaults and type conversion. It provides typed methods for accessing each configuration value. - Required environment variables (only when CH__CLICKHOUSE_ENABLED=true): - CH__CLICKHOUSE_HOST: The hostname of the ClickHouse server - CH__CLICKHOUSE_USER: The username for authentication - CH__CLICKHOUSE_PASSWORD: The password for authentication + Required environment variables (only when CH__CLICKHOUSE_ENABLED=true): + CH__CLICKHOUSE_HOST: The hostname of the ClickHouse server + CH__CLICKHOUSE_USER: The username for authentication + CH__CLICKHOUSE_PASSWORD: The password for authentication Optional environment variables (with defaults): - CH__CLICKHOUSE_PORT: The port number (default: 8443 if secure=True, 8123 if secure=False) - CH__CLICKHOUSE_SECURE: Enable HTTPS (default: true) - CH__CLICKHOUSE_VERIFY: Verify SSL certificates (default: true) - CH__CLICKHOUSE_CONNECT_TIMEOUT: Connection timeout in seconds (default: 30) - CH__CLICKHOUSE_SEND_RECEIVE_TIMEOUT: Send/receive timeout in seconds (default: 300) - CH__CLICKHOUSE_DATABASE: Default database to use (default: None) - CH__CLICKHOUSE_PROXY_PATH: Path to be added to the host URL. For instance, for servers behind an HTTP proxy (default: None) - CH__CLICKHOUSE_MCP_SERVER_TRANSPORT: MCP server transport method - "stdio", "http", or "sse" (default: stdio) - CH__CLICKHOUSE_MCP_BIND_HOST: Host to bind the MCP server to when using HTTP or SSE transport (default: 127.0.0.1) - CH__CLICKHOUSE_MCP_BIND_PORT: Port to bind the MCP server to when using HTTP or SSE transport (default: 8000) - CH__CLICKHOUSE_ENABLED: Enable ClickHouse server (default: true) + CH__CLICKHOUSE_PORT: The port number (default: 8443 if secure=True, 8123 if secure=False) + CH__CLICKHOUSE_SECURE: Enable HTTPS (default: true) + CH__CLICKHOUSE_VERIFY: Verify SSL certificates (default: true) + CH__CLICKHOUSE_CONNECT_TIMEOUT: Connection timeout in seconds (default: 30) + CH__CLICKHOUSE_SEND_RECEIVE_TIMEOUT: Send/receive timeout in seconds (default: 300) + CH__CLICKHOUSE_DATABASE: Default database to use (default: None) + CH__CLICKHOUSE_PROXY_PATH: Path to be added to the host URL. For instance, for servers behind an HTTP proxy (default: None) + CH__CLICKHOUSE_MCP_SERVER_TRANSPORT: MCP server transport method - "stdio", "http", or "sse" (default: stdio) + CH__CLICKHOUSE_MCP_BIND_HOST: Host to bind the MCP server to when using HTTP or SSE transport (default: 127.0.0.1) + CH__CLICKHOUSE_MCP_BIND_PORT: Port to bind the MCP server to when using HTTP or SSE transport (default: 8000) + CH__CLICKHOUSE_ENABLED: Enable ClickHouse server (default: true) """ + tenant: str - def __init__(self, tenant_prefix: str = ""): + def __init__(self): """Initialize the configuration from environment variables.""" - self.tenant_prefix = tenant_prefix if self.enabled: self._validate_required_vars() def _getenv(self, key: str, default=None, cast=str): - prefixed_key = f"CH_{self.tenant_prefix}_{key}" + prefixed_key = f"CH_{self.tenant}_{key}" val = os.getenv(prefixed_key, os.getenv(key, default)) if val is not None and cast is not str: try: @@ -154,7 +154,7 @@ def get_client_config(self) -> dict: "verify": self.verify, "connect_timeout": self.connect_timeout, "send_receive_timeout": self.send_receive_timeout, - "client_name": f"{self.tenant_prefix}mcp_clickhouse", + "client_name": f"{self.tenant}mcp_clickhouse", } # Add optional database if set @@ -177,7 +177,7 @@ def _validate_required_vars(self) -> None: if not self._getenv(var): missing_vars.append(var) if missing_vars: - raise ValueError(f"Missing required environment variables for tenant '{self.tenant_prefix}': {', '.join(missing_vars)}") + raise ValueError(f"Missing required environment variables for tenant '{self.tenant}': {', '.join(missing_vars)}") @dataclass @@ -190,15 +190,15 @@ class ChDBConfig: Required environment variables: CHDB_DATA_PATH: The path to the chDB data directory (only required if CHDB_ENABLED=true) """ + tenant: str - def __init__(self, tenant_prefix: str = ""): + def __init__(self): """Initialize the configuration from environment variables.""" - self.tenant_prefix = tenant_prefix if self.enabled: self._validate_required_vars() def _getenv(self, key: str, default=None, cast=str): - prefixed_key = f"CH_{self.tenant_prefix}_{key}" + prefixed_key = f"CH_{self.tenant}_{key}" val = os.getenv(prefixed_key, os.getenv(key, default)) if val is not None and cast is not str: try: @@ -264,11 +264,11 @@ def load_clickhouse_configs() -> Dict[str, ClickHouseConfig]: global _CLICKHOUSE_TENANTS for key in os.environ: if key.endswith("CLICKHOUSE_HOST") and key.startswith("CH_"): - # CH__CLICKHOUSE_HOST - tenant_prefix = key[len("CH_"): -len("_CLICKHOUSE_HOST")] - _CLICKHOUSE_TENANTS[tenant_prefix] = ClickHouseConfig(tenant_prefix=tenant_prefix) + # CH__CLICKHOUSE_HOST + tenant = key[len("CH_"): -len("_CLICKHOUSE_HOST")] + _CLICKHOUSE_TENANTS[tenant] = ClickHouseConfig(tenant=tenant) if not _CLICKHOUSE_TENANTS and "CLICKHOUSE_HOST" in os.environ: - _CLICKHOUSE_TENANTS["default"] = ClickHouseConfig(tenant_prefix="") + _CLICKHOUSE_TENANTS["default"] = ClickHouseConfig(tenant="") return _CLICKHOUSE_TENANTS @@ -276,11 +276,11 @@ def load_chdb_configs() -> Dict[str, ChDBConfig]: global _CHDB_TENANTS for key in os.environ: if key.endswith("CHDB_DATA_PATH") and key.startswith("CH_"): - # CH__CLICKHOUSE_HOST - tenant_prefix = key[len("CH_"): -len("_CHDB_DATA_PATH")] - _CHDB_TENANTS[tenant_prefix] = ChDBConfig(tenant_prefix=tenant_prefix) + # CH__CLICKHOUSE_HOST + tenant = key[len("CH_"): -len("_CHDB_DATA_PATH")] + _CHDB_TENANTS[tenant] = ChDBConfig(tenant=tenant) if not _CHDB_TENANTS and "CHDB_DATA_PATH" in os.environ: - _CHDB_TENANTS["default"] = ChDBConfig(tenant_prefix="") + _CHDB_TENANTS["default"] = ChDBConfig(tenant="") return _CHDB_TENANTS def get_config(tenant: str = "default") -> ClickHouseConfig: From bee564fc58800fc50bf6b14a693b70de111aabd6 Mon Sep 17 00:00:00 2001 From: Benedict Date: Mon, 18 Aug 2025 00:44:24 +0800 Subject: [PATCH 10/17] segregate clickhouse and chdb tenants --- mcp_clickhouse/mcp_env.py | 12 ++- mcp_clickhouse/mcp_server.py | 186 ++++++++++++++++++++++++----------- 2 files changed, 137 insertions(+), 61 deletions(-) diff --git a/mcp_clickhouse/mcp_env.py b/mcp_clickhouse/mcp_env.py index 198f801..57c30ad 100644 --- a/mcp_clickhouse/mcp_env.py +++ b/mcp_clickhouse/mcp_env.py @@ -302,6 +302,12 @@ def get_chdb_config(tenant: str = "default") -> ChDBConfig: raise ValueError(f"No ChDB config found for tenant '{tenant}'") return _CHDB_TENANTS[tenant] -def list_tenants() -> List[str]: - """Get list of all tenant names.""" - return [tenant for tenant in _CLICKHOUSE_TENANTS.keys()] \ No newline at end of file +def list_clickhouse_tenants() -> List[str]: + """Get list of all clickhouse tenant names.""" + global _CLICKHOUSE_TENANTS + return [tenant for tenant in _CLICKHOUSE_TENANTS.keys()] + +def list_chdb_tenants() -> List[str]: + """Get list of all chdb tenant names.""" + global _CHDB_TENANTS + return [tenant for tenant in _CHDB_TENANTS.keys()] \ No newline at end of file diff --git a/mcp_clickhouse/mcp_server.py b/mcp_clickhouse/mcp_server.py index 8680333..9c6cda3 100644 --- a/mcp_clickhouse/mcp_server.py +++ b/mcp_clickhouse/mcp_server.py @@ -17,7 +17,7 @@ from starlette.requests import Request from starlette.responses import PlainTextResponse -from mcp_clickhouse.mcp_env import load_clickhouse_configs, load_chdb_configs, list_tenants, get_config, get_chdb_config +from mcp_clickhouse.mcp_env import load_clickhouse_configs, load_chdb_configs, list_clickhouse_tenants, list_chdb_tenants, get_config, get_chdb_config from mcp_clickhouse.chdb_prompt import CHDB_PROMPT @@ -61,25 +61,33 @@ class Table: ) logger = logging.getLogger(MCP_SERVER_NAME) +# Load Configs +load_dotenv() +load_clickhouse_configs() +load_chdb_configs() + # List of Tenants -TENANTS = list_tenants() +CLICKHOUSE_TENANTS = list_clickhouse_tenants() +CHDB_TENANTS = list_chdb_tenants() + +# Create ThreadPoolExecutors for each tenant +CLICKHOUSE_QUERY_EXECUTOR = { + tenant: concurrent.futures.ThreadPoolExecutor(max_workers=10) + for tenant in CLICKHOUSE_TENANTS +} -# Create a ThreadPoolExecutor per tenant -QUERY_EXECUTOR = { +CHDB_QUERY_EXECUTOR = { tenant: concurrent.futures.ThreadPoolExecutor(max_workers=10) - for tenant in TENANTS + for tenant in CHDB_TENANTS } # Ensure all executors are properly shutdown on exit -atexit.register(lambda: [executor.shutdown(wait=True) for executor in QUERY_EXECUTOR.values()]) +atexit.register(lambda: [executor.shutdown(wait=True) for executor in CLICKHOUSE_QUERY_EXECUTOR.values()]) +atexit.register(lambda: [executor.shutdown(wait=True) for executor in CHDB_QUERY_EXECUTOR.values()]) # Default query timeout for selects SELECT_QUERY_TIMEOUT_SECS = 30 -load_dotenv() -load_clickhouse_configs() -load_chdb_configs() - mcp = FastMCP( name=MCP_SERVER_NAME, dependencies=[ @@ -100,8 +108,8 @@ async def health_check(request: Request) -> PlainTextResponse: try: reports = [] - for tenant in TENANTS: - tenant_report = f"Tenant '{tenant}': " + for tenant in CLICKHOUSE_TENANTS: + tenant_report = f"Tenant - '{tenant}': " # Check if ClickHouse is enabled by trying to create config # If ClickHouse is disabled, this will succeed but connection will fail @@ -151,10 +159,27 @@ def to_json(obj: Any) -> str: return {key: to_json(value) for key, value in obj.items()} return obj +def clickhouse_tenant_available(tenant: str): + if tenant in CLICKHOUSE_TENANTS: + return True + return False + +def chdb_tenant_available(tenant: str): + if tenant in CHDB_TENANTS: + return True + return False def list_databases(tenant: str): """List available ClickHouse databases""" logger.info("Listing all databases") + + if not clickhouse_tenant_available(tenant): + logger.warning(f"List databases not performed for non-existent tenant - '{tenant}'") + return { + "status": "error", + "message": f"List databases not performed for non-existent tenant - '{tenant}'" + } + client = create_clickhouse_client(tenant) result = client.command("SHOW DATABASES") @@ -164,14 +189,22 @@ def list_databases(tenant: str): else: databases = [result] - logger.info(f"Found {len(databases)} databases for tenant '{tenant}'") + logger.info(f"Found {len(databases)} databases for tenant - '{tenant}'") return json.dumps(databases) def list_tables(tenant: str, database: str, like: Optional[str] = None, not_like: Optional[str] = None): """List available ClickHouse tables in a database, including schema, comment, row count, and column count.""" - logger.info(f"Listing tables for tenant '{tenant}' in database '{database}'") + + if not clickhouse_tenant_available(tenant): + logger.warning(f"List tables not performed for non-existent tenant - '{tenant}'") + return { + "status": "error", + "message": f"List tables not performed for non-existent tenant - '{tenant}'" + } + + logger.info(f"Listing tables for tenant - '{tenant}' in database '{database}'") client = create_clickhouse_client(tenant) query = f"SELECT database, name, engine, create_table_query, dependencies_database, dependencies_table, engine_full, sorting_key, primary_key, total_rows, total_bytes, total_bytes_uncompressed, parts, active_parts, total_marks, comment FROM system.tables WHERE database = {format_query_value(database)}" if like: @@ -196,12 +229,12 @@ def list_tables(tenant: str, database: str, like: Optional[str] = None, not_like ) ] - logger.info(f"Found {len(tables)} tables for tenant '{tenant}'") + logger.info(f"Found {len(tables)} tables for tenant - '{tenant}'") return [asdict(table) for table in tables] -def execute_query(query: str): - client = create_clickhouse_client() +def execute_query(tenant: str, query: str): + client = create_clickhouse_client(tenant) try: read_only = get_readonly_setting(client) res = client.query(query, settings={"readonly": read_only}) @@ -214,40 +247,50 @@ def execute_query(query: str): def run_select_query(tenant: str, query: str): """Run a SELECT query in a ClickHouse database""" - logger.info(f"Executing SELECT query for tenant '{tenant}': {query}") - - if tenant not in TENANTS: - tenant = "default" # TO-DO: Should return some error since tenant does not exist + if not clickhouse_tenant_available(tenant): + logger.warning(f"Query not performed for non-existent tenant - '{tenant}'") + return { + "status": "error", + "message": f"Query not performed for non-existent tenant - '{tenant}'" + } + logger.info(f"Executing SELECT query for tenant - '{tenant}': {query}") try: - future = QUERY_EXECUTOR[tenant].submit(execute_query, query) + future = CLICKHOUSE_QUERY_EXECUTOR[tenant].submit(execute_query, {"tenant": tenant, "query": query}) try: result = future.result(timeout=SELECT_QUERY_TIMEOUT_SECS) # Check if we received an error structure from execute_query if isinstance(result, dict) and "error" in result: - logger.warning(f"Query failed for tenant '{tenant}': {result['error']}") + logger.warning(f"Query failed for tenant - '{tenant}': {result['error']}") # MCP requires structured responses; string error messages can cause # serialization issues leading to BrokenResourceError return { "status": "error", - "message": f"Query failed for tenant '{tenant}': {result['error']}", + "message": f"Query failed for tenant - '{tenant}': {result['error']}", } return result except concurrent.futures.TimeoutError: - logger.warning(f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds for tenant '{tenant}': {query}") + logger.warning(f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds for tenant - '{tenant}': {query}") future.cancel() - raise ToolError(f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds for tenant '{tenant}'") + raise ToolError(f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds for tenant - '{tenant}'") except ToolError: raise except Exception as e: - logger.error(f"Unexpected error in run_select_query for tenant '{tenant}': {str(e)}") - raise RuntimeError(f"Unexpected error during query execution for tenant '{tenant}': {str(e)}") + logger.error(f"Unexpected error in run_select_query for tenant - '{tenant}': {str(e)}") + raise RuntimeError(f"Unexpected error during query execution for tenant - '{tenant}': {str(e)}") def create_clickhouse_client(tenant: str): + if not clickhouse_tenant_available(tenant): + logger.warning(f"Clickhouse client not created for non-existent tenant - '{tenant}'") + return { + "status": "error", + "message": f"Clickhouse client not created for non-existent tenant - '{tenant}'" + } + client_config = get_config(tenant).get_client_config() logger.info( - f"Creating ClickHouse client connection for tenant '{tenant}', to {client_config['host']}:{client_config['port']} " + f"Creating ClickHouse client connection for tenant - '{tenant}', to {client_config['host']}:{client_config['port']} " f"as {client_config['username']} " f"(secure={client_config['secure']}, verify={client_config['verify']}, " f"connect_timeout={client_config['connect_timeout']}s, " @@ -258,10 +301,10 @@ def create_clickhouse_client(tenant: str): client = clickhouse_connect.get_client(**client_config) # Test the connection version = client.server_version - logger.info(f"Successfully connected to ClickHouse server version {version} for tenant '{tenant}'") + logger.info(f"Successfully connected to ClickHouse server version {version} for tenant - '{tenant}'") return client except Exception as e: - logger.error(f"Failed to connect to ClickHouse for tenant '{tenant}': {str(e)}") + logger.error(f"Failed to connect to ClickHouse for tenant - '{tenant}': {str(e)}") raise @@ -297,14 +340,28 @@ def get_readonly_setting(client) -> str: def create_chdb_client(tenant: str): """Create a chDB client connection.""" - if not get_chdb_config().enabled: - raise ValueError(f"chDB is not enabled for tenant '{tenant}'. Set CHDB_ENABLED=true to enable it.") + if not chdb_tenant_available(tenant): + logger.warning(f"chDB client not created for non-existent tenant - '{tenant}'") + return { + "status": "error", + "message": f"chDB client not created for non-existent tenant - '{tenant}'" + } + + if not get_chdb_config(tenant).enabled: + raise ValueError(f"chDB is not enabled for tenant - '{tenant}'. Set CHDB_ENABLED=true to enable it.") return _chdb_client -def execute_chdb_query(query: str): +def execute_chdb_query(tenant: str, query: str): """Execute a query using chDB client.""" - client = create_chdb_client() + if not chdb_tenant_available(tenant): + logger.warning(f"chDB Query not performed for non-existent tenant - '{tenant}'") + return { + "status": "error", + "message": f"chDB Query not performed for non-existent tenant - '{tenant}'" + } + + client = create_chdb_client(tenant) try: res = client.query(query, "JSON") if res.has_error(): @@ -327,35 +384,38 @@ def execute_chdb_query(query: str): def run_chdb_select_query(tenant: str, query: str): """Run SQL in chDB, an in-process ClickHouse engine""" - logger.info(f"Executing chDB SELECT query for tenant '{tenant}': {query}") - - if tenant not in TENANTS: - tenant = "default" # TO-DO: Should return some error since tenant does not exist + if not chdb_tenant_available(tenant): + logger.warning(f"chDB query not performed for non-existent tenant - '{tenant}'") + return { + "status": "error", + "message": f"chDB query not performed for non-existent tenant - '{tenant}'" + } + logger.info(f"Executing chDB SELECT query for tenant - '{tenant}': {query}") try: - future = QUERY_EXECUTOR[tenant].submit(execute_chdb_query, query) + future = CHDB_QUERY_EXECUTOR[tenant].submit(execute_chdb_query, query) try: result = future.result(timeout=SELECT_QUERY_TIMEOUT_SECS) # Check if we received an error structure from execute_chdb_query if isinstance(result, dict) and "error" in result: - logger.warning(f"chDB query failed for tenant '{tenant}': {result['error']}") + logger.warning(f"chDB query failed for tenant - '{tenant}': {result['error']}") return { "status": "error", - "message": f"chDB query failed for tenant '{tenant}': {result['error']}", + "message": f"chDB query failed for tenant - '{tenant}': {result['error']}", } return result except concurrent.futures.TimeoutError: logger.warning( - f"chDB query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds for tenant '{tenant}': {query}" + f"chDB query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds for tenant - '{tenant}': {query}" ) future.cancel() return { "status": "error", - "message": f"chDB query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds for tenant '{tenant}'", + "message": f"chDB query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds for tenant - '{tenant}'", } except Exception as e: - logger.error(f"Unexpected error in run_chdb_select_query for tenant '{tenant}': {e}") - return {"status": "error", "message": f"Unexpected error for tenant '{tenant}': {e}"} + logger.error(f"Unexpected error in run_chdb_select_query for tenant - '{tenant}': {e}") + return {"status": "error", "message": f"Unexpected error for tenant - '{tenant}': {e}"} def chdb_initial_prompt() -> str: @@ -365,35 +425,45 @@ def chdb_initial_prompt() -> str: def _init_chdb_client(tenant: str): """Initialize the global chDB client instance.""" + if not chdb_tenant_available(tenant): + logger.warning(f"chDB client not initialised for non-existent tenant - '{tenant}'") + return { + "status": "error", + "message": f"chDB client not initialised for non-existent tenant - '{tenant}'" + } + try: if not get_chdb_config(tenant).enabled: - logger.info("chDB is disabled for tenant '{tenant}', skipping client initialization") + logger.info("chDB is disabled for tenant - '{tenant}', skipping client initialization") return None client_config = get_chdb_config(tenant).get_client_config() data_path = client_config["data_path"] - logger.info(f"Creating chDB client with data_path={data_path} for tenant '{tenant}'") + logger.info(f"Creating chDB client with data_path={data_path} for tenant - '{tenant}'") client = chs.Session(path=data_path) - logger.info(f"Successfully connected to chDB with data_path={data_path} for tenant '{tenant}'") + logger.info(f"Successfully connected to chDB with data_path={data_path} for tenant - '{tenant}'") return client except Exception as e: - logger.error(f"Failed to initialize chDB client for tenant '{tenant}': {e}") + logger.error(f"Failed to initialize chDB client for tenant - '{tenant}': {e}") return None -# Register tools based on configuration -# For multi-tenancy, we will use global flags to bypass this -if os.getenv("CLICKHOUSE_ENABLED", "true").lower() == "true": +# Register tools +if not CLICKHOUSE_TENANTS: + logger.info("ClickHouse tools not registered") +else: mcp.add_tool(Tool.from_function(list_databases)) mcp.add_tool(Tool.from_function(list_tables)) mcp.add_tool(Tool.from_function(run_select_query)) logger.info("ClickHouse tools registered") - -if os.getenv("CHDB_ENABLED", "false").lower() == "true": - _chdb_client = _init_chdb_client() - if _chdb_client: - atexit.register(lambda: _chdb_client.close()) +if not CHDB_TENANTS: + logger.info("chDB tools and prompts not registered") +else: + for tenant in CHDB_TENANTS: + _chdb_client = _init_chdb_client(tenant) + if _chdb_client: + atexit.register(lambda: _chdb_client.close()) mcp.add_tool(Tool.from_function(run_chdb_select_query)) chdb_prompt = Prompt.from_function( From 2ef472a86d95fb72db5a04e3dfaffdc4dccaa268 Mon Sep 17 00:00:00 2001 From: Benedict Date: Mon, 18 Aug 2025 01:06:53 +0800 Subject: [PATCH 11/17] add unit tests for wrong tenant --- tests/test_chdb_tool.py | 16 ++++++++++------ tests/test_mcp_server.py | 35 +++++++++++++++++++++++++++++------ tests/test_tool.py | 30 ++++++++++++++++++++++++------ 3 files changed, 63 insertions(+), 18 deletions(-) diff --git a/tests/test_chdb_tool.py b/tests/test_chdb_tool.py index e164b26..1cef909 100644 --- a/tests/test_chdb_tool.py +++ b/tests/test_chdb_tool.py @@ -6,12 +6,6 @@ load_dotenv() -## TO-DO -""" -- Set up multiple tenants -- Test with wrong tenant -> Should we execute default or not execute? -""" - class TestChDBTools(unittest.TestCase): @classmethod def setUpClass(cls): @@ -19,6 +13,16 @@ def setUpClass(cls): tenant = "example" cls.client = create_chdb_client(tenant) + def test_run_chdb_select_query_wrong_tenant(self): + """Test running a simple SELECT query in chDB with wrong tenant.""" + tenant = "wrong_tenant" + query = "SELECT 1 as test_value" + result = run_chdb_select_query(tenant, query) + self.assertEqual(result, { + "status": "error", + "message": f"chDB query not performed for non-existent tenant - '{tenant}'" + }) + def test_run_chdb_select_query_simple(self): """Test running a simple SELECT query in chDB.""" tenant = "example" diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 689967b..2e52616 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -17,12 +17,6 @@ def event_loop(): yield loop loop.close() -## TO-DO -""" -- Set up multiple tenants -- Test with wrong tenant -> Should we execute default or not execute? -""" - @pytest_asyncio.fixture(scope="module") async def setup_test_database(): """Set up test database and tables before running tests.""" @@ -95,6 +89,35 @@ def mcp_server(): @pytest.mark.asyncio +async def test_list_databases_wrong_tenant(mcp_server): + """Test the list_databases tool with wrong tenant.""" + async with Client(mcp_server) as client: + result = await client.call_tool("list_databases", {"tenant": "wrong_tenant"}) + assert result == { + "status": "error", + "message": "List databases not performed for non-existent tenant - 'wrong_tenant'" + } + +@pytest.mark.asyncio +async def test_list_tables_wrong_tenant(mcp_server): + """Test the list_tables tool with wrong tenant.""" + async with Client(mcp_server) as client: + result = await client.call_tool("list_databases", {"tenant": "wrong_tenant"}) + assert result == { + "status": "error", + "message": "List tables not performed for non-existent tenant - 'wrong_tenant'" + } + +@pytest.mark.asyncio +async def test_run_select_query_wrong_tenant(mcp_server): + """Test the run_select_query tool with wrong tenant.""" + async with Client(mcp_server) as client: + result = await client.call_tool("list_databases", {"tenant": "wrong_tenant"}) + assert result == { + "status": "error", + "message": "Query not performed for non-existent tenant - 'wrong_tenant'" + } + async def test_list_databases(mcp_server, setup_test_database): """Test the list_databases tool.""" test_tenant, test_db, _, _ = setup_test_database diff --git a/tests/test_tool.py b/tests/test_tool.py index 08f0418..8c506d9 100644 --- a/tests/test_tool.py +++ b/tests/test_tool.py @@ -8,12 +8,6 @@ load_dotenv() -## TO-DO -""" -- Set up multiple tenants -- Test with wrong tenant -> Should we execute default or not execute? -""" - class TestClickhouseTools(unittest.TestCase): @classmethod def setUpClass(cls): @@ -46,6 +40,30 @@ def tearDownClass(cls): """Clean up the environment after tests.""" cls.client.command(f"DROP DATABASE IF EXISTS {cls.test_db}") + def test_list_databases_wrong_tenant(self): + """Test listing databases with wrong tenant.""" + result = list_databases("wrong_tenant") + self.assertEqual(result, { + "status": "error", + "message": "List databases not performed for non-existent tenant - 'wrong_tenant'" + }) + + def test_list_tables_wrong_tenant(self): + """Test listing tables with wrong tenant.""" + result = list_tables("wrong_tenant") + self.assertEqual(result, { + "status": "error", + "message": "List tables not performed for non-existent tenant - 'wrong_tenant'" + }) + + def test_run_select_query_wrong_tenant(self): + """Test run select query with wrong tenant.""" + result = list_databases("wrong_tenant") + self.assertEqual(result, { + "status": "error", + "message": "Query not performed for non-existent tenant - 'wrong_tenant'" + }) + def test_list_databases(self): """Test listing databases.""" result = list_databases("example") From c6aeeafb42ae05dbfd8ab51f93a0cad1db62bcd3 Mon Sep 17 00:00:00 2001 From: Benedict Date: Mon, 18 Aug 2025 21:15:30 +0800 Subject: [PATCH 12/17] remove bad import --- README.md | 3 --- mcp_clickhouse/mcp_server.py | 1 - 2 files changed, 4 deletions(-) diff --git a/README.md b/README.md index 99603f9..d83294f 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,5 @@ # ClickHouse MCP Server -TO-DO: Update README - - [![PyPI - Version](https://img.shields.io/pypi/v/mcp-clickhouse)](https://pypi.org/project/mcp-clickhouse) An MCP server for ClickHouse. diff --git a/mcp_clickhouse/mcp_server.py b/mcp_clickhouse/mcp_server.py index 9c6cda3..0a8964a 100644 --- a/mcp_clickhouse/mcp_server.py +++ b/mcp_clickhouse/mcp_server.py @@ -3,7 +3,6 @@ from typing import Optional, List, Any import concurrent.futures import atexit -import os import clickhouse_connect import chdb.session as chs From 7e4d0bf477e7addc7e0bee6241122e863d33856d Mon Sep 17 00:00:00 2001 From: Benedict Date: Mon, 18 Aug 2025 23:29:26 +0800 Subject: [PATCH 13/17] use __post_init__ instead of __init__ --- mcp_clickhouse/mcp_env.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mcp_clickhouse/mcp_env.py b/mcp_clickhouse/mcp_env.py index 57c30ad..5981867 100644 --- a/mcp_clickhouse/mcp_env.py +++ b/mcp_clickhouse/mcp_env.py @@ -50,7 +50,7 @@ class ClickHouseConfig: """ tenant: str - def __init__(self): + def __post_init__(self): """Initialize the configuration from environment variables.""" if self.enabled: self._validate_required_vars() @@ -192,7 +192,7 @@ class ChDBConfig: """ tenant: str - def __init__(self): + def __post_init__(self): """Initialize the configuration from environment variables.""" if self.enabled: self._validate_required_vars() From 5782a8c078e994f03a35d75e4294aacd227d327d Mon Sep 17 00:00:00 2001 From: Benedict Date: Tue, 19 Aug 2025 00:00:56 +0800 Subject: [PATCH 14/17] fixed tests --- mcp_clickhouse/mcp_server.py | 4 ++-- tests/test_chdb_tool.py | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/mcp_clickhouse/mcp_server.py b/mcp_clickhouse/mcp_server.py index 0a8964a..83e79e8 100644 --- a/mcp_clickhouse/mcp_server.py +++ b/mcp_clickhouse/mcp_server.py @@ -255,7 +255,7 @@ def run_select_query(tenant: str, query: str): logger.info(f"Executing SELECT query for tenant - '{tenant}': {query}") try: - future = CLICKHOUSE_QUERY_EXECUTOR[tenant].submit(execute_query, {"tenant": tenant, "query": query}) + future = CLICKHOUSE_QUERY_EXECUTOR[tenant].submit(execute_query, tenant, query) try: result = future.result(timeout=SELECT_QUERY_TIMEOUT_SECS) # Check if we received an error structure from execute_query @@ -392,7 +392,7 @@ def run_chdb_select_query(tenant: str, query: str): logger.info(f"Executing chDB SELECT query for tenant - '{tenant}': {query}") try: - future = CHDB_QUERY_EXECUTOR[tenant].submit(execute_chdb_query, query) + future = CHDB_QUERY_EXECUTOR[tenant].submit(execute_chdb_query, tenant, query) try: result = future.result(timeout=SELECT_QUERY_TIMEOUT_SECS) # Check if we received an error structure from execute_chdb_query diff --git a/tests/test_chdb_tool.py b/tests/test_chdb_tool.py index 1cef909..e02e015 100644 --- a/tests/test_chdb_tool.py +++ b/tests/test_chdb_tool.py @@ -10,8 +10,7 @@ class TestChDBTools(unittest.TestCase): @classmethod def setUpClass(cls): """Set up the environment before chDB tests.""" - tenant = "example" - cls.client = create_chdb_client(tenant) + cls.client = create_chdb_client(tenant="example") def test_run_chdb_select_query_wrong_tenant(self): """Test running a simple SELECT query in chDB with wrong tenant.""" From 33e0d349f05209121d4416e91e0faba1fb8e898d Mon Sep 17 00:00:00 2001 From: Benedict Date: Tue, 19 Aug 2025 00:33:42 +0800 Subject: [PATCH 15/17] fixed all tests --- mcp_clickhouse/mcp_server.py | 63 ++++++++++++------------------------ tests/test_chdb_tool.py | 14 ++++---- tests/test_mcp_server.py | 36 +++++++++++---------- tests/test_tool.py | 42 +++++++++++++++--------- 4 files changed, 74 insertions(+), 81 deletions(-) diff --git a/mcp_clickhouse/mcp_server.py b/mcp_clickhouse/mcp_server.py index 83e79e8..8396549 100644 --- a/mcp_clickhouse/mcp_server.py +++ b/mcp_clickhouse/mcp_server.py @@ -170,15 +170,11 @@ def chdb_tenant_available(tenant: str): def list_databases(tenant: str): """List available ClickHouse databases""" - logger.info("Listing all databases") - if not clickhouse_tenant_available(tenant): - logger.warning(f"List databases not performed for non-existent tenant - '{tenant}'") - return { - "status": "error", - "message": f"List databases not performed for non-existent tenant - '{tenant}'" - } + logger.warning(f"List databases not performed for invalid tenant - '{tenant}'") + raise ToolError(f"List databases not performed for invalid tenant - '{tenant}'") + logger.info("Listing all databases") client = create_clickhouse_client(tenant) result = client.command("SHOW DATABASES") @@ -197,11 +193,8 @@ def list_tables(tenant: str, database: str, like: Optional[str] = None, not_like row count, and column count.""" if not clickhouse_tenant_available(tenant): - logger.warning(f"List tables not performed for non-existent tenant - '{tenant}'") - return { - "status": "error", - "message": f"List tables not performed for non-existent tenant - '{tenant}'" - } + logger.warning(f"List tables not performed for invalid tenant - '{tenant}'") + raise ToolError(f"List tables not performed for invalid tenant - '{tenant}'") logger.info(f"Listing tables for tenant - '{tenant}' in database '{database}'") client = create_clickhouse_client(tenant) @@ -233,6 +226,10 @@ def list_tables(tenant: str, database: str, like: Optional[str] = None, not_like def execute_query(tenant: str, query: str): + if not clickhouse_tenant_available(tenant): + logger.warning(f"Query not executed for invalid tenant - '{tenant}'") + raise ToolError(f"Query not executed for invalid tenant - '{tenant}'") + client = create_clickhouse_client(tenant) try: read_only = get_readonly_setting(client) @@ -247,11 +244,8 @@ def execute_query(tenant: str, query: str): def run_select_query(tenant: str, query: str): """Run a SELECT query in a ClickHouse database""" if not clickhouse_tenant_available(tenant): - logger.warning(f"Query not performed for non-existent tenant - '{tenant}'") - return { - "status": "error", - "message": f"Query not performed for non-existent tenant - '{tenant}'" - } + logger.warning(f"Select Query not performed for invalid tenant - '{tenant}'") + raise ToolError(f"Select Query not performed for invalid tenant - '{tenant}'") logger.info(f"Executing SELECT query for tenant - '{tenant}': {query}") try: @@ -281,11 +275,8 @@ def run_select_query(tenant: str, query: str): def create_clickhouse_client(tenant: str): if not clickhouse_tenant_available(tenant): - logger.warning(f"Clickhouse client not created for non-existent tenant - '{tenant}'") - return { - "status": "error", - "message": f"Clickhouse client not created for non-existent tenant - '{tenant}'" - } + logger.warning(f"Clickhouse client not created for invalid tenant - '{tenant}'") + raise ToolError(f"Clickhouse client not created for invalid tenant - '{tenant}'") client_config = get_config(tenant).get_client_config() logger.info( @@ -340,11 +331,8 @@ def get_readonly_setting(client) -> str: def create_chdb_client(tenant: str): """Create a chDB client connection.""" if not chdb_tenant_available(tenant): - logger.warning(f"chDB client not created for non-existent tenant - '{tenant}'") - return { - "status": "error", - "message": f"chDB client not created for non-existent tenant - '{tenant}'" - } + logger.warning(f"chDB client not created for invalid tenant - '{tenant}'") + raise ToolError(f"chDB client not created for invalid tenant - '{tenant}'") if not get_chdb_config(tenant).enabled: raise ValueError(f"chDB is not enabled for tenant - '{tenant}'. Set CHDB_ENABLED=true to enable it.") @@ -354,11 +342,8 @@ def create_chdb_client(tenant: str): def execute_chdb_query(tenant: str, query: str): """Execute a query using chDB client.""" if not chdb_tenant_available(tenant): - logger.warning(f"chDB Query not performed for non-existent tenant - '{tenant}'") - return { - "status": "error", - "message": f"chDB Query not performed for non-existent tenant - '{tenant}'" - } + logger.warning(f"chDB query not executed for invalid tenant - '{tenant}'") + raise ToolError(f"chDB query not executed for invalid tenant - '{tenant}'") client = create_chdb_client(tenant) try: @@ -384,11 +369,8 @@ def execute_chdb_query(tenant: str, query: str): def run_chdb_select_query(tenant: str, query: str): """Run SQL in chDB, an in-process ClickHouse engine""" if not chdb_tenant_available(tenant): - logger.warning(f"chDB query not performed for non-existent tenant - '{tenant}'") - return { - "status": "error", - "message": f"chDB query not performed for non-existent tenant - '{tenant}'" - } + logger.warning(f"chDB query not performed for invalid tenant - '{tenant}'") + raise ToolError(f"chDB query not performed for invalid tenant - '{tenant}'") logger.info(f"Executing chDB SELECT query for tenant - '{tenant}': {query}") try: @@ -425,11 +407,8 @@ def chdb_initial_prompt() -> str: def _init_chdb_client(tenant: str): """Initialize the global chDB client instance.""" if not chdb_tenant_available(tenant): - logger.warning(f"chDB client not initialised for non-existent tenant - '{tenant}'") - return { - "status": "error", - "message": f"chDB client not initialised for non-existent tenant - '{tenant}'" - } + logger.warning(f"chDB client not initialised for invalid tenant - '{tenant}'") + raise ToolError(f"chDB client not initialised for invalid tenant - '{tenant}'") try: if not get_chdb_config(tenant).enabled: diff --git a/tests/test_chdb_tool.py b/tests/test_chdb_tool.py index e02e015..36dc790 100644 --- a/tests/test_chdb_tool.py +++ b/tests/test_chdb_tool.py @@ -1,7 +1,7 @@ import unittest from dotenv import load_dotenv - +from fastmcp.exceptions import ToolError from mcp_clickhouse import create_chdb_client, run_chdb_select_query load_dotenv() @@ -16,11 +16,13 @@ def test_run_chdb_select_query_wrong_tenant(self): """Test running a simple SELECT query in chDB with wrong tenant.""" tenant = "wrong_tenant" query = "SELECT 1 as test_value" - result = run_chdb_select_query(tenant, query) - self.assertEqual(result, { - "status": "error", - "message": f"chDB query not performed for non-existent tenant - '{tenant}'" - }) + with self.assertRaises(ToolError) as cm: + run_chdb_select_query(tenant, query) + + self.assertIn( + f"chDB query not performed for invalid tenant - '{tenant}'", + str(cm.exception) + ) def test_run_chdb_select_query_simple(self): """Test running a simple SELECT query in chDB.""" diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 2e52616..cdfecd0 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -91,33 +91,35 @@ def mcp_server(): @pytest.mark.asyncio async def test_list_databases_wrong_tenant(mcp_server): """Test the list_databases tool with wrong tenant.""" + tenant = "wrong_tenant" async with Client(mcp_server) as client: - result = await client.call_tool("list_databases", {"tenant": "wrong_tenant"}) - assert result == { - "status": "error", - "message": "List databases not performed for non-existent tenant - 'wrong_tenant'" - } + with pytest.raises(ToolError) as exc_info: + await client.call_tool("list_databases", {"tenant": tenant}) + + assert f"List databases not performed for invalid tenant - '{tenant}'" in str(exc_info.value) @pytest.mark.asyncio -async def test_list_tables_wrong_tenant(mcp_server): +async def test_list_tables_wrong_tenant(mcp_server, setup_test_database): """Test the list_tables tool with wrong tenant.""" + _, test_db, test_table, _ = setup_test_database + tenant = "wrong_tenant" async with Client(mcp_server) as client: - result = await client.call_tool("list_databases", {"tenant": "wrong_tenant"}) - assert result == { - "status": "error", - "message": "List tables not performed for non-existent tenant - 'wrong_tenant'" - } + with pytest.raises(ToolError) as exc_info: + await client.call_tool("list_tables", {"tenant": tenant, "database": test_db}) + assert f"List tables not performed for invalid tenant - '{tenant}'" in str(exc_info.value) @pytest.mark.asyncio -async def test_run_select_query_wrong_tenant(mcp_server): +async def test_run_select_query_wrong_tenant(mcp_server, setup_test_database): """Test the run_select_query tool with wrong tenant.""" + _, test_db, test_table, _ = setup_test_database + tenant = "wrong_tenant" async with Client(mcp_server) as client: - result = await client.call_tool("list_databases", {"tenant": "wrong_tenant"}) - assert result == { - "status": "error", - "message": "Query not performed for non-existent tenant - 'wrong_tenant'" - } + with pytest.raises(ToolError) as exc_info: + await client.call_tool("run_select_query", {"tenant": tenant, "query": f"SELECT COUNT(*) FROM {test_db}.{test_table}",}) + + assert f"Query not performed for invalid tenant - '{tenant}'" in str(exc_info.value) +@pytest.mark.asyncio async def test_list_databases(mcp_server, setup_test_database): """Test the list_databases tool.""" test_tenant, test_db, _, _ = setup_test_database diff --git a/tests/test_tool.py b/tests/test_tool.py index 8c506d9..eda4c42 100644 --- a/tests/test_tool.py +++ b/tests/test_tool.py @@ -41,28 +41,38 @@ def tearDownClass(cls): cls.client.command(f"DROP DATABASE IF EXISTS {cls.test_db}") def test_list_databases_wrong_tenant(self): - """Test listing databases with wrong tenant.""" - result = list_databases("wrong_tenant") - self.assertEqual(result, { - "status": "error", - "message": "List databases not performed for non-existent tenant - 'wrong_tenant'" - }) + """Test listing tables with wrong tenant.""" + tenant = "wrong_tenant" + with self.assertRaises(ToolError) as cm: + list_databases(tenant) + + self.assertIn( + f"List databases not performed for invalid tenant - '{tenant}'", + str(cm.exception) + ) def test_list_tables_wrong_tenant(self): """Test listing tables with wrong tenant.""" - result = list_tables("wrong_tenant") - self.assertEqual(result, { - "status": "error", - "message": "List tables not performed for non-existent tenant - 'wrong_tenant'" - }) + tenant = "wrong_tenant" + with self.assertRaises(ToolError) as cm: + list_tables(tenant, self.test_db) + + self.assertIn( + f"List tables not performed for invalid tenant - '{tenant}'", + str(cm.exception) + ) def test_run_select_query_wrong_tenant(self): """Test run select query with wrong tenant.""" - result = list_databases("wrong_tenant") - self.assertEqual(result, { - "status": "error", - "message": "Query not performed for non-existent tenant - 'wrong_tenant'" - }) + tenant = "wrong_tenant" + query = f"SELECT * FROM {self.test_db}.{self.test_table}" + with self.assertRaises(ToolError) as cm: + run_select_query(tenant, query) + + self.assertIn( + f"Query not performed for invalid tenant - '{tenant}'", + str(cm.exception) + ) def test_list_databases(self): """Test listing databases.""" From 0b8530947e7fe0d258f740ac2b3b9c7643f68cc2 Mon Sep 17 00:00:00 2001 From: Benedict Date: Tue, 19 Aug 2025 23:40:09 +0800 Subject: [PATCH 16/17] removed CH_ prefix for custom tenants and added list tenants tools --- mcp_clickhouse/__init__.py | 4 +++ mcp_clickhouse/mcp_env.py | 56 +++++++++++++++++++----------------- mcp_clickhouse/mcp_server.py | 12 ++++++++ tests/test_mcp_server.py | 28 ++++++++++++++++++ tests/test_tool.py | 14 ++++++++- 5 files changed, 86 insertions(+), 28 deletions(-) diff --git a/mcp_clickhouse/__init__.py b/mcp_clickhouse/__init__.py index 879259d..e22e8a1 100644 --- a/mcp_clickhouse/__init__.py +++ b/mcp_clickhouse/__init__.py @@ -1,4 +1,6 @@ from .mcp_server import ( + list_clickhouse_tenants, + list_chdb_tenants, create_clickhouse_client, list_databases, list_tables, @@ -9,6 +11,8 @@ ) __all__ = [ + "list_clickhouse_tenants", + "list_chdb_tenants", "list_databases", "list_tables", "run_select_query", diff --git a/mcp_clickhouse/mcp_env.py b/mcp_clickhouse/mcp_env.py index 5981867..204d855 100644 --- a/mcp_clickhouse/mcp_env.py +++ b/mcp_clickhouse/mcp_env.py @@ -30,23 +30,23 @@ class ClickHouseConfig: This class handles all environment variable configuration with sensible defaults and type conversion. It provides typed methods for accessing each configuration value. - Required environment variables (only when CH__CLICKHOUSE_ENABLED=true): - CH__CLICKHOUSE_HOST: The hostname of the ClickHouse server - CH__CLICKHOUSE_USER: The username for authentication - CH__CLICKHOUSE_PASSWORD: The password for authentication + Required environment variables (only when _CLICKHOUSE_ENABLED=true): + _CLICKHOUSE_HOST: The hostname of the ClickHouse server + _CLICKHOUSE_USER: The username for authentication + _CLICKHOUSE_PASSWORD: The password for authentication Optional environment variables (with defaults): - CH__CLICKHOUSE_PORT: The port number (default: 8443 if secure=True, 8123 if secure=False) - CH__CLICKHOUSE_SECURE: Enable HTTPS (default: true) - CH__CLICKHOUSE_VERIFY: Verify SSL certificates (default: true) - CH__CLICKHOUSE_CONNECT_TIMEOUT: Connection timeout in seconds (default: 30) - CH__CLICKHOUSE_SEND_RECEIVE_TIMEOUT: Send/receive timeout in seconds (default: 300) - CH__CLICKHOUSE_DATABASE: Default database to use (default: None) - CH__CLICKHOUSE_PROXY_PATH: Path to be added to the host URL. For instance, for servers behind an HTTP proxy (default: None) - CH__CLICKHOUSE_MCP_SERVER_TRANSPORT: MCP server transport method - "stdio", "http", or "sse" (default: stdio) - CH__CLICKHOUSE_MCP_BIND_HOST: Host to bind the MCP server to when using HTTP or SSE transport (default: 127.0.0.1) - CH__CLICKHOUSE_MCP_BIND_PORT: Port to bind the MCP server to when using HTTP or SSE transport (default: 8000) - CH__CLICKHOUSE_ENABLED: Enable ClickHouse server (default: true) + _CLICKHOUSE_PORT: The port number (default: 8443 if secure=True, 8123 if secure=False) + _CLICKHOUSE_SECURE: Enable HTTPS (default: true) + _CLICKHOUSE_VERIFY: Verify SSL certificates (default: true) + _CLICKHOUSE_CONNECT_TIMEOUT: Connection timeout in seconds (default: 30) + _CLICKHOUSE_SEND_RECEIVE_TIMEOUT: Send/receive timeout in seconds (default: 300) + _CLICKHOUSE_DATABASE: Default database to use (default: None) + _CLICKHOUSE_PROXY_PATH: Path to be added to the host URL. For instance, for servers behind an HTTP proxy (default: None) + _CLICKHOUSE_MCP_SERVER_TRANSPORT: MCP server transport method - "stdio", "http", or "sse" (default: stdio) + _CLICKHOUSE_MCP_BIND_HOST: Host to bind the MCP server to when using HTTP or SSE transport (default: 127.0.0.1) + _CLICKHOUSE_MCP_BIND_PORT: Port to bind the MCP server to when using HTTP or SSE transport (default: 8000) + _CLICKHOUSE_ENABLED: Enable ClickHouse server (default: true) """ tenant: str @@ -56,7 +56,7 @@ def __post_init__(self): self._validate_required_vars() def _getenv(self, key: str, default=None, cast=str): - prefixed_key = f"CH_{self.tenant}_{key}" + prefixed_key = f"{self.tenant}_{key}" val = os.getenv(prefixed_key, os.getenv(key, default)) if val is not None and cast is not str: try: @@ -198,7 +198,7 @@ def __post_init__(self): self._validate_required_vars() def _getenv(self, key: str, default=None, cast=str): - prefixed_key = f"CH_{self.tenant}_{key}" + prefixed_key = f"{self.tenant}_{key}" val = os.getenv(prefixed_key, os.getenv(key, default)) if val is not None and cast is not str: try: @@ -263,24 +263,26 @@ def get_mcp_config() -> dict: def load_clickhouse_configs() -> Dict[str, ClickHouseConfig]: global _CLICKHOUSE_TENANTS for key in os.environ: - if key.endswith("CLICKHOUSE_HOST") and key.startswith("CH_"): - # CH__CLICKHOUSE_HOST - tenant = key[len("CH_"): -len("_CLICKHOUSE_HOST")] + if key.endswith("CLICKHOUSE_HOST") and not key.startswith("CLICKHOUSE_HOST"): + # _CLICKHOUSE_HOST + tenant = key[: -len("_CLICKHOUSE_HOST")] _CLICKHOUSE_TENANTS[tenant] = ClickHouseConfig(tenant=tenant) - if not _CLICKHOUSE_TENANTS and "CLICKHOUSE_HOST" in os.environ: - _CLICKHOUSE_TENANTS["default"] = ClickHouseConfig(tenant="") + elif key.endswith("CLICKHOUSE_HOST") and key.startswith("CLICKHOUSE_HOST"): + # default tenant -> _CLICKHOUSE_HOST + _CLICKHOUSE_TENANTS["default"] = ClickHouseConfig(tenant="") return _CLICKHOUSE_TENANTS def load_chdb_configs() -> Dict[str, ChDBConfig]: global _CHDB_TENANTS for key in os.environ: - if key.endswith("CHDB_DATA_PATH") and key.startswith("CH_"): - # CH__CLICKHOUSE_HOST - tenant = key[len("CH_"): -len("_CHDB_DATA_PATH")] + if key.endswith("CHDB_DATA_PATH") and not key.startswith("CHDB_DATA_PATH"): + # _CHDB_DATA_PATH + tenant = key[: -len("_CHDB_DATA_PATH")] _CHDB_TENANTS[tenant] = ChDBConfig(tenant=tenant) - if not _CHDB_TENANTS and "CHDB_DATA_PATH" in os.environ: - _CHDB_TENANTS["default"] = ChDBConfig(tenant="") + elif key.endswith("CHDB_DATA_PATH") and key.startswith("CHDB_DATA_PATH"): + # default tenant -> _CHDB_DATA_PATH + _CHDB_TENANTS["default"] = ChDBConfig(tenant="") return _CHDB_TENANTS def get_config(tenant: str = "default") -> ClickHouseConfig: diff --git a/mcp_clickhouse/mcp_server.py b/mcp_clickhouse/mcp_server.py index 8396549..904681c 100644 --- a/mcp_clickhouse/mcp_server.py +++ b/mcp_clickhouse/mcp_server.py @@ -168,6 +168,16 @@ def chdb_tenant_available(tenant: str): return True return False +def list_chdb_tenants(): + """List available Clickhouse tenants""" + global CLICKHOUSE_TENANTS + return CLICKHOUSE_TENANTS + +def list_chdb_tenants(): + """List available chDB tenants""" + global CHDB_TENANTS + return CHDB_TENANTS + def list_databases(tenant: str): """List available ClickHouse databases""" if not clickhouse_tenant_available(tenant): @@ -430,6 +440,7 @@ def _init_chdb_client(tenant: str): if not CLICKHOUSE_TENANTS: logger.info("ClickHouse tools not registered") else: + mcp.add_tool(Tool.from_function(list_clickhouse_tenants)) mcp.add_tool(Tool.from_function(list_databases)) mcp.add_tool(Tool.from_function(list_tables)) mcp.add_tool(Tool.from_function(run_select_query)) @@ -443,6 +454,7 @@ def _init_chdb_client(tenant: str): if _chdb_client: atexit.register(lambda: _chdb_client.close()) + mcp.add_tool(Tool.from_function(list_chdb_tenants)) mcp.add_tool(Tool.from_function(run_chdb_select_query)) chdb_prompt = Prompt.from_function( chdb_initial_prompt, diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index cdfecd0..a9d45cf 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -119,6 +119,34 @@ async def test_run_select_query_wrong_tenant(mcp_server, setup_test_database): assert f"Query not performed for invalid tenant - '{tenant}'" in str(exc_info.value) +@pytest.mark.asyncio +async def test_list_clickhouse_tenants(mcp_server): + """Test the list_clickhouse_tenants tool.""" + async with Client(mcp_server) as client: + result = await client.call_tool("list_clickhouse_tenants") + + # The result should be a list containing at least one item + assert len(result) >= 1 + assert isinstance(result[0].text, str) + tenants = json.loads(result[0].text) + + assert "default" in tenants # default tenant is defined in .env (no prefix) + assert "example" in tenants # example tenant is defined in .env (example prefix) + +@pytest.mark.asyncio +async def test_list_chdb_tenants(mcp_server): + """Test the list_chdb_tenants tool.""" + async with Client(mcp_server) as client: + result = await client.call_tool("list_chdb_tenants") + + # The result should be a list containing at least one item + assert len(result) >= 1 + assert isinstance(result[0].text, str) + tenants = json.loads(result[0].text) + + assert "default" in tenants # default tenant is defined in .env (no prefix) + assert "example" in tenants # example tenant is defined in .env (example prefix) + @pytest.mark.asyncio async def test_list_databases(mcp_server, setup_test_database): """Test the list_databases tool.""" diff --git a/tests/test_tool.py b/tests/test_tool.py index eda4c42..b74e004 100644 --- a/tests/test_tool.py +++ b/tests/test_tool.py @@ -4,7 +4,7 @@ from dotenv import load_dotenv from fastmcp.exceptions import ToolError -from mcp_clickhouse import create_clickhouse_client, list_databases, list_tables, run_select_query +from mcp_clickhouse import create_clickhouse_client, list_clickhouse_tenants, list_chdb_tenants, list_databases, list_tables, run_select_query load_dotenv() @@ -40,6 +40,18 @@ def tearDownClass(cls): """Clean up the environment after tests.""" cls.client.command(f"DROP DATABASE IF EXISTS {cls.test_db}") + def test_list_clickhouse_tenants(self): + tenants = list_clickhouse_tenants() + self.assertIn("example", tenants) + self.assertIn("default", tenants) + self.assertEqual(len(tenants), 2) + + def test_list_chdb_tenants(self): + tenants = list_chdb_tenants() + self.assertIn("example", tenants) + self.assertIn("default", tenants) + self.assertEqual(len(tenants), 2) + def test_list_databases_wrong_tenant(self): """Test listing tables with wrong tenant.""" tenant = "wrong_tenant" From ef80c341395b069403036b489c2533d4cca110ae Mon Sep 17 00:00:00 2001 From: Benedict Date: Wed, 20 Aug 2025 00:31:50 +0800 Subject: [PATCH 17/17] adjusted to single default tenant in unit tests --- README.md | 132 +++++++++++++++++++++++++++++++++++ mcp_clickhouse/mcp_env.py | 20 ++++-- mcp_clickhouse/mcp_server.py | 12 ++-- tests/test_chdb_tool.py | 17 +++-- tests/test_mcp_server.py | 81 +++++++++++---------- tests/test_tool.py | 25 +++---- 6 files changed, 212 insertions(+), 75 deletions(-) diff --git a/README.md b/README.md index d83294f..6393a8f 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,9 @@ An MCP server for ClickHouse. ### ClickHouse Tools +* `list_clickhouse_tenants` + * List all clickhouse tenants. + * `run_select_query` * Execute SQL queries on your ClickHouse cluster. * Input: `sql` (string): The SQL query to execute. @@ -24,6 +27,9 @@ An MCP server for ClickHouse. ### chDB Tools +* `list_chdb_tenants` + * List all chdb tenants. + * `run_chdb_select_query` * Execute SQL queries using [chDB](https://github.com/chdb-io/chdb)'s embedded ClickHouse engine. * Input: `sql` (string): The SQL query to execute. @@ -168,6 +174,132 @@ You can also enable both ClickHouse and chDB simultaneously: } ``` +Multi-tenancy configuration is also supported. This is enabled by defining custom prefixes in front of the base environment variables. The below configuration creates two tenants: `cluster1` and `cluster2`. + +```json +{ + "mcpServers": { + "mcp-clickhouse": { + "command": "uv", + "args": [ + "run", + "--with", + "mcp-clickhouse", + "--python", + "3.10", + "mcp-clickhouse" + ], + "env": { + "cluster1_CLICKHOUSE_HOST": "", + "cluster1_CLICKHOUSE_PORT": "", + "cluster1_CLICKHOUSE_USER": "", + "cluster1_CLICKHOUSE_PASSWORD": "", + "cluster1_CLICKHOUSE_SECURE": "true", + "cluster1_CLICKHOUSE_VERIFY": "true", + "cluster1_CLICKHOUSE_CONNECT_TIMEOUT": "30", + "cluster1_CLICKHOUSE_SEND_RECEIVE_TIMEOUT": "30", + "cluster1_CHDB_ENABLED": "true", + "cluster1_CHDB_DATA_PATH": "/path/to/chdb/data", + "cluster2_CLICKHOUSE_HOST": "", + "cluster2_CLICKHOUSE_PORT": "", + "cluster2_CLICKHOUSE_USER": "", + "cluster2_CLICKHOUSE_PASSWORD": "", + "cluster2_CLICKHOUSE_SECURE": "true", + "cluster2_CLICKHOUSE_VERIFY": "true", + "cluster2_CLICKHOUSE_CONNECT_TIMEOUT": "30", + "cluster2_CLICKHOUSE_SEND_RECEIVE_TIMEOUT": "30", + "cluster2_CHDB_ENABLED": "true", + "cluster2_CHDB_DATA_PATH": "/path/to/chdb/data" + } + } + } +} +``` + +If no custom prefix is defined, a `default` tenant is automatically assigned based on the original environment variables. Defining custom tenants using the reserved `default` prefix is not allowed. The below example creates two tenants: `default` and `custom`. + +```json +{ + "mcpServers": { + "mcp-clickhouse": { + "command": "uv", + "args": [ + "run", + "--with", + "mcp-clickhouse", + "--python", + "3.10", + "mcp-clickhouse" + ], + "env": { + "CLICKHOUSE_HOST": "", + "CLICKHOUSE_PORT": "", + "CLICKHOUSE_USER": "", + "CLICKHOUSE_PASSWORD": "", + "CLICKHOUSE_SECURE": "true", + "CLICKHOUSE_VERIFY": "true", + "CLICKHOUSE_CONNECT_TIMEOUT": "30", + "CLICKHOUSE_SEND_RECEIVE_TIMEOUT": "30", + "CHDB_ENABLED": "true", + "CHDB_DATA_PATH": "/path/to/chdb/data", + "custom_CLICKHOUSE_HOST": "", + "custom_CLICKHOUSE_PORT": "", + "custom_CLICKHOUSE_USER": "", + "custom_CLICKHOUSE_PASSWORD": "", + "custom_CLICKHOUSE_SECURE": "true", + "custom_CLICKHOUSE_VERIFY": "true", + "custom_CLICKHOUSE_CONNECT_TIMEOUT": "30", + "custom_CLICKHOUSE_SEND_RECEIVE_TIMEOUT": "30", + "custom_CHDB_ENABLED": "true", + "custom_CHDB_DATA_PATH": "/path/to/chdb/data" + } + } + } +} +``` + +The below example will throw an error as `default` prefix is used. + +```json +{ + "mcpServers": { + "mcp-clickhouse": { + "command": "uv", + "args": [ + "run", + "--with", + "mcp-clickhouse", + "--python", + "3.10", + "mcp-clickhouse" + ], + "env": { + "CLICKHOUSE_HOST": "", + "CLICKHOUSE_PORT": "", + "CLICKHOUSE_USER": "", + "CLICKHOUSE_PASSWORD": "", + "CLICKHOUSE_SECURE": "true", + "CLICKHOUSE_VERIFY": "true", + "CLICKHOUSE_CONNECT_TIMEOUT": "30", + "CLICKHOUSE_SEND_RECEIVE_TIMEOUT": "30", + "CHDB_ENABLED": "true", + "CHDB_DATA_PATH": "/path/to/chdb/data", + "default_CLICKHOUSE_HOST": "", + "default_CLICKHOUSE_PORT": "", + "default_CLICKHOUSE_USER": "", + "default_CLICKHOUSE_PASSWORD": "", + "default_CLICKHOUSE_SECURE": "true", + "default_CLICKHOUSE_VERIFY": "true", + "default_CLICKHOUSE_CONNECT_TIMEOUT": "30", + "default_CLICKHOUSE_SEND_RECEIVE_TIMEOUT": "30", + "default_CHDB_ENABLED": "true", + "default_CHDB_DATA_PATH": "/path/to/chdb/data" + } + } + } +} +``` + 3. Locate the command entry for `uv` and replace it with the absolute path to the `uv` executable. This ensures that the correct version of `uv` is used when starting the server. On a mac, you can find this path using `which uv`. 4. Restart Claude Desktop to apply the changes. diff --git a/mcp_clickhouse/mcp_env.py b/mcp_clickhouse/mcp_env.py index 204d855..176c3fd 100644 --- a/mcp_clickhouse/mcp_env.py +++ b/mcp_clickhouse/mcp_env.py @@ -57,6 +57,9 @@ def __post_init__(self): def _getenv(self, key: str, default=None, cast=str): prefixed_key = f"{self.tenant}_{key}" + if self.tenant == "": + prefixed_key = key # default + val = os.getenv(prefixed_key, os.getenv(key, default)) if val is not None and cast is not str: try: @@ -154,7 +157,7 @@ def get_client_config(self) -> dict: "verify": self.verify, "connect_timeout": self.connect_timeout, "send_receive_timeout": self.send_receive_timeout, - "client_name": f"{self.tenant}mcp_clickhouse", + "client_name": f"mcp_clickhouse_{self.tenant if self.tenant else 'default'}", } # Add optional database if set @@ -199,6 +202,9 @@ def __post_init__(self): def _getenv(self, key: str, default=None, cast=str): prefixed_key = f"{self.tenant}_{key}" + if self.tenant == "": + prefixed_key = key # default + val = os.getenv(prefixed_key, os.getenv(key, default)) if val is not None and cast is not str: try: @@ -266,9 +272,11 @@ def load_clickhouse_configs() -> Dict[str, ClickHouseConfig]: if key.endswith("CLICKHOUSE_HOST") and not key.startswith("CLICKHOUSE_HOST"): # _CLICKHOUSE_HOST tenant = key[: -len("_CLICKHOUSE_HOST")] + if tenant == "default": + raise ValueError("default is a reserved tenant") _CLICKHOUSE_TENANTS[tenant] = ClickHouseConfig(tenant=tenant) elif key.endswith("CLICKHOUSE_HOST") and key.startswith("CLICKHOUSE_HOST"): - # default tenant -> _CLICKHOUSE_HOST + # default tenant -> CLICKHOUSE_HOST _CLICKHOUSE_TENANTS["default"] = ClickHouseConfig(tenant="") return _CLICKHOUSE_TENANTS @@ -279,9 +287,11 @@ def load_chdb_configs() -> Dict[str, ChDBConfig]: if key.endswith("CHDB_DATA_PATH") and not key.startswith("CHDB_DATA_PATH"): # _CHDB_DATA_PATH tenant = key[: -len("_CHDB_DATA_PATH")] + if tenant == "default": + raise ValueError("default is a reserved tenant") _CHDB_TENANTS[tenant] = ChDBConfig(tenant=tenant) elif key.endswith("CHDB_DATA_PATH") and key.startswith("CHDB_DATA_PATH"): - # default tenant -> _CHDB_DATA_PATH + # default tenant -> CHDB_DATA_PATH _CHDB_TENANTS["default"] = ChDBConfig(tenant="") return _CHDB_TENANTS @@ -304,12 +314,12 @@ def get_chdb_config(tenant: str = "default") -> ChDBConfig: raise ValueError(f"No ChDB config found for tenant '{tenant}'") return _CHDB_TENANTS[tenant] -def list_clickhouse_tenants() -> List[str]: +def get_clickhouse_tenants() -> List[str]: """Get list of all clickhouse tenant names.""" global _CLICKHOUSE_TENANTS return [tenant for tenant in _CLICKHOUSE_TENANTS.keys()] -def list_chdb_tenants() -> List[str]: +def get_chdb_tenants() -> List[str]: """Get list of all chdb tenant names.""" global _CHDB_TENANTS return [tenant for tenant in _CHDB_TENANTS.keys()] \ No newline at end of file diff --git a/mcp_clickhouse/mcp_server.py b/mcp_clickhouse/mcp_server.py index 904681c..702a89e 100644 --- a/mcp_clickhouse/mcp_server.py +++ b/mcp_clickhouse/mcp_server.py @@ -16,7 +16,7 @@ from starlette.requests import Request from starlette.responses import PlainTextResponse -from mcp_clickhouse.mcp_env import load_clickhouse_configs, load_chdb_configs, list_clickhouse_tenants, list_chdb_tenants, get_config, get_chdb_config +from mcp_clickhouse.mcp_env import load_clickhouse_configs, load_chdb_configs, get_clickhouse_tenants, get_chdb_tenants, get_config, get_chdb_config from mcp_clickhouse.chdb_prompt import CHDB_PROMPT @@ -66,8 +66,8 @@ class Table: load_chdb_configs() # List of Tenants -CLICKHOUSE_TENANTS = list_clickhouse_tenants() -CHDB_TENANTS = list_chdb_tenants() +CLICKHOUSE_TENANTS = get_clickhouse_tenants() +CHDB_TENANTS = get_chdb_tenants() # Create ThreadPoolExecutors for each tenant CLICKHOUSE_QUERY_EXECUTOR = { @@ -168,15 +168,15 @@ def chdb_tenant_available(tenant: str): return True return False -def list_chdb_tenants(): +def list_clickhouse_tenants(): """List available Clickhouse tenants""" global CLICKHOUSE_TENANTS - return CLICKHOUSE_TENANTS + return json.dumps(CLICKHOUSE_TENANTS) def list_chdb_tenants(): """List available chDB tenants""" global CHDB_TENANTS - return CHDB_TENANTS + return json.dumps(CHDB_TENANTS) def list_databases(tenant: str): """List available ClickHouse databases""" diff --git a/tests/test_chdb_tool.py b/tests/test_chdb_tool.py index 36dc790..2947fa2 100644 --- a/tests/test_chdb_tool.py +++ b/tests/test_chdb_tool.py @@ -2,7 +2,7 @@ from dotenv import load_dotenv from fastmcp.exceptions import ToolError -from mcp_clickhouse import create_chdb_client, run_chdb_select_query +from mcp_clickhouse import list_chdb_tenants, create_chdb_client, run_chdb_select_query load_dotenv() @@ -10,7 +10,12 @@ class TestChDBTools(unittest.TestCase): @classmethod def setUpClass(cls): """Set up the environment before chDB tests.""" - cls.client = create_chdb_client(tenant="example") + cls.client = create_chdb_client(tenant="default") + + def test_list_chdb_tenants(self): + tenants = list_chdb_tenants() + self.assertIn("default", tenants) + self.assertEqual(len(tenants), 1) def test_run_chdb_select_query_wrong_tenant(self): """Test running a simple SELECT query in chDB with wrong tenant.""" @@ -26,7 +31,7 @@ def test_run_chdb_select_query_wrong_tenant(self): def test_run_chdb_select_query_simple(self): """Test running a simple SELECT query in chDB.""" - tenant = "example" + tenant = "default" query = "SELECT 1 as test_value" result = run_chdb_select_query(tenant, query) self.assertIsInstance(result, list) @@ -34,7 +39,7 @@ def test_run_chdb_select_query_simple(self): def test_run_chdb_select_query_with_url_table_function(self): """Test running a SELECT query with url table function in chDB.""" - tenant = "example" + tenant = "default" query = "SELECT COUNT(1) FROM url('https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_0.parquet', 'Parquet')" result = run_chdb_select_query(tenant, query) print(result) @@ -43,7 +48,7 @@ def test_run_chdb_select_query_with_url_table_function(self): def test_run_chdb_select_query_failure(self): """Test running a SELECT query with an error in chDB.""" - tenant = "example" + tenant = "default" query = "SELECT * FROM non_existent_table_chDB" result = run_chdb_select_query(tenant, query) print(result) @@ -53,7 +58,7 @@ def test_run_chdb_select_query_failure(self): def test_run_chdb_select_query_empty_result(self): """Test running a SELECT query that returns empty result in chDB.""" - tenant = "example" + tenant = "default" query = "SELECT 1 WHERE 1 = 0" result = run_chdb_select_query(tenant, query) print(result) diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index a9d45cf..cc8966f 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -20,10 +20,8 @@ def event_loop(): @pytest_asyncio.fixture(scope="module") async def setup_test_database(): """Set up test database and tables before running tests.""" - # Test tenant - test_tenant = "example" - client = create_clickhouse_client(test_tenant) + client = create_clickhouse_client(tenant= "default") # Test database and table names test_db = "test_mcp_db" @@ -76,7 +74,7 @@ async def setup_test_database(): (1003, 'login', '2024-01-01 12:00:00') """) - yield test_tenant, test_db, test_table, test_table2 + yield test_db, test_table, test_table2 # Cleanup after tests client.command(f"DROP DATABASE IF EXISTS {test_db}") @@ -101,7 +99,7 @@ async def test_list_databases_wrong_tenant(mcp_server): @pytest.mark.asyncio async def test_list_tables_wrong_tenant(mcp_server, setup_test_database): """Test the list_tables tool with wrong tenant.""" - _, test_db, test_table, _ = setup_test_database + test_db, test_table, _ = setup_test_database tenant = "wrong_tenant" async with Client(mcp_server) as client: with pytest.raises(ToolError) as exc_info: @@ -111,7 +109,7 @@ async def test_list_tables_wrong_tenant(mcp_server, setup_test_database): @pytest.mark.asyncio async def test_run_select_query_wrong_tenant(mcp_server, setup_test_database): """Test the run_select_query tool with wrong tenant.""" - _, test_db, test_table, _ = setup_test_database + test_db, test_table, _ = setup_test_database tenant = "wrong_tenant" async with Client(mcp_server) as client: with pytest.raises(ToolError) as exc_info: @@ -123,37 +121,36 @@ async def test_run_select_query_wrong_tenant(mcp_server, setup_test_database): async def test_list_clickhouse_tenants(mcp_server): """Test the list_clickhouse_tenants tool.""" async with Client(mcp_server) as client: - result = await client.call_tool("list_clickhouse_tenants") - - # The result should be a list containing at least one item - assert len(result) >= 1 + result = await client.call_tool("list_clickhouse_tenants", {}) + # The result should be a list containing one item + assert len(result) == 1 assert isinstance(result[0].text, str) - tenants = json.loads(result[0].text) + # Parse the result text (it's a JSON list of tenant names) + tenants = json.loads(result[0].text) + assert "default" in tenants # default tenant is defined in .env (no prefix) - assert "example" in tenants # example tenant is defined in .env (example prefix) @pytest.mark.asyncio async def test_list_chdb_tenants(mcp_server): """Test the list_chdb_tenants tool.""" async with Client(mcp_server) as client: - result = await client.call_tool("list_chdb_tenants") - - # The result should be a list containing at least one item - assert len(result) >= 1 + result = await client.call_tool("list_chdb_tenants", {}) + # The result should be a list containing one item + assert len(result) == 1 assert isinstance(result[0].text, str) + # Parse the result text (it's a JSON list of tenant names) tenants = json.loads(result[0].text) assert "default" in tenants # default tenant is defined in .env (no prefix) - assert "example" in tenants # example tenant is defined in .env (example prefix) @pytest.mark.asyncio async def test_list_databases(mcp_server, setup_test_database): """Test the list_databases tool.""" - test_tenant, test_db, _, _ = setup_test_database + test_db, _, _ = setup_test_database async with Client(mcp_server) as client: - result = await client.call_tool("list_databases", {"tenant": test_tenant}) + result = await client.call_tool("list_databases", {"tenant": "default"}) # The result should be a list containing at least one item assert len(result) >= 1 @@ -168,10 +165,10 @@ async def test_list_databases(mcp_server, setup_test_database): @pytest.mark.asyncio async def test_list_tables_basic(mcp_server, setup_test_database): """Test the list_tables tool without filters.""" - test_tenant, test_db, test_table, test_table2 = setup_test_database + test_db, test_table, test_table2 = setup_test_database async with Client(mcp_server) as client: - result = await client.call_tool("list_tables", {"tenant": test_tenant, "database": test_db}) + result = await client.call_tool("list_tables", {"tenant": "default", "database": test_db}) assert len(result) >= 1 tables = json.loads(result[0].text) @@ -203,11 +200,11 @@ async def test_list_tables_basic(mcp_server, setup_test_database): @pytest.mark.asyncio async def test_list_tables_with_like_filter(mcp_server, setup_test_database): """Test the list_tables tool with LIKE filter.""" - test_tenant, test_db, test_table, _ = setup_test_database + test_db, test_table, _ = setup_test_database async with Client(mcp_server) as client: # Test with LIKE filter - result = await client.call_tool("list_tables", {"tenant": test_tenant, "database": test_db, "like": "test_%"}) + result = await client.call_tool("list_tables", {"tenant": "default", "database": test_db, "like": "test_%"}) tables_data = json.loads(result[0].text) @@ -224,11 +221,11 @@ async def test_list_tables_with_like_filter(mcp_server, setup_test_database): @pytest.mark.asyncio async def test_list_tables_with_not_like_filter(mcp_server, setup_test_database): """Test the list_tables tool with NOT LIKE filter.""" - test_tenant, test_db, _, test_table2 = setup_test_database + test_db, _, test_table2 = setup_test_database async with Client(mcp_server) as client: # Test with NOT LIKE filter - result = await client.call_tool("list_tables", {"tenant": test_tenant, "database": test_db, "not_like": "test_%"}) + result = await client.call_tool("list_tables", {"tenant": "default", "database": test_db, "not_like": "test_%"}) tables_data = json.loads(result[0].text) @@ -245,11 +242,11 @@ async def test_list_tables_with_not_like_filter(mcp_server, setup_test_database) @pytest.mark.asyncio async def test_run_select_query_success(mcp_server, setup_test_database): """Test running a successful SELECT query.""" - test_tenant, test_db, test_table, _ = setup_test_database + test_db, test_table, _ = setup_test_database async with Client(mcp_server) as client: query = f"SELECT id, name, age FROM {test_db}.{test_table} ORDER BY id" - result = await client.call_tool("run_select_query", {"tenant": test_tenant, "query": query}) + result = await client.call_tool("run_select_query", {"tenant": "default", "query": query}) query_result = json.loads(result[0].text) @@ -271,11 +268,11 @@ async def test_run_select_query_success(mcp_server, setup_test_database): @pytest.mark.asyncio async def test_run_select_query_with_aggregation(mcp_server, setup_test_database): """Test running a SELECT query with aggregation.""" - test_tenant, test_db, test_table, _ = setup_test_database + test_db, test_table, _ = setup_test_database async with Client(mcp_server) as client: query = f"SELECT COUNT(*) as count, AVG(age) as avg_age FROM {test_db}.{test_table}" - result = await client.call_tool("run_select_query", {"tenant": test_tenant, "query": query}) + result = await client.call_tool("run_select_query", {"tenant": "default", "query": query}) query_result = json.loads(result[0].text) @@ -288,11 +285,11 @@ async def test_run_select_query_with_aggregation(mcp_server, setup_test_database @pytest.mark.asyncio async def test_run_select_query_with_join(mcp_server, setup_test_database): """Test running a SELECT query with JOIN.""" - test_tenant, test_db, test_table, test_table2 = setup_test_database + test_db, test_table, test_table2 = setup_test_database async with Client(mcp_server) as client: # Insert related data for join - client_direct = create_clickhouse_client(test_tenant) + client_direct = create_clickhouse_client("default") client_direct.command(f""" INSERT INTO {test_db}.{test_table2} (event_id, event_type, timestamp) VALUES (2001, 'purchase', '2024-01-01 14:00:00') @@ -303,7 +300,7 @@ async def test_run_select_query_with_join(mcp_server, setup_test_database): COUNT(DISTINCT event_type) as event_types_count FROM {test_db}.{test_table2} """ - result = await client.call_tool("run_select_query", {"tenant": test_tenant, "query": query}) + result = await client.call_tool("run_select_query", {"tenant": "default", "query": query}) query_result = json.loads(result[0].text) assert query_result["rows"][0][0] == 3 # login, logout, purchase @@ -312,7 +309,7 @@ async def test_run_select_query_with_join(mcp_server, setup_test_database): @pytest.mark.asyncio async def test_run_select_query_error(mcp_server, setup_test_database): """Test running a SELECT query that results in an error.""" - test_tenant, test_db, _, _ = setup_test_database + test_db, _, _ = setup_test_database async with Client(mcp_server) as client: # Query non-existent table @@ -320,14 +317,14 @@ async def test_run_select_query_error(mcp_server, setup_test_database): # Should raise ToolError with pytest.raises(ToolError) as exc_info: - await client.call_tool("run_select_query", {"tenant": test_tenant, "query": query}) + await client.call_tool("run_select_query", {"tenant": "default", "query": query}) assert "Query execution failed" in str(exc_info.value) @pytest.mark.asyncio async def test_run_select_query_syntax_error(mcp_server, setup_test_database): - test_tenant, _, _, _ = setup_test_database + _, _, _ = setup_test_database """Test running a SELECT query with syntax error.""" async with Client(mcp_server) as client: @@ -336,7 +333,7 @@ async def test_run_select_query_syntax_error(mcp_server, setup_test_database): # Should raise ToolError with pytest.raises(ToolError) as exc_info: - await client.call_tool("run_select_query", {"tenant": test_tenant, "query": query}) + await client.call_tool("run_select_query", {"tenant": "default", "query": query}) assert "Query execution failed" in str(exc_info.value) @@ -344,10 +341,10 @@ async def test_run_select_query_syntax_error(mcp_server, setup_test_database): @pytest.mark.asyncio async def test_table_metadata_details(mcp_server, setup_test_database): """Test that table metadata is correctly retrieved.""" - test_tenant, test_db, test_table, _ = setup_test_database + test_db, test_table, _ = setup_test_database async with Client(mcp_server) as client: - result = await client.call_tool("list_tables", {"tenant": test_tenant, "database": test_db}) + result = await client.call_tool("list_tables", {"tenant": "default", "database": test_db}) tables = json.loads(result[0].text) # Find our test table @@ -383,10 +380,10 @@ async def test_table_metadata_details(mcp_server, setup_test_database): @pytest.mark.asyncio async def test_system_database_access(mcp_server, setup_test_database): """Test that we can access system databases.""" - test_tenant, _, _, _ = setup_test_database + _, _, _ = setup_test_database async with Client(mcp_server) as client: # List tables in system database - result = await client.call_tool("list_tables", {"tenant": test_tenant, "database": "system"}) + result = await client.call_tool("list_tables", {"tenant": "default", "database": "system"}) tables = json.loads(result[0].text) # System database should have many tables @@ -402,7 +399,7 @@ async def test_system_database_access(mcp_server, setup_test_database): @pytest.mark.asyncio async def test_concurrent_queries(mcp_server, setup_test_database): """Test running multiple queries concurrently.""" - test_tenant, test_db, test_table, test_table2 = setup_test_database + test_db, test_table, test_table2 = setup_test_database async with Client(mcp_server) as client: # Run multiple queries concurrently @@ -415,7 +412,7 @@ async def test_concurrent_queries(mcp_server, setup_test_database): # Execute all queries concurrently results = await asyncio.gather( - *[client.call_tool("run_select_query", {"tenant": test_tenant, "query": query}) for query in queries] + *[client.call_tool("run_select_query", {"tenant": "default", "query": query}) for query in queries] ) # Verify all queries succeeded diff --git a/tests/test_tool.py b/tests/test_tool.py index b74e004..fb4b117 100644 --- a/tests/test_tool.py +++ b/tests/test_tool.py @@ -4,7 +4,7 @@ from dotenv import load_dotenv from fastmcp.exceptions import ToolError -from mcp_clickhouse import create_clickhouse_client, list_clickhouse_tenants, list_chdb_tenants, list_databases, list_tables, run_select_query +from mcp_clickhouse import create_clickhouse_client, list_clickhouse_tenants, list_databases, list_tables, run_select_query load_dotenv() @@ -12,7 +12,7 @@ class TestClickhouseTools(unittest.TestCase): @classmethod def setUpClass(cls): """Set up the environment before tests.""" - cls.client = create_clickhouse_client("example") + cls.client = create_clickhouse_client(tenant="default") # Prepare test database and table cls.test_db = "test_tool_db" @@ -42,15 +42,8 @@ def tearDownClass(cls): def test_list_clickhouse_tenants(self): tenants = list_clickhouse_tenants() - self.assertIn("example", tenants) self.assertIn("default", tenants) - self.assertEqual(len(tenants), 2) - - def test_list_chdb_tenants(self): - tenants = list_chdb_tenants() - self.assertIn("example", tenants) - self.assertIn("default", tenants) - self.assertEqual(len(tenants), 2) + self.assertEqual(len(tenants), 1) def test_list_databases_wrong_tenant(self): """Test listing tables with wrong tenant.""" @@ -88,21 +81,21 @@ def test_run_select_query_wrong_tenant(self): def test_list_databases(self): """Test listing databases.""" - result = list_databases("example") + result = list_databases("default") # Parse JSON response databases = json.loads(result) self.assertIn(self.test_db, databases) def test_list_tables_without_like(self): """Test listing tables without a 'LIKE' filter.""" - result = list_tables("example", self.test_db) + result = list_tables("default", self.test_db) self.assertIsInstance(result, list) self.assertEqual(len(result), 1) self.assertEqual(result[0]["name"], self.test_table) def test_list_tables_with_like(self): """Test listing tables with a 'LIKE' filter.""" - result = list_tables("example", self.test_db, like=f"{self.test_table}%") + result = list_tables("default", self.test_db, like=f"{self.test_table}%") self.assertIsInstance(result, list) self.assertEqual(len(result), 1) self.assertEqual(result[0]["name"], self.test_table) @@ -110,7 +103,7 @@ def test_list_tables_with_like(self): def test_run_select_query_success(self): """Test running a SELECT query successfully.""" query = f"SELECT * FROM {self.test_db}.{self.test_table}" - result = run_select_query("example", query) + result = run_select_query("default", query) self.assertIsInstance(result, dict) self.assertEqual(len(result["rows"]), 2) self.assertEqual(result["rows"][0][0], 1) @@ -122,13 +115,13 @@ def test_run_select_query_failure(self): # Should raise ToolError with self.assertRaises(ToolError) as context: - run_select_query("example", query) + run_select_query("default", query) self.assertIn("Query execution failed", str(context.exception)) def test_table_and_column_comments(self): """Test that table and column comments are correctly retrieved.""" - result = list_tables("example", self.test_db) + result = list_tables("default", self.test_db) self.assertIsInstance(result, list) self.assertEqual(len(result), 1)