-
Notifications
You must be signed in to change notification settings - Fork 999
feat: feedback enrichment #1571
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Please make sure all the checkboxes are checked:
|
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThis PR introduces comprehensive enhancements to Cognee including web scraping capabilities (BeautifulSoup and Tavily integration), a new feedback enrichment system, API mode support for MCP with a dual-mode CogneeClient, distributed Kuzu locking via Redis, Mistral LLM provider support, pipeline batching with configurable batch sizes, removal of INSIGHTS search type, complete removal of MemgraphAdapter, and systematic exception chaining improvements throughout the codebase. Changes
Sequence Diagram(s)sequenceDiagram
actor User
participant UI as Frontend/CLI
participant API as Cognee API
participant MCP as MCP Server
participant Direct as Direct Mode<br/>(In-Process)
participant APIClient as CogneeClient<br/>(API Mode)
User->>UI: Invoke action<br/>(add/cognify/search)
alt API Mode Enabled
UI->>APIClient: Create with api_url
APIClient->>MCP: POST /api/v1/{operation}
MCP->>API: Route to operation
API-->>MCP: Response (JSON)
MCP-->>APIClient: HTTP Response
APIClient-->>UI: Parsed result
else Direct Mode
UI->>Direct: Invoke cognee.{operation}
Direct->>API: Execute operation
API-->>Direct: Result
Direct-->>UI: Direct result
end
sequenceDiagram
participant Task as Pipeline Task
participant Batching as Batch Processor<br/>(data_per_batch)
participant DataItem as Data Item<br/>Processor
participant Telemetry as Telemetry
participant DB as Database
Task->>Batching: run_tasks(data[], batch_size=20)
loop For each batch of 20 items
Batching->>DataItem: process_batch(items)
alt Incremental Mode
DataItem->>DB: Check prior status
alt Not completed
DataItem->>Telemetry: run_with_telemetry
Telemetry->>DB: Update status
else Already completed
DataItem-->>Batching: Skip (already done)
end
else Regular Mode
DataItem->>Telemetry: run_with_telemetry
end
DataItem-->>Batching: Results
end
Batching-->>Task: All results collected
sequenceDiagram
participant User
participant FeedbackTask as Feedback Task
participant LLM as LLM Service
participant Retriever as GraphCompletion<br/>Retriever
participant Graph as Graph DB
User->>FeedbackTask: Extract feedback
FeedbackTask->>Graph: Fetch negative feedback + interactions
FeedbackTask-->>Graph: Build enrichments
FeedbackTask->>FeedbackTask: For each enrichment
FeedbackTask->>Retriever: Generate improved answer<br/>(CoT)
Retriever->>LLM: Reaction prompt
LLM-->>Retriever: Improved answer + explanation
Retriever-->>FeedbackTask: Result
FeedbackTask->>LLM: Create report<br/>(feedback_report_prompt)
LLM-->>FeedbackTask: Report text
FeedbackTask->>Graph: Add enrichment edges<br/>(enriches_feedback,<br/>improves_interaction)
Graph->>Graph: Index new edges
FeedbackTask-->>User: Enrichments complete
sequenceDiagram
participant User
participant WebScraper as Web Scraper Task
participant Crawler as Crawler<br/>(BS4/Tavily)
participant LLM as LLM
participant Graph as Graph DB
User->>WebScraper: web_scraper_task(url, extraction_rules)
WebScraper->>Crawler: fetch_page_content(urls)
alt Using BeautifulSoup
Crawler->>Crawler: Check robots.txt
Crawler->>Crawler: Fetch HTML (+ Playwright if needed)
Crawler->>Crawler: Extract via selectors/XPath
else Using Tavily
Crawler->>Crawler: TavilyClient.search
end
Crawler-->>WebScraper: {url: content}
WebScraper->>LLM: Generate descriptions<br/>(WebPage, WebSite)
LLM-->>WebScraper: Descriptions
WebScraper->>Graph: Add WebPage/WebSite nodes
WebScraper->>Graph: Add edges (is_part_of, is_scraping)
Graph->>Graph: Index data points & edges
WebScraper-->>User: Result with graph data
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Rationale: This PR exhibits high complexity across multiple dimensions:
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 49
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (16)
cognee/infrastructure/loaders/LoaderEngine.py (3)
80-97: Add None check for file_info to prevent AttributeError.The
filetype.guess()function can returnNonewhen it cannot determine the file type from the magic bytes. Accessingfile_info.extensionandfile_info.mimeon lines 87 and 96 without checking forNonewill cause anAttributeError, leading to unclear error messages instead of properly handling unsupported file types.Apply this diff to handle the case when file type cannot be determined:
file_info = filetype.guess(file_path) + + # Handle case where file type cannot be determined + if file_info is None: + logger.warning(f"Could not determine file type for: {file_path}") + # Try to use fallback loaders that might handle unknown types + for loader_name in self.default_loader_priority: + if loader_name in self._loaders: + loader = self._loaders[loader_name] + # Try loader without type info + if loader.can_handle(extension=None, mime_type=None): + return loader + return None # Try preferred loaders first if preferred_loaders:
140-140: Correct the type hint fromanytoAny.The return type annotation uses lowercase
anyinstead ofAnyfrom thetypingmodule (which is already imported on line 2). Python's built-inanyis a function, not a type annotation.Apply this diff:
- def get_loader_info(self, loader_name: str) -> Dict[str, any]: + def get_loader_info(self, loader_name: str) -> Dict[str, Any]:
105-130: All callers have been correctly updated; fix stale documentation in LoaderInterface.py.Verification confirms all three call sites in
cognee/tasks/ingestion/data_item_to_text_file.py(lines 51, 59, 72) correctly use the new signature withfile_pathinstead offile_stream.However, the abstract
loadmethod docstring incognee/infrastructure/loaders/LoaderInterface.py(lines 62-69) still references the removedfile_streamparameter: "file_stream: If file stream is provided it will be used to process file instead". Remove this stale documentation line to keep the interface contract accurate.cognee/infrastructure/databases/graph/neo4j_driver/deadlock_retry.py (3)
43-60: Critical: Inconsistent retry logic between exception handlers.The two exception handlers use different comparison operators, causing different retry behavior:
- Line 48 (Neo4jError):
if attempt > max_retries:allows the final attempt whenattempt == max_retries- Line 57 (DatabaseUnavailable):
if attempt >= max_retries:prevents the final attempt whenattempt == max_retriesWith
max_retries=10, Neo4jError gets 11 attempts while DatabaseUnavailable gets only 9.Apply this diff to make retry behavior consistent:
except DatabaseUnavailable: - if attempt >= max_retries: + if attempt > max_retries: raise # Re-raise the original error await wait()
12-26: Remove stale parameters from docstring.The docstring lists
initial_backoff,backoff_factor, andjitterparameters that don't exist in the function signature. These parameters likely belong to thecalculate_backofffunction instead.Apply this diff:
""" Decorator that automatically retries an asynchronous function when rate limit errors occur. This decorator implements an exponential backoff strategy with jitter to handle rate limit errors efficiently. Args: max_retries: Maximum number of retry attempts. - initial_backoff: Initial backoff time in seconds. - backoff_factor: Multiplier for exponential backoff. - jitter: Jitter factor to avoid the thundering herd problem. Returns: The decorated async function. """
37-39: Update misleading log message.The log message states "Neo4j rate limit hit" but this decorator handles deadlocks (
DeadlockDetected), transient errors (Neo.TransientError), and database unavailability, not just rate limits.Apply this diff:
backoff_time = calculate_backoff(attempt) logger.warning( - f"Neo4j rate limit hit, retrying in {backoff_time:.2f}s " + f"Neo4j transient error, retrying in {backoff_time:.2f}s " f"Attempt {attempt}/{max_retries}" )cognee/infrastructure/databases/graph/kuzu/adapter.py (3)
224-256: Use the cache lock context manager and ensure DB close on exceptions.Manual acquire/release is error-prone; prefer a with-context once CacheDBInterface.hold_lock is fixed. Also make sure self.close() happens even on exceptions.
Apply this diff:
- def blocking_query(): - lock_acquired = False - try: - if cache_config.shared_kuzu_lock: - self.redis_lock.acquire_lock() - lock_acquired = True - if not self.connection: - logger.info("Reconnecting to Kuzu database...") - self._initialize_connection() - - result = self.connection.execute(query, params) - rows = [] - - while result.has_next(): - row = result.get_next() - processed_rows = [] - for val in row: - if hasattr(val, "as_py"): - val = val.as_py() - processed_rows.append(val) - rows.append(tuple(processed_rows)) - - return rows - except Exception as e: - logger.error(f"Query execution failed: {str(e)}") - raise - finally: - if cache_config.shared_kuzu_lock and lock_acquired: - try: - self.close() - finally: - self.redis_lock.release_lock() + def blocking_query(): + def _exec_once() -> list[tuple]: + if not self.connection: + logger.info("Reconnecting to Kuzu database...") + self._initialize_connection() + result = self.connection.execute(query, params) + rows: list[tuple] = [] + while result.has_next(): + row = result.get_next() + processed_rows = [] + for val in row: + if hasattr(val, "as_py"): + val = val.as_py() + processed_rows.append(val) + rows.append(tuple(processed_rows)) + return rows + + try: + if cache_config.shared_kuzu_lock: + with self.redis_lock.hold_lock(): + try: + return _exec_once() + finally: + self.close() + else: + return _exec_once() + except Exception as e: + logger.error(f"Query execution failed: {str(e)}") + raiseThis assumes CacheDBInterface.hold_lock calls acquire_lock()/release_lock as corrected. Based on learnings.
1427-1461: Bug: get_graph_metrics unpacks a dict and indexes wrong shape; function will fail.get_model_independent_graph_data returns a dict, not (nodes, edges). The current code will throw and returns meaningless metrics.
Apply this minimal, correct implementation:
- # Get basic graph data - nodes, edges = await self.get_model_independent_graph_data() - num_nodes = len(nodes[0]["nodes"]) if nodes else 0 - num_edges = len(edges[0]["elements"]) if edges else 0 + # Get basic counts with dedicated queries + node_count_rows = await self.query("MATCH (n:Node) RETURN COUNT(n)") + edge_count_rows = await self.query("MATCH ()-[r:EDGE]->() RETURN COUNT(r)") + num_nodes = int(node_count_rows[0][0]) if node_count_rows else 0 + num_edges = int(edge_count_rows[0][0]) if edge_count_rows else 0The rest of the method can remain as-is for optional metrics computations.
1789-1799: Bug: UNWIND list formatting in collect_events produces a nested list.event_collection_cypher expects a comma-separated string of quoted IDs, but a raw Python list is formatted as "['a','b']" leading to
UNWIND [['a','b']] AS uid. Quote and join explicitly.Apply this diff:
- query = event_collection_cypher.format(quoted=ids) + quoted_ids = ", ".join(f\"'{uid}'\" for uid in ids) + query = event_collection_cypher.format(quoted=quoted_ids)cognee/tests/test_search_db.py (1)
226-226: Critical: Undefined variable 'text'.Line 226 references
textwhich is not defined in scope. This appears to be a copy-paste error from the earlier refactoring. Should this beexplanation_file_path_quantum?Apply this diff:
- await cognee.add([text], dataset_name) + await cognee.add([explanation_file_path_quantum], dataset_name)cognee/api/v1/update/update.py (1)
66-66: Fix inconsistent docstring.The docstring at line 66 still says "Optional specific dataset UUID" but
dataset_idis now required (line 13).Apply this diff:
- dataset_id: Optional specific dataset UUID to use instead of dataset_name. + dataset_id: UUID of the dataset containing the data to update.cognee/modules/retrieval/graph_completion_retriever.py (1)
235-237: Add missingindex_graph_edges()call afteradd_edges().The codebase establishes a consistent pattern of calling
index_graph_edges()immediately afteradd_edges(). Atcognee/modules/retrieval/graph_completion_retriever.py:237, this call is missing, despite being present in the analogous code atcognee/modules/retrieval/user_qa_feedback.py:78-79and throughout task modules (add_data_points.py,extract_graph_from_data.py,link_enrichments_to_feedback.py, etc.). Add the missing indexing call after line 237.cognee/api/v1/cognify/cognify.py (1)
171-175: Docstring references unsupported parameter ontology_file_pathThe example passes ontology_file_path, but cognify() does not accept it. Update example or expose a supported way (e.g., via config/ontology_config).
Suggested fix:
- await cognee.cognify( - datasets=["research_papers"], - graph_model=ScientificPaper, - ontology_file_path="scientific_ontology.owl" - ) + await cognee.cognify( + datasets=["research_papers"], + graph_model=ScientificPaper, + config={"ontology_config": {"ontology_resolver": my_resolver}} + )cognee/modules/pipelines/operations/run_tasks.py (1)
162-164: Error propagation contradicts comment; likely inverted.Comment says “don’t raise” on incremental loading, but code re-raises unless PipelineRunFailedError. Make re-raise conditional on not incremental.
- # In case of error during incremental loading of data just let the user know the pipeline Errored, don't raise error - if not isinstance(error, PipelineRunFailedError): - raise error + # If not incremental, re-raise to fail the run; otherwise, yield the error and continue + if not incremental_loading: + raisecognee-mcp/src/server.py (1)
116-127: Fix runtime error:LLMGateway.read_query_prompt()method does not exist.The mistral adapter at
cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/mistral/adapter.py:121callsLLMGateway.read_query_prompt(system_prompt), butread_query_promptis a standalone function incognee/infrastructure/llm/prompts/read_query_prompt.py, not a class method. Add the import and call it directly:from cognee.infrastructure.llm.prompts.read_query_prompt import read_query_prompt # Then at line 121: system_prompt = read_query_prompt(system_prompt)cognee/tasks/storage/index_graph_edges.py (1)
52-57: Bug: edge type counting ignores tuple’s type (breaks on Neo4j path).You only count relationship_name found inside a props dict. Many adapters return edges as (src, dst, type, props) and don’t duplicate the type in props, so no edge types are counted and nothing gets indexed.
Fix by extracting from the 3rd tuple element first, then falling back to props or model attributes.
Apply:
- edge_types = Counter( - item.get("relationship_name") - for edge in edges_data - for item in edge - if isinstance(item, dict) and "relationship_name" in item - ) + edge_types: Counter[str] = Counter() + for edge in edges_data: + rel = None + # Common tuple shape: (source_id, target_id, rel_type, props) + if isinstance(edge, (list, tuple)) and len(edge) >= 3 and isinstance(edge[2], str): + rel = edge[2] + # Fallback: search props for 'relationship_name' + elif isinstance(edge, (list, tuple)): + rel = next( + (it.get("relationship_name") for it in edge if isinstance(it, dict) and "relationship_name" in it), + None, + ) + else: + # If it's a model-like EdgeData object + rel = getattr(edge, "relationship_name", None) or getattr(edge, "type", None) + if rel: + edge_types[rel] += 1Based on learnings.
🧹 Nitpick comments (79)
cognee/api/health.py (1)
289-294: LGTM! Good refactor for maintainability.Deriving
critical_compsdynamically fromcritical_checkseliminates the risk of keeping a hard-coded list in sync. The logic correctly identifies when critical components are unhealthy.Minor observation: The
llm_providerandembedding_servicechecks are commented as "non-critical" (lines 188, 217) but reside incritical_checks. They effectively behave as non-critical because they returnDEGRADEDrather thanUNHEALTHYon failure. Moving them tonon_critical_checkswould align the code structure with the comments, though the current approach works correctly.cognee/modules/data/methods/get_dataset_data.py (1)
15-15: Add index todata_sizecolumn for ORDER BY performance.The
data_sizecolumn incognee/modules/data/models/Data.py(line 36) lacks an index but is used in an ORDER BY clause in the query. For datasets with large numbers of records, this can cause performance degradation. Addindex=Trueto the column definition or document why sorting without an index is acceptable for this use case.cognee/api/v1/responses/dispatch_function.py (2)
59-63: Consider deriving valid_search_types from the SearchType enum.The hardcoded fallback list on line 62 duplicates knowledge about valid search types that should come from a single source of truth. If the
SearchTypeenum changes in the future, this fallback list must be manually updated, creating a maintenance burden and risk of inconsistency.Apply this diff to derive the list programmatically from the enum:
+from enum import Enum + async def handle_search(arguments: Dict[str, Any], user) -> list: """Handle search function call""" search_tool = next((tool for tool in DEFAULT_TOOLS if tool["name"] == "search"), None) required_params = ( search_tool["parameters"].get("required", []) if search_tool else ["search_query"] ) query = arguments.get("search_query") if not query and "search_query" in required_params: return "Error: Missing required 'search_query' parameter" search_type_str = arguments.get("search_type", "GRAPH_COMPLETION") valid_search_types = ( search_tool["parameters"]["properties"]["search_type"]["enum"] if search_tool - else ["CODE", "GRAPH_COMPLETION", "NATURAL_LANGUAGE"] + else [member.name for member in SearchType] )
62-62: INSIGHTS removal is consistent—enum and fallback list properly synchronized.Verification confirms the
SearchTypeenum has been updated to remove the INSIGHTS member, and the fallback list on line 62 now correctly contains only valid types:["CODE", "GRAPH_COMPLETION", "NATURAL_LANGUAGE"]. The enum consistency concern is resolved.Optional: Consider deriving the fallback list programmatically from the enum's valid members to avoid duplication and reduce maintenance burden if new search types are added in the future. The current hardcoded approach works correctly but increases risk of divergence between enum and fallback definitions.
cognee/infrastructure/llm/prompts/feedback_user_context_prompt.txt (1)
4-5: Consider hyphenating compound adjectives for formal style.For more formal grammar, consider "one-paragraph" and "human-readable" with hyphens. However, the current phrasing is clear and functional for an LLM prompt.
-Provide a one paragraph human readable summary of this interaction context, +Provide a one-paragraph human-readable summary of this interaction context,cognee-frontend/src/app/(graph)/GraphVisualization.tsx (1)
220-232: Make zoomToFit wrapper return void; dropundefined as any.Returning a value from a
void-typed method is unnecessary and theas anyescape hatch is avoidable. No‑op early and call through.Apply:
- const zoomToFit: ForceGraphMethods["zoomToFit"] = ( + const zoomToFit: ForceGraphMethods["zoomToFit"] = ( durationMs?: number, padding?: number, nodeFilter?: (node: NodeObject) => boolean ) => { if (!graphRef.current) { console.warn("GraphVisualization: graphRef not ready yet"); - return undefined as any; + return; } - return graphRef.current.zoomToFit?.(durationMs, padding, nodeFilter); + graphRef.current.zoomToFit?.(durationMs, padding, nodeFilter); };Optional: set sensible defaults matching resize behavior.
- const zoomToFit: ForceGraphMethods["zoomToFit"] = ( - durationMs?: number, - padding?: number, - nodeFilter?: (node: NodeObject) => boolean - ) => { + const zoomToFit: ForceGraphMethods["zoomToFit"] = ( + durationMs = 1000, + padding = 50, + nodeFilter?: (node: NodeObject) => boolean + ) => {cognee/tests/subprocesses/writer.py (1)
8-16: Use string UUIDs for DB parameters.Kuzu parameter binding may not accept
uuid.UUIDdirectly. Emitstrto avoid driver/type issues.Apply:
- document = PdfDocument( - id=uuid.uuid4(), + document = PdfDocument( + id=str(uuid.uuid4()), name=name, raw_data_location=name, external_metadata="test_external_metadata", mime_type="test_mime", )Optional: if this test should intentionally keep the DB handle open, keep as is; otherwise consider closing the adapter when done.
cognee/api/v1/ui/ui.py (1)
556-561:pid_callbacknow receives a tuple; widen its type hint.
start_uideclarespid_callback: Callable[[int], None], but here you pass(pid, container_name). Update the annotation (and docstring) to reflect both forms.Outside this hunk, change the signature to:
def start_ui( pid_callback: Callable[[int | tuple[int, str]], None], ... ) -> Optional[subprocess.Popen]: ...This matches usage in
cognee/cli/_cognee.pyand avoids type-checker noise.cognee/tasks/web_scraper/models.py (2)
1-3: Avoid shared mutable defaults; use Field(default_factory=...) for metadata.Using a bare dict literal as a default creates a shared mutable default across instances. Switch to Field(default_factory=...) and keep typing consistent.
Apply this diff:
-from cognee.infrastructure.engine import DataPoint +from cognee.infrastructure.engine import DataPoint +from pydantic import Field @@ - metadata: dict = {"index_fields": ["name", "description", "content"]} + metadata: dict = Field(default_factory=lambda: {"index_fields": ["name", "description", "content"]}) @@ - metadata: dict = {"index_fields": ["name", "description"]} + metadata: dict = Field(default_factory=lambda: {"index_fields": ["name", "description"]}) @@ - metadata: dict = {"index_fields": ["name", "description"]} + metadata: dict = Field(default_factory=lambda: {"index_fields": ["name", "description"]})If DataPoint.metadata uses a dedicated alias/type (e.g., MetaData), consider aligning the annotation to that alias for consistency. Based on learnings.
Also applies to: 19-20, 33-34, 46-46
42-42: Constrain ScrapingJob.status to a finite set (Enum or Literal).Prevent invalid states by using an Enum or Literal[“active”, “paused”, “completed”, “failed”].
Example:
+from enum import Enum + +class ScrapingStatus(str, Enum): + active = "active" + paused = "paused" + completed = "completed" + failed = "failed" @@ - status: str # "active", "paused", "completed", "failed" + status: ScrapingStatuscognee/modules/pipelines/operations/run_tasks_data_item.py (2)
100-105: Telemetry naming: pass the human-friendly pipeline_name instead of pipeline_id.run_tasks_with_telemetry emits events keyed by pipeline_name. Passing IDs degrades observability.
Apply this diff:
- pipeline_name=pipeline_id, + pipeline_name=pipeline_name,Repeat in the regular path:
- pipeline_name=pipeline_id, + pipeline_name=pipeline_name,Alternatively rename the parameter in run_tasks_data_item_regular to accept pipeline_name explicitly for clarity.
Also applies to: 179-185
106-111: Unify generator yield shape with the docstring/type hint.Function annotations/docstring say the generators yield dicts; currently they yield PipelineRunYield objects. Wrap them for consistency (and include data_id in incremental).
Incremental:
- yield PipelineRunYield( - pipeline_run_id=pipeline_run_id, - dataset_id=dataset.id, - dataset_name=dataset.name, - payload=result, - ) + yield { + "run_info": PipelineRunYield( + pipeline_run_id=pipeline_run_id, + dataset_id=dataset.id, + dataset_name=dataset.name, + payload=result, + ), + "data_id": data_id, + }Regular:
- yield PipelineRunYield( - pipeline_run_id=pipeline_run_id, - dataset_id=dataset.id, - dataset_name=dataset.name, - payload=result, - ) + yield { + "run_info": PipelineRunYield( + pipeline_run_id=pipeline_run_id, + dataset_id=dataset.id, + dataset_name=dataset.name, + payload=result, + ) + }Also applies to: 186-191
cognee-mcp/src/cognee_client.py (3)
78-89: Use_get_headers()for consistency and fix hardcoded filename.Two issues here:
- Headers are constructed inline instead of using the
_get_headers()helper, creating inconsistency with other methods.- The hardcoded filename
"data.txt"may be misleading when uploading non-text data.Apply this diff:
- files = {"data": ("data.txt", str(data), "text/plain")} + files = {"data": ("data", str(data), "text/plain")} form_data = { "datasetName": dataset_name, } if node_set is not None: form_data["node_set"] = json.dumps(node_set) response = await self.client.post( endpoint, files=files, data=form_data, - headers={"Authorization": f"Bearer {self.api_token}"} if self.api_token else {}, + headers={"Authorization": f"Bearer {self.api_token}"} if self.api_token else None, )Note: Using
Noneinstead of{}for empty headers is more idiomatic with httpx.
94-96: Document the redirect_stdout pattern.The
redirect_stdout(sys.stderr)pattern is used throughout but not explained. Consider adding a comment explaining why stdout is redirected to stderr in direct mode.
85-92: Consider wrapping HTTP exceptions for better error messages.HTTP errors from
httpx(e.g.,HTTPStatusError,RequestError) will propagate directly to callers. For better user experience, consider catching these and raising custom exceptions with more context about what operation failed.Example:
try: response = await self.client.post(...) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: raise CogneeAPIError(f"Failed to add data: {e}") from ecognee-mcp/README.md (3)
132-189: API Mode: add explicit security and networking cautions.
- Note that API_TOKEN will end up in container env and shell history; advise using Docker secrets or env files with least privilege.
- Caution that --network host exposes container ports to host namespace; recommend only for dev or document risks.
- Mention Linux case where host.docker.internal may not exist unless configured; you already show alternatives—link to Docker docs here.
123-131: Clarify transport config parity (env vs args).Add a one‑liner mapping table for SSE/HTTP paths (e.g., SSE at /sse, HTTP at /mcp) to avoid ambiguity when switching between Docker and direct modes.
317-386: API Mode limitations: cross‑link exact tool behavior.For each limited tool (codify, prune, status, list_data by dataset), add a quick pointer to the equivalent API endpoint or note “not supported via API.” Helps users decide when to use Direct vs API.
cognee/infrastructure/llm/prompts/feedback_reaction_prompt.txt (1)
9-14: Harden output format and non‑speculative guardrails.Add: “Do not fabricate facts; if information is missing, state the limitation briefly.” Also require single‑line “Answer:” and “Explanation:” to simplify parsing.
cognee/cli/commands/cognify_command.py (1)
125-128: Include docs_url for richer CLI errors.When raising CliCommandException, pass
docs_url=self.docs_urlto improve UX.- raise CliCommandException(str(e), error_code=1) from e + raise CliCommandException(str(e), error_code=1, docs_url=self.docs_url) from e - raise CliCommandException(f"Error during cognification: {str(e)}", error_code=1) from e + raise CliCommandException( + f"Error during cognification: {str(e)}", + error_code=1, + docs_url=self.docs_url, + ) from ecognee/tests/test_kuzu.py (2)
41-45: Validate test data paths and prefer Pathlib.Add existence assertions for both files; use Path objects for clarity and OS safety.
- explanation_file_path_nlp = os.path.join( - pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt" - ) + base = pathlib.Path(__file__).parent + explanation_file_path_nlp = base / "test_data" / "Natural_language_processing.txt" + assert explanation_file_path_nlp.exists() ... - explanation_file_path_quantum = os.path.join( - pathlib.Path(__file__).parent, "test_data/Quantum_computers.txt" - ) + explanation_file_path_quantum = base / "test_data" / "Quantum_computers.txt" + assert explanation_file_path_quantum.exists()Also applies to: 46-51
85-87: Relax brittle history count.Exact 6 may fluctuate with pipeline changes; assert “>= expected minimum” or derive from executed calls.
- assert len(history) == 6, "Search history is not correct." + assert len(history) >= 6, f"Expected at least 6 history entries, got {len(history)}"cognee/tests/test_add_docling_document.py (2)
18-20: Ensure cleanup even on failure.Wrap prune calls in try/finally or add a final prune to avoid cross‑test contamination.
- await cognee.prune.prune_data() - await cognee.prune.prune_system(metadata=True) + try: + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + ... + finally: + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True)
35-53: Reduce brittleness of assertions.
- Color assertion may fail on minor extraction variance; prefer set inclusion against tokenized words.
- For the “light bulbs” check, allow synonyms (e.g., “zero,” “don’t”) or use regex.
- lowercase_answer = answer[0].lower() - assert ("no" in lowercase_answer) or ("none" in lowercase_answer) + import re + assert re.search(r"\b(no|none|zero|don'?t)\b", answer[0].lower())cognee/tests/subprocesses/simple_cognify_2.py (1)
24-31: Graceful event loop teardown.Add
loop.close()aftershutdown_asyncgens()to release resources.finally: loop.run_until_complete(loop.shutdown_asyncgens()) + loop.close()cognee-mcp/src/__init__.py (1)
1-4: Constrain fallback import to avoid shadowing unrelated top-level modules and to preserve real import errors.Current try/except will also catch ImportError raised inside
.serverand then import a possibly unrelatedserveron sys.path. Gate the fallback to only fire when running as a script (__package__ is None) and re-raise otherwise.Apply:
-try: - from .server import main as server_main -except ImportError: - from server import main as server_main +try: + from .server import main as server_main +except ImportError as e: + # Only fall back when executed as a script where relative imports don't work. + if __package__ is None: + from server import main as server_main # local dev/script mode + else: + raisecognee/tests/test_library.py (2)
105-109: Avoid writing visualization output to the user’s home in CI.visualize_graph() writes to HOME when no path is provided. Provide a temp path to keep tests hermetic.
- await visualize_graph() + import tempfile, os + with tempfile.TemporaryDirectory() as tmp: + out = os.path.join(tmp, "graph.html") + await visualize_graph(destination_file_path=out)
92-103: Scope all three unscoped search calls with dataset_ids to prevent cross-test flakiness.The test includes three unscoped
cognee.search()calls (lines 54, 68, and 92) that can pull results from other datasets/tests, causing flakiness. Thecognee.search()function acceptsdataset_idsparameter; usepipeline_run_obj.dataset_idwhich is available at line 83.search_results = await cognee.search( - query_type=SearchType.GRAPH_COMPLETION, query_text=random_node_name + query_type=SearchType.GRAPH_COMPLETION, + query_text=random_node_name, + dataset_ids=[pipeline_run_obj.dataset_id], ) search_results = await cognee.search( - query_type=SearchType.SUMMARIES, query_text=random_node_name + query_type=SearchType.SUMMARIES, + query_text=random_node_name, + dataset_ids=[pipeline_run_obj.dataset_id], ) search_results = await cognee.search( query_type=SearchType.GRAPH_COMPLETION, query_text="What information do you contain?", + dataset_ids=[pipeline_run_obj.dataset_id], )cognee/tasks/feedback/models.py (1)
21-26: Minor: align with base typing and doc clarity.Consider keeping
belongs_to_set: Optional[List[DataPoint]](NodeSet is a DataPoint) to match base expectations, and add brief field docstrings if used in public APIs. Optional only.cognee/tasks/storage/add_data_points.py (2)
31-37: Docstring drift: edge indexing is now unconditional.Update the comment to match behavior.
- - Optionally updates the edge index via `index_graph_edges`. + - Updates the edge index via `index_graph_edges`.
71-76: Add failure boundaries to avoid partial writes or full‑pipeline failure on indexing errors.A failure in indexing (nodes or edges) after successful graph writes currently fails the entire call with no rollback. Log and continue, or surface a structured warning.
- await graph_engine.add_nodes(nodes) - await index_data_points(nodes) + await graph_engine.add_nodes(nodes) + try: + await index_data_points(nodes) + except Exception as e: + # Do not fail persistence; surface degraded retrieval explicitly. + # Consider metrics/telemetry hook here. + print(f"Warning: node indexing failed: {e}") @@ - await graph_engine.add_edges(edges) - await index_graph_edges(edges) + await graph_engine.add_edges(edges) + try: + if edges: + await index_graph_edges(edges) + except Exception as e: + print(f"Warning: edge indexing failed: {e}")If you need strict atomicity, confirm whether your graph/vector backends support transactions to implement a real rollback instead.
cognee/tests/test_concurrent_subprocess_access.py (1)
21-42: Ensure the Redis‑based lock is actually enabled for this test.Defaults in CacheConfig set
caching=Falseandshared_kuzu_lock=False. Explicitly enable in the test or skip if Redis isn’t available.async def concurrent_subprocess_access(): + # Ensure shared Redis lock is enabled for Kùzu + os.environ.setdefault("CACHE_CACHING", "true") + os.environ.setdefault("CACHE_SHARED_KUZU_LOCK", "true") + os.environ.setdefault("CACHE_HOST", os.environ.get("CACHE_HOST", "127.0.0.1")) + os.environ.setdefault("CACHE_PORT", os.environ.get("CACHE_PORT", "6379"))If CI doesn’t provide Redis, guard with a skip:
# at top of file +# import socket, pytest +# def _redis_available(host, port): +# try: s=socket.create_connection((host, int(port)), timeout=1); s.close(); return True +# except OSError: return False +# if not _redis_available(os.environ.get("CACHE_HOST","127.0.0.1"), os.environ.get("CACHE_PORT","6379")): +# pytest.skip("Redis not available; skipping lock test", allow_module_level=True)cognee/api/v1/cognify/cognify.py (2)
54-55: Document and define semantics for data_per_batchAdd data_per_batch to the Args section and clarify precedence vs per-Task task_config batch_size (currently hardcoded to 10 below). Consider threading data_per_batch into task_config defaults to avoid divergence.
Apply:
@@ - Args: + Args: @@ temporal_cognify: bool = False, - data_per_batch: int = 20, + data_per_batch: int = 20, + Number of data points processed per batch across the pipeline. If a Task + specifies task_config["batch_size"], that value takes precedence unless + overridden by this argument.
38-38: Unused lockupdate_status_lock is unused; remove to avoid dead code.
- update_status_lock = asyncio.Lock() + # removed unused update_status_lockcognee-mcp/entrypoint.sh (1)
50-52: DB readiness sleep may flakeReplace fixed sleep with a small wait loop against DB (or HTTP health) to reduce startup races.
-# Add startup delay to ensure DB is ready -sleep 2 +# Wait for DB/HTTP health (example: HTTP on $HTTP_PORT if applicable) +for i in {1..30}; do + curl -sf "http://127.0.0.1:${HTTP_PORT}/health" && break + sleep 1 +done || echo "Warning: health check did not pass; continuing..."cognee/tasks/feedback/__init__.py (1)
1-13: Export ImprovedAnswerResponse for convenienceExpose ImprovedAnswerResponse to avoid reaching into submodule for types.
-from .generate_improved_answers import generate_improved_answers +from .generate_improved_answers import generate_improved_answers, ImprovedAnswerResponse @@ "link_enrichments_to_feedback", "FeedbackEnrichment", + "ImprovedAnswerResponse", ]cognee/tasks/feedback/generate_improved_answers.py (3)
6-9: Remove unused importsLLMGateway and resolve_edges_to_text are not used here.
-from cognee.infrastructure.llm import LLMGateway @@ -from cognee.modules.graph.utils import resolve_edges_to_text
72-81: Minor: remove else after return (pylint R1705)Simplify control flow.
- if completion: - enrichment.improved_answer = completion.answer - enrichment.new_context = new_context_text - enrichment.explanation = completion.explanation - return enrichment - else: - logger.warning( - "Failed to get structured completion from retriever", question=enrichment.question - ) - return None + if completion: + enrichment.improved_answer = completion.answer + enrichment.new_context = new_context_text + enrichment.explanation = completion.explanation + return enrichment + logger.warning( + "Failed to get structured completion from retriever", question=enrichment.question + ) + return None
92-101: Nit: top_k default is 20 but not documented elsewhereIf this should match a global setting, consider centralizing or documenting.
cognee/cli/_cognee.py (3)
184-219: Harden shutdown: handle missing Docker, add kill fallback, and dedupe safe-printing.
- If Docker CLI is missing (FileNotFoundError), emit a clear warning instead of a silent pass.
- After sending SIGTERM (or docker stop), consider a short wait and fallback to SIGKILL/docker rm -f if still alive.
- Repeated try/except around fmt.echo/success/warning can be centralized via a small safe_echo wrapper.
@@ - # First, stop Docker container if running + # First, stop Docker container if running if docker_container: try: result = subprocess.run( ["docker", "stop", docker_container], capture_output=True, timeout=10, check=False, ) @@ except subprocess.TimeoutExpired: @@ - except Exception: - pass + except FileNotFoundError: + try: + fmt.warning("Docker CLI not found; skipping container shutdown.") + except (BrokenPipeError, OSError): + pass + except Exception: + passOptionally, after process termination below, poll briefly and escalate to SIGKILL if still alive.
Please confirm whether you want me to provide a concrete SIGKILL escalation patch and a minimal safe_echo helper.
220-245: Terminate robustness: add wait-and-escalate path for stubborn processes.Currently we send SIGTERM/taskkill once without confirming exit. Add a short wait and, if needed, escalate (SIGKILL on Unix, second taskkill on Windows with error check). This prevents orphans in CI.
@@ - for pid in spawned_pids: + for pid in spawned_pids: try: if hasattr(os, "killpg"): @@ - os.killpg(pgid, signal.SIGTERM) + os.killpg(pgid, signal.SIGTERM) + try: + os.waitpid(-pgid, os.WNOHANG) # non-blocking check + except Exception: + pass + # Optional: small sleep + SIGKILL if still running @@ else: # Windows: Use taskkill to terminate process and its children subprocess.run( ["taskkill", "/F", "/T", "/PID", str(pid)], capture_output=True, check=False, )
257-266: Document tuple-aware pid_callback.Annotating the callback clarifies tuple support and prevents misuse.
- def pid_callback(pid_or_tuple): + from typing import Union, Tuple + def pid_callback(pid_or_tuple: Union[int, Tuple[int, str]]) -> None: nonlocal spawned_pids, docker_containercognee/tasks/feedback/link_enrichments_to_feedback.py (1)
16-30: Optional: include feedback_weight and widen metadata typing.Edges elsewhere carry
feedback_weight(see GraphCompletionRetriever.save_qa). Aligning metadata helps downstream analysis.-def _create_edge_tuple( - source_id: UUID, target_id: UUID, relationship_name: str -) -> Tuple[UUID, UUID, str, dict]: +from typing import Dict, Any + +def _create_edge_tuple( + source_id: UUID, target_id: UUID, relationship_name: str +) -> Tuple[UUID, UUID, str, Dict[str, Any]]: @@ { "relationship_name": relationship_name, "source_node_id": source_id, "target_node_id": target_id, "ontology_valid": False, + "feedback_weight": 0, },cognee/tasks/ingestion/save_data_item_to_storage.py (2)
21-26: HTMLContent validation is too naive.Checking only “<” and “>” yields many false positives/negatives. Consider a minimal parse attempt (e.g., strip and require at least one tag-like pattern) or defer to caller.
class HTMLContent(str): def __new__(cls, value: str): - if not ("<" in value and ">" in value): + import re + if not re.search(r"<[a-zA-Z][^>]*>", value or ""): raise ValueError("Not valid HTML-like content") return super().__new__(cls, value)
38-43: Avoid string-based type detection for Docling.
"docling" in str(type(...))is brittle. Prefer a guarded import and isinstance check; fall back gracefully if Docling isn’t installed.- if "docling" in str(type(data_item)): - from docling_core.types import DoclingDocument - - if isinstance(data_item, DoclingDocument): - data_item = data_item.export_to_text() + try: + from docling_core.types import DoclingDocument # type: ignore + if isinstance(data_item, DoclingDocument): + data_item = data_item.export_to_text() + except ImportError: + passcognee/tests/test_neptune_analytics_vector.py (1)
55-60: Avoid IndexError if search returns no results.Add a precondition check and a helpful failure message.
- random_node = (await vector_engine.search("Entity_name", "Quantum computer"))[0] + results = await vector_engine.search("Entity_name", "Quantum computer") + assert results, "Vector engine returned no nodes for 'Quantum computer'" + random_node = results[0]cognee/modules/retrieval/graph_completion_cot_retriever.py (1)
73-80: Too many locals in _run_cot_completion; consider extracting helpers.Pylint flags high local var count. Extract prompt-building and validation blocks into small private helpers to reduce complexity.
Would you like me to propose a small split into
_build_user_and_system_prompts(...)and_validate_and_followup(...)?cognee/infrastructure/databases/cache/redis/RedisAdapter.py (1)
7-11: Constructor has too many positional args; prefer a config object and timeouts.Pass a small config/dataclass (host, port, timeouts) and keep args keyword-only to avoid misuse. Consider adding socket_connect_timeout/socket_timeout on the client for resilience.
cognee/modules/pipelines/operations/run_tasks.py (1)
92-115: Safer concurrency: capture per-task exceptions without aborting the batch.gather() without return_exceptions=True cancels siblings on first failure and bypasses your later “errored_results” check. Consider collecting exceptions and converting them to PipelineRunErrored entries for uniform handling.
- results.extend(await asyncio.gather(*data_item_tasks)) + batch_results = await asyncio.gather(*data_item_tasks, return_exceptions=True) + # Normalize exceptions into error entries to be handled downstream + for br in batch_results: + if isinstance(br, Exception): + results.append({"run_info": PipelineRunErrored(payload=repr(br))}) + else: + results.append(br)cognee/tasks/feedback/extract_feedback_interactions.py (4)
138-151: Be robust to non-UUID node ids.Graph ids may not be UUIDs. Fall back to a stable uuid5 when parsing fails; reduces noisy error logging.
- enrichment = FeedbackEnrichment( + # Normalize IDs to UUIDs + try: + feedback_uuid = UUID(str(feedback_node_id)) + except ValueError: + feedback_uuid = uuid5(NAMESPACE_OID, str(feedback_node_id)) + try: + interaction_uuid = UUID(str(interaction_node_id)) + except ValueError: + interaction_uuid = uuid5(NAMESPACE_OID, str(interaction_node_id)) + + enrichment = FeedbackEnrichment( id=str(uuid5(NAMESPACE_OID, f"{question_text}_{interaction_node_id}")), text="", question=question_text, original_answer=original_answer_text, improved_answer="", - feedback_id=UUID(str(feedback_node_id)), - interaction_id=UUID(str(interaction_node_id)), + feedback_id=feedback_uuid, + interaction_id=interaction_uuid, belongs_to_set=None, context=context_summary_text, feedback_text=feedback_text, new_context="", explanation="", )
153-157: Remove unnecessary else-after-return.- if _has_required_feedback_fields(enrichment): - return enrichment - else: - logger.warning("Skipping invalid feedback item", interaction=str(interaction_node_id)) - return None + if _has_required_feedback_fields(enrichment): + return enrichment + logger.warning("Skipping invalid feedback item", interaction=str(interaction_node_id)) + return None
180-186: Parameter ‘subgraphs’ is unused.Either consume it (prefer: allow passing pre-fetched (nodes, edges)) or drop it from the signature to avoid confusion.
Would you like me to wire subgraphs as an optional (nodes, edges) tuple and fall back to fetching when None?
82-95: Date sorting should parse timestamps.String sort is fragile unless guaranteed ISO 8601. Parse to datetime for correctness.
- def _recency_key(pair): + from datetime import datetime + def _to_dt(v): + try: return datetime.fromisoformat(v.replace("Z", "+00:00")) + except Exception: return datetime.min + def _recency_key(pair): _, (_, interaction_props) = pair - created_at = interaction_props.get("created_at") or "" - updated_at = interaction_props.get("updated_at") or "" - return (created_at, updated_at) + created_at = _to_dt(interaction_props.get("created_at") or "") + updated_at = _to_dt(interaction_props.get("updated_at") or "") + return (created_at, updated_at)cognee/tests/test_neo4j.py (2)
50-56: Avoid IndexError before asserting results exist.Assert non-emptiness before indexing the first result.
- vector_engine = get_vector_engine() - random_node = (await vector_engine.search("Entity_name", "Quantum computer"))[0] + vector_engine = get_vector_engine() + search_hits = await vector_engine.search("Entity_name", "Quantum computer") + assert search_hits, "No vector hits for 'Quantum computer'." + random_node = search_hits[0]
86-90: Brittle history count.Hard-coding len(history) == 6 will drift as flows evolve. Prefer >= expected minimum or assert specific recent entries.
If history semantics are strict, point me to the spec and I’ll align the assertion precisely.
cognee/infrastructure/llm/structured_output_framework/litellm_instructor/llm/mistral/adapter.py (2)
98-101: Catch Pydantic validation as well.Structured parsing can raise
pydantic.ValidationError. Consider adding:- except JSONSchemaValidationError as e: + except (JSONSchemaValidationError, Exception) as e:Or explicitly import and catch
ValidationErrorto provide clearer error messages.
1-6: Remove unused imports.
acompletionis unused. Drop it to avoid confusion.cognee/tasks/web_scraper/utils.py (1)
39-41: Docstring return type mismatch.Function returns
Dict[str, str]for both paths; comment says "dict for Tavily". Clarify it’s string content for Tavily too.cognee-mcp/src/server.py (6)
13-13: Import style nit.Prefer
from mcp import typesto avoid star-import-like module path usage.
154-159: Remove unused importKnowledgeGraph.
KnowledgeGraphis imported but not used after loading a custom model. Drop it.
458-459: Typo: "succesfully".- logger.info("Codify process finished succesfully.") + logger.info("Codify process finished successfully.")
598-627: Tighten control flow to reduce nested elifs.A few
elifblocks follow areturnpath (Pylint R1705). Refactor to early returns for readability. No behavior change.
701-714: Early return inside tool on API mode is fine; consider mirroring in docstring.Docstring for
list_datamentions both modes; note the API-mode limitation fordataset_idin docs to avoid surprises.
1029-1093: Global init is OK; add a null-guard for defensive safety.If tools are invoked programmatically without
main(),cognee_clientwould beNone. Consider asserting in each tool or initializing a default client whencognee_client is None.cognee/api/v1/add/add.py (1)
90-101: Docs/examples: clarify URL ingestion path and prerequisites.
- Note that Tavily requires
TAVILY_API_KEYand BeautifulSoup requiresbeautifulsoup4(and optionallylxml/html5lib).- In examples, consider showing
SoupCrawlerConfig(extraction_rules=...)explicitly.Also applies to: 169-181
cognee/tasks/web_scraper/config.py (1)
13-24: Config models LGTM; minor: consider stricter header typing.If practical, type
headersasDict[Literal["User-Agent"], str]or aMapping[str, str]to match httpx. Optional.cognee/tasks/storage/index_graph_edges.py (2)
83-88: Defensive default for batch size.Avoid AttributeError if embedding_engine/get_batch_size is absent.
- batch_size = vector_engine.embedding_engine.get_batch_size() + # Be defensive: not all engines expose get_batch_size() + batch_size_getter = getattr(getattr(vector_engine, "embedding_engine", None), "get_batch_size", None) + batch_size = batch_size_getter() if callable(batch_size_getter) else 20
45-47: Clearer deprecation message.Message is confusing. Suggest rewording.
- logger.warning( - "Your graph edge embedding is deprecated, please pass edges to the index_graph_edges directly." - ) + logger.warning( + "Implicit edge loading in index_graph_edges() is deprecated; pass edges_data explicitly." + )cognee/tasks/web_scraper/bs4_crawler.py (2)
353-368: Close Playwright page/context explicitly to avoid leaks.Ensure page and context are closed even on exceptions.
- async with async_playwright() as p: - browser = await p.chromium.launch(headless=True) - try: - context = await browser.new_context() - page = await context.new_page() - await page.goto( - url, - wait_until="networkidle", - timeout=int((timeout or self.timeout) * 1000), - ) - if js_wait: - await asyncio.sleep(js_wait) - return await page.content() - finally: - await browser.close() + async with async_playwright() as p: + browser = await p.chromium.launch(headless=True) + context = await browser.new_context() + page = await context.new_page() + try: + await page.goto( + url, + wait_until="networkidle", + timeout=int((timeout or self.timeout) * 1000), + ) + if js_wait: + await asyncio.sleep(js_wait) + return await page.content() + finally: + try: + await page.close() + finally: + await context.close() + await browser.close()
23-25: Minor: clarify install hint.Playwright usually needs both package install and browser install.
- "Failed to import playwright, make sure to install using pip install playwright>=1.9.0" + "Playwright not installed. Run: pip install playwright && playwright install"cognee/tests/tasks/web_scraping/web_scraping_test.py (1)
120-137: Mark cron job test async for pytest.-async def test_cron_web_scraper(): +@pytest.mark.asyncio +async def test_cron_web_scraper():cognee/tasks/web_scraper/web_scraper_task.py (8)
11-11: Don’t freeze env vars at import; defer TAVILY_API_KEY to runtime.Defaulting params to os.getenv(...) is evaluated at import time. Use Optional and resolve inside check_arguments.
@@ -from typing import Union, List +from typing import Union, List, Optional @@ async def cron_web_scraper_task( - tavily_api_key: str = os.getenv("TAVILY_API_KEY"), + tavily_api_key: Optional[str] = None, @@ async def web_scraper_task( - tavily_api_key: str = os.getenv("TAVILY_API_KEY"), + tavily_api_key: Optional[str] = None, @@ def check_arguments(tavily_api_key, extraction_rules, tavily_config, soup_crawler_config): - preferred_tool = "beautifulsoup" + preferred_tool = "beautifulsoup" + # fallback to env only at runtime + tavily_api_key = tavily_api_key or os.getenv("TAVILY_API_KEY")Also applies to: 49-53, 123-127, 350-366
260-263: Plumb real HTTP metadata; avoid hard-coded 200/text/html.Status code, content type, and last_modified are hard-coded. Prefer returning these from fetch_page_content (and underlying fetchers) and set WebPage fields/description accordingly.
Short-term: default when unknown, but don’t claim 200/text/html if not verified.
Also applies to: 271-274
155-157: Validate URLs before fetching to reduce SSRF and bad input.Filter to http/https and optionally block private/loopback ranges before calling fetch_page_content.
@@ async def web_scraper_task( - if isinstance(url, str): - url = [url] + if isinstance(url, str): + url = [url] + # basic scheme guard + url = [u for u in url if urlparse(u).scheme in ("http", "https")] + if not url: + raise ValueError("No valid http(s) URLs provided") @@ - results = await fetch_page_content( + results = await fetch_page_content( urls=url,For stronger defense, add CIDR checks to exclude RFC1918, loopback, link-local.
Also applies to: 194-201
3-6: Generalize docstring (not Kuzu-specific).Module works with the configured graph engine; remove Kuzu mention to avoid misleading docs.
- and ScrapingJob data points, and store them in a Kuzu graph database. It supports + and ScrapingJob data points, and store them in the configured graph database. It supports
393-396: Remove unnecessary else after return (R1705).- if full_path.startswith(base_path): - return full_path[len(base_path) :].lstrip("/") - else: - return full_path.lstrip("/") + if full_path.startswith(base_path): + return full_path[len(base_path) :].lstrip("/") + return full_path.lstrip("/")
44-53: Too many args; consider an options object.Both functions take 7 params and web_scraper_task has many locals/statements. Consider a WebScraperOptions dataclass to group configs and reduce churn in call sites.
Also applies to: 118-127
106-106: Minor: rely on logger timestamps.The logger already timestamps; you can drop datetime.now() in the message for cleaner logs.
104-115: Optional: return the scheduled job id.When scheduling, consider returning the job id for observability/control.
- if not scheduler.running: - scheduler.start() - return + if not scheduler.running: + scheduler.start() + return job_name
cognee/api/v1/add/add.py
Outdated
| except ImportError: | ||
| logger.debug(f"Unable to import {str(ImportError)}") | ||
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import error logging uses the exception type, not the instance.
logger.debug(f"Unable to import {str(ImportError)}") prints the class. Capture the exception.
-except ImportError:
- logger.debug(f"Unable to import {str(ImportError)}")
- pass
+except ImportError as e:
+ logger.debug(f"Unable to import web scraping config/context: {e}")🤖 Prompt for AI Agents
In cognee/api/v1/add/add.py around lines 26-28, the except block logs the
ImportError class instead of the caught exception instance; change the handler
to capture the exception (e.g., except ImportError as e) and log the actual
exception details (use e or repr(e), or logger.exception/traceback if stack is
needed) instead of str(ImportError); keep or remove the pass as appropriate.
| @contextmanager | ||
| def hold_lock(self): | ||
| """ | ||
| Context manager for safely acquiring and releasing the lock. | ||
| """ | ||
| self.acquire() | ||
| try: | ||
| yield | ||
| finally: | ||
| self.release() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Fix method name mismatch in hold_lock (calls undefined acquire/release).
hold_lock calls self.acquire()/self.release(), but the abstract API exposes acquire_lock()/release_lock(). This will raise at runtime and is flagged by static analysis. Use the declared methods.
Apply this diff:
@contextmanager
def hold_lock(self):
"""
Context manager for safely acquiring and releasing the lock.
"""
- self.acquire()
+ self.acquire_lock()
try:
yield
finally:
- self.release()
+ self.release_lock()Optionally yield the lock if callers need it: yield self.lock.
Run to find other mismatches (e.g., RedisAdapter.hold_lock uses acquire/release):
🏁 Script executed:
#!/bin/bash
rg -nC2 -e '\bhold_lock\s*\(' -e '\bacquire_lock\s*\(' -e '\brelease_lock\s*\(' -e '\bacquire\s*\(' -e '\brelease\s*\(' cognee/infrastructure/databases/cacheLength of output: 4573
Fix method name mismatch in hold_lock—calls undefined acquire/release methods.
The review comment is accurate. Both cache_db_interface.py (lines 38, 42) and RedisAdapter.py (lines 45, 49) call self.acquire() and self.release(), which don't exist. The abstract interface and all implementations define acquire_lock() and release_lock() instead.
Apply the provided diff to cache_db_interface.py. Additionally, the same issue exists in RedisAdapter.py and should be fixed identically:
- Line 45:
self.acquire()→self.acquire_lock() - Line 49:
self.release()→self.release_lock()
🧰 Tools
🪛 Pylint (4.0.1)
[error] 38-38: Instance of 'CacheDBInterface' has no 'acquire' member
(E1101)
[error] 42-42: Instance of 'CacheDBInterface' has no 'release' member
(E1101)
🤖 Prompt for AI Agents
In cognee/infrastructure/databases/cache/cache_db_interface.py around lines
33-42 and in RedisAdapter.py at the occurrences on lines 45 and 49, the
hold_lock context manager calls undefined methods self.acquire() and
self.release(); change those calls to the existing method names
self.acquire_lock() and self.release_lock() respectively so the interface and
implementations match (update the two calls in cache_db_interface.py and the two
calls in RedisAdapter.py).
hajdul88
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments. In general I think the biggest issue is that sometimes it breaks + cot rounds are not getting created when access control is ON.
I believe it would have been a better idea to create a reasoning enrichment task/module instead of extending and using the cot retriever as it is a different problem. In this way the prompt is getting searched in the graph in the first round and the retriever is designed for a QA type of use.
Some tests are missing (unit + end to end memify enrichment loop).
Description
Type of Change
Screenshots/Videos (if applicable)
Pre-submission Checklist
DCO Affirmation
I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin.