diff --git a/README.md b/README.md index d83294f..ddf1b27 100644 --- a/README.md +++ b/README.md @@ -314,6 +314,9 @@ The following environment variables are used to configure the ClickHouse and chD * `CLICKHOUSE_MCP_BIND_PORT`: Port to bind the MCP server to when using HTTP or SSE transport * Default: `"8000"` * Only used when transport is `"http"` or `"sse"` +* `CLICKHOUSE_MCP_QUERY_TIMEOUT`: Timeout in seconds for SELECT tools + * Default: `"30"` + * Increase this if you see `Query timed out after ...` errors for heavy queries * `CLICKHOUSE_ENABLED`: Enable/disable ClickHouse functionality * Default: `"true"` * Set to `"false"` to disable ClickHouse tools when using chDB only diff --git a/mcp_clickhouse/main.py b/mcp_clickhouse/main.py index 97599a4..db3877b 100644 --- a/mcp_clickhouse/main.py +++ b/mcp_clickhouse/main.py @@ -1,17 +1,17 @@ from .mcp_server import mcp -from .mcp_env import get_config, TransportType +from .mcp_env import get_mcp_config, TransportType def main(): - config = get_config() - transport = config.mcp_server_transport + mcp_config = get_mcp_config() + transport = mcp_config.server_transport # For HTTP and SSE transports, we need to specify host and port http_transports = [TransportType.HTTP.value, TransportType.SSE.value] if transport in http_transports: # Use the configured bind host (defaults to 127.0.0.1, can be set to 0.0.0.0) # and bind port (defaults to 8000) - mcp.run(transport=transport, host=config.mcp_bind_host, port=config.mcp_bind_port) + mcp.run(transport=transport, host=mcp_config.bind_host, port=mcp_config.bind_port) else: # For stdio transport, no host or port is needed mcp.run(transport=transport) diff --git a/mcp_clickhouse/mcp_env.py b/mcp_clickhouse/mcp_env.py index aec6826..64e0e8a 100644 --- a/mcp_clickhouse/mcp_env.py +++ b/mcp_clickhouse/mcp_env.py @@ -43,9 +43,6 @@ class ClickHouseConfig: CLICKHOUSE_SEND_RECEIVE_TIMEOUT: Send/receive timeout in seconds (default: 300) CLICKHOUSE_DATABASE: Default database to use (default: None) CLICKHOUSE_PROXY_PATH: Path to be added to the host URL. For instance, for servers behind an HTTP proxy (default: None) - CLICKHOUSE_MCP_SERVER_TRANSPORT: MCP server transport method - "stdio", "http", or "sse" (default: stdio) - CLICKHOUSE_MCP_BIND_HOST: Host to bind the MCP server to when using HTTP or SSE transport (default: 127.0.0.1) - CLICKHOUSE_MCP_BIND_PORT: Port to bind the MCP server to when using HTTP or SSE transport (default: 8000) CLICKHOUSE_ENABLED: Enable ClickHouse server (default: true) """ @@ -129,39 +126,6 @@ def send_receive_timeout(self) -> int: def proxy_path(self) -> str: return os.getenv("CLICKHOUSE_PROXY_PATH") - @property - def mcp_server_transport(self) -> str: - """Get the MCP server transport method. - - Valid options: "stdio", "http", "sse" - Default: "stdio" - """ - transport = os.getenv("CLICKHOUSE_MCP_SERVER_TRANSPORT", TransportType.STDIO.value).lower() - - # Validate transport type - if transport not in TransportType.values(): - valid_options = ", ".join(f'"{t}"' for t in TransportType.values()) - raise ValueError(f"Invalid transport '{transport}'. Valid options: {valid_options}") - return transport - - @property - def mcp_bind_host(self) -> str: - """Get the host to bind the MCP server to. - - Only used when transport is "http" or "sse". - Default: "127.0.0.1" - """ - return os.getenv("CLICKHOUSE_MCP_BIND_HOST", "127.0.0.1") - - @property - def mcp_bind_port(self) -> int: - """Get the port to bind the MCP server to. - - Only used when transport is "http" or "sse". - Default: 8000 - """ - return int(os.getenv("CLICKHOUSE_MCP_BIND_PORT", "8000")) - def get_client_config(self) -> dict: """Get the configuration dictionary for clickhouse_connect client. @@ -282,3 +246,49 @@ def get_chdb_config() -> ChDBConfig: if _CHDB_CONFIG_INSTANCE is None: _CHDB_CONFIG_INSTANCE = ChDBConfig() return _CHDB_CONFIG_INSTANCE + + +@dataclass +class MCPServerConfig: + """Configuration for MCP server-level settings. + + These settings control the server transport and tool behavior and are + intentionally independent of ClickHouse connection validation. + + Optional environment variables (with defaults): + CLICKHOUSE_MCP_SERVER_TRANSPORT: "stdio", "http", or "sse" (default: stdio) + CLICKHOUSE_MCP_BIND_HOST: Bind host for HTTP/SSE (default: 127.0.0.1) + CLICKHOUSE_MCP_BIND_PORT: Bind port for HTTP/SSE (default: 8000) + CLICKHOUSE_MCP_QUERY_TIMEOUT: SELECT tool timeout in seconds (default: 30) + """ + + @property + def server_transport(self) -> str: + transport = os.getenv("CLICKHOUSE_MCP_SERVER_TRANSPORT", TransportType.STDIO.value).lower() + if transport not in TransportType.values(): + valid_options = ", ".join(f'"{t}"' for t in TransportType.values()) + raise ValueError(f"Invalid transport '{transport}'. Valid options: {valid_options}") + return transport + + @property + def bind_host(self) -> str: + return os.getenv("CLICKHOUSE_MCP_BIND_HOST", "127.0.0.1") + + @property + def bind_port(self) -> int: + return int(os.getenv("CLICKHOUSE_MCP_BIND_PORT", "8000")) + + @property + def query_timeout(self) -> int: + return int(os.getenv("CLICKHOUSE_MCP_QUERY_TIMEOUT", "30")) + + +_MCP_CONFIG_INSTANCE = None + + +def get_mcp_config() -> MCPServerConfig: + """Gets the singleton instance of MCPServerConfig.""" + global _MCP_CONFIG_INSTANCE + if _MCP_CONFIG_INSTANCE is None: + _MCP_CONFIG_INSTANCE = MCPServerConfig() + return _MCP_CONFIG_INSTANCE diff --git a/mcp_clickhouse/mcp_server.py b/mcp_clickhouse/mcp_server.py index a3fa081..01639b2 100644 --- a/mcp_clickhouse/mcp_server.py +++ b/mcp_clickhouse/mcp_server.py @@ -17,7 +17,7 @@ from starlette.requests import Request from starlette.responses import PlainTextResponse -from mcp_clickhouse.mcp_env import get_config, get_chdb_config +from mcp_clickhouse.mcp_env import get_config, get_chdb_config, get_mcp_config from mcp_clickhouse.chdb_prompt import CHDB_PROMPT @@ -63,7 +63,6 @@ class Table: QUERY_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=10) atexit.register(lambda: QUERY_EXECUTOR.shutdown(wait=True)) -SELECT_QUERY_TIMEOUT_SECS = 30 load_dotenv() @@ -186,7 +185,8 @@ def run_select_query(query: str): try: future = QUERY_EXECUTOR.submit(execute_query, query) try: - result = future.result(timeout=SELECT_QUERY_TIMEOUT_SECS) + timeout_secs = get_mcp_config().query_timeout + result = future.result(timeout=timeout_secs) # Check if we received an error structure from execute_query if isinstance(result, dict) and "error" in result: logger.warning(f"Query failed: {result['error']}") @@ -198,9 +198,9 @@ def run_select_query(query: str): } return result except concurrent.futures.TimeoutError: - logger.warning(f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds: {query}") + logger.warning(f"Query timed out after {timeout_secs} seconds: {query}") future.cancel() - raise ToolError(f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds") + raise ToolError(f"Query timed out after {timeout_secs} seconds") except ToolError: raise except Exception as e: @@ -295,7 +295,8 @@ def run_chdb_select_query(query: str): try: future = QUERY_EXECUTOR.submit(execute_chdb_query, query) try: - result = future.result(timeout=SELECT_QUERY_TIMEOUT_SECS) + timeout_secs = get_mcp_config().query_timeout + result = future.result(timeout=timeout_secs) # Check if we received an error structure from execute_chdb_query if isinstance(result, dict) and "error" in result: logger.warning(f"chDB query failed: {result['error']}") @@ -306,12 +307,12 @@ def run_chdb_select_query(query: str): return result except concurrent.futures.TimeoutError: logger.warning( - f"chDB query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds: {query}" + f"chDB query timed out after {timeout_secs} seconds: {query}" ) future.cancel() return { "status": "error", - "message": f"chDB query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds", + "message": f"chDB query timed out after {timeout_secs} seconds", } except Exception as e: logger.error(f"Unexpected error in run_chdb_select_query: {e}")