diff --git a/cognee-mcp/Dockerfile b/cognee-mcp/Dockerfile index 68dce18440..6608102c8c 100644 --- a/cognee-mcp/Dockerfile +++ b/cognee-mcp/Dockerfile @@ -65,6 +65,9 @@ ENV PYTHONUNBUFFERED=1 ENV MCP_LOG_LEVEL=DEBUG ENV PYTHONPATH=/app +# Add labels for API mode usage +LABEL org.opencontainers.image.description="Cognee MCP Server with API mode support" + # Use the application name from pyproject.toml for normal operation # For testing, we'll override this with a direct command ENTRYPOINT ["/app/entrypoint.sh"] diff --git a/cognee-mcp/README.md b/cognee-mcp/README.md index aa64429934..d14bc9fa12 100644 --- a/cognee-mcp/README.md +++ b/cognee-mcp/README.md @@ -38,7 +38,8 @@ Build memory for Agents and query from any client that speaks MCP – in your t ## ✨ Features - Multiple transports – choose Streamable HTTP --transport http (recommended for web deployments), SSE --transport sse (real‑time streaming), or stdio (classic pipe, default) -- Integrated logging – all actions written to a rotating file (see get_log_file_location()) and mirrored to console in dev +- **API Mode** – connect to an already running Cognee FastAPI server instead of using cognee directly (see [API Mode](#-api-mode) below) +- Integrated logging – all actions written to a rotating file (see get_log_file_location()) and mirrored to console in dev - Local file ingestion – feed .md, source files, Cursor rule‑sets, etc. straight from disk - Background pipelines – long‑running cognify & codify jobs spawn off‑thread; check progress with status tools - Developer rules bootstrap – one call indexes .cursorrules, .cursor/rules, AGENT.md, and friends into the developer_rules nodeset @@ -91,7 +92,7 @@ To use different LLM providers / database configurations, and for more info chec ## 🐳 Docker Usage -If you’d rather run cognee-mcp in a container, you have two options: +If you'd rather run cognee-mcp in a container, you have two options: 1. **Build locally** 1. Make sure you are in /cognee root directory and have a fresh `.env` containing only your `LLM_API_KEY` (and your chosen settings). @@ -128,6 +129,64 @@ If you’d rather run cognee-mcp in a container, you have two options: - ✅ Direct: `python src/server.py --transport http` - ❌ Direct: `-e TRANSPORT_MODE=http` (won't work) +### **Docker API Mode** + +To connect the MCP Docker container to a Cognee API server running on your host machine: + +#### **Simple Usage (Automatic localhost handling):** +```bash +# Start your Cognee API server on the host +python -m cognee.api.client + +# Run MCP container in API mode - localhost is automatically converted! +docker run \ + -e TRANSPORT_MODE=sse \ + -e API_URL=http://localhost:8000 \ + -e API_TOKEN=your_auth_token \ + -p 8001:8000 \ + --rm -it cognee/cognee-mcp:main +``` +**Note:** The container will automatically convert `localhost` to `host.docker.internal` on Mac/Windows/Docker Desktop. You'll see a message in the logs showing the conversion. + +#### **Explicit host.docker.internal (Mac/Windows):** +```bash +# Or explicitly use host.docker.internal +docker run \ + -e TRANSPORT_MODE=sse \ + -e API_URL=http://host.docker.internal:8000 \ + -e API_TOKEN=your_auth_token \ + -p 8001:8000 \ + --rm -it cognee/cognee-mcp:main +``` + +#### **On Linux (use host network or container IP):** +```bash +# Option 1: Use host network (simplest) +docker run \ + --network host \ + -e TRANSPORT_MODE=sse \ + -e API_URL=http://localhost:8000 \ + -e API_TOKEN=your_auth_token \ + --rm -it cognee/cognee-mcp:main + +# Option 2: Use host IP address +# First, get your host IP: ip addr show docker0 +docker run \ + -e TRANSPORT_MODE=sse \ + -e API_URL=http://172.17.0.1:8000 \ + -e API_TOKEN=your_auth_token \ + -p 8001:8000 \ + --rm -it cognee/cognee-mcp:main +``` + +**Environment variables for API mode:** +- `API_URL`: URL of the running Cognee API server +- `API_TOKEN`: Authentication token (optional, required if API has authentication enabled) + +**Note:** When running in API mode: +- Database migrations are automatically skipped (API server handles its own DB) +- Some features are limited (see [API Mode Limitations](#-api-mode)) + ## 🔗 MCP Client Configuration @@ -255,6 +314,76 @@ You can configure both transports simultaneously for testing: **Note:** Only enable the server you're actually running to avoid connection errors. +## 🌐 API Mode + +The MCP server can operate in two modes: + +### **Direct Mode** (Default) +The MCP server directly imports and uses the cognee library. This is the default mode with full feature support. + +### **API Mode** +The MCP server connects to an already running Cognee FastAPI server via HTTP requests. This is useful when: +- You have a centralized Cognee API server running +- You want to separate the MCP server from the knowledge graph backend +- You need multiple MCP servers to share the same knowledge graph + +**Starting the MCP server in API mode:** +```bash +# Start your Cognee FastAPI server first (default port 8000) +cd /path/to/cognee +python -m cognee.api.client + +# Then start the MCP server in API mode +cd cognee-mcp +python src/server.py --api-url http://localhost:8000 --api-token YOUR_AUTH_TOKEN +``` + +**API Mode with different transports:** +```bash +# With SSE transport +python src/server.py --transport sse --api-url http://localhost:8000 --api-token YOUR_TOKEN + +# With HTTP transport +python src/server.py --transport http --api-url http://localhost:8000 --api-token YOUR_TOKEN +``` + +**API Mode with Docker:** +```bash +# On Mac/Windows (use host.docker.internal to access host) +docker run \ + -e TRANSPORT_MODE=sse \ + -e API_URL=http://host.docker.internal:8000 \ + -e API_TOKEN=YOUR_TOKEN \ + -p 8001:8000 \ + --rm -it cognee/cognee-mcp:main + +# On Linux (use host network) +docker run \ + --network host \ + -e TRANSPORT_MODE=sse \ + -e API_URL=http://localhost:8000 \ + -e API_TOKEN=YOUR_TOKEN \ + --rm -it cognee/cognee-mcp:main +``` + +**Command-line arguments for API mode:** +- `--api-url`: Base URL of the running Cognee FastAPI server (e.g., `http://localhost:8000`) +- `--api-token`: Authentication token for the API (optional, required if API has authentication enabled) + +**Docker environment variables for API mode:** +- `API_URL`: Base URL of the running Cognee FastAPI server +- `API_TOKEN`: Authentication token (optional, required if API has authentication enabled) + +**API Mode limitations:** +Some features are only available in direct mode: +- `codify` (code graph pipeline) +- `cognify_status` / `codify_status` (pipeline status tracking) +- `prune` (data reset) +- `get_developer_rules` (developer rules retrieval) +- `list_data` with specific dataset_id (detailed data listing) + +Basic operations like `cognify`, `search`, `delete`, and `list_data` (all datasets) work in both modes. + ## 💻 Basic Usage The MCP server exposes its functionality through tools. Call them from any MCP client (Cursor, Claude Desktop, Cline, Roo and more). diff --git a/cognee-mcp/entrypoint.sh b/cognee-mcp/entrypoint.sh index e3ff849e07..2f122bbfdc 100644 --- a/cognee-mcp/entrypoint.sh +++ b/cognee-mcp/entrypoint.sh @@ -14,61 +14,94 @@ HTTP_PORT=${HTTP_PORT:-8000} echo "Debug port: $DEBUG_PORT" echo "HTTP port: $HTTP_PORT" -# Run Alembic migrations with proper error handling. -# Note on UserAlreadyExists error handling: -# During database migrations, we attempt to create a default user. If this user -# already exists (e.g., from a previous deployment or migration), it's not a -# critical error and shouldn't prevent the application from starting. This is -# different from other migration errors which could indicate database schema -# inconsistencies and should cause the startup to fail. This check allows for -# smooth redeployments and container restarts while maintaining data integrity. -echo "Running database migrations..." +# Check if API mode is enabled +if [ -n "$API_URL" ]; then + echo "API mode enabled: $API_URL" + echo "Skipping database migrations (API server handles its own database)" +else + echo "Direct mode: Using local cognee instance" + # Run Alembic migrations with proper error handling. + # Note on UserAlreadyExists error handling: + # During database migrations, we attempt to create a default user. If this user + # already exists (e.g., from a previous deployment or migration), it's not a + # critical error and shouldn't prevent the application from starting. This is + # different from other migration errors which could indicate database schema + # inconsistencies and should cause the startup to fail. This check allows for + # smooth redeployments and container restarts while maintaining data integrity. + echo "Running database migrations..." -MIGRATION_OUTPUT=$(alembic upgrade head) -MIGRATION_EXIT_CODE=$? + MIGRATION_OUTPUT=$(alembic upgrade head) + MIGRATION_EXIT_CODE=$? -if [[ $MIGRATION_EXIT_CODE -ne 0 ]]; then - if [[ "$MIGRATION_OUTPUT" == *"UserAlreadyExists"* ]] || [[ "$MIGRATION_OUTPUT" == *"User default_user@example.com already exists"* ]]; then - echo "Warning: Default user already exists, continuing startup..." - else - echo "Migration failed with unexpected error." - exit 1 + if [[ $MIGRATION_EXIT_CODE -ne 0 ]]; then + if [[ "$MIGRATION_OUTPUT" == *"UserAlreadyExists"* ]] || [[ "$MIGRATION_OUTPUT" == *"User default_user@example.com already exists"* ]]; then + echo "Warning: Default user already exists, continuing startup..." + else + echo "Migration failed with unexpected error." + exit 1 + fi fi -fi -echo "Database migrations done." + echo "Database migrations done." +fi echo "Starting Cognee MCP Server with transport mode: $TRANSPORT_MODE" # Add startup delay to ensure DB is ready sleep 2 +# Build API arguments if API_URL is set +API_ARGS="" +if [ -n "$API_URL" ]; then + # Handle localhost in API_URL - convert to host-accessible address + if echo "$API_URL" | grep -q "localhost" || echo "$API_URL" | grep -q "127.0.0.1"; then + echo "⚠️ Warning: API_URL contains localhost/127.0.0.1" + echo " Original: $API_URL" + + # Try to use host.docker.internal (works on Mac/Windows and recent Linux with Docker Desktop) + FIXED_API_URL=$(echo "$API_URL" | sed 's/localhost/host.docker.internal/g' | sed 's/127\.0\.0\.1/host.docker.internal/g') + + echo " Converted to: $FIXED_API_URL" + echo " This will work on Mac/Windows/Docker Desktop." + echo " On Linux without Docker Desktop, you may need to:" + echo " - Use --network host, OR" + echo " - Set API_URL=http://172.17.0.1:8000 (Docker bridge IP)" + + API_URL="$FIXED_API_URL" + fi + + API_ARGS="--api-url $API_URL" + if [ -n "$API_TOKEN" ]; then + API_ARGS="$API_ARGS --api-token $API_TOKEN" + fi +fi + # Modified startup with transport mode selection and error handling if [ "$ENVIRONMENT" = "dev" ] || [ "$ENVIRONMENT" = "local" ]; then if [ "$DEBUG" = "true" ]; then echo "Waiting for the debugger to attach..." if [ "$TRANSPORT_MODE" = "sse" ]; then - exec python -m debugpy --wait-for-client --listen 0.0.0.0:$DEBUG_PORT -m cognee-mcp --transport sse --host 0.0.0.0 --port $HTTP_PORT --no-migration + exec python -m debugpy --wait-for-client --listen 0.0.0.0:$DEBUG_PORT -m cognee-mcp --transport sse --host 0.0.0.0 --port $HTTP_PORT --no-migration $API_ARGS elif [ "$TRANSPORT_MODE" = "http" ]; then - exec python -m debugpy --wait-for-client --listen 0.0.0.0:$DEBUG_PORT -m cognee-mcp --transport http --host 0.0.0.0 --port $HTTP_PORT --no-migration + exec python -m debugpy --wait-for-client --listen 0.0.0.0:$DEBUG_PORT -m cognee-mcp --transport http --host 0.0.0.0 --port $HTTP_PORT --no-migration $API_ARGS else - exec python -m debugpy --wait-for-client --listen 0.0.0.0:$DEBUG_PORT -m cognee-mcp --transport stdio --no-migration + exec python -m debugpy --wait-for-client --listen 0.0.0.0:$DEBUG_PORT -m cognee-mcp --transport stdio --no-migration $API_ARGS fi else if [ "$TRANSPORT_MODE" = "sse" ]; then - exec cognee-mcp --transport sse --host 0.0.0.0 --port $HTTP_PORT --no-migration + exec cognee-mcp --transport sse --host 0.0.0.0 --port $HTTP_PORT --no-migration $API_ARGS elif [ "$TRANSPORT_MODE" = "http" ]; then - exec cognee-mcp --transport http --host 0.0.0.0 --port $HTTP_PORT --no-migration + exec cognee-mcp --transport http --host 0.0.0.0 --port $HTTP_PORT --no-migration $API_ARGS else - exec cognee-mcp --transport stdio --no-migration + exec cognee-mcp --transport stdio --no-migration $API_ARGS fi fi else if [ "$TRANSPORT_MODE" = "sse" ]; then - exec cognee-mcp --transport sse --host 0.0.0.0 --port $HTTP_PORT --no-migration + exec cognee-mcp --transport sse --host 0.0.0.0 --port $HTTP_PORT --no-migration $API_ARGS elif [ "$TRANSPORT_MODE" = "http" ]; then - exec cognee-mcp --transport http --host 0.0.0.0 --port $HTTP_PORT --no-migration + exec cognee-mcp --transport http --host 0.0.0.0 --port $HTTP_PORT --no-migration $API_ARGS else - exec cognee-mcp --transport stdio --no-migration + exec cognee-mcp --transport stdio --no-migration $API_ARGS fi fi diff --git a/cognee-mcp/pyproject.toml b/cognee-mcp/pyproject.toml index e5b31ad469..37646e3e68 100644 --- a/cognee-mcp/pyproject.toml +++ b/cognee-mcp/pyproject.toml @@ -13,6 +13,7 @@ dependencies = [ "fastmcp>=2.10.0,<3.0.0", "mcp>=1.12.0,<2.0.0", "uv>=0.6.3,<1.0.0", + "httpx>=0.27.0,<1.0.0", ] authors = [ diff --git a/cognee-mcp/src/__init__.py b/cognee-mcp/src/__init__.py index 933bcd73f4..f1c3706c61 100644 --- a/cognee-mcp/src/__init__.py +++ b/cognee-mcp/src/__init__.py @@ -1,4 +1,7 @@ -from .server import main as server_main +try: + from .server import main as server_main +except ImportError: + from server import main as server_main import warnings import sys diff --git a/cognee-mcp/src/cognee_client.py b/cognee-mcp/src/cognee_client.py new file mode 100644 index 0000000000..a2fd3345f8 --- /dev/null +++ b/cognee-mcp/src/cognee_client.py @@ -0,0 +1,338 @@ +""" +Cognee Client abstraction that supports both direct function calls and HTTP API calls. + +This module provides a unified interface for interacting with Cognee, supporting: +- Direct mode: Directly imports and calls cognee functions (default behavior) +- API mode: Makes HTTP requests to a running Cognee FastAPI server +""" + +import sys +from typing import Optional, Any, List, Dict +from uuid import UUID +from contextlib import redirect_stdout +import httpx +from cognee.shared.logging_utils import get_logger +import json + +logger = get_logger() + + +class CogneeClient: + """ + Unified client for interacting with Cognee via direct calls or HTTP API. + + Parameters + ---------- + api_url : str, optional + Base URL of the Cognee API server (e.g., "http://localhost:8000"). + If None, uses direct cognee function calls. + api_token : str, optional + Authentication token for the API (optional, required if API has authentication enabled). + """ + + def __init__(self, api_url: Optional[str] = None, api_token: Optional[str] = None): + self.api_url = api_url.rstrip("/") if api_url else None + self.api_token = api_token + self.use_api = bool(api_url) + + if self.use_api: + logger.info(f"Cognee client initialized in API mode: {self.api_url}") + self.client = httpx.AsyncClient(timeout=300.0) # 5 minute timeout for long operations + else: + logger.info("Cognee client initialized in direct mode") + # Import cognee only if we're using direct mode + import cognee as _cognee + + self.cognee = _cognee + + def _get_headers(self) -> Dict[str, str]: + """Get headers for API requests.""" + headers = {"Content-Type": "application/json"} + if self.api_token: + headers["Authorization"] = f"Bearer {self.api_token}" + return headers + + async def add( + self, data: Any, dataset_name: str = "main_dataset", node_set: Optional[List[str]] = None + ) -> Dict[str, Any]: + """ + Add data to Cognee for processing. + + Parameters + ---------- + data : Any + Data to add (text, file path, etc.) + dataset_name : str + Name of the dataset to add data to + node_set : List[str], optional + List of node identifiers for graph organization + + Returns + ------- + Dict[str, Any] + Result of the add operation + """ + if self.use_api: + endpoint = f"{self.api_url}/api/v1/add" + + files = {"data": ("data.txt", str(data), "text/plain")} + form_data = { + "datasetName": dataset_name, + } + if node_set is not None: + form_data["node_set"] = json.dumps(node_set) + + response = await self.client.post( + endpoint, + files=files, + data=form_data, + headers={"Authorization": f"Bearer {self.api_token}"} if self.api_token else {}, + ) + response.raise_for_status() + return response.json() + else: + with redirect_stdout(sys.stderr): + await self.cognee.add(data, dataset_name=dataset_name, node_set=node_set) + return {"status": "success", "message": "Data added successfully"} + + async def cognify( + self, + datasets: Optional[List[str]] = None, + custom_prompt: Optional[str] = None, + graph_model: Any = None, + ) -> Dict[str, Any]: + """ + Transform data into a knowledge graph. + + Parameters + ---------- + datasets : List[str], optional + List of dataset names to process + custom_prompt : str, optional + Custom prompt for entity extraction + graph_model : Any, optional + Custom graph model (only used in direct mode) + + Returns + ------- + Dict[str, Any] + Result of the cognify operation + """ + if self.use_api: + # API mode: Make HTTP request + endpoint = f"{self.api_url}/api/v1/cognify" + payload = { + "datasets": datasets or ["main_dataset"], + "run_in_background": False, + } + if custom_prompt: + payload["custom_prompt"] = custom_prompt + + response = await self.client.post(endpoint, json=payload, headers=self._get_headers()) + response.raise_for_status() + return response.json() + else: + # Direct mode: Call cognee directly + with redirect_stdout(sys.stderr): + kwargs = {} + if datasets: + kwargs["datasets"] = datasets + if custom_prompt: + kwargs["custom_prompt"] = custom_prompt + if graph_model: + kwargs["graph_model"] = graph_model + + await self.cognee.cognify(**kwargs) + return {"status": "success", "message": "Cognify completed successfully"} + + async def search( + self, + query_text: str, + query_type: str, + datasets: Optional[List[str]] = None, + system_prompt: Optional[str] = None, + top_k: int = 10, + ) -> Any: + """ + Search the knowledge graph. + + Parameters + ---------- + query_text : str + The search query + query_type : str + Type of search (e.g., "GRAPH_COMPLETION", "INSIGHTS", etc.) + datasets : List[str], optional + List of datasets to search + system_prompt : str, optional + System prompt for completion searches + top_k : int + Maximum number of results + + Returns + ------- + Any + Search results + """ + if self.use_api: + # API mode: Make HTTP request + endpoint = f"{self.api_url}/api/v1/search" + payload = {"query": query_text, "search_type": query_type.upper(), "top_k": top_k} + if datasets: + payload["datasets"] = datasets + if system_prompt: + payload["system_prompt"] = system_prompt + + response = await self.client.post(endpoint, json=payload, headers=self._get_headers()) + response.raise_for_status() + return response.json() + else: + # Direct mode: Call cognee directly + from cognee.modules.search.types import SearchType + + with redirect_stdout(sys.stderr): + results = await self.cognee.search( + query_type=SearchType[query_type.upper()], query_text=query_text + ) + return results + + async def delete(self, data_id: UUID, dataset_id: UUID, mode: str = "soft") -> Dict[str, Any]: + """ + Delete data from a dataset. + + Parameters + ---------- + data_id : UUID + ID of the data to delete + dataset_id : UUID + ID of the dataset containing the data + mode : str + Deletion mode ("soft" or "hard") + + Returns + ------- + Dict[str, Any] + Result of the deletion + """ + if self.use_api: + # API mode: Make HTTP request + endpoint = f"{self.api_url}/api/v1/delete" + params = {"data_id": str(data_id), "dataset_id": str(dataset_id), "mode": mode} + + response = await self.client.delete( + endpoint, params=params, headers=self._get_headers() + ) + response.raise_for_status() + return response.json() + else: + # Direct mode: Call cognee directly + from cognee.modules.users.methods import get_default_user + + with redirect_stdout(sys.stderr): + user = await get_default_user() + result = await self.cognee.delete( + data_id=data_id, dataset_id=dataset_id, mode=mode, user=user + ) + return result + + async def prune_data(self) -> Dict[str, Any]: + """ + Prune all data from the knowledge graph. + + Returns + ------- + Dict[str, Any] + Result of the prune operation + """ + if self.use_api: + # Note: The API doesn't expose a prune endpoint, so we'll need to handle this + # For now, raise an error + raise NotImplementedError("Prune operation is not available via API") + else: + # Direct mode: Call cognee directly + with redirect_stdout(sys.stderr): + await self.cognee.prune.prune_data() + return {"status": "success", "message": "Data pruned successfully"} + + async def prune_system(self, metadata: bool = True) -> Dict[str, Any]: + """ + Prune system data from the knowledge graph. + + Parameters + ---------- + metadata : bool + Whether to prune metadata + + Returns + ------- + Dict[str, Any] + Result of the prune operation + """ + if self.use_api: + # Note: The API doesn't expose a prune endpoint + raise NotImplementedError("Prune system operation is not available via API") + else: + # Direct mode: Call cognee directly + with redirect_stdout(sys.stderr): + await self.cognee.prune.prune_system(metadata=metadata) + return {"status": "success", "message": "System pruned successfully"} + + async def get_pipeline_status(self, dataset_ids: List[UUID], pipeline_name: str) -> str: + """ + Get the status of a pipeline run. + + Parameters + ---------- + dataset_ids : List[UUID] + List of dataset IDs + pipeline_name : str + Name of the pipeline + + Returns + ------- + str + Status information + """ + if self.use_api: + # Note: This would need a custom endpoint on the API side + raise NotImplementedError("Pipeline status is not available via API") + else: + # Direct mode: Call cognee directly + from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status + + with redirect_stdout(sys.stderr): + status = await get_pipeline_status(dataset_ids, pipeline_name) + return str(status) + + async def list_datasets(self) -> List[Dict[str, Any]]: + """ + List all datasets. + + Returns + ------- + List[Dict[str, Any]] + List of datasets + """ + if self.use_api: + # API mode: Make HTTP request + endpoint = f"{self.api_url}/api/v1/datasets" + response = await self.client.get(endpoint, headers=self._get_headers()) + response.raise_for_status() + return response.json() + else: + # Direct mode: Call cognee directly + from cognee.modules.users.methods import get_default_user + from cognee.modules.data.methods import get_datasets + + with redirect_stdout(sys.stderr): + user = await get_default_user() + datasets = await get_datasets(user.id) + return [ + {"id": str(d.id), "name": d.name, "created_at": str(d.created_at)} + for d in datasets + ] + + async def close(self): + """Close the HTTP client if in API mode.""" + if self.use_api and hasattr(self, "client"): + await self.client.aclose() diff --git a/cognee-mcp/src/server.py b/cognee-mcp/src/server.py index e7c82c99bd..ce6dad88a9 100755 --- a/cognee-mcp/src/server.py +++ b/cognee-mcp/src/server.py @@ -2,28 +2,27 @@ import os import sys import argparse -import cognee import asyncio import subprocess from pathlib import Path +from typing import Optional from cognee.shared.logging_utils import get_logger, setup_logging, get_log_file_location import importlib.util from contextlib import redirect_stdout import mcp.types as types from mcp.server import FastMCP -from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status -from cognee.modules.data.methods.get_unique_dataset_id import get_unique_dataset_id -from cognee.modules.users.methods import get_default_user -from cognee.api.v1.cognify.code_graph_pipeline import run_code_graph_pipeline -from cognee.modules.search.types import SearchType -from cognee.shared.data_models import KnowledgeGraph from cognee.modules.storage.utils import JSONEncoder from starlette.responses import JSONResponse from starlette.middleware import Middleware from starlette.middleware.cors import CORSMiddleware import uvicorn +try: + from .cognee_client import CogneeClient +except ImportError: + from cognee_client import CogneeClient + try: from cognee.tasks.codingagents.coding_rule_associations import ( @@ -41,6 +40,8 @@ logger = get_logger() +cognee_client: Optional[CogneeClient] = None + async def run_sse_with_cors(): """Custom SSE transport with CORS middleware.""" @@ -141,11 +142,20 @@ async def cognify_task(file_path: str) -> None: with redirect_stdout(sys.stderr): logger.info(f"Starting cognify for: {file_path}") try: - await cognee.add(file_path, node_set=["developer_rules"]) - model = KnowledgeGraph + await cognee_client.add(file_path, node_set=["developer_rules"]) + + model = None if graph_model_file and graph_model_name: - model = load_class(graph_model_file, graph_model_name) - await cognee.cognify(graph_model=model) + if cognee_client.use_api: + logger.warning( + "Custom graph models are not supported in API mode, ignoring." + ) + else: + from cognee.shared.data_models import KnowledgeGraph + + model = load_class(graph_model_file, graph_model_name) + + await cognee_client.cognify(graph_model=model) logger.info(f"Cognify finished for: {file_path}") except Exception as e: logger.error(f"Cognify failed for {file_path}: {str(e)}") @@ -293,15 +303,20 @@ async def cognify_task( # going to stdout ( like the print function ) to stderr. with redirect_stdout(sys.stderr): logger.info("Cognify process starting.") + + graph_model = None if graph_model_file and graph_model_name: - graph_model = load_class(graph_model_file, graph_model_name) - else: - graph_model = KnowledgeGraph + if cognee_client.use_api: + logger.warning("Custom graph models are not supported in API mode, ignoring.") + else: + from cognee.shared.data_models import KnowledgeGraph + + graph_model = load_class(graph_model_file, graph_model_name) - await cognee.add(data) + await cognee_client.add(data) try: - await cognee.cognify(graph_model=graph_model, custom_prompt=custom_prompt) + await cognee_client.cognify(custom_prompt=custom_prompt, graph_model=graph_model) logger.info("Cognify process finished.") except Exception as e: logger.error("Cognify process failed.") @@ -354,16 +369,19 @@ async def save_user_agent_interaction(data: str) -> None: with redirect_stdout(sys.stderr): logger.info("Save interaction process starting.") - await cognee.add(data, node_set=["user_agent_interaction"]) + await cognee_client.add(data, node_set=["user_agent_interaction"]) try: - await cognee.cognify() + await cognee_client.cognify() logger.info("Save interaction process finished.") - logger.info("Generating associated rules from interaction data.") - - await add_rule_associations(data=data, rules_nodeset_name="coding_agent_rules") - logger.info("Associated rules generated from interaction data.") + # Rule associations only work in direct mode + if not cognee_client.use_api: + logger.info("Generating associated rules from interaction data.") + await add_rule_associations(data=data, rules_nodeset_name="coding_agent_rules") + logger.info("Associated rules generated from interaction data.") + else: + logger.warning("Rule associations are not available in API mode, skipping.") except Exception as e: logger.error("Save interaction process failed.") @@ -420,11 +438,18 @@ async def codify(repo_path: str) -> list: - All stdout is redirected to stderr to maintain MCP communication integrity """ + if cognee_client.use_api: + error_msg = "❌ Codify operation is not available in API mode. Please use direct mode for code graph pipeline." + logger.error(error_msg) + return [types.TextContent(type="text", text=error_msg)] + async def codify_task(repo_path: str): # NOTE: MCP uses stdout to communicate, we must redirect all output # going to stdout ( like the print function ) to stderr. with redirect_stdout(sys.stderr): logger.info("Codify process starting.") + from cognee.api.v1.cognify.code_graph_pipeline import run_code_graph_pipeline + results = [] async for result in run_code_graph_pipeline(repo_path, False): results.append(result) @@ -566,20 +591,40 @@ async def search_task(search_query: str, search_type: str) -> str: # NOTE: MCP uses stdout to communicate, we must redirect all output # going to stdout ( like the print function ) to stderr. with redirect_stdout(sys.stderr): - search_results = await cognee.search( - query_type=SearchType[search_type.upper()], query_text=search_query + search_results = await cognee_client.search( + query_text=search_query, query_type=search_type ) - if search_type.upper() == "CODE": - return json.dumps(search_results, cls=JSONEncoder) - elif ( - search_type.upper() == "GRAPH_COMPLETION" or search_type.upper() == "RAG_COMPLETION" - ): - return str(search_results[0]) - elif search_type.upper() == "CHUNKS": - return str(search_results) + # Handle different result formats based on API vs direct mode + if cognee_client.use_api: + # API mode returns JSON-serialized results + if isinstance(search_results, str): + return search_results + elif isinstance(search_results, list): + if ( + search_type.upper() in ["GRAPH_COMPLETION", "RAG_COMPLETION"] + and len(search_results) > 0 + ): + return str(search_results[0]) + return str(search_results) + else: + return json.dumps(search_results, cls=JSONEncoder) else: - return str(search_results) + # Direct mode processing + if search_type.upper() == "CODE": + return json.dumps(search_results, cls=JSONEncoder) + elif ( + search_type.upper() == "GRAPH_COMPLETION" + or search_type.upper() == "RAG_COMPLETION" + ): + return str(search_results[0]) + elif search_type.upper() == "CHUNKS": + return str(search_results) + elif search_type.upper() == "INSIGHTS": + results = retrieved_edges_to_string(search_results) + return results + else: + return str(search_results) search_results = await search_task(search_query, search_type) return [types.TextContent(type="text", text=search_results)] @@ -612,6 +657,10 @@ async def get_developer_rules() -> list: async def fetch_rules_from_cognee() -> str: """Collect all developer rules from Cognee""" with redirect_stdout(sys.stderr): + if cognee_client.use_api: + logger.warning("Developer rules retrieval is not available in API mode") + return "Developer rules retrieval is not available in API mode" + developer_rules = await get_existing_rules(rules_nodeset_name="coding_agent_rules") return developer_rules @@ -651,17 +700,25 @@ async def list_data(dataset_id: str = None) -> list: with redirect_stdout(sys.stderr): try: - user = await get_default_user() output_lines = [] if dataset_id: - # List data for specific dataset - logger.info(f"Listing data for dataset: {dataset_id}") - dataset_uuid = UUID(dataset_id) + # Detailed data listing for specific dataset is only available in direct mode + if cognee_client.use_api: + return [ + types.TextContent( + type="text", + text="❌ Detailed data listing for specific datasets is not available in API mode.\nPlease use the API directly or use direct mode.", + ) + ] - # Get the dataset information + from cognee.modules.users.methods import get_default_user from cognee.modules.data.methods import get_dataset, get_dataset_data + logger.info(f"Listing data for dataset: {dataset_id}") + dataset_uuid = UUID(dataset_id) + user = await get_default_user() + dataset = await get_dataset(user.id, dataset_uuid) if not dataset: @@ -689,11 +746,9 @@ async def list_data(dataset_id: str = None) -> list: output_lines.append(" (No data items in this dataset)") else: - # List all datasets + # List all datasets - works in both modes logger.info("Listing all datasets") - from cognee.modules.data.methods import get_datasets - - datasets = await get_datasets(user.id) + datasets = await cognee_client.list_datasets() if not datasets: return [ @@ -708,20 +763,21 @@ async def list_data(dataset_id: str = None) -> list: output_lines.append("") for i, dataset in enumerate(datasets, 1): - # Get data count for each dataset - from cognee.modules.data.methods import get_dataset_data - - data_items = await get_dataset_data(dataset.id) - - output_lines.append(f"{i}. 📁 {dataset.name}") - output_lines.append(f" Dataset ID: {dataset.id}") - output_lines.append(f" Created: {dataset.created_at}") - output_lines.append(f" Data items: {len(data_items)}") + # In API mode, dataset is a dict; in direct mode, it's formatted as dict + if isinstance(dataset, dict): + output_lines.append(f"{i}. 📁 {dataset.get('name', 'Unnamed')}") + output_lines.append(f" Dataset ID: {dataset.get('id')}") + output_lines.append(f" Created: {dataset.get('created_at', 'N/A')}") + else: + output_lines.append(f"{i}. 📁 {dataset.name}") + output_lines.append(f" Dataset ID: {dataset.id}") + output_lines.append(f" Created: {dataset.created_at}") output_lines.append("") - output_lines.append("💡 To see data items in a specific dataset, use:") - output_lines.append(' list_data(dataset_id="your-dataset-id-here")') - output_lines.append("") + if not cognee_client.use_api: + output_lines.append("💡 To see data items in a specific dataset, use:") + output_lines.append(' list_data(dataset_id="your-dataset-id-here")') + output_lines.append("") output_lines.append("🗑️ To delete specific data, use:") output_lines.append(' delete(data_id="data-id", dataset_id="dataset-id")') @@ -790,12 +846,9 @@ async def delete(data_id: str, dataset_id: str, mode: str = "soft") -> list: data_uuid = UUID(data_id) dataset_uuid = UUID(dataset_id) - # Get default user for the operation - user = await get_default_user() - - # Call the cognee delete function - result = await cognee.delete( - data_id=data_uuid, dataset_id=dataset_uuid, mode=mode, user=user + # Call the cognee delete function via client + result = await cognee_client.delete( + data_id=data_uuid, dataset_id=dataset_uuid, mode=mode ) logger.info(f"Delete operation completed successfully: {result}") @@ -842,11 +895,21 @@ async def prune(): ----- - This operation cannot be undone. All memory data will be permanently deleted. - The function prunes both data content (using prune_data) and system metadata (using prune_system) + - This operation is not available in API mode """ with redirect_stdout(sys.stderr): - await cognee.prune.prune_data() - await cognee.prune.prune_system(metadata=True) - return [types.TextContent(type="text", text="Pruned")] + try: + await cognee_client.prune_data() + await cognee_client.prune_system(metadata=True) + return [types.TextContent(type="text", text="Pruned")] + except NotImplementedError: + error_msg = "❌ Prune operation is not available in API mode" + logger.error(error_msg) + return [types.TextContent(type="text", text=error_msg)] + except Exception as e: + error_msg = f"❌ Prune operation failed: {str(e)}" + logger.error(error_msg) + return [types.TextContent(type="text", text=error_msg)] @mcp.tool() @@ -869,13 +932,26 @@ async def cognify_status(): - The function retrieves pipeline status specifically for the "cognify_pipeline" on the "main_dataset" - Status information includes job progress, execution time, and completion status - The status is returned in string format for easy reading + - This operation is not available in API mode """ with redirect_stdout(sys.stderr): - user = await get_default_user() - status = await get_pipeline_status( - [await get_unique_dataset_id("main_dataset", user)], "cognify_pipeline" - ) - return [types.TextContent(type="text", text=str(status))] + try: + from cognee.modules.data.methods.get_unique_dataset_id import get_unique_dataset_id + from cognee.modules.users.methods import get_default_user + + user = await get_default_user() + status = await cognee_client.get_pipeline_status( + [await get_unique_dataset_id("main_dataset", user)], "cognify_pipeline" + ) + return [types.TextContent(type="text", text=str(status))] + except NotImplementedError: + error_msg = "❌ Pipeline status is not available in API mode" + logger.error(error_msg) + return [types.TextContent(type="text", text=error_msg)] + except Exception as e: + error_msg = f"❌ Failed to get cognify status: {str(e)}" + logger.error(error_msg) + return [types.TextContent(type="text", text=error_msg)] @mcp.tool() @@ -898,13 +974,26 @@ async def codify_status(): - The function retrieves pipeline status specifically for the "cognify_code_pipeline" on the "codebase" dataset - Status information includes job progress, execution time, and completion status - The status is returned in string format for easy reading + - This operation is not available in API mode """ with redirect_stdout(sys.stderr): - user = await get_default_user() - status = await get_pipeline_status( - [await get_unique_dataset_id("codebase", user)], "cognify_code_pipeline" - ) - return [types.TextContent(type="text", text=str(status))] + try: + from cognee.modules.data.methods.get_unique_dataset_id import get_unique_dataset_id + from cognee.modules.users.methods import get_default_user + + user = await get_default_user() + status = await cognee_client.get_pipeline_status( + [await get_unique_dataset_id("codebase", user)], "cognify_code_pipeline" + ) + return [types.TextContent(type="text", text=str(status))] + except NotImplementedError: + error_msg = "❌ Pipeline status is not available in API mode" + logger.error(error_msg) + return [types.TextContent(type="text", text=error_msg)] + except Exception as e: + error_msg = f"❌ Failed to get codify status: {str(e)}" + logger.error(error_msg) + return [types.TextContent(type="text", text=error_msg)] def node_to_string(node): @@ -938,6 +1027,8 @@ def load_class(model_file, model_name): async def main(): + global cognee_client + parser = argparse.ArgumentParser() parser.add_argument( @@ -981,12 +1072,30 @@ async def main(): help="Argument stops database migration from being attempted", ) + # Cognee API connection options + parser.add_argument( + "--api-url", + default=None, + help="Base URL of a running Cognee FastAPI server (e.g., http://localhost:8000). " + "If provided, the MCP server will connect to the API instead of using cognee directly.", + ) + + parser.add_argument( + "--api-token", + default=None, + help="Authentication token for the API (optional, required if API has authentication enabled).", + ) + args = parser.parse_args() + # Initialize the global CogneeClient + cognee_client = CogneeClient(api_url=args.api_url, api_token=args.api_token) + mcp.settings.host = args.host mcp.settings.port = args.port - if not args.no_migration: + # Skip migrations when in API mode (the API server handles its own database) + if not args.no_migration and not args.api_url: # Run Alembic migrations from the main cognee directory where alembic.ini is located logger.info("Running database migrations...") migration_result = subprocess.run( @@ -1009,6 +1118,8 @@ async def main(): sys.exit(1) logger.info("Database migrations done.") + elif args.api_url: + logger.info("Skipping database migrations (using API mode)") logger.info(f"Starting MCP server with transport: {args.transport}") if args.transport == "stdio": diff --git a/cognee-mcp/uv.lock b/cognee-mcp/uv.lock index e317416133..dbbc6542c0 100644 --- a/cognee-mcp/uv.lock +++ b/cognee-mcp/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.10" resolution-markers = [ "python_full_version >= '3.14' and platform_python_implementation != 'PyPy' and sys_platform != 'emscripten'", @@ -737,6 +737,7 @@ source = { editable = "." } dependencies = [ { name = "cognee", extra = ["codegraph", "docs", "gemini", "huggingface", "neo4j", "postgres"] }, { name = "fastmcp" }, + { name = "httpx" }, { name = "mcp" }, { name = "uv" }, ] @@ -750,6 +751,7 @@ dev = [ requires-dist = [ { name = "cognee", extras = ["postgres", "codegraph", "gemini", "huggingface", "docs", "neo4j"], specifier = "==0.3.4" }, { name = "fastmcp", specifier = ">=2.10.0,<3.0.0" }, + { name = "httpx", specifier = ">=0.27.0,<1.0.0" }, { name = "mcp", specifier = ">=1.12.0,<2.0.0" }, { name = "uv", specifier = ">=0.6.3,<1.0.0" }, ] @@ -1026,7 +1028,7 @@ version = "3.24.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "attrs" }, - { name = "docstring-parser", marker = "python_full_version < '4.0'" }, + { name = "docstring-parser", marker = "python_full_version < '4'" }, { name = "rich" }, { name = "rich-rst" }, { name = "typing-extensions", marker = "python_full_version < '3.11'" }, @@ -1309,17 +1311,17 @@ name = "fastembed" version = "0.6.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "huggingface-hub" }, - { name = "loguru" }, - { name = "mmh3" }, + { name = "huggingface-hub", marker = "python_full_version < '3.13'" }, + { name = "loguru", marker = "python_full_version < '3.13'" }, + { name = "mmh3", marker = "python_full_version < '3.13'" }, { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, - { name = "numpy", version = "2.3.3", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, - { name = "onnxruntime" }, - { name = "pillow" }, - { name = "py-rust-stemmers" }, - { name = "requests" }, - { name = "tokenizers" }, - { name = "tqdm" }, + { name = "numpy", version = "2.3.3", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11' and python_full_version < '3.13'" }, + { name = "onnxruntime", marker = "python_full_version < '3.13'" }, + { name = "pillow", marker = "python_full_version < '3.13'" }, + { name = "py-rust-stemmers", marker = "python_full_version < '3.13'" }, + { name = "requests", marker = "python_full_version < '3.13'" }, + { name = "tokenizers", marker = "python_full_version < '3.13'" }, + { name = "tqdm", marker = "python_full_version < '3.13'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/c6/f4/036a656c605f63dc25f11284f60f69900a54a19c513e1ae60d21d6977e75/fastembed-0.6.0.tar.gz", hash = "sha256:5c9ead25f23449535b07243bbe1f370b820dcc77ec2931e61674e3fe7ff24733", size = 50731, upload-time = "2025-02-26T13:50:33.031Z" } wheels = [ @@ -2526,8 +2528,8 @@ name = "loguru" version = "0.7.3" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "colorama", marker = "sys_platform == 'win32'" }, - { name = "win32-setctime", marker = "sys_platform == 'win32'" }, + { name = "colorama", marker = "python_full_version < '3.13' and sys_platform == 'win32'" }, + { name = "win32-setctime", marker = "python_full_version < '3.13' and sys_platform == 'win32'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/3a/05/a1dae3dffd1116099471c643b8924f5aa6524411dc6c63fdae648c4f1aca/loguru-0.7.3.tar.gz", hash = "sha256:19480589e77d47b8d85b2c827ad95d49bf31b0dcde16593892eb51dd18706eb6", size = 63559, upload-time = "2024-12-06T11:20:56.608Z" } wheels = [ diff --git a/cognee/api/v1/ui/ui.py b/cognee/api/v1/ui/ui.py index 7711cb6acf..51088c3e1e 100644 --- a/cognee/api/v1/ui/ui.py +++ b/cognee/api/v1/ui/ui.py @@ -502,24 +502,48 @@ def start_ui( if start_mcp: logger.info("Starting Cognee MCP server with Docker...") - cwd = os.getcwd() - env_file = os.path.join(cwd, ".env") try: - image = "cognee/cognee-mcp:main" + image = "cognee/cognee-mcp:feature-standalone-mcp" # TODO: change to "cognee/cognee-mcp:main" right before merging into main subprocess.run(["docker", "pull", image], check=True) + + import uuid + + container_name = f"cognee-mcp-{uuid.uuid4().hex[:8]}" + + docker_cmd = [ + "docker", + "run", + "--name", + container_name, + "-p", + f"{mcp_port}:8000", + "--rm", + "-e", + "TRANSPORT_MODE=sse", + ] + + if start_backend: + docker_cmd.extend( + [ + "-e", + f"API_URL=http://localhost:{backend_port}", + ] + ) + logger.info( + f"Configuring MCP to connect to backend API at http://localhost:{backend_port}" + ) + logger.info("(localhost will be auto-converted to host.docker.internal)") + else: + cwd = os.getcwd() + env_file = os.path.join(cwd, ".env") + docker_cmd.extend(["--env-file", env_file]) + + docker_cmd.append( + image + ) # TODO: change to "cognee/cognee-mcp:main" right before merging into main + mcp_process = subprocess.Popen( - [ - "docker", - "run", - "-p", - f"{mcp_port}:8000", - "--rm", - "--env-file", - env_file, - "-e", - "TRANSPORT_MODE=sse", - "cognee/cognee-mcp:main", - ], + docker_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid if hasattr(os, "setsid") else None, @@ -528,8 +552,13 @@ def start_ui( _stream_process_output(mcp_process, "stdout", "[MCP]", "\033[34m") # Blue _stream_process_output(mcp_process, "stderr", "[MCP]", "\033[34m") # Blue - pid_callback(mcp_process.pid) - logger.info(f"✓ Cognee MCP server starting on http://127.0.0.1:{mcp_port}/sse") + # Pass both PID and container name using a tuple + pid_callback((mcp_process.pid, container_name)) + + mode_info = "API mode" if start_backend else "direct mode" + logger.info( + f"✓ Cognee MCP server starting on http://127.0.0.1:{mcp_port}/sse ({mode_info})" + ) except Exception as e: logger.error(f"Failed to start MCP server with Docker: {str(e)}") # Start backend server if requested diff --git a/cognee/cli/_cognee.py b/cognee/cli/_cognee.py index b257d37dec..1539d1acf2 100644 --- a/cognee/cli/_cognee.py +++ b/cognee/cli/_cognee.py @@ -175,19 +175,59 @@ def main() -> int: # Handle UI flag if hasattr(args, "start_ui") and args.start_ui: spawned_pids = [] + docker_container = None def signal_handler(signum, frame): """Handle Ctrl+C and other termination signals""" - nonlocal spawned_pids - fmt.echo("\nShutting down UI server...") + nonlocal spawned_pids, docker_container + try: + fmt.echo("\nShutting down UI server...") + except (BrokenPipeError, OSError): + pass + + # First, stop Docker container if running + if docker_container: + try: + result = subprocess.run( + ["docker", "stop", docker_container], + capture_output=True, + timeout=10, + check=False, + ) + try: + if result.returncode == 0: + fmt.success(f"✓ Docker container {docker_container} stopped.") + else: + fmt.warning( + f"Could not stop container {docker_container}: {result.stderr.decode()}" + ) + except (BrokenPipeError, OSError): + pass + except subprocess.TimeoutExpired: + try: + fmt.warning( + f"Timeout stopping container {docker_container}, forcing removal..." + ) + except (BrokenPipeError, OSError): + pass + subprocess.run( + ["docker", "rm", "-f", docker_container], capture_output=True, check=False + ) + except Exception: + pass + + # Then, stop regular processes for pid in spawned_pids: try: if hasattr(os, "killpg"): # Unix-like systems: Use process groups pgid = os.getpgid(pid) os.killpg(pgid, signal.SIGTERM) - fmt.success(f"✓ Process group {pgid} (PID {pid}) terminated.") + try: + fmt.success(f"✓ Process group {pgid} (PID {pid}) terminated.") + except (BrokenPipeError, OSError): + pass else: # Windows: Use taskkill to terminate process and its children subprocess.run( @@ -195,24 +235,35 @@ def signal_handler(signum, frame): capture_output=True, check=False, ) - fmt.success(f"✓ Process {pid} and its children terminated.") - except (OSError, ProcessLookupError, subprocess.SubprocessError) as e: - fmt.warning(f"Could not terminate process {pid}: {e}") + try: + fmt.success(f"✓ Process {pid} and its children terminated.") + except (BrokenPipeError, OSError): + pass + except (OSError, ProcessLookupError, subprocess.SubprocessError): + pass sys.exit(0) signal.signal(signal.SIGINT, signal_handler) # Ctrl+C signal.signal(signal.SIGTERM, signal_handler) # Termination request + if hasattr(signal, "SIGHUP"): + signal.signal(signal.SIGHUP, signal_handler) try: from cognee import start_ui fmt.echo("Starting cognee UI...") - # Callback to capture PIDs of all spawned processes - def pid_callback(pid): - nonlocal spawned_pids - spawned_pids.append(pid) + # Callback to capture PIDs and Docker container of all spawned processes + def pid_callback(pid_or_tuple): + nonlocal spawned_pids, docker_container + # Handle both regular PIDs and (PID, container_name) tuples + if isinstance(pid_or_tuple, tuple): + pid, container_name = pid_or_tuple + spawned_pids.append(pid) + docker_container = container_name + else: + spawned_pids.append(pid_or_tuple) frontend_port = 3000 start_backend, backend_port = True, 8000