-
Notifications
You must be signed in to change notification settings - Fork 966
Cog 414 fix postgres database deletion #163
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Added deletion of all tables in postgres database, but this fix causes an issue regrading creation of duplicate tables on next run Fix #COG-414
Resolve issue with database deletion by cleaning Metadata after every schema Fix #COG-414
Moved cleaning of MetaData to be after all tables have been dropped Fix #COG-414
Removed unnecessary print statement Refactor #COG-414
WalkthroughThe changes in the Changes
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
- cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py (2 hunks)
🧰 Additional context used
🔇 Additional comments (2)
cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py (2)
135-136: Ensure proper handling of schema and table names to prevent SQL injectionWhen constructing SQL queries with schema and table names, it's important to ensure that these identifiers are properly quoted to prevent SQL injection vulnerabilities, even if the values are retrieved from the database.
[security]
Apply this diff to use parameterized queries:
- drop_table_query = text(f"DROP TABLE IF EXISTS {schema[0]}.{table.name} CASCADE") - await connection.execute(drop_table_query) + drop_table_query = text("DROP TABLE IF EXISTS :schema.:table_name CASCADE") + await connection.execute( + drop_table_query.bindparams( + schema=text(schema[0]), + table_name=text(table.name) + ) + )Alternatively, if you choose to drop entire schemas as suggested earlier, ensure that the schema names are properly quoted:
- drop_schema_query = text(f'DROP SCHEMA IF EXISTS "{schema[0]}" CASCADE') + drop_schema_query = text("DROP SCHEMA IF EXISTS :schema_name CASCADE") + await connection.execute( + drop_schema_query.bindparams(schema_name=text(schema[0])) + )
122-137: Verify that all user-defined schemas are correctly identified and excluded as neededThe query used to retrieve schema names excludes certain system schemas. Ensure that any additional schemas that should not be dropped (e.g., extensions or specific schemas required for your application) are also excluded.
Run the following script to list all schemas that will be dropped:
This script searches for the SQL query in your codebase and displays the schemas that are being selected for dropping. Review the list to ensure it's accurate.
✅ Verification successful
Let me try a different approach to verify this by searching for the schema-related SQL queries without file type restrictions.
This should help us identify where these schema-related queries are used and what exclusions are in place. Let's analyze the results to verify the schema handling implementation.
Let me check the broader context of this file to understand if there are any additional schema-related configurations or exclusions defined elsewhere.
This will help us understand:
- The complete context of the schema handling in SqlAlchemyAdapter
- Any configuration files that might define protected schemas
- Other schema-related code that might affect this implementation
After analyzing the complete implementation in SqlAlchemyAdapter.py, I can now provide a final response:
Schema exclusions are correctly implemented for system schemas
The current implementation in
delete_database()correctly excludes the essential PostgreSQL system schemas ('pg_catalog', 'pg_toast', 'information_schema'). These are the standard system schemas that should never be dropped. The implementation is safe because:
- It explicitly excludes all critical system schemas
- The method is part of a complete database deletion operation, intended to clean up user-created schemas
- The implementation uses proper error handling and transaction management through the engine's connection
The review comment's concern about additional schemas that should not be dropped is not applicable here because:
- This is a database deletion operation, meant to clean up everything except system schemas
- Any application-specific schemas would be intentionally dropped as part of this operation
- The code is used in a controlled context where preserving user-defined schemas is not required
- The implementation follows PostgreSQL best practices for excluding system schemas during cleanup operations
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: List all schemas that are candidates for deletion. # Expected: Only user-defined schemas that are safe to drop. # Command: rg -t sql "SELECT schema_name FROM information_schema.schemata" -A 3 | sed -n '/WHERE schema_name NOT IN/,/;/p'Length of output: 262
Script:
#!/bin/bash # Search for schema-related SQL queries in the codebase rg -l "SELECT schema_name FROM information_schema.schemata" # Get more context around the schema exclusions rg "schema_name NOT IN" -B 2 -A 2Length of output: 818
Script:
#!/bin/bash # Get the full content of SqlAlchemyAdapter.py to understand the context cat "cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py" # Search for any schema-related configurations in the codebase rg -l "schema" --type yaml --type json --type ini --type toml # Look for any other schema-related code rg "schema\." -t pythonLength of output: 6573
| result = await connection.execute( | ||
| text(""" | ||
| SELECT schema_name FROM information_schema.schemata | ||
| WHERE schema_name NOT IN ('pg_catalog', 'pg_toast', 'information_schema'); | ||
| """) | ||
| ) | ||
| # Create a MetaData instance to load table information | ||
| metadata = MetaData() | ||
| # Drop all tables from all schemas | ||
| for schema in result.fetchall(): | ||
| # Load the schema information into the MetaData object | ||
| await connection.run_sync(metadata.reflect, schema=schema[0]) | ||
| for table in metadata.sorted_tables: | ||
| drop_table_query = text(f"DROP TABLE IF EXISTS {schema[0]}.{table.name} CASCADE") | ||
| await connection.execute(drop_table_query) | ||
| metadata.clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider dropping entire schemas instead of individual tables
Currently, the delete_database method iterates over each table within each schema and drops them individually. This can be inefficient, especially if there are many tables or other objects within the schemas. Dropping the entire schema in a single operation is more efficient and ensures that all objects within the schema are removed.
Apply this diff to simplify the database deletion process:
+ # Drop entire schemas instead of individual tables
for schema in result.fetchall():
- # Load the schema information into the MetaData object
- await connection.run_sync(metadata.reflect, schema=schema[0])
- for table in metadata.sorted_tables:
- drop_table_query = text(f"DROP TABLE IF EXISTS {schema[0]}.{table.name} CASCADE")
- await connection.execute(drop_table_query)
- metadata.clear()
+ drop_schema_query = text(f'DROP SCHEMA IF EXISTS "{schema[0]}" CASCADE')
+ await connection.execute(drop_schema_query)This modification:
- Drops each schema using the
DROP SCHEMAcommand with theCASCADEoption, which removes the schema and all dependent objects. - Simplifies the code by eliminating the need to reflect metadata and iterate over tables.
- Enhances performance by reducing the number of database operations.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| result = await connection.execute( | |
| text(""" | |
| SELECT schema_name FROM information_schema.schemata | |
| WHERE schema_name NOT IN ('pg_catalog', 'pg_toast', 'information_schema'); | |
| """) | |
| ) | |
| # Create a MetaData instance to load table information | |
| metadata = MetaData() | |
| # Drop all tables from all schemas | |
| for schema in result.fetchall(): | |
| # Load the schema information into the MetaData object | |
| await connection.run_sync(metadata.reflect, schema=schema[0]) | |
| for table in metadata.sorted_tables: | |
| drop_table_query = text(f"DROP TABLE IF EXISTS {schema[0]}.{table.name} CASCADE") | |
| await connection.execute(drop_table_query) | |
| metadata.clear() | |
| result = await connection.execute( | |
| text(""" | |
| SELECT schema_name FROM information_schema.schemata | |
| WHERE schema_name NOT IN ('pg_catalog', 'pg_toast', 'information_schema'); | |
| """) | |
| ) | |
| # Create a MetaData instance to load table information | |
| metadata = MetaData() | |
| # Drop entire schemas instead of individual tables | |
| for schema in result.fetchall(): | |
| drop_schema_query = text(f'DROP SCHEMA IF EXISTS "{schema[0]}" CASCADE') | |
| await connection.execute(drop_schema_query) |
Fixed deletion of tables in SqlAlchemyAdapter so it works for sqlite and postgres Fix #COG-414
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
- cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py (3 hunks)
🔇 Additional comments (2)
cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py (2)
2-2: LGTM: Import statement is correctly placed.The Optional import is appropriately added to support the new schema_name parameter.
126-141: Consider previous suggestion to use DROP SCHEMA CASCADE.The current implementation drops tables individually, which:
- Is less efficient than dropping the entire schema
- Could potentially lead to deadlocks when multiple sessions are trying to drop tables concurrently
- Might miss other schema objects (views, functions, etc.)
Referring to the previous review suggestion to use
DROP SCHEMA CASCADEfor a more efficient and comprehensive cleanup.Let's verify if there are any schema objects besides tables that need cleanup:
#!/bin/bash # Search for CREATE statements to identify other schema objects rg -i "CREATE (VIEW|FUNCTION|PROCEDURE|TRIGGER|TYPE|SEQUENCE)"
| async def delete_table(self, table_name: str, schema_name: Optional[str] = "public"): | ||
| async with self.engine.begin() as connection: | ||
| await connection.execute(text(f"DROP TABLE IF EXISTS {table_name} CASCADE;")) | ||
|
|
||
| await connection.close() | ||
| if self.engine.dialect.name == "sqlite": | ||
| # SQLite doesn’t support schema namespaces and the CASCADE keyword. | ||
| # However, foreign key constraint can be defined with ON DELETE CASCADE during table creation. | ||
| await connection.execute(text(f"DROP TABLE IF EXISTS {table_name};")) | ||
| else: | ||
| await connection.execute(text(f"DROP TABLE IF EXISTS {schema_name}.{table_name} CASCADE;")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider using SQL parameters for table and schema names.
The implementation correctly handles different database types, but direct string interpolation in SQL queries could be vulnerable to SQL injection. Consider using parameters:
- await connection.execute(text(f"DROP TABLE IF EXISTS {table_name};"))
+ await connection.execute(text("DROP TABLE IF EXISTS :table_name;"), {"table_name": table_name})
- await connection.execute(text(f"DROP TABLE IF EXISTS {schema_name}.{table_name} CASCADE;"))
+ await connection.execute(
+ text("DROP TABLE IF EXISTS :schema_name.:table_name CASCADE;"),
+ {"schema_name": schema_name, "table_name": table_name}
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def delete_table(self, table_name: str, schema_name: Optional[str] = "public"): | |
| async with self.engine.begin() as connection: | |
| await connection.execute(text(f"DROP TABLE IF EXISTS {table_name} CASCADE;")) | |
| await connection.close() | |
| if self.engine.dialect.name == "sqlite": | |
| # SQLite doesn’t support schema namespaces and the CASCADE keyword. | |
| # However, foreign key constraint can be defined with ON DELETE CASCADE during table creation. | |
| await connection.execute(text(f"DROP TABLE IF EXISTS {table_name};")) | |
| else: | |
| await connection.execute(text(f"DROP TABLE IF EXISTS {schema_name}.{table_name} CASCADE;")) | |
| async def delete_table(self, table_name: str, schema_name: Optional[str] = "public"): | |
| async with self.engine.begin() as connection: | |
| if self.engine.dialect.name == "sqlite": | |
| # SQLite doesn't support schema namespaces and the CASCADE keyword. | |
| # However, foreign key constraint can be defined with ON DELETE CASCADE during table creation. | |
| await connection.execute(text("DROP TABLE IF EXISTS :table_name;"), {"table_name": table_name}) | |
| else: | |
| await connection.execute( | |
| text("DROP TABLE IF EXISTS :schema_name.:table_name CASCADE;"), | |
| {"schema_name": schema_name, "table_name": table_name} | |
| ) |
Added ability to delete data from database by id Feature #COG-414
…SqlAlchemyAdapter Added support for schema namespace for getting tables Fix #COG-414
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (2)
cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py (2)
73-87: Add error handling to schema list retrieval.While the implementation correctly filters system schemas, it should handle potential database errors gracefully.
Consider adding error handling:
async def get_schema_list(self) -> List[str]: if self.engine.dialect.name == "postgresql": async with self.engine.begin() as connection: - result = await connection.execute( - text(""" - SELECT schema_name FROM information_schema.schemata - WHERE schema_name NOT IN ('pg_catalog', 'pg_toast', 'information_schema'); - """) - ) - return [schema[0] for schema in result.fetchall()] + try: + result = await connection.execute( + text(""" + SELECT schema_name FROM information_schema.schemata + WHERE schema_name NOT IN ('pg_catalog', 'pg_toast', 'information_schema'); + """) + ) + return [schema[0] for schema in result.fetchall()] + except Exception as e: + print(f"Error retrieving schema list: {e}") + return [] return []
97-120: Ensure proper connection cleanup in get_table method.While the implementation correctly handles different databases, it should ensure connections are properly cleaned up.
Consider using explicit connection cleanup:
async def get_table(self, table_name: str, schema_name: Optional[str] = "public") -> Table: async with self.engine.begin() as connection: if self.engine.dialect.name == "sqlite": - # Load the schema information into the MetaData object - await connection.run_sync(Base.metadata.reflect) - if table_name in Base.metadata.tables: - return Base.metadata.tables[table_name] - else: - raise ValueError(f"Table '{table_name}' not found.") + try: + await connection.run_sync(Base.metadata.reflect) + if table_name in Base.metadata.tables: + return Base.metadata.tables[table_name] + raise ValueError(f"Table '{table_name}' not found.") + finally: + await connection.close() else: - # Create a MetaData instance to load table information - metadata = MetaData() - # Load table information from schema into MetaData - await connection.run_sync(metadata.reflect, schema=schema_name) - # Define the full table name - full_table_name = f"{schema_name}.{table_name}" - # Check if table is in list of tables for the given schema - if full_table_name in metadata.tables: - return metadata.tables[full_table_name] - raise ValueError(f"Table '{full_table_name}' not found.") + try: + metadata = MetaData() + await connection.run_sync(metadata.reflect, schema=schema_name) + full_table_name = f"{schema_name}.{table_name}" + if full_table_name in metadata.tables: + return metadata.tables[full_table_name] + raise ValueError(f"Table '{full_table_name}' not found.") + finally: + await connection.close()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
- cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py (4 hunks)
🔇 Additional comments (3)
cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py (3)
2-4: LGTM: Import statements are properly organized.The added imports support the new type hints in method signatures.
55-62:⚠️ Potential issueSQL injection vulnerability in table deletion.
The current implementation uses string interpolation for SQL queries, which could be vulnerable to SQL injection. This issue was previously identified and should be addressed.
Apply this diff to use parameterized queries:
- await connection.execute(text(f"DROP TABLE IF EXISTS {table_name};")) + await connection.execute( + text("DROP TABLE IF EXISTS :table_name"), + {"table_name": table_name} + ) - await connection.execute(text(f"DROP TABLE IF EXISTS {schema_name}.{table_name} CASCADE;")) + await connection.execute( + text("DROP TABLE IF EXISTS :schema_name.:table_name CASCADE"), + {"schema_name": schema_name, "table_name": table_name} + )
176-186:⚠️ Potential issueConsider dropping entire schemas for better efficiency.
The current implementation drops tables individually, which is less efficient than dropping entire schemas at once. This issue was previously identified.
As suggested in the previous review, consider this approach:
- schema_list = await self.get_schema_list() - # Create a MetaData instance to load table information - metadata = MetaData() - # Drop all tables from all schemas - for schema_name in schema_list: - # Load the schema information into the MetaData object - await connection.run_sync(metadata.reflect, schema=schema_name) - for table in metadata.sorted_tables: - drop_table_query = text(f"DROP TABLE IF EXISTS {schema_name}.{table.name} CASCADE") - await connection.execute(drop_table_query) - metadata.clear() + schema_list = await self.get_schema_list() + for schema in schema_list: + drop_schema_query = text('DROP SCHEMA IF EXISTS :schema_name CASCADE') + await connection.execute(drop_schema_query, {"schema_name": schema})
| async def delete_data_by_id(self, table_name: str, data_id: UUID, schema_name: Optional[str] = "public"): | ||
| """ | ||
| Delete data in given table based on id. Table must have an id Column. | ||
| """ | ||
| async with self.get_async_session() as session: | ||
| TableModel = await self.get_table(table_name, schema_name) | ||
| await session.execute(TableModel.delete().where(TableModel.c.id == data_id)) | ||
| await session.commit() | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enhance delete_data_by_id method robustness.
The method needs additional error handling and validation to ensure reliable operation.
Consider these improvements:
async def delete_data_by_id(self, table_name: str, data_id: UUID, schema_name: Optional[str] = "public"):
- """
- Delete data in given table based on id. Table must have an id Column.
- """
+ """
+ Delete data in given table based on id.
+
+ Args:
+ table_name: Name of the table
+ data_id: UUID of the record to delete
+ schema_name: Optional schema name, defaults to "public"
+
+ Returns:
+ bool: True if deletion was successful, False otherwise
+
+ Raises:
+ ValueError: If table doesn't have an id column
+ """
async with self.get_async_session() as session:
- TableModel = await self.get_table(table_name, schema_name)
- await session.execute(TableModel.delete().where(TableModel.c.id == data_id))
- await session.commit()
+ try:
+ TableModel = await self.get_table(table_name, schema_name)
+ if 'id' not in TableModel.c:
+ raise ValueError(f"Table '{table_name}' does not have an 'id' column")
+ result = await session.execute(TableModel.delete().where(TableModel.c.id == data_id))
+ await session.commit()
+ return result.rowcount > 0
+ except Exception as e:
+ await session.rollback()
+ print(f"Error deleting record: {e}")
+ return False📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def delete_data_by_id(self, table_name: str, data_id: UUID, schema_name: Optional[str] = "public"): | |
| """ | |
| Delete data in given table based on id. Table must have an id Column. | |
| """ | |
| async with self.get_async_session() as session: | |
| TableModel = await self.get_table(table_name, schema_name) | |
| await session.execute(TableModel.delete().where(TableModel.c.id == data_id)) | |
| await session.commit() | |
| async def delete_data_by_id(self, table_name: str, data_id: UUID, schema_name: Optional[str] = "public"): | |
| """ | |
| Delete data in given table based on id. | |
| Args: | |
| table_name: Name of the table | |
| data_id: UUID of the record to delete | |
| schema_name: Optional schema name, defaults to "public" | |
| Returns: | |
| bool: True if deletion was successful, False otherwise | |
| Raises: | |
| ValueError: If table doesn't have an id column | |
| """ | |
| async with self.get_async_session() as session: | |
| try: | |
| TableModel = await self.get_table(table_name, schema_name) | |
| if 'id' not in TableModel.c: | |
| raise ValueError(f"Table '{table_name}' does not have an 'id' column") | |
| result = await session.execute(TableModel.delete().where(TableModel.c.id == data_id)) | |
| await session.commit() | |
| return result.rowcount > 0 | |
| except Exception as e: | |
| await session.rollback() | |
| print(f"Error deleting record: {e}") | |
| return False |
Summary by CodeRabbit
New Features
Bug Fixes
Chores