Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 49 additions & 7 deletions cognee/api/v1/cognify/code_graph_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,47 @@
# NOTICE: This module contains deprecated functions.
# Use only the run_code_graph_pipeline function; all other functions are deprecated.
# Related issue: COG-906

import asyncio
import logging
from pathlib import Path
from typing import Union

from cognee.base_config import get_base_config
from cognee.infrastructure.databases.vector.embeddings import \
get_embedding_engine
from cognee.modules.cognify.config import get_cognify_config
from cognee.modules.pipelines import run_tasks
from cognee.modules.pipelines.tasks.Task import Task
from cognee.modules.users.methods import get_default_user
from cognee.shared.data_models import KnowledgeGraph, MonitoringTool
from cognee.tasks.documents import (classify_documents,
extract_chunks_from_documents)
from cognee.tasks.graph import extract_graph_from_data
from cognee.tasks.ingestion import ingest_data_with_metadata
from cognee.tasks.repo_processor import (enrich_dependency_graph,
expand_dependency_graph,
get_data_list_for_user,
get_non_code_files,
get_repo_file_dependencies)
from cognee.tasks.repo_processor.get_source_code_chunks import \
get_source_code_chunks
from cognee.tasks.storage import add_data_points

from cognee.base_config import get_base_config
from cognee.shared.data_models import MonitoringTool

monitoring = get_base_config().monitoring_tool
if monitoring == MonitoringTool.LANGFUSE:
from langfuse.decorators import observe

from cognee.tasks.summarization import summarize_code
from cognee.tasks.summarization import summarize_code, summarize_text

logger = logging.getLogger("code_graph_pipeline")
update_status_lock = asyncio.Lock()

@observe
async def run_code_graph_pipeline(repo_path):
async def run_code_graph_pipeline(repo_path, include_docs=True):
import os
import pathlib

import cognee
from cognee.infrastructure.databases.relational import create_db_and_tables

Expand All @@ -38,12 +55,37 @@ async def run_code_graph_pipeline(repo_path):
await cognee.prune.prune_system(metadata=True)
await create_db_and_tables()

embedding_engine = get_embedding_engine()
cognee_config = get_cognify_config()
user = await get_default_user()

tasks = [
Task(get_repo_file_dependencies),
Task(enrich_dependency_graph, task_config={"batch_size": 50}),
Task(enrich_dependency_graph),
Task(expand_dependency_graph, task_config={"batch_size": 50}),
Task(get_source_code_chunks, embedding_model=embedding_engine.model, task_config={"batch_size": 50}),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does get_source_code_chunks come from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @borisarzentar I messed up the commits on this PR, moved the meaningful part here: #395

I am closing this one now, sorry for the confusion

Task(summarize_code, task_config={"batch_size": 50}),
Task(add_data_points, task_config={"batch_size": 50}),
]

return run_tasks(tasks, repo_path, "cognify_code_pipeline")
if include_docs:
non_code_tasks = [
Task(get_non_code_files, task_config={"batch_size": 50}),
Task(ingest_data_with_metadata, dataset_name="repo_docs", user=user),
Task(get_data_list_for_user, dataset_name="repo_docs", user=user),
Task(classify_documents),
Task(extract_chunks_from_documents),
Task(extract_graph_from_data, graph_model=KnowledgeGraph, task_config={"batch_size": 50}),
Task(
summarize_text,
summarization_model=cognee_config.summarization_model,
task_config={"batch_size": 50}
),
]

if include_docs:
async for result in run_tasks(non_code_tasks, repo_path):
yield result

async for result in run_tasks(tasks, repo_path, "cognify_code_pipeline"):
yield result
Comment on lines +86 to +91
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve pipeline execution and error handling

The current implementation runs document and code tasks sequentially and lacks proper error handling. Consider:

  1. Running document and code tasks in parallel for better performance
  2. Adding specific error handling for document processing
  3. Implementing status tracking for document pipeline tasks

Example implementation:

try:
    if include_docs:
        doc_pipeline = run_tasks(non_code_tasks, repo_path)
    code_pipeline = run_tasks(tasks, repo_path, "cognify_code_pipeline")

    async for result in asyncio.gather(
        doc_pipeline if include_docs else [],
        code_pipeline
    ):
        yield result
except Exception as error:
    logger.error(f"Pipeline execution failed: {error}")
    # Add specific error handling for document vs code pipeline
    raise

1 change: 1 addition & 0 deletions cognee/tasks/repo_processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@

from .enrich_dependency_graph import enrich_dependency_graph
from .expand_dependency_graph import expand_dependency_graph
from .get_non_code_files import get_data_list_for_user, get_non_py_files
from .get_repo_file_dependencies import get_repo_file_dependencies
36 changes: 36 additions & 0 deletions cognee/tasks/repo_processor/get_non_code_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import os

import aiofiles

import cognee.modules.ingestion as ingestion
from cognee.infrastructure.engine import DataPoint
from cognee.modules.data.methods import get_datasets
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
from cognee.modules.data.methods.get_datasets_by_name import \
get_datasets_by_name
from cognee.modules.data.models import Data
from cognee.modules.data.operations.write_metadata import write_metadata
from cognee.modules.ingestion.data_types import BinaryData
from cognee.modules.users.methods import get_default_user
from cognee.shared.CodeGraphEntities import Repository


async def get_non_py_files(repo_path):
"""Get files that are not .py files and their contents"""
if not os.path.exists(repo_path):
return {}

non_py_files_paths = [
os.path.join(root, file)
for root, _, files in os.walk(repo_path) for file in files if not file.endswith(".py")
]
return non_py_files_paths


async def get_data_list_for_user(_, dataset_name, user):
datasets = await get_datasets_by_name(dataset_name, user.id)
data_documents: list[Data] = []
for dataset in datasets:
data_docs: list[Data] = await get_dataset_data(dataset_id=dataset.id)
data_documents.extend(data_docs)
return data_documents
Loading