Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
import os
from os import path
import logging
from uuid import UUID
from typing import Optional
from typing import AsyncGenerator, List
from contextlib import asynccontextmanager
from sqlalchemy import text, select, MetaData, Table
from sqlalchemy import text, select, MetaData, Table, delete
from sqlalchemy.orm import joinedload
from sqlalchemy.exc import NoResultFound
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker

from cognee.infrastructure.databases.exceptions import EntityNotFoundError
from cognee.modules.data.models.Data import Data

from ..ModelBase import Base


logger = logging.getLogger(__name__)

class SQLAlchemyAdapter():
def __init__(self, connection_string: str):
self.db_path: str = None
Expand Down Expand Up @@ -86,9 +94,9 @@ async def get_schema_list(self) -> List[str]:
return [schema[0] for schema in result.fetchall()]
return []

async def delete_data_by_id(self, table_name: str, data_id: UUID, schema_name: Optional[str] = "public"):
async def delete_entity_by_id(self, table_name: str, data_id: UUID, schema_name: Optional[str] = "public"):
"""
Delete data in given table based on id. Table must have an id Column.
Delete entity in given table based on id. Table must have an id Column.
"""
if self.engine.dialect.name == "sqlite":
async with self.get_async_session() as session:
Expand All @@ -107,6 +115,42 @@ async def delete_data_by_id(self, table_name: str, data_id: UUID, schema_name: O
await session.commit()


async def delete_data_entity(self, data_id: UUID):
"""
Delete data and local files related to data if there are no references to it anymore.
"""
async with self.get_async_session() as session:
if self.engine.dialect.name == "sqlite":
# Foreign key constraints are disabled by default in SQLite (for backwards compatibility),
# so must be enabled for each database connection/session separately.
await session.execute(text("PRAGMA foreign_keys = ON;"))

try:
data_entity = (await session.scalars(select(Data).where(Data.id == data_id))).one()
except (ValueError, NoResultFound) as e:
raise EntityNotFoundError(message=f"Entity not found: {str(e)}")

# Check if other data objects point to the same raw data location
raw_data_location_entities = (await session.execute(
select(Data.raw_data_location).where(Data.raw_data_location == data_entity.raw_data_location))).all()

# Don't delete local file unless this is the only reference to the file in the database
if len(raw_data_location_entities) == 1:

# delete local file only if it's created by cognee
from cognee.base_config import get_base_config
config = get_base_config()

if config.data_root_directory in raw_data_location_entities[0].raw_data_location:
if os.path.exists(raw_data_location_entities[0].raw_data_location):
os.remove(raw_data_location_entities[0].raw_data_location)
else:
# Report bug as file should exist
logger.error("Local file which should exist can't be found.")

await session.execute(delete(Data).where(Data.id == data_id))
await session.commit()

async def get_table(self, table_name: str, schema_name: Optional[str] = "public") -> Table:
"""
Dynamically loads a table using the given table name and schema name.
Expand Down
2 changes: 1 addition & 1 deletion cognee/modules/data/methods/delete_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ async def delete_data(data: Data):

db_engine = get_relational_engine()

return await db_engine.delete_data_by_id(data.__tablename__, data.id)
return await db_engine.delete_data_entity(data.id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Based on the gathered information, I can now provide the final response:

Error handling improvements needed for file operations

The delete_data_entity implementation has proper error handling for:

  • Non-existent data IDs (raises EntityNotFoundError)
  • Concurrent deletion (handled via foreign key constraints)

However, the file deletion operation using os.remove() lacks try-catch error handling, which could fail due to:

  • Permission issues
  • File already being deleted
  • File system errors

Action items:

  • Wrap the os.remove() operation in a try-catch block
  • Handle potential OSError/IOError exceptions
  • Consider logging file deletion failures
🔗 Analysis chain

Verify error handling in delete_data_entity

The function now uses delete_data_entity which should handle local file deletion. Ensure that error handling is comprehensive, particularly for:

  1. Non-existent data IDs
  2. Failed file deletions
  3. Concurrent deletion attempts

Let me gather more information about the error handling implementation since the previous results were truncated.


Let me check one more aspect to see how the file deletion errors are handled.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Look for error handling in the delete_data_entity implementation

echo "Checking error handling in delete_data_entity implementation..."
rg -A 10 "async def delete_data_entity"

echo "\nChecking for try-catch blocks around file operations..."
ast-grep --pattern 'try {
  $$$
  delete_data_entity
  $$$
}'

Length of output: 1758


Script:

#!/bin/bash
# Get full implementation of delete_data_entity including try-catch blocks
rg -A 20 "async def delete_data_entity"

# Look for any file deletion operations in the same file
echo "\nChecking file deletion operations..."
rg "unlink|remove|delete.*file" cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py

Length of output: 3324


Script:

#!/bin/bash
# Look for try-catch blocks around the file deletion operation
rg -B 5 -A 5 "os.remove\(.*raw_data_location" cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py

# Check if there are any custom exceptions defined for file operations
echo "\nChecking for file operation related exceptions..."
rg "class.*Error" cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py

Length of output: 940

2 changes: 1 addition & 1 deletion cognee/modules/data/methods/delete_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
async def delete_dataset(dataset: Dataset):
db_engine = get_relational_engine()

return await db_engine.delete_data_by_id(dataset.__tablename__, dataset.id)
return await db_engine.delete_entity_by_id(dataset.__tablename__, dataset.id)
54 changes: 44 additions & 10 deletions cognee/tests/test_pgvector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,53 @@
import logging
import pathlib
import cognee

from cognee.modules.data.models import Data
from cognee.api.v1.search import SearchType
from cognee.modules.retrieval.brute_force_triplet_search import brute_force_triplet_search
from cognee.modules.users.methods import get_default_user

logging.basicConfig(level=logging.DEBUG)

async def test_local_file_deletion(data_text, file_location):
from sqlalchemy import select
import hashlib
from cognee.infrastructure.databases.relational import get_relational_engine

engine = get_relational_engine()

async with engine.get_async_session() as session:
# Get hash of data contents
encoded_text = data_text.encode("utf-8")
data_hash = hashlib.md5(encoded_text).hexdigest()
Comment on lines +22 to +23
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using a more secure hashing algorithm

MD5 is cryptographically weak and susceptible to hash collisions. Consider using a more secure algorithm like SHA-256.

-        encoded_text = data_text.encode("utf-8")
-        data_hash = hashlib.md5(encoded_text).hexdigest()
+        encoded_text = data_text.encode("utf-8")
+        data_hash = hashlib.sha256(encoded_text).hexdigest()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
encoded_text = data_text.encode("utf-8")
data_hash = hashlib.md5(encoded_text).hexdigest()
encoded_text = data_text.encode("utf-8")
data_hash = hashlib.sha256(encoded_text).hexdigest()

# Get data entry from database based on hash contents
data = (await session.scalars(select(Data).where(Data.content_hash == data_hash))).one()
assert os.path.isfile(data.raw_data_location), f"Data location doesn't exist: {data.raw_data_location}"
# Test deletion of data along with local files created by cognee
await engine.delete_data_entity(data.id)
assert not os.path.exists(
data.raw_data_location), f"Data location still exists after deletion: {data.raw_data_location}"

async with engine.get_async_session() as session:
# Get data entry from database based on file path
data = (await session.scalars(select(Data).where(Data.raw_data_location == file_location))).one()
assert os.path.isfile(data.raw_data_location), f"Data location doesn't exist: {data.raw_data_location}"
# Test local files not created by cognee won't get deleted
await engine.delete_data_entity(data.id)
assert os.path.exists(data.raw_data_location), f"Data location doesn't exists: {data.raw_data_location}"

async def test_getting_of_documents(dataset_name_1):
# Test getting of documents for search per dataset
from cognee.modules.users.permissions.methods import get_document_ids_for_user
user = await get_default_user()
document_ids = await get_document_ids_for_user(user.id, [dataset_name_1])
assert len(document_ids) == 1, f"Number of expected documents doesn't match {len(document_ids)} != 1"

# Test getting of documents for search when no dataset is provided
user = await get_default_user()
document_ids = await get_document_ids_for_user(user.id)
assert len(document_ids) == 2, f"Number of expected documents doesn't match {len(document_ids)} != 2"
Comment on lines +40 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve test function structure and assertions

Several improvements needed:

  1. Add docstring
  2. Improve assertion messages
  3. Remove duplicate user retrieval
 async def test_getting_of_documents(dataset_name_1):
+    """
+    Test document retrieval functionality:
+    1. Verify correct document count when filtering by dataset
+    2. Verify correct document count when no dataset filter is applied
+    """
     # Test getting of documents for search per dataset
     from cognee.modules.users.permissions.methods import get_document_ids_for_user
     user = await get_default_user()
     document_ids = await get_document_ids_for_user(user.id, [dataset_name_1])
-    assert len(document_ids) == 1, f"Number of expected documents doesn't match {len(document_ids)} != 1"
+    assert len(document_ids) == 1, f"Expected 1 document when filtering by dataset, but got {len(document_ids)}"

     # Test getting of documents for search when no dataset is provided
-    user = await get_default_user()  # Duplicate user retrieval
     document_ids = await get_document_ids_for_user(user.id)
-    assert len(document_ids) == 2, f"Number of expected documents doesn't match {len(document_ids)} != 2"
+    assert len(document_ids) == 2, f"Expected 2 documents without dataset filter, but got {len(document_ids)}"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def test_getting_of_documents(dataset_name_1):
# Test getting of documents for search per dataset
from cognee.modules.users.permissions.methods import get_document_ids_for_user
user = await get_default_user()
document_ids = await get_document_ids_for_user(user.id, [dataset_name_1])
assert len(document_ids) == 1, f"Number of expected documents doesn't match {len(document_ids)} != 1"
# Test getting of documents for search when no dataset is provided
user = await get_default_user()
document_ids = await get_document_ids_for_user(user.id)
assert len(document_ids) == 2, f"Number of expected documents doesn't match {len(document_ids)} != 2"
async def test_getting_of_documents(dataset_name_1):
"""
Test document retrieval functionality:
1. Verify correct document count when filtering by dataset
2. Verify correct document count when no dataset filter is applied
"""
# Test getting of documents for search per dataset
from cognee.modules.users.permissions.methods import get_document_ids_for_user
user = await get_default_user()
document_ids = await get_document_ids_for_user(user.id, [dataset_name_1])
assert len(document_ids) == 1, f"Expected 1 document when filtering by dataset, but got {len(document_ids)}"
# Test getting of documents for search when no dataset is provided
document_ids = await get_document_ids_for_user(user.id)
assert len(document_ids) == 2, f"Expected 2 documents without dataset filter, but got {len(document_ids)}"



async def main():
cognee.config.set_vector_db_config(
Expand Down Expand Up @@ -67,16 +108,7 @@ async def main():

from cognee.infrastructure.databases.vector import get_vector_engine

# Test getting of documents for search per dataset
from cognee.modules.users.permissions.methods import get_document_ids_for_user
user = await get_default_user()
document_ids = await get_document_ids_for_user(user.id, [dataset_name_1])
assert len(document_ids) == 1, f"Number of expected documents doesn't match {len(document_ids)} != 1"

# Test getting of documents for search when no dataset is provided
user = await get_default_user()
document_ids = await get_document_ids_for_user(user.id)
assert len(document_ids) == 2, f"Number of expected documents doesn't match {len(document_ids)} != 2"
await test_getting_of_documents(dataset_name_1)

vector_engine = get_vector_engine()
random_node = (await vector_engine.search("entity_name", "Quantum computer"))[0]
Expand Down Expand Up @@ -106,6 +138,8 @@ async def main():
results = await brute_force_triplet_search('What is a quantum computer?')
assert len(results) > 0

await test_local_file_deletion(text, explanation_file_path)

await cognee.prune.prune_data()
assert not os.path.isdir(data_directory_path), "Local data files are not deleted"

Expand Down
Loading