Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: Added ability to check status of pipelines for MCP
  • Loading branch information
dexters1 committed Apr 28, 2025
commit 55f1224c7986ccc25813b01bac29f9f23f8c1c48
4 changes: 2 additions & 2 deletions cognee-mcp/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[project]
name = "cognee-mcp"
version = "0.2.3"
version = "0.3.0"
description = "A MCP server project"
readme = "README.md"
requires-python = ">=3.10"

dependencies = [
"cognee[postgres,codegraph,gemini,huggingface]==0.1.39",
"cognee[postgres,codegraph,gemini,huggingface]==0.1.40",
"fastmcp>=1.0",
"mcp==1.5.0",
"uv>=0.6.3",
Expand Down
26 changes: 24 additions & 2 deletions cognee-mcp/src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from contextlib import redirect_stdout
import mcp.types as types
from mcp.server import FastMCP
from cognee.api.v1.datasets.datasets import datasets as cognee_datasets
from cognee.modules.users.methods import get_default_user
from cognee.api.v1.cognify.code_graph_pipeline import run_code_graph_pipeline
from cognee.modules.search.types import SearchType
from cognee.shared.data_models import KnowledgeGraph
Expand All @@ -28,7 +30,6 @@ async def cognify_task(
"""Build knowledge graph from the input text"""
# NOTE: MCP uses stdout to communicate, we must redirect all output
# going to stdout ( like the print function ) to stderr.
# As cognify is an async background job the output had to be redirected again.
with redirect_stdout(sys.stderr):
logger.info("Cognify process starting.")
if graph_model_file and graph_model_name:
Expand Down Expand Up @@ -72,7 +73,6 @@ async def codify(repo_path: str) -> list:
async def codify_task(repo_path: str):
# NOTE: MCP uses stdout to communicate, we must redirect all output
# going to stdout ( like the print function ) to stderr.
# As codify is an async background job the output had to be redirected again.
with redirect_stdout(sys.stderr):
logger.info("Codify process starting.")
results = []
Expand Down Expand Up @@ -138,6 +138,28 @@ async def prune():
return [types.TextContent(type="text", text="Pruned")]


@mcp.tool()
async def cognify_status():
"""Get status of cognify pipeline"""
with redirect_stdout(sys.stderr):
user = await get_default_user()
status = await cognee_datasets.get_status(
[await cognee_datasets.get_unique_dataset_id("main_dataset", user)]
)
return [types.TextContent(type="text", text=status)]


@mcp.tool()
async def codify_status():
"""Get status of codify pipeline"""
with redirect_stdout(sys.stderr):
user = await get_default_user()
status = await cognee_datasets.get_status(
[await cognee_datasets.get_unique_dataset_id("codebase", user)]
)
return [types.TextContent(type="text", text=status)]


def node_to_string(node):
node_data = ", ".join(
[f'{key}: "{value}"' for key, value in node.items() if key in ["id", "name"]]
Expand Down
3 changes: 2 additions & 1 deletion cognee/api/v1/cognify/code_graph_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from cognee.shared.data_models import KnowledgeGraph, MonitoringTool
from cognee.shared.utils import render_graph
from cognee.tasks.documents import classify_documents, extract_chunks_from_documents
from cognee.api.v1.datasets.datasets import datasets as cognee_datasets
from cognee.tasks.graph import extract_graph_from_data
from cognee.tasks.ingestion import ingest_data
from cognee.tasks.repo_processor import get_non_py_files, get_repo_file_dependencies
Expand Down Expand Up @@ -69,7 +70,7 @@ async def run_code_graph_pipeline(repo_path, include_docs=False):
),
]

dataset_id = uuid5(NAMESPACE_OID, "codebase")
dataset_id = await cognee_datasets.get_unique_dataset_id("codebase", user)

if include_docs:
non_code_pipeline_run = run_tasks(
Expand Down
7 changes: 6 additions & 1 deletion cognee/api/v1/datasets/datasets.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from uuid import UUID
from uuid import UUID, uuid5, NAMESPACE_OID
from cognee.modules.users.methods import get_default_user
from cognee.modules.ingestion import discover_directory_datasets
from cognee.modules.users.models import User
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status


Expand All @@ -12,6 +13,10 @@ async def list_datasets():
user = await get_default_user()
return await get_datasets(user.id)

@staticmethod
async def get_unique_dataset_id(dataset_name: str, user: User) -> UUID:
return uuid5(NAMESPACE_OID, f"{dataset_name}{str(user.id)}")

@staticmethod
def discover_datasets(directory_path: str):
return list(discover_directory_datasets(directory_path).keys())
Expand Down
14 changes: 10 additions & 4 deletions cognee/modules/data/methods/create_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@
from sqlalchemy.orm import joinedload
from cognee.modules.data.models import Dataset

from cognee.api.v1.datasets.datasets import datasets as cognee_datasets
from cognee.modules.users.models import User


async def create_dataset(dataset_name: str, user: User, session: AsyncSession) -> Dataset:
owner_id = user.id

async def create_dataset(dataset_name: str, owner_id: UUID, session: AsyncSession) -> Dataset:
dataset = (
await session.scalars(
select(Dataset)
Expand All @@ -16,10 +21,11 @@ async def create_dataset(dataset_name: str, owner_id: UUID, session: AsyncSessio
).first()

if dataset is None:
# Dataset id should be generated based on dataset_name and owner_id so multiple users can use the same dataset_name
dataset = Dataset(
id=uuid5(NAMESPACE_OID, f"{dataset_name}{str(owner_id)}"), name=dataset_name, data=[]
# Dataset id should be generated based on dataset_name and owner_id/user so multiple users can use the same dataset_name
dataset_id = await cognee_datasets.get_unique_dataset_id(
dataset_name=dataset_name, user=user
)
dataset = Dataset(id=dataset_id, name=dataset_name, data=[])
dataset.owner_id = owner_id

session.add(dataset)
Expand Down
3 changes: 2 additions & 1 deletion cognee/modules/pipelines/operations/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from cognee.modules.data.methods import get_datasets, get_datasets_by_name
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
from cognee.api.v1.datasets.datasets import datasets as cognee_datasets
from cognee.modules.data.models import Data, Dataset
from cognee.modules.pipelines.operations.run_tasks import run_tasks
from cognee.modules.pipelines.models import PipelineRunStatus
Expand Down Expand Up @@ -93,7 +94,7 @@ async def run_pipeline(
elif isinstance(dataset, str):
check_dataset_name(dataset)
# Generate id based on unique dataset_id formula
dataset_id = uuid5(NAMESPACE_OID, f"{dataset}{str(user.id)}")
dataset_id = await cognee_datasets.get_unique_dataset_id(dataset_name=dataset, user=user)

if not data:
data: list[Data] = await get_dataset_data(dataset_id=dataset_id)
Expand Down
2 changes: 1 addition & 1 deletion cognee/tasks/ingestion/ingest_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async def store_data_to_dataset(
db_engine = get_relational_engine()

async with db_engine.get_async_session() as session:
dataset = await create_dataset(dataset_name, user.id, session)
dataset = await create_dataset(dataset_name, user, session)

# Check to see if data should be updated
data_point = (
Expand Down