Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions evals/eval_with_modal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# File: eval_with_modal.py

import modal
import os
import json
from typing import Optional

app = modal.App("cognee-runner")

# LOCAL_COGNEE_PATH = os.path.dirname(os.path.abspath(__file__))
LOCAL_COGNEE_PATH = "/Users/vasilije/cognee"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace hardcoded path with environment variable or configuration.

The hardcoded local path /Users/vasilije/cognee is not portable and will fail for other users.

Replace with:

-LOCAL_COGNEE_PATH = "/Users/vasilije/cognee"
+LOCAL_COGNEE_PATH = os.getenv("COGNEE_PATH", os.path.dirname(os.path.abspath(__file__)))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# LOCAL_COGNEE_PATH = os.path.dirname(os.path.abspath(__file__))
LOCAL_COGNEE_PATH = "/Users/vasilije/cognee"
# LOCAL_COGNEE_PATH = os.path.dirname(os.path.abspath(__file__))
LOCAL_COGNEE_PATH = os.getenv("COGNEE_PATH", os.path.dirname(os.path.abspath(__file__)))
🧰 Tools
🪛 GitHub Actions: ruff format

[warning] File requires formatting with Ruff formatter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix once i finish running it


image = (
modal.Image.debian_slim()
.pip_install("poetry")
.copy_local_dir(LOCAL_COGNEE_PATH, "/root/cognee")
.run_commands(
"cd /root/cognee && poetry install",
)
)


@app.function(image=image, gpu="T4", concurrency_limit=5)
def run_single_repo(instance_data: dict, disable_cognee: bool = False):
import subprocess
import json
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double imports here

import os

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix duplicate import and add error handling.

The os module is imported twice, and there's no error handling for the subprocess execution.

-    import os
     import subprocess
     import json
-    import os
+    from typing import Tuple
+
+    def handle_subprocess_error(error: subprocess.CalledProcessError) -> Tuple[str, str]:
+        return (
+            f"pred_{'nocognee' if disable_cognee else 'cognee'}_{instance_data['instance_id']}.json",
+            json.dumps({"error": str(error)})
+        )

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.8.2)

27-27: Redefinition of unused os from line 4

Remove definition: os

(F811)

🪛 GitHub Actions: ruff format

[warning] File requires formatting with Ruff formatter

🪛 GitHub Actions: ruff lint

[error] 27-27: Redefinition of unused 'os' from line 4. The 'os' module is already imported on line 4 and is not being used.

instance_json_str = json.dumps(instance_data)

cmd = [
"python",
"process_single_repo.py",
f"--instance_json={instance_json_str}",
]
if disable_cognee:
cmd.append("--disable-cognee")

work_dir = "/root/cognee"
subprocess.run(cmd, cwd=work_dir, check=True)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for subprocess execution.

The subprocess execution lacks error handling and could fail silently.

     work_dir = "/root/cognee"
-    subprocess.run(cmd, cwd=work_dir, check=True)
+    try:
+        subprocess.run(cmd, cwd=work_dir, check=True, capture_output=True, text=True)
+    except subprocess.CalledProcessError as e:
+        print(f"Error executing process_single_repo.py: {e.stderr}", file=sys.stderr)
+        return handle_subprocess_error(e)

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 GitHub Actions: ruff format

[warning] File requires formatting with Ruff formatter

instance_id = instance_data["instance_id"]
filename = f"pred_{'nocognee' if disable_cognee else 'cognee'}_{instance_id}.json"
path_in_container = os.path.join(work_dir, filename)

if os.path.exists(path_in_container):
with open(path_in_container, "r") as f:
content = f.read()
return (filename, content)
else:
return (filename, "")


@app.local_entrypoint()
def main(disable_cognee: bool = False, num_samples: int = 5):
"""
Main entry point for Modal.
Args:
disable_cognee: If True, runs without Cognee
num_samples: Number of samples to process
"""
from swebench.harness.utils import load_swebench_dataset

dataset_name = (
"princeton-nlp/SWE-bench_Lite_bm25_13K" if disable_cognee
else "princeton-nlp/SWE-bench_Lite"
)

swe_dataset = load_swebench_dataset(dataset_name, split="test")
swe_dataset = swe_dataset[:num_samples]

calls = []
for instance in swe_dataset:
calls.append(run_single_repo.remote(instance, disable_cognee=disable_cognee))

results = []
for call in calls:
filename, content = call
if content:
with open(filename, "w") as f:
f.write(content)
print(f"Saved {filename} locally.")
results.append(filename)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add cleanup for temporary files.

The code saves files locally but never cleans them up, which could lead to disk space issues over time.

     results = []
+    temp_files = []
     for call in calls:
         filename, content = call
         if content:
             with open(filename, "w") as f:
                 f.write(content)
             print(f"Saved {filename} locally.")
             results.append(filename)
+            temp_files.append(filename)
+
+    # After merging files, clean up temporary files
+    def cleanup_temp_files():
+        for temp_file in temp_files:
+            try:
+                os.remove(temp_file)
+                print(f"Cleaned up {temp_file}")
+            except OSError as e:
+                print(f"Error cleaning up {temp_file}: {e}", file=sys.stderr)

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 GitHub Actions: ruff format

[warning] File requires formatting with Ruff formatter

merged = []
for fname in results:
with open(fname, "r") as f:
merged.append(json.load(f))

merged_filename = "merged_nocognee.json" if disable_cognee else "merged_cognee.json"
with open(merged_filename, "w") as f:
json.dump(merged, f, indent=2)

print(f"Merged {len(results)} repos into {merged_filename}!")
print("Done!")
14 changes: 14 additions & 0 deletions evals/get_started.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import modal

app = modal.App("example-get-started")


@app.function()
def square(x):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will turn it into a test

print("This code is running on a remote worker!")
return x**2


@app.local_entrypoint()
def main():
print("the square is", square.remote(42))
132 changes: 132 additions & 0 deletions evals/process_single_repo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# File: process_single_repo.py

import argparse
import json
import subprocess
import sys
import os
from pathlib import Path

from swebench.inference.make_datasets.create_instance import PATCH_EXAMPLE

from cognee.api.v1.cognify.code_graph_pipeline import run_code_graph_pipeline
from cognee.infrastructure.llm.get_llm_client import get_llm_client
from cognee.infrastructure.llm.prompts import read_query_prompt
from cognee.modules.retrieval.description_to_codepart_search import (
code_description_to_code_part_search,
)
from evals.eval_utils import download_github_repo


def check_install_package(package_name):
"""Check if a pip package is installed and install it if not."""
try:
__import__(package_name)
return True
except ImportError:
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", package_name])
return True
except subprocess.CalledProcessError:
return False

Comment on lines +21 to +32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace runtime package installation with proper dependency management.

Installing packages at runtime is risky and could lead to inconsistent environments.

Move dependencies to pyproject.toml or requirements.txt:

-def check_install_package(package_name):
-    """Check if a pip package is installed and install it if not."""
-    try:
-        __import__(package_name)
-        return True
-    except ImportError:
-        try:
-            subprocess.check_call([sys.executable, "-m", "pip", "install", package_name])
-            return True
-        except subprocess.CalledProcessError:
-            return False

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 GitHub Actions: ruff format

[warning] File requires formatting with Ruff formatter


async def generate_patch_with_cognee(instance):
repo_path = download_github_repo(instance, "../RAW_GIT_REPOS")
include_docs = True
problem_statement = instance["problem_statement"]
instructions = read_query_prompt("patch_gen_kg_instructions.txt")

async for result in run_code_graph_pipeline(repo_path, include_docs=include_docs):
print(result)

retrieved_codeparts = await code_description_to_code_part_search(
problem_statement, include_docs=include_docs
)

prompt = "\n".join(
[
problem_statement,
"<patch>",
PATCH_EXAMPLE,
"</patch>",
"Additional context to solve the problem:",
retrieved_codeparts,
]
)

llm_client = get_llm_client()
answer_prediction = await llm_client.acreate_structured_output(
text_input=prompt,
system_prompt=instructions,
response_model=str,
)

return answer_prediction

Comment on lines +34 to +66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling and type hints to async functions.

The async functions lack proper error handling and type hints.

-async def generate_patch_with_cognee(instance):
+async def generate_patch_with_cognee(instance: dict) -> str:
+    """Generate patch using Cognee.
+    
+    Args:
+        instance: Repository instance data
+        
+    Returns:
+        str: Generated patch
+        
+    Raises:
+        ValueError: If required instance data is missing
+        RuntimeError: If patch generation fails
+    """
+    if not instance.get("problem_statement"):
+        raise ValueError("Missing problem statement in instance")
+
     repo_path = download_github_repo(instance, "../RAW_GIT_REPOS")
     include_docs = True
     problem_statement = instance["problem_statement"]
     instructions = read_query_prompt("patch_gen_kg_instructions.txt")
 
-    async for result in run_code_graph_pipeline(repo_path, include_docs=include_docs):
-        print(result)
+    try:
+        async for result in run_code_graph_pipeline(repo_path, include_docs=include_docs):
+            print(result)
+    except Exception as e:
+        raise RuntimeError(f"Failed to run code graph pipeline: {e}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def generate_patch_with_cognee(instance):
repo_path = download_github_repo(instance, "../RAW_GIT_REPOS")
include_docs = True
problem_statement = instance["problem_statement"]
instructions = read_query_prompt("patch_gen_kg_instructions.txt")
async for result in run_code_graph_pipeline(repo_path, include_docs=include_docs):
print(result)
retrieved_codeparts = await code_description_to_code_part_search(
problem_statement, include_docs=include_docs
)
prompt = "\n".join(
[
problem_statement,
"<patch>",
PATCH_EXAMPLE,
"</patch>",
"Additional context to solve the problem:",
retrieved_codeparts,
]
)
llm_client = get_llm_client()
answer_prediction = await llm_client.acreate_structured_output(
text_input=prompt,
system_prompt=instructions,
response_model=str,
)
return answer_prediction
async def generate_patch_with_cognee(instance: dict) -> str:
"""Generate patch using Cognee.
Args:
instance: Repository instance data
Returns:
str: Generated patch
Raises:
ValueError: If required instance data is missing
RuntimeError: If patch generation fails
"""
if not instance.get("problem_statement"):
raise ValueError("Missing problem statement in instance")
repo_path = download_github_repo(instance, "../RAW_GIT_REPOS")
include_docs = True
problem_statement = instance["problem_statement"]
instructions = read_query_prompt("patch_gen_kg_instructions.txt")
try:
async for result in run_code_graph_pipeline(repo_path, include_docs=include_docs):
print(result)
except Exception as e:
raise RuntimeError(f"Failed to run code graph pipeline: {e}")
retrieved_codeparts = await code_description_to_code_part_search(
problem_statement, include_docs=include_docs
)
prompt = "\n".join(
[
problem_statement,
"<patch>",
PATCH_EXAMPLE,
"</patch>",
"Additional context to solve the problem:",
retrieved_codeparts,
]
)
llm_client = get_llm_client()
answer_prediction = await llm_client.acreate_structured_output(
text_input=prompt,
system_prompt=instructions,
response_model=str,
)
return answer_prediction
🧰 Tools
🪛 GitHub Actions: ruff format

[warning] File requires formatting with Ruff formatter


async def generate_patch_without_cognee(instance, llm_client):
instructions = read_query_prompt("patch_gen_instructions.txt")
answer_prediction = await llm_client.acreate_structured_output(
text_input=instance["text"],
system_prompt=instructions,
response_model=str,
)
return answer_prediction


async def process_repo(instance, disable_cognee=False):
"""
Process a single repository (a single instance).
"""
if not disable_cognee:
model_patch = await generate_patch_with_cognee(instance)
model_name = "with_cognee"
else:
llm_client = get_llm_client()
model_patch = await generate_patch_without_cognee(instance, llm_client)
model_name = "without_cognee"

return {
"instance_id": instance["instance_id"],
"model_patch": model_patch,
"model_name_or_path": model_name,
}


async def main():
"""
Main entry: expects a single repository (instance) in JSON form.
Example usage:
python process_single_repo.py --instance_json='{"instance_id": "abc123", ...}'
or called as an imported function from Modal.
"""
parser = argparse.ArgumentParser(description="Process a single repo from SWE-Bench")
parser.add_argument("--instance_json", type=str, required=True)
parser.add_argument("--disable-cognee", action="store_true", help="Disable Cognee for evaluation")
args = parser.parse_args()

# Install dependencies if needed
for dependency in ["transformers", "sentencepiece", "swebench"]:
check_install_package(dependency)

# Parse the instance JSON from CLI
instance = json.loads(args.instance_json)

# Get the prediction
result = await process_repo(instance, disable_cognee=args.disable_cognee)

# Construct a file name for the single result
instance_id = instance["instance_id"]
out_name = f"pred_{'nocognee' if args.disable_cognee else 'cognee'}_{instance_id}.json"

with open(out_name, "w") as f:
json.dump(result, f, indent=2)

print(f"Finished processing instance_id={instance_id}. Saved to {out_name}")


if __name__ == "__main__":
import asyncio

asyncio.run(main(), debug=True)
Comment on lines +129 to +132
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove debug mode in production.

Running asyncio with debug=True in production can impact performance.

 if __name__ == "__main__":
     import asyncio
 
-    asyncio.run(main(), debug=True)
+    asyncio.run(main())
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if __name__ == "__main__":
import asyncio
asyncio.run(main(), debug=True)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
🧰 Tools
🪛 GitHub Actions: ruff format

[warning] File requires formatting with Ruff formatter

Loading