Skip to content

Conversation

@borisarzentar
Copy link
Member

Description

DCO Affirmation

I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin.

@borisarzentar borisarzentar self-assigned this Apr 1, 2025
@pull-checklist
Copy link

pull-checklist bot commented Apr 1, 2025

Please make sure all the checkboxes are checked:

  • I have tested these changes locally.
  • I have reviewed the code changes.
  • I have added end-to-end and unit tests (if applicable).
  • I have updated the documentation and README.md file (if necessary).
  • I have removed unnecessary code and debug statements.
  • PR title is clear and follows the convention.
  • I have tagged reviewers or team members for feedback.

@borisarzentar borisarzentar requested a review from hajdul88 April 1, 2025 10:37
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Apr 1, 2025

Walkthrough

This PR refines the task pipeline system by introducing explicit task configuration through a new TaskConfig and updating task dependencies with merge_needs. The execution flow is modified to allow task functions (e.g., run_tasks_with_telemetry and run_tasks_base) to accept a context parameter and operate asynchronously, with detailed telemetry events. New execution state classes and exception handling (e.g., WrongTaskOrderException) are added, alongside several integration tests and a graph visualization call, to cover multiple execution scenarios.

Changes

File(s) Change Summary
cognee/api/v1/add/add.py Updated the add function to pass a new task_config parameter for the ingest_data task.
cognee/api/v1/cognify/cognify.py, cognee/api/v1/cognify/code_graph_pipeline.py Modified pipeline functions to include a context parameter and restructured task dependencies via TaskConfig and merge_needs.
cognee/modules/pipelines/__init__.py, cognee/modules/pipelines/operations/needs.py Removed the old run_tasks_parallel import and added imports and definitions for merge_needs and MergeNeeds for task dependency management.
cognee/modules/pipelines/exceptions.py Introduced a new WrongTaskOrderException to handle incorrect task ordering.
cognee/modules/pipelines/operations/run_tasks.py, cognee/modules/pipelines/operations/run_tasks_base.py Refactored task execution: replaced the old run_tasks_base with a new implementation, updated function signatures to accept a context, and streamlined telemetry logging.
cognee/modules/pipelines/tasks/Task.py, cognee/modules/pipelines/tasks/__init__.py Added new execution state classes (TaskExecutionStarted, TaskExecutionCompleted, TaskExecutionErrored, TaskExecutionInfo) and updated Task to use an instance of TaskConfig with an asynchronous run method.
cognee/tasks/graph/extract_graph_from_data.py, cognee/tasks/ingestion/resolve_data_directories.py, cognee/tasks/storage/add_data_points.py, cognee/tasks/summarization/summarize_text.py Adjusted task logic: removed legacy calls to add_data_points, altered return types, added an optional data_point_connections parameter, and updated input type for summarization.
cognee/tests/integration/run_toy_tasks/*.py Introduced several new test files for broadcast, circular dependency, multiple inputs, and simple task execution scenarios; removed outdated test files.
cognee/tests/test_library.py Added a graph visualization step by importing and invoking visualize_graph.

Suggested reviewers

  • hajdul88

Poem

I'm a rabbit hopping through lines of code,
With TaskConfig and merge_needs lighting my road.
Tasks now run async with a clever new beat,
Telemetry dancing as processes meet.
In a burrow of tests and pipelines so bright,
I nibble on bugs and hop into the night.
CodeRabbit cheers these changes—a real delight!

✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai plan to trigger planning for file edits and PR creation.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@borisarzentar borisarzentar requested a review from dexters1 April 1, 2025 10:37
@borisarzentar borisarzentar marked this pull request as draft April 1, 2025 10:37
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 14

🔭 Outside diff range comments (2)
cognee/tasks/graph/extract_graph_from_data.py (2)

17-22: 🛠️ Refactor suggestion

Update the return type annotation to match the new return value

The function now returns a tuple of (graph_nodes, graph_edges) but the return type annotation still indicates List[DocumentChunk]. This inconsistency should be fixed to maintain code clarity and enable proper type checking.

async def integrate_chunk_graphs(
    data_chunks: list[DocumentChunk],
    chunk_graphs: list,
    graph_model: Type[BaseModel],
    ontology_adapter: OntologyResolver,
-) -> List[DocumentChunk]:
+) -> tuple[list, list]:

For even better type safety, consider specifying the exact types in the tuple rather than using generic list:

) -> tuple[list[DataPoint], list]:  # Replace with actual types for nodes and edges

44-54: 🛠️ Refactor suggestion

Update extract_graph_from_data return type and docstring

The extract_graph_from_data function returns the result of integrate_chunk_graphs, which now returns a tuple of (graph_nodes, graph_edges) instead of List[DocumentChunk]. The function signature and docstring should be updated to reflect this change.

async def extract_graph_from_data(
    data_chunks: list[DocumentChunk],
    graph_model: Type[BaseModel],
    ontology_adapter: OntologyResolver = OntologyResolver(),
-) -> List[DocumentChunk]:
+) -> tuple[list, list]:  # Consider using more specific types
-    """Extracts and integrates a knowledge graph from the text content of document chunks using a specified graph model."""
+    """Extracts a knowledge graph from the text content of document chunks and returns the resulting nodes and edges."""
    chunk_graphs = await asyncio.gather(
        *[extract_content_graph(chunk.text, graph_model) for chunk in data_chunks]
    )
    return await integrate_chunk_graphs(data_chunks, chunk_graphs, graph_model, ontology_adapter)
🧰 Tools
🪛 Ruff (0.8.2)

48-48: Do not perform function call OntologyResolver in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

🧹 Nitpick comments (13)
cognee/modules/pipelines/exceptions.py (1)

1-2: Consider implementing a proper constructor for the exception class.

The current implementation defines message as a typed attribute but doesn't initialize it in a constructor. While this will work (Python's Exception base class handles the message), a more idiomatic approach would be to explicitly initialize the message in an __init__ method.

class WrongTaskOrderException(Exception):
-    message: str
+    def __init__(self, message: str):
+        self.message = message
+        super().__init__(message)

This makes the exception usage clearer and ensures the message is properly initialized and accessible both as exception.message and through the standard exception.args[0].

cognee/modules/pipelines/tasks/__init__.py (1)

1-1: Use __all__ to explicitly define exported symbols.

The current import statement correctly exposes the classes at the package level, but static analysis tools flag these imports as unused. Adding an __all__ list would make the exports explicit and silence these warnings.

from .Task import Task, TaskConfig, TaskExecutionResult
+
+__all__ = ["Task", "TaskConfig", "TaskExecutionResult"]

This approach clearly indicates which symbols are being exported from the package and satisfies linting tools while maintaining the same functionality.

🧰 Tools
🪛 Ruff (0.8.2)

1-1: .Task.Task imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


1-1: .Task.TaskConfig imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


1-1: .Task.TaskExecutionResult imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)

cognee/tasks/graph/extract_graph_from_data.py (1)

23-23: Update docstring to reflect new return value

The docstring doesn't accurately reflect the function's new behavior of returning graph nodes and edges rather than updated DocumentChunk objects.

-    """Updates DocumentChunk objects, integrates data points and edges into databases."""
+    """Processes document chunks and returns graph nodes and edges without integrating them into databases."""
cognee/modules/pipelines/__init__.py (1)

2-3: Consider adding all to explicitly define public API.

Static analysis shows these imports are unused, which is expected in an __init__.py file that re-exports symbols. Consider adding an __all__ list to explicitly define the module's public API.

from .tasks.Task import Task
from .operations.run_tasks_v2 import run_tasks
from .operations.input_output import merge_inputs, MergeInputs
+
+__all__ = ["Task", "run_tasks", "merge_inputs", "MergeInputs"]
🧰 Tools
🪛 Ruff (0.8.2)

2-2: .operations.run_tasks_v2.run_tasks imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


3-3: .operations.input_output.merge_inputs imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


3-3: .operations.input_output.MergeInputs imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)

cognee/modules/pipelines/operations/input_output.py (3)

7-9: Consider adding docstrings to MergeInputs.

Defining a short docstring describing the rationale behind merging outputs would enhance readability and maintainability.


11-12: merge_inputs function is succinct and readable.

The function clearly encapsulates the creation of a MergeInputs object. Consider adding basic doctests or unit tests to confirm correct behavior.


27-39: Validate that dependencies exist in results before accessing them.

If a task dependency is not yet processed or is missing in results, get_input_results will fail. Consider adding defensive checks or error handling to improve robustness.

cognee/modules/pipelines/operations/run_tasks_v2.py (4)

21-21: Refine logger name for clarity.

Using "run_tasks(tasks: [Task], data)" as the logger name is unusual. Typically, module-level loggers are named after the module (e.g., get_logger(__name__)) for better identification.

-logger = get_logger("run_tasks(tasks: [Task], data)")
+logger = get_logger(__name__)

39-43: Fix spelling of the variable name.

dependendent_task is misspelled. For clarity and code consistency, rename it to dependent_task.

-for dependendent_task in task_graph[task.executable]:
-    dependents.setdefault(dependendent_task, []).append(task.executable)
+for dependent_task in task_graph[task.executable]:
+    dependents.setdefault(dependent_task, []).append(task.executable)

54-59: Consider combining these conditional branches.

According to the static analysis hint, lines 55-58 can be merged. Ensure that this refactor does not break the logic of storing partial vs. final results.

- if not task_execution_info.is_done:
-     results[task.executable] = task_execution_info.result
- elif task_execution_info.is_done and task.executable not in results:
-     results[task.executable] = task_execution_info.result
+ if (not task_execution_info.is_done) or (task_execution_info.is_done and task.executable not in results):
+     results[task.executable] = task_execution_info.result
🧰 Tools
🪛 Ruff (0.8.2)

55-58: Combine if branches using logical or operator

Combine if branches

(SIM114)


71-75: Correct minor typo in the exception message.

“disconneted” is misspelled; replace it with “disconnected.”

-f"{number_of_executed_tasks}/{len(tasks)} tasks executed. You likely have some disconneted tasks or circular dependency."
+f"{number_of_executed_tasks}/{len(tasks)} tasks executed. You likely have some disconnected tasks or circular dependency."
cognee/modules/pipelines/tasks/Task.py (2)

8-8: Remove or clarify commented-out code.

The TaskExecutable union is commented out. If it’s no longer needed, remove it to keep the code clean; otherwise, clarify why it remains commented.


46-91: Reduce duplication between generator and async generator blocks.

Both branches share similar logic for handling and batching results. Consider extracting a helper function to reduce repetition and improve maintainability.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ff0878d and 12086c0.

📒 Files selected for processing (18)
  • cognee/api/v1/add/add.py (2 hunks)
  • cognee/api/v1/cognify/cognify.py (2 hunks)
  • cognee/modules/pipelines/__init__.py (1 hunks)
  • cognee/modules/pipelines/exceptions.py (1 hunks)
  • cognee/modules/pipelines/operations/input_output.py (1 hunks)
  • cognee/modules/pipelines/operations/run_tasks.py (3 hunks)
  • cognee/modules/pipelines/operations/run_tasks_v2.py (1 hunks)
  • cognee/modules/pipelines/tasks/Task.py (1 hunks)
  • cognee/modules/pipelines/tasks/__init__.py (1 hunks)
  • cognee/tasks/graph/extract_graph_from_data.py (1 hunks)
  • cognee/tasks/ingestion/resolve_data_directories.py (1 hunks)
  • cognee/tasks/storage/add_data_points.py (2 hunks)
  • cognee/tasks/summarization/summarize_text.py (1 hunks)
  • cognee/tests/integration/run_toy_tasks/run_tasks_broadcast_test.py (1 hunks)
  • cognee/tests/integration/run_toy_tasks/run_tasks_circular_test.py (1 hunks)
  • cognee/tests/integration/run_toy_tasks/run_tasks_multiple_inputs_test.py (1 hunks)
  • cognee/tests/integration/run_toy_tasks/run_tasks_simple_test.py (1 hunks)
  • cognee/tests/integration/run_toy_tasks/run_tasks_test.py (0 hunks)
💤 Files with no reviewable changes (1)
  • cognee/tests/integration/run_toy_tasks/run_tasks_test.py
🧰 Additional context used
🧬 Code Definitions (8)
cognee/tests/integration/run_toy_tasks/run_tasks_circular_test.py (3)
cognee/modules/pipelines/tasks/Task.py (3)
  • Task (21-98)
  • TaskConfig (16-18)
  • run (32-98)
cognee/modules/pipelines/exceptions.py (1)
  • WrongTaskOrderException (1-2)
cognee/modules/pipelines/operations/run_tasks_v2.py (1)
  • run_tasks_base (24-74)
cognee/tests/integration/run_toy_tasks/run_tasks_simple_test.py (5)
cognee/modules/pipelines/tasks/Task.py (3)
  • Task (21-98)
  • TaskConfig (16-18)
  • run (32-98)
cognee/modules/pipelines/operations/run_tasks_v2.py (1)
  • run_tasks_base (24-74)
cognee/tests/integration/run_toy_tasks/run_tasks_circular_test.py (5)
  • run_and_check_tasks (9-39)
  • number_generator (10-12)
  • add_one (14-15)
  • multiply_by_two (17-18)
  • test_run_tasks (42-43)
cognee/tests/integration/run_toy_tasks/run_tasks_multiple_inputs_test.py (5)
  • run_and_check_tasks (7-38)
  • number_generator (8-10)
  • add_one (12-13)
  • multiply_by_two (18-20)
  • test_run_tasks (41-42)
cognee/tests/integration/run_toy_tasks/run_tasks_broadcast_test.py (5)
  • run_and_check_tasks (7-33)
  • number_generator (8-10)
  • add_one (12-13)
  • multiply_by_two (15-16)
  • test_run_tasks (36-37)
cognee/modules/pipelines/tasks/__init__.py (1)
cognee/modules/pipelines/tasks/Task.py (3)
  • Task (21-98)
  • TaskConfig (16-18)
  • TaskExecutionResult (11-13)
cognee/tests/integration/run_toy_tasks/run_tasks_broadcast_test.py (3)
cognee/modules/pipelines/tasks/Task.py (3)
  • Task (21-98)
  • TaskConfig (16-18)
  • run (32-98)
cognee/modules/pipelines/operations/run_tasks_v2.py (1)
  • run_tasks_base (24-74)
cognee/tests/integration/run_toy_tasks/run_tasks_simple_test.py (5)
  • run_and_check_tasks (7-33)
  • number_generator (8-10)
  • add_one (12-13)
  • multiply_by_two (15-16)
  • test_run_tasks (36-37)
cognee/tests/integration/run_toy_tasks/run_tasks_multiple_inputs_test.py (4)
cognee/modules/pipelines/tasks/Task.py (3)
  • Task (21-98)
  • TaskConfig (16-18)
  • run (32-98)
cognee/modules/pipelines/operations/run_tasks_v2.py (1)
  • run_tasks_base (24-74)
cognee/tests/integration/run_toy_tasks/run_tasks_simple_test.py (5)
  • run_and_check_tasks (7-33)
  • number_generator (8-10)
  • add_one (12-13)
  • multiply_by_two (15-16)
  • test_run_tasks (36-37)
cognee/tests/integration/run_toy_tasks/run_tasks_broadcast_test.py (5)
  • run_and_check_tasks (7-33)
  • number_generator (8-10)
  • add_one (12-13)
  • multiply_by_two (15-16)
  • test_run_tasks (36-37)
cognee/modules/pipelines/operations/run_tasks_v2.py (3)
cognee/modules/pipelines/operations/input_output.py (2)
  • get_input_results (27-39)
  • get_input_tasks (15-24)
cognee/modules/pipelines/exceptions.py (1)
  • WrongTaskOrderException (1-2)
cognee/modules/pipelines/operations/run_tasks.py (2)
  • run_tasks_with_telemetry (22-67)
  • run_tasks (70-96)
cognee/api/v1/cognify/cognify.py (2)
cognee/modules/pipelines/operations/input_output.py (1)
  • merge_inputs (11-12)
cognee/modules/pipelines/tasks/Task.py (2)
  • Task (21-98)
  • TaskConfig (16-18)
cognee/modules/pipelines/tasks/Task.py (1)
cognee/modules/pipelines/operations/input_output.py (1)
  • MergeInputs (7-8)
🪛 Ruff (0.8.2)
cognee/tests/integration/run_toy_tasks/run_tasks_circular_test.py

10-10: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


14-14: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


17-17: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

cognee/tests/integration/run_toy_tasks/run_tasks_simple_test.py

8-8: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


12-12: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


15-15: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

cognee/modules/pipelines/tasks/__init__.py

1-1: .Task.Task imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


1-1: .Task.TaskConfig imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


1-1: .Task.TaskExecutionResult imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)

cognee/tests/integration/run_toy_tasks/run_tasks_broadcast_test.py

8-8: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


12-12: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


15-15: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

cognee/tasks/storage/add_data_points.py

9-9: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

cognee/tests/integration/run_toy_tasks/run_tasks_multiple_inputs_test.py

8-8: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


12-12: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


15-15: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


18-18: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

cognee/modules/pipelines/__init__.py

2-2: .operations.run_tasks_v2.run_tasks imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


3-3: .operations.input_output.merge_inputs imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


3-3: .operations.input_output.MergeInputs imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)

cognee/modules/pipelines/operations/run_tasks_v2.py

55-58: Combine if branches using logical or operator

Combine if branches

(SIM114)


127-127: Do not perform function call uuid4 in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

cognee/modules/pipelines/operations/run_tasks.py

10-10: cognee.modules.users.models.User imported but unused

Remove unused import: cognee.modules.users.models.User

(F401)

⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: lint (ubuntu-latest, 3.11.x)
  • GitHub Check: docker-compose-test
  • GitHub Check: Build Cognee Backend Docker App Image
🔇 Additional comments (27)
cognee/tasks/ingestion/resolve_data_directories.py (1)

44-44: Minor formatting improvement.

Adding this blank line improves code readability by better separating the conditional blocks from the return statement.

cognee/tasks/summarization/summarize_text.py (1)

5-5:

Details

❓ Verification inconclusive

Verify all callers after parameter type change.

The function's parameter type has been changed from list[DocumentChunk] to list[DataPoint]. While both types appear to have compatible interfaces (both have .text and .id attributes used in the function), this change requires all callers to provide the correct type.

Ensure that all places where summarize_text is called have been updated to provide list[DataPoint] instead of list[DocumentChunk]. This could be verified with:

Also applies to: 10-10


🏁 Script executed:

#!/bin/bash
# Find all usages of the summarize_text function to verify they're providing the correct type
rg -A 2 -B 2 "summarize_text\(" --type py

Length of output: 1722


Verify Summarize Function Callers for Type Consistency

The updated summarize_text function in cognee/tasks/summarization/summarize_text.py now expects a parameter of type list[DataPoint] instead of list[DocumentChunk]. The grep output indicates that at least two callers—one in cognee/modules/retrieval/graph_summary_completion_retriever.py and another in cognee/modules/retrieval/context_providers/SummarizedTripletSearchContextProvider.py—pass a variable named direct_text as the first argument.

  • Action Required:
    • Confirm whether direct_text in these callers is indeed a list[DataPoint]. If it’s not (for example, if it’s a string or a list of DocumentChunk), the callers must be updated accordingly.
    • Additionally, note that there is another summarize_text defined in cognee/modules/retrieval/utils/completion.py which accepts a str as the first parameter. Please verify that there is no mix-up between these functions and that the appropriate version is called in each context.
cognee/api/v1/add/add.py (2)

4-5: Appropriate update of imports to support TaskConfig

The import of TaskConfig from cognee.modules.pipelines.tasks is correctly added to support the changes in the task configuration. This aligns with the broader refactoring to explicitly define task dependencies.


40-48: Good refactoring to explicitly define task dependencies

This change improves code clarity by explicitly defining that ingest_data depends on the output of resolve_data_directories through the TaskConfig mechanism. This makes the task dependency graph more explicit and maintainable.

cognee/tests/integration/run_toy_tasks/run_tasks_circular_test.py (3)

1-7: Good test setup for circular dependency detection

The imports correctly bring in the necessary components to test circular dependency detection in the new run_tasks_v2 implementation.


20-40: Well-structured test for circular dependency detection

This test correctly verifies that a WrongTaskOrderException is raised when circular dependencies are detected (add_one depends on multiply_by_two, and multiply_by_two depends on add_one). It also validates intermediate results against expected values.


42-48: Good test execution setup

The test is correctly set up to run the async test function and includes a main block for direct execution.

cognee/tasks/storage/add_data_points.py (1)

41-42: Good implementation of additional graph edge handling

The implementation correctly checks if data_point_connections is provided before attempting to add them to the graph. This maintains backward compatibility while adding new functionality.

cognee/tasks/graph/extract_graph_from_data.py (1)

42-42:

Details

✅ Verification successful

Function return value changed

The function now returns graph_nodes and graph_edges as a tuple instead of returning data_chunks. This change appears to be part of a refactoring to separate data extraction from storage operations, which is a good architectural improvement.

Verify that all callers of this function have been updated to handle the new return type:


🏁 Script executed:

#!/bin/bash
# Find all calls to integrate_chunk_graphs to ensure they handle the new return type
rg "integrate_chunk_graphs\(" --type py -A 5 -B 5

Length of output: 2047


Verified: Callers now propagate the new tuple return type

We’ve confirmed that in both cognee/tasks/graph/extract_graph_from_data.py and cognee/tasks/graph/extract_graph_from_data_v2.py the calls to integrate_chunk_graphs now simply return its result—which, after the change in extract_graph_from_data.py (i.e. returning graph_nodes, graph_edges), is being propagated consistently. No destructuring issues were found at the call sites; however, please double-check that any downstream code expecting these values correctly handles the tuple format.

  • Verify that any further processing of the returned tuple unpacks the values as intended.
  • Consider updating type annotations and documentation to explicitly reflect the new tuple return type.
cognee/tests/integration/run_toy_tasks/run_tasks_simple_test.py (1)

21-33: Task configuration looks good.

The task pipeline is correctly set up with a chain of dependencies: multiply_by_two depends on add_one, which depends on number_generator. The assertions correctly validate the expected results.

cognee/tests/integration/run_toy_tasks/run_tasks_broadcast_test.py (1)

21-33: Broadcast pattern implemented correctly.

This test correctly verifies the "broadcast" pattern, where multiple tasks (both add_one and multiply_by_two) take input from the same source task (number_generator). The expected results array matches this configuration.

cognee/tests/integration/run_toy_tasks/run_tasks_multiple_inputs_test.py (1)

25-38: Multiple inputs configuration is correctly implemented.

This test effectively verifies the functionality of tasks with multiple inputs. The task multiply_by_two correctly receives inputs from both add_one and add_two. The expected results array accounts for all output values from this execution graph.

cognee/modules/pipelines/operations/run_tasks.py (3)

3-3: Imports look fine.

No issues with adding the UUID utilities for pipeline identification.


11-12: Imports are appropriately utilized.

These imports (get_default_user, get_current_settings, get_logger, and run_tasks_base) are all directly referenced and serve clear purposes in this file.

Also applies to: 14-14, 17-17


40-41: Yielding results within run_tasks_with_telemetry.

Using the asynchronous generator to yield run_task_info from the new run_tasks_base is a coherent approach. This maintains a streaming pattern of result emission.

cognee/modules/pipelines/operations/input_output.py (3)

1-2: Imports are clear and necessary.

The BaseModel from Pydantic and Any from typing are used in this file, so these imports are valid.


4-4: Task import is appropriate.

The Task class is referenced later in this module, ensuring typed declarations for function parameters.


15-25: Ensure handling of nested merged inputs if needed.

get_input_tasks flattens a single level of MergeInputs. If nested merges are not required, this is fine. Otherwise, consider a recursive approach or additional checks.

Would you like a script to scan for any nested MergeInputs usage in the codebase to confirm this logic suffices?

cognee/api/v1/cognify/cognify.py (6)

13-14: New imports for pipeline tasks are well placed.

run_tasks and merge_inputs are both used in subsequent lines, as are Task and TaskConfig, making the import statements concise and appropriate.


125-129: task_config with user permissions.

Providing the user and permissions=["write"] in check_permissions_on_documents is logical. Ensure tests confirm this permission check is effective.


130-135: Batch size configuration for extract_chunks_from_documents.

Setting output_batch_size=10 clarifies how chunks are yielded downstream. This is a practical design for controlling data flow.


136-141: Graph extraction task config.

Accepting extract_chunks_from_documents as input for extract_graph_from_data aligns with a clear data flow. The usage of TaskConfig is consistent with the new pipeline approach.


142-146: Summarization task config.

Pointing summarize_text to extract_chunks_from_documents as input ensures that only processed text chunks are summarized. This is a coherent pipeline design.


147-149: New add_data_points task merges summary and graph outputs.

Combining summarize_text and extract_graph_from_data via merge_inputs is a neat solution for a unified downstream feed. Confirm that all necessary fields are included to avoid partial data merges.

cognee/modules/pipelines/operations/run_tasks_v2.py (2)

1-15: Overall Implementation Looks Solid!

The new module effectively orchestrates task execution, logs important milestones, and handles errors. The imports and initial setup appear correct, with well-organized structure.


24-27: Consider yielding or documenting behavior for empty tasks.

When no tasks are provided, the function immediately returns without yielding any value or result. This might be intentional. If you intend to notify callers of an empty run, consider yielding an empty execution result or documenting that this function will exit silently.

cognee/modules/pipelines/tasks/Task.py (1)

25-30: Constructor approach is concise.

Automatically assigning task_config = TaskConfig() if none is provided offers a clean initialization strategy. Nice job!

@borisarzentar borisarzentar requested a review from soobrosa April 1, 2025 10:54
@borisarzentar borisarzentar marked this pull request as ready for review April 1, 2025 20:12
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (19)
cognee/modules/pipelines/__init__.py (1)

3-3: Add merge_needs and MergeNeeds to the __all__ list

These imports are flagged by the static analyzer as unused. Since they are in the __init__.py file, they are likely meant to be part of the public API. Add them to an __all__ list to properly export them.

from .tasks.Task import Task
from .operations.run_tasks import run_tasks
from .operations.needs import merge_needs, MergeNeeds

+__all__ = ["Task", "run_tasks", "merge_needs", "MergeNeeds"]
🧰 Tools
🪛 Ruff (0.8.2)

3-3: .operations.needs.merge_needs imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


3-3: .operations.needs.MergeNeeds imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)

cognee/tests/test_library.py (1)

80-84: Consider removing commented code

There's a commented line # await render_graph() that appears to be replaced by the new visualization code. If this function is no longer used, the commented code should be removed.

Additionally, consider creating the directory for the visualization file if it doesn't exist to prevent potential runtime errors:

graph_file_path = os.path.join(
    pathlib.Path(__file__).parent, ".artifacts", "graph_visualization.html"
)
+os.makedirs(os.path.dirname(graph_file_path), exist_ok=True)
await visualize_graph(graph_file_path)
cognee/modules/pipelines/tasks/__init__.py (1)

1-8: Add imported classes to __all__ list

These imports are flagged by the static analyzer as unused. Since they are in the __init__.py file, they are likely meant to be part of the public API. Add them to an __all__ list to properly export them.

from .Task import (
    Task,
    TaskConfig,
    TaskExecutionInfo,
    TaskExecutionCompleted,
    TaskExecutionStarted,
    TaskExecutionErrored,
)

+__all__ = [
+    "Task",
+    "TaskConfig",
+    "TaskExecutionInfo",
+    "TaskExecutionCompleted",
+    "TaskExecutionStarted",
+    "TaskExecutionErrored",
+]
🧰 Tools
🪛 Ruff (0.8.2)

2-2: .Task.Task imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


3-3: .Task.TaskConfig imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


4-4: .Task.TaskExecutionInfo imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


5-5: .Task.TaskExecutionCompleted imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


6-6: .Task.TaskExecutionStarted imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


7-7: .Task.TaskExecutionErrored imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)

cognee/tests/integration/run_toy_tasks/run_tasks_circular_test.py (2)

25-25: Fix typo in error message

The error message contains a typo: "disconneted" should be "disconnected".

-        match="1/3 tasks executed. You likely have some disconneted tasks or circular dependency.",
+        match="1/3 tasks executed. You likely have some disconnected tasks or circular dependency.",

Note: You may need to update this typo in the WrongTaskOrderException implementation as well.


20-21: Consider using a more descriptive variable name than 'index'

The variable index is used to track the current position in the expected results. Consider renaming it to a more descriptive name like result_index to better indicate its purpose.

-    index = 0
+    result_index = 0
     expected_results = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 22]

Make sure to update all references to this variable in the code.

cognee/modules/pipelines/operations/needs.py (1)

27-60: Consider adding more explanatory comments to complex result handling logic.

The result processing logic is complex, particularly for merged needs. Adding more explanatory comments would help future maintainers understand the rationale behind certain steps.

 def get_need_task_results(results, task: Task):
     input_results = []
 
     for task_dependency in task.task_config.needs:
         if isinstance(task_dependency, MergeNeeds):
+            # For merged needs, we need to collect results from all dependency tasks
+            # and organize them properly for the dependent task
             task_results = []
             max_result_length = 0
 
             for task_need in task_dependency.needs:
                 result = results[task_need]
                 task_results.append(result)
 
                 if isinstance(result, tuple):
+                    # Track the maximum tuple length to ensure we can accommodate all results
                     max_result_length = max(max_result_length, len(result))
 
+            # Initialize a list of lists to hold the final results
+            # Each inner list will contain results for one parameter position
             final_results = [[] for _ in range(max_result_length)]
 
             for result in task_results:
                 if isinstance(result, tuple):
+                    # For tuple results, distribute each tuple element to the corresponding parameter position
                     for i, value in enumerate(result):
                         final_results[i].extend(value)
                 else:
+                    # For non-tuple results, add them to the first parameter position
                     final_results[0].extend(result)
 
+            # Add the organized results to the input results
             input_results.extend(final_results)
         else:
+            # For single task dependencies, handle the results directly
             result = results[task_dependency]
 
             if isinstance(result, tuple):
+                # For tuple results, add each element as a separate input
                 input_results.extend(result)
             else:
+                # For non-tuple results, add them as a single input
                 input_results.append(result)
 
     return input_results
cognee/modules/pipelines/operations/run_tasks_base.py (2)

12-14: Document the function’s return type in the docstring.
While the docstring provides a summary of the function’s behavior, it does not clarify the return type or the nature of the yielded data. Explicitly stating the yielded return types can help developers quickly understand how to consume this function’s output.


60-63: Fix the spelling of "disconneted" in the exception message.
A minor typographical fix to ensure clarity for users encountering this error.

- f"{number_of_executed_tasks}/{len(tasks)} tasks executed. You likely have some disconneted tasks or circular dependency."
+ f"{number_of_executed_tasks}/{len(tasks)} tasks executed. You likely have some disconnected tasks or circular dependency."
cognee/api/v1/cognify/code_graph_pipeline.py (2)

48-55: Check for concurrency or performance constraints.
The get_repo_file_dependencies task can be I/O intensive. Applying a large output_batch_size is beneficial but consider if you need an upper limit to avoid high memory usage in very large repositories.


106-106: Consider removing hard-coded user path.
Using a user-specific path like "/Users/borisarzentar/Projects/graphrag" may not be portable for other environments. You can make it configurable or accept a parameter to improve maintainability.

cognee/modules/pipelines/operations/run_tasks.py (2)

21-23: Add a docstring for run_tasks_with_telemetry.
A brief explanation of what telemetry events are logged, plus the expected usage of the function, can improve maintainability.


107-114: Document the context parameter in run_tasks.
You have added a context parameter, but there is no explanation about what it contains or how it’s used. Documenting it in a docstring prevents confusion for future maintainers.

🧰 Tools
🪛 Ruff (0.8.2)

110-110: Do not perform function call uuid4 in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

cognee/modules/pipelines/tasks/Task.py (7)

8-8: Remove or uncomment the TaskExecutable type definition.

The commented-out TaskExecutable type definition could be useful for type hinting but is currently not being used.

-# TaskExecutable = Union[Callable[..., Any], Callable[..., Coroutine[Any, Any, Any]], AsyncGenerator[Any, Any, Any], Generator[Any, Any, Any]]
+TaskExecutable = Union[Callable[..., Any], Callable[..., Coroutine[Any, Any, Any]], AsyncGenerator[Any, Any, Any], Generator[Any, Any, Any]]

11-13: Add docstrings to the new TaskExecution classes.

The new execution state classes lack documentation. Adding docstrings would help users understand the purpose and usage of each class.

 class TaskExecutionStarted(BaseModel):
+    """Emitted when a task begins execution."""
     task: Callable


 class TaskExecutionCompleted(BaseModel):
+    """Emitted when a task successfully completes execution."""
     task: Callable
     result: Any = None


 class TaskExecutionErrored(BaseModel):
+    """Emitted when a task encounters an error during execution."""
     task: Callable
     error: Exception

     model_config = {"arbitrary_types_allowed": True}


 class TaskExecutionInfo(BaseModel):
+    """Emitted during task execution to provide intermediate results."""
     result: Any = None
     task: Callable

Also applies to: 15-18, 20-25, 27-30


32-34: Add a docstring to the TaskConfig class.

The TaskConfig class lacks documentation about its purpose and configuration options.

 class TaskConfig(BaseModel):
+    """
+    Configuration for task execution.
+    
+    Attributes:
+        output_batch_size: Number of results to accumulate before yielding a batch
+        needs: List of dependencies (tasks or MergeNeeds) that must complete before this task
+    """
     output_batch_size: int = 1
     needs: list[Union[Callable, MergeNeeds]] = []

41-47: Consider adding type annotation for the 'executable' parameter.

The executable parameter lacks type annotation, which would help with code understanding and IDE support. Consider using the TaskExecutable type definition that's currently commented out.

-    def __init__(self, executable, *args, task_config: TaskConfig = None, **kwargs):
+    def __init__(self, executable: Callable, *args, task_config: TaskConfig = None, **kwargs):
         self.executable = executable
         self.default_params = {"args": args, "kwargs": kwargs}
         self.result = None
 
         self.task_config = task_config or TaskConfig()

48-59: Add return type annotation to the async run method.

The run method is now async and yields various execution state objects, but lacks a return type annotation that would help with type checking and code clarity.

-    async def run(self, *args, **kwargs):
+    async def run(self, *args, **kwargs) -> AsyncGenerator[Union[TaskExecutionStarted, TaskExecutionInfo, TaskExecutionCompleted, TaskExecutionErrored], None]:
         combined_args = args + self.default_params["args"]
         combined_kwargs = {
             **self.default_params["kwargs"],
             **kwargs,
         }
 
         yield TaskExecutionStarted(
             task=self.executable,
         )

103-104: Consider adding TaskExecutionInfo for regular function results.

Unlike generator functions, regular functions don't yield intermediate TaskExecutionInfo events. For consistency, consider yielding a TaskExecutionInfo for regular functions before the completion event.

             else:  # Regular function
                 task_result = self.executable(*combined_args, **combined_kwargs)
+                
+                # For consistency with other executable types
+                yield TaskExecutionInfo(
+                    result=task_result,
+                    task=self.executable,
+                )

1-48: Overall architectural observations on the task execution framework.

The new task execution framework provides detailed telemetry and more flexible configuration with TaskConfig, which is good. However, I have some architectural suggestions:

  1. Consider using inheritance or composition to avoid duplicate code in generator and async generator handling.
  2. The current implementation might be verbose for simple tasks - consider adding factory methods or builders for common patterns.
  3. Add documentation about how to properly consume and interpret the yielded execution state objects.

Would you like me to propose a refactored design that addresses these architectural concerns?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 12086c0 and e947257.

📒 Files selected for processing (14)
  • cognee/api/v1/cognify/code_graph_pipeline.py (3 hunks)
  • cognee/api/v1/cognify/cognify.py (3 hunks)
  • cognee/modules/pipelines/__init__.py (1 hunks)
  • cognee/modules/pipelines/operations/needs.py (1 hunks)
  • cognee/modules/pipelines/operations/run_tasks.py (3 hunks)
  • cognee/modules/pipelines/operations/run_tasks_base.py (1 hunks)
  • cognee/modules/pipelines/tasks/Task.py (1 hunks)
  • cognee/modules/pipelines/tasks/__init__.py (1 hunks)
  • cognee/tests/integration/run_toy_tasks/run_task_from_queue_test.py (0 hunks)
  • cognee/tests/integration/run_toy_tasks/run_tasks_broadcast_test.py (1 hunks)
  • cognee/tests/integration/run_toy_tasks/run_tasks_circular_test.py (1 hunks)
  • cognee/tests/integration/run_toy_tasks/run_tasks_multiple_inputs_test.py (1 hunks)
  • cognee/tests/integration/run_toy_tasks/run_tasks_simple_test.py (1 hunks)
  • cognee/tests/test_library.py (2 hunks)
💤 Files with no reviewable changes (1)
  • cognee/tests/integration/run_toy_tasks/run_task_from_queue_test.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • cognee/api/v1/cognify/cognify.py
🧰 Additional context used
🧬 Code Definitions (11)
cognee/modules/pipelines/tasks/__init__.py (1)
cognee/modules/pipelines/tasks/Task.py (6)
  • Task (37-114)
  • TaskConfig (32-34)
  • TaskExecutionInfo (27-29)
  • TaskExecutionCompleted (15-17)
  • TaskExecutionStarted (11-12)
  • TaskExecutionErrored (20-24)
cognee/tests/test_library.py (2)
cognee/api/v1/users/routers/get_visualize_router.py (1)
  • visualize (15-26)
cognee/api/v1/visualize/visualize.py (1)
  • visualize_graph (14-27)
cognee/modules/pipelines/operations/run_tasks_base.py (2)
cognee/modules/pipelines/operations/needs.py (2)
  • get_need_task_results (27-60)
  • get_task_needs (15-24)
cognee/modules/pipelines/tasks/Task.py (4)
  • Task (37-114)
  • TaskExecutionCompleted (15-17)
  • TaskExecutionInfo (27-29)
  • run (48-114)
cognee/api/v1/cognify/code_graph_pipeline.py (5)
cognee/modules/pipelines/operations/needs.py (1)
  • merge_needs (11-12)
cognee/modules/pipelines/tasks/Task.py (2)
  • Task (37-114)
  • TaskConfig (32-34)
cognee/tasks/repo_processor/get_repo_file_dependencies.py (1)
  • get_repo_file_dependencies (50-92)
cognee/tasks/repo_processor/get_non_code_files.py (1)
  • get_non_py_files (4-125)
cognee/tasks/ingestion/ingest_data.py (1)
  • ingest_data (17-153)
cognee/modules/pipelines/operations/run_tasks.py (6)
cognee/modules/pipelines/operations/log_pipeline_run_start.py (1)
  • log_pipeline_run_start (8-35)
cognee/modules/pipelines/operations/log_pipeline_run_complete.py (1)
  • log_pipeline_run_complete (8-35)
cognee/modules/pipelines/operations/log_pipeline_run_error.py (1)
  • log_pipeline_run_error (8-41)
cognee/shared/utils.py (1)
  • send_telemetry (77-102)
cognee/modules/pipelines/tasks/Task.py (4)
  • Task (37-114)
  • TaskExecutionCompleted (15-17)
  • TaskExecutionErrored (20-24)
  • TaskExecutionStarted (11-12)
cognee/modules/pipelines/operations/run_tasks_base.py (1)
  • run_tasks_base (12-63)
cognee/modules/pipelines/tasks/Task.py (1)
cognee/modules/pipelines/operations/needs.py (1)
  • MergeNeeds (7-8)
cognee/modules/pipelines/__init__.py (1)
cognee/modules/pipelines/operations/needs.py (2)
  • merge_needs (11-12)
  • MergeNeeds (7-8)
cognee/tests/integration/run_toy_tasks/run_tasks_broadcast_test.py (3)
cognee/modules/pipelines/tasks/Task.py (4)
  • Task (37-114)
  • TaskConfig (32-34)
  • TaskExecutionInfo (27-29)
  • run (48-114)
cognee/modules/pipelines/operations/run_tasks_base.py (1)
  • run_tasks_base (12-63)
cognee/tests/integration/run_toy_tasks/run_tasks_simple_test.py (5)
  • run_and_check_tasks (7-33)
  • number_generator (8-10)
  • add_one (12-13)
  • multiply_by_two (15-16)
  • test_run_tasks (36-37)
cognee/tests/integration/run_toy_tasks/run_tasks_circular_test.py (3)
cognee/modules/pipelines/tasks/Task.py (4)
  • Task (37-114)
  • TaskConfig (32-34)
  • TaskExecutionInfo (27-29)
  • run (48-114)
cognee/modules/pipelines/exceptions.py (1)
  • WrongTaskOrderException (1-2)
cognee/modules/pipelines/operations/run_tasks_base.py (1)
  • run_tasks_base (12-63)
cognee/tests/integration/run_toy_tasks/run_tasks_multiple_inputs_test.py (3)
cognee/modules/pipelines/tasks/Task.py (4)
  • Task (37-114)
  • TaskConfig (32-34)
  • TaskExecutionInfo (27-29)
  • run (48-114)
cognee/modules/pipelines/operations/run_tasks_base.py (1)
  • run_tasks_base (12-63)
cognee/tests/integration/run_toy_tasks/run_tasks_simple_test.py (5)
  • run_and_check_tasks (7-33)
  • number_generator (8-10)
  • add_one (12-13)
  • multiply_by_two (15-16)
  • test_run_tasks (36-37)
cognee/tests/integration/run_toy_tasks/run_tasks_simple_test.py (4)
cognee/modules/pipelines/tasks/Task.py (4)
  • Task (37-114)
  • TaskConfig (32-34)
  • TaskExecutionInfo (27-29)
  • run (48-114)
cognee/modules/pipelines/operations/run_tasks_base.py (1)
  • run_tasks_base (12-63)
cognee/tests/integration/run_toy_tasks/run_tasks_multiple_inputs_test.py (5)
  • run_and_check_tasks (7-38)
  • number_generator (8-10)
  • add_one (12-13)
  • multiply_by_two (18-20)
  • test_run_tasks (41-42)
cognee/tests/integration/run_toy_tasks/run_tasks_broadcast_test.py (5)
  • run_and_check_tasks (7-33)
  • number_generator (8-10)
  • add_one (12-13)
  • multiply_by_two (15-16)
  • test_run_tasks (36-37)
🪛 Ruff (0.8.2)
cognee/modules/pipelines/tasks/__init__.py

2-2: .Task.Task imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


3-3: .Task.TaskConfig imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


4-4: .Task.TaskExecutionInfo imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


5-5: .Task.TaskExecutionCompleted imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


6-6: .Task.TaskExecutionStarted imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


7-7: .Task.TaskExecutionErrored imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)

cognee/modules/pipelines/__init__.py

3-3: .operations.needs.merge_needs imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)


3-3: .operations.needs.MergeNeeds imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)

cognee/tests/integration/run_toy_tasks/run_tasks_broadcast_test.py

8-8: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


12-12: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


15-15: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

cognee/tests/integration/run_toy_tasks/run_tasks_circular_test.py

10-10: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


14-14: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


17-17: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

cognee/tests/integration/run_toy_tasks/run_tasks_multiple_inputs_test.py

8-8: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


12-12: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


15-15: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


18-18: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

cognee/tests/integration/run_toy_tasks/run_tasks_simple_test.py

8-8: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


12-12: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


15-15: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

⏰ Context from checks skipped due to timeout of 90000ms (33)
  • GitHub Check: run_notebook_test / test
  • GitHub Check: test
  • GitHub Check: run_simple_example_test / test
  • GitHub Check: run_eval_framework_test / test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: run_networkx_metrics_test / test
  • GitHub Check: run_multimedia_example_test / test
  • GitHub Check: Test on macos-15
  • GitHub Check: test
  • GitHub Check: Test on macos-13
  • GitHub Check: Test on macos-15
  • GitHub Check: run_notebook_test / test
  • GitHub Check: run_simple_example_test / test
  • GitHub Check: run_dynamic_steps_example_test / test
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: Test on macos-13
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: test
  • GitHub Check: chromadb test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: Test on macos-15
  • GitHub Check: Test on macos-13
  • GitHub Check: test
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: Test cognee server start
  • GitHub Check: test
  • GitHub Check: windows-latest
  • GitHub Check: run_simple_example_test
  • GitHub Check: docker-compose-test
🔇 Additional comments (21)
cognee/tests/integration/run_toy_tasks/run_tasks_circular_test.py (1)

10-18: Fix mutable default arguments

Using mutable objects (like empty dictionaries) as default function parameters can lead to unexpected behavior if the default is modified between function calls.

-    def number_generator(num, context={}):
+    def number_generator(num, context=None):
+        context = context or {}
         for i in range(num):
             yield i + 1

-    async def add_one(num, context={}):
+    async def add_one(num, context=None):
+        context = context or {}
         yield num + 1

-    async def multiply_by_two(num, context={}):
+    async def multiply_by_two(num, context=None):
+        context = context or {}
         yield num * 2
🧰 Tools
🪛 Ruff (0.8.2)

10-10: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


14-14: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


17-17: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

cognee/tests/integration/run_toy_tasks/run_tasks_simple_test.py (4)

8-10: Fix mutable default argument.

Using a mutable data structure ({}) as a default parameter value can lead to unexpected behavior, as the default value is evaluated only once at function definition time.

-def number_generator(num, context={}):
+def number_generator(num, context=None):
+    if context is None:
+        context = {}
     for i in range(num):
         yield i + 1
🧰 Tools
🪛 Ruff (0.8.2)

8-8: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


12-13: Fix mutable default argument.

Using a mutable data structure ({}) as a default parameter value can lead to unexpected behavior in asynchronous contexts.

-async def add_one(num, context={}):
+async def add_one(num, context=None):
+    if context is None:
+        context = {}
     yield num + 1
🧰 Tools
🪛 Ruff (0.8.2)

12-12: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


15-16: Fix mutable default argument.

Using a mutable data structure ({}) as a default parameter value is an anti-pattern in Python.

-async def multiply_by_two(num, context={}):
+async def multiply_by_two(num, context=None):
+    if context is None:
+        context = {}
     yield num * 2
🧰 Tools
🪛 Ruff (0.8.2)

15-15: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


19-19:

Details

✅ Verification successful

Verify the expected results for the multiply_by_two function.

The expected result for the last element (22) doesn't match the calculation logic based on the task dependencies. Since multiply_by_two depends on add_one, the last result should be (11 + 1) * 2 = 24.

-    expected_results = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 22]
+    expected_results = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 24]

Run the following script to verify:


🏁 Script executed:

#!/bin/bash
# Calculate the expected result manually
echo "Last number from number_generator: 11"
echo "After add_one: 11 + 1 = 12"
echo "After multiply_by_two: 12 * 2 = 24"
echo "But expected_results has 22 at the last position"

# Find other occurrences of this pattern in the codebase
rg -A 1 "expected_results.*22" --type py

Length of output: 988


Update Expected Result in Task Integration Tests

The manual calculation confirms that starting with 11, applying add_one gives 12, and then multiply_by_two correctly computes 24—not 22. Please update the expected result accordingly in the test file.

  • In cognee/tests/integration/run_toy_tasks/run_tasks_simple_test.py, change the last element from 22 to 24.
  • (Optionally verify that similar changes are not also needed in run_tasks_circular_test.py if they share the same logic.)
-    expected_results = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 22]
+    expected_results = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 24]
cognee/tests/integration/run_toy_tasks/run_tasks_broadcast_test.py (4)

8-10: Fix mutable default argument.

Using a mutable data structure ({}) as a default parameter value can lead to unexpected behavior, as the default value is evaluated only once at function definition time.

-def number_generator(num, context={}):
+def number_generator(num, context=None):
+    if context is None:
+        context = {}
     for i in range(num):
         yield i + 1
🧰 Tools
🪛 Ruff (0.8.2)

8-8: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


12-13: Fix mutable default argument.

Using a mutable data structure ({}) as a default parameter value is problematic in asynchronous functions.

-async def add_one(num, context={}):
+async def add_one(num, context=None):
+    if context is None:
+        context = {}
     yield num + 1
🧰 Tools
🪛 Ruff (0.8.2)

12-12: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


15-16: Fix mutable default argument.

Using a mutable data structure ({}) as a default parameter value can cause hard-to-debug issues.

-async def multiply_by_two(num, context={}):
+async def multiply_by_two(num, context=None):
+    if context is None:
+        context = {}
     yield num * 2
🧰 Tools
🪛 Ruff (0.8.2)

15-15: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


19-33: The test correctly validates the broadcast dependency pattern.

This test demonstrates the broadcast pattern where multiple tasks depend on the same source task (number_generator). The expected results correctly reflect that multiply_by_two operates directly on the output of number_generator rather than depending on add_one.

cognee/modules/pipelines/operations/needs.py (3)

7-9: Clear model for merging task needs.

The MergeNeeds class provides a clean way to represent merged dependencies.


11-13: Well-designed helper function for combining task dependencies.

The merge_needs function provides a convenient API for combining multiple task dependencies.


15-25: Effective task needs resolution.

The function correctly handles both regular tasks and MergeNeeds instances, aggregating them into a single list of input tasks.

cognee/tests/integration/run_toy_tasks/run_tasks_multiple_inputs_test.py (5)

8-10: Fix mutable default argument.

Using a mutable data structure ({}) as a default parameter value can lead to unexpected behavior, as the default value is evaluated only once at function definition time.

-def number_generator(num, context={}):
+def number_generator(num, context=None):
+    if context is None:
+        context = {}
     for i in range(num):
         yield i + 1
🧰 Tools
🪛 Ruff (0.8.2)

8-8: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


12-13: Fix mutable default argument.

Using a mutable data structure ({}) as a default parameter value can cause shared state issues across function calls.

-async def add_one(num, context={}):
+async def add_one(num, context=None):
+    if context is None:
+        context = {}
     yield num + 1
🧰 Tools
🪛 Ruff (0.8.2)

12-12: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


15-16: Fix mutable default argument.

Using a mutable data structure ({}) as a default parameter value can lead to unexpected behavior.

-async def add_two(num, context={}):
+async def add_two(num, context=None):
+    if context is None:
+        context = {}
     yield num + 2
🧰 Tools
🪛 Ruff (0.8.2)

15-15: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


18-20: Fix mutable default argument.

Using a mutable data structure ({}) as a default parameter value in a function that takes multiple inputs can be especially problematic.

-async def multiply_by_two(num1, num2, context={}):
+async def multiply_by_two(num1, num2, context=None):
+    if context is None:
+        context = {}
     yield num1 * 2
     yield num2 * 2
🧰 Tools
🪛 Ruff (0.8.2)

18-18: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


22-38: Well-structured test for multiple input task dependencies.

This test effectively demonstrates how a task can receive inputs from multiple upstream tasks. The test validates the correct execution order and result handling with multiple dependencies.

The expected results correctly reflect:

  • 10 items from number_generator (1-10)
  • 1 item from add_one (11)
  • 1 item from add_two (12)
  • 2 items from multiply_by_two (22, 24)
cognee/api/v1/cognify/code_graph_pipeline.py (3)

4-4: Imports look correct.
The newly imported merge_needs and TaskConfig are consistent with your usage below. No issues found.

Also applies to: 13-13


96-97: Consistent variable naming.
Renaming run_status to run_info clarifies that it carries broader execution information. Good improvement!

Also applies to: 99-100


61-88:

Details

❓ Verification inconclusive

Verify the correctness of the new task dependencies.
A chain of tasks depends on each other (e.g., [get_non_py_files], [ingest_data], etc.). Ensure that the final pipeline accounts for any tasks that must be executed in parallel or that require additional data.

Please run a local test of run_code_graph_pipeline with an example repository containing both Python and non-Python files to confirm that these dependencies chain together as expected.


🏁 Script executed:

#!/bin/bash
# We'll look for logs or yields from each new task in the pipeline.
ast-grep --pattern $'Task($_, task_config=TaskConfig(needs: $_))' | rg 'def run_code_graph_pipeline'

Length of output: 101


Action: Manually Verify the Task Dependency Chain in run_code_graph_pipeline

The automated check did not yield any output to confirm the expected dependency chain. Please run a manual test of run_code_graph_pipeline with a sample repository containing both Python and non-Python files. In particular, verify that:

  • The task dependencies are correctly chained (e.g., get_non_py_filesingest_dataclassify_documentsextract_chunks_from_documents).
  • Tasks that should run in parallel or reassemble outputs (e.g., the merged needs in add_data_points) function as expected.
  • All tasks emit appropriate logs or outputs confirming their execution order.

Once you’ve manually verified the behavior and ensured that the tasks execute in the intended order, no further changes are needed.

cognee/modules/pipelines/operations/run_tasks.py (1)

42-49: Telemetry handling is well-structured.
Collecting events for task start, completion, and error states cleanly integrates with the Task event model. This modular approach is easy to extend for additional logging or analytics.

Also applies to: 52-60, 62-79

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
cognee/eval_framework/corpus_builder/task_getters/get_cascade_graph_tasks.py (2)

32-38: Permissions pipeline check
Chaining classify_documents before check_permissions_on_documents is logical. Ensure you have tests that validate how permissions affect downstream tasks.


39-43: Output batch size
Setting output_batch_size=50 can be appropriate but might need tuning for different scenarios. If this value could vary significantly, consider making it a config parameter.

notebooks/hr_demo.ipynb (1)

422-426: Consider validating max_chunk_size.

Using get_max_chunk_tokens() for max_chunk_size is appropriate. However, consider handling unexpected or error values (e.g., negative or zero) that could lead to pipeline errors.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e947257 and 3bbbe72.

📒 Files selected for processing (6)
  • cognee/api/v1/add/add.py (2 hunks)
  • cognee/api/v1/cognify/code_graph_pipeline.py (2 hunks)
  • cognee/eval_framework/corpus_builder/task_getters/get_cascade_graph_tasks.py (2 hunks)
  • notebooks/cognee_code_graph_demo.ipynb (1 hunks)
  • notebooks/cognee_demo.ipynb (4 hunks)
  • notebooks/hr_demo.ipynb (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • cognee/api/v1/add/add.py
  • cognee/api/v1/cognify/code_graph_pipeline.py
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: Test on macos-13
  • GitHub Check: Test on macos-13
  • GitHub Check: windows-latest
  • GitHub Check: run_simple_example_test
🔇 Additional comments (18)
notebooks/cognee_code_graph_demo.ipynb (2)

77-77: Improved import statement for better module organization

The import statement has been updated to import both Task and TaskConfig from the same module, which aligns with the new pipeline task configuration approach being introduced in this PR.


83-83: Enhanced task configuration with explicit dependencies

The task configuration has been significantly improved by:

  1. Using the structured TaskConfig class instead of a raw dictionary
  2. Explicitly defining the dependency on get_repo_file_dependencies using the needs parameter
  3. Renaming the parameter from batch_size to output_batch_size for better clarity

This change makes the task pipeline more maintainable as dependencies are now explicitly declared rather than being implicit.

notebooks/cognee_demo.ipynb (7)

522-523: Updated imports support the new task configuration system.

The imports have been updated to include TaskConfig and merge_needs, which are essential components of the new pipeline inputs configuration system introduced in this PR.


543-547: Task configuration enhanced with explicit dependencies and batch processing.

The extract_chunks_from_documents task now uses TaskConfig to explicitly define its dependency on classify_documents and sets an output batch size of 10, which should improve processing efficiency for larger document sets.


548-552: Knowledge graph extraction now has explicit dependency configuration.

The task configuration for extract_graph_from_data clearly defines its dependency on the output from extract_chunks_from_documents, making the pipeline flow more explicit and maintainable.


553-557: Text summarization task now has explicit dependency configuration.

The summarization task now properly defines its dependency on the text chunks extracted earlier in the pipeline using the new TaskConfig class.


558-561: Data points aggregation uses merged dependencies.

The add_data_points task now explicitly requires the outputs from both the summarization and knowledge graph extraction tasks through the merge_needs function, demonstrating the new capability to combine multiple task outputs.


1050-1050: Notebook kernel display name updated.

The kernel display name has been changed from "py312" to ".venv", likely to reflect a change in the development environment setup.


1064-1064: Python version downgraded.

The Python version has been changed from 3.12.8 to 3.11.8. This might be to ensure compatibility with certain dependencies or to align with the project's overall Python version requirements.

Is this version change intentional? Please confirm that all dependencies work correctly with Python 3.11.8.

cognee/eval_framework/corpus_builder/task_getters/get_cascade_graph_tasks.py (5)

5-6: New imports introduced
The usage of merge_needs for combining dependencies and TaskConfig for specifying task requirements is clear and consistent with pipeline best practices.


30-31: Configuration usage
Initializing cognee_config via get_cognify_config() keeps the approach flexible and centralized. No issues identified with this pattern.


44-47: Order of graph extraction
Defining extract_graph_from_data to run after chunk extraction ensures the data is properly pre-processed. Implementation looks good.


48-52: Summarization model assignment
Referencing cognee_config.summarization_model keeps everything flexible. Just confirm the selected model is installed and compatible across all environments.


53-56: Possible nested list in dependencies
merge_needs(summarize_text, extract_graph_from_data) likely returns a list. Wrapping it with [ ... ] could create a nested list, potentially causing pipeline resolution issues. Consider removing the extra brackets.

- task_config=TaskConfig(needs=[merge_needs(summarize_text, extract_graph_from_data)]),
+ task_config=TaskConfig(needs=merge_needs(summarize_text, extract_graph_from_data)),
notebooks/hr_demo.ipynb (4)

401-402: Imports for new task configuration look correct.

These references to TaskConfig and merge_needs are valid and properly used later in the pipeline. No issues found.


427-431: Graph extraction dependency is well-defined.

extract_chunks_from_documents as a prerequisite aligns with the requirement of chunk-based graph generation. Implementation is consistent with the pipeline design.


435-436: Summarization dependency is logically sound.

Depending on chunk extraction ensures summaries only occur after the text has been segmented. Overall looks good.


437-439:

Details

❓ Verification inconclusive

Confirm merged task prerequisites are intentional.

By merging dependencies on both summarize_text and extract_graph_from_data, this final task ensures that both tasks complete before data points are added. Verify that concurrency or ordering meets your design expectations.


🏁 Script executed:

#!/bin/bash
# This script demonstrates how you might verify the final dependency merge logic in practice.
# Run existing pipeline or test suite to ensure that `summarize_text` and `extract_graph_from_data`
# are both completed before `add_data_points` is invoked.

echo "Running pipeline tests..."
# If a test script exists (e.g., `test_pipeline.py`), we’d run it:
# python -m unittest tests/test_pipeline.py
# (Substitute above with your actual test commands)

Length of output: 60


Action Required: Confirm pipeline dependency ordering for merged tasks

The code merging dependencies on both summarize_text and extract_graph_from_data for the add_data_points task appears intentional. However, the current verification script only prints a message without actually validating that both tasks complete before add_data_points is invoked. Please ensure that your test suite includes assertions to confirm that the dependency merge works as expected (i.e., that both prerequisite tasks finish execution prior to add_data_points). If needed, consider enhancing the tests to simulate the concurrent/task ordering scenario to verify your design intent.

@borisarzentar borisarzentar changed the title feat: pipeline inputs config feat: enable pipeline task needs mapping Apr 2, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
cognee/modules/pipelines/tasks/Task.py (1)

90-115: Consider refactoring duplicate generator handling code.

The code for handling async generators (lines 90-115) is almost identical to the code for handling regular generators (lines 63-88). Consider refactoring to reduce duplication.

You could extract the common logic into a separate method that takes a generator/async generator and handles the batching, with a parameter indicating whether it's async or not.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3bbbe72 and 2bcaefc.

📒 Files selected for processing (9)
  • cognee/modules/pipelines/exceptions.py (1 hunks)
  • cognee/modules/pipelines/operations/run_tasks.py (3 hunks)
  • cognee/modules/pipelines/tasks/Task.py (1 hunks)
  • cognee/modules/pipelines/tasks/types.py (1 hunks)
  • cognee/tasks/storage/add_data_points.py (2 hunks)
  • cognee/tests/integration/run_toy_tasks/run_tasks_broadcast_test.py (1 hunks)
  • cognee/tests/integration/run_toy_tasks/run_tasks_circular_test.py (1 hunks)
  • cognee/tests/integration/run_toy_tasks/run_tasks_multiple_inputs_test.py (1 hunks)
  • cognee/tests/integration/run_toy_tasks/run_tasks_simple_test.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • cognee/tasks/storage/add_data_points.py
  • cognee/tests/integration/run_toy_tasks/run_tasks_broadcast_test.py
  • cognee/tests/integration/run_toy_tasks/run_tasks_circular_test.py
  • cognee/tests/integration/run_toy_tasks/run_tasks_simple_test.py
  • cognee/tests/integration/run_toy_tasks/run_tasks_multiple_inputs_test.py
🧰 Additional context used
🧬 Code Definitions (1)
cognee/modules/pipelines/tasks/Task.py (2)
cognee/modules/pipelines/operations/needs.py (1)
  • MergeNeeds (7-8)
cognee/modules/pipelines/exceptions.py (1)
  • TaskExecutionException (5-8)
⏰ Context from checks skipped due to timeout of 90000ms (31)
  • GitHub Check: run_notebook_test / test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: Test on macos-15
  • GitHub Check: run_dynamic_steps_example_test / test
  • GitHub Check: test
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: run_simple_example_test / test
  • GitHub Check: Test on macos-13
  • GitHub Check: Test on macos-15
  • GitHub Check: run_multimedia_example_test / test
  • GitHub Check: test
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: Test on macos-15
  • GitHub Check: test
  • GitHub Check: Test on macos-13
  • GitHub Check: test
  • GitHub Check: windows-latest
  • GitHub Check: test
  • GitHub Check: run_eval_framework_test / test
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: Test on macos-13
  • GitHub Check: run_notebook_test / test
  • GitHub Check: run_networkx_metrics_test / test
  • GitHub Check: run_simple_example_test / test
  • GitHub Check: run_simple_example_test
  • GitHub Check: docker-compose-test
  • GitHub Check: Build Cognee Backend Docker App Image
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: chromadb test
  • GitHub Check: test
🔇 Additional comments (4)
cognee/modules/pipelines/tasks/types.py (1)

1-9: Well-structured type alias definition for task executables.

The TaskExecutable type alias is well-defined, covering all the necessary function types that can be used in the pipeline system: regular functions, async functions, async generators, and standard generators. This provides strong typing support for the task execution system.

cognee/modules/pipelines/operations/run_tasks.py (1)

41-79: Enhanced telemetry with task execution state handling.

The task execution telemetry has been improved to handle different task execution states (started, completed, errored) which provides more granular tracking. This is a good enhancement that will help with monitoring and debugging pipeline executions.

cognee/modules/pipelines/tasks/Task.py (2)

63-88: Well-implemented batch handling for generators.

The batch handling code for generators correctly handles the case when the final batch is incomplete. This is good attention to detail that ensures all results are properly yielded even if they don't fill a complete batch.


32-34: Well-designed TaskConfig class for task configuration.

The TaskConfig class provides a clean, structured way to configure tasks with output batch sizes and dependencies. This is a significant improvement over using a raw dictionary for configuration.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
cognee/modules/pipelines/tasks/Task.py (1)

60-90: Consider reducing duplication in generator handling.

There's significant code duplication between the generator handling (lines 67-88) and async generator handling (lines 94-115). Consider extracting common logic to a separate method to reduce duplication and improve maintainability.

async def _process_results(self, results, is_async=False):
    task_result = []
    end_result = []
    
    async def process_value(value):
        task_result.append(value)
        end_result.append(value)
        
        if self.task_config.output_batch_size == 1:
            yield TaskExecutionInfo(
                result=value,
                task=self.executable,
            )
        elif self.task_config.output_batch_size == len(task_result):
            yield TaskExecutionInfo(
                result=task_result.copy(),
                task=self.executable,
            )
            task_result.clear()  # Reset for the next batch
    
    if is_async:
        async for value in results:
            async for result in process_value(value):
                yield result
    else:
        for value in results:
            async for result in process_value(value):
                yield result
    
    # Yield any remaining items in the final batch if it's not empty
    if task_result and self.task_config.output_batch_size > 1:
        yield TaskExecutionInfo(
            result=task_result,
            task=self.executable,
        )
    
    return end_result

Then use it like:

if inspect.isgeneratorfunction(self.executable):  # Generator
    generator = self.executable(*combined_args, **combined_kwargs)
    end_result = await self._process_results(generator, is_async=False)
elif inspect.isasyncgenfunction(self.executable):  # Async Generator
    generator = self.executable(*combined_args, **combined_kwargs)
    end_result = await self._process_results(generator, is_async=True)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2bcaefc and 58fb85f.

📒 Files selected for processing (3)
  • cognee/modules/pipelines/exceptions.py (1 hunks)
  • cognee/modules/pipelines/operations/run_tasks.py (3 hunks)
  • cognee/modules/pipelines/tasks/Task.py (1 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
cognee/modules/pipelines/tasks/Task.py (2)
cognee/modules/pipelines/operations/needs.py (1)
  • MergeNeeds (7-8)
cognee/modules/pipelines/exceptions.py (1)
  • TaskExecutionException (9-18)
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: chromadb test
  • GitHub Check: Test on macos-15
  • GitHub Check: run_notebook_test / test
  • GitHub Check: Test on macos-13
  • GitHub Check: run_notebook_test / test
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: Test on macos-15
  • GitHub Check: Test on macos-15
  • GitHub Check: Test on macos-13
  • GitHub Check: Test on macos-13
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: test
  • GitHub Check: windows-latest
  • GitHub Check: docker-compose-test
  • GitHub Check: run_simple_example_test
🔇 Additional comments (14)
cognee/modules/pipelines/exceptions.py (2)

1-7: Properly implemented exception class with initialization.

The WrongTaskOrderException class correctly initializes the message attribute and calls the parent class constructor.


9-19: Comprehensive exception capturing task execution details.

The TaskExecutionException properly captures error type, message, and traceback information, enabling detailed error reporting in task execution flows.

cognee/modules/pipelines/operations/run_tasks.py (5)

15-16: Good structural updates with proper imports.

The imports have been updated to include the new task execution state classes and the refactored run_tasks_base function.


21-23: API signature improved with context parameter.

The function now accepts an optional context dictionary parameter, improving flexibility and aligning with modern dependency injection patterns.


41-79: Enhanced telemetry with structured task execution states.

The updated telemetry implementation uses the new task execution state classes to provide more structured and consistent telemetry data. The async iteration over run_tasks_base results is a good pattern for event-driven processing.


108-114: Dataset ID handling is now consistent with function signature.

The update to make dataset_id optional in the function signature and then defaulting it to uuid4() if it's None resolves the inconsistency mentioned in previous reviews.


125-126: Updated function call with context parameter.

The call to run_tasks_with_telemetry has been updated to include the new context parameter, maintaining consistency with the updated function signature.

cognee/modules/pipelines/tasks/Task.py (7)

1-9: Good imports for the new task execution framework.

The imports have been updated to include necessary modules for the new task execution state classes and exception handling.


11-24: Well-structured task execution state classes.

The introduction of dedicated classes for different task execution states (TaskExecutionStarted, TaskExecutionCompleted, TaskExecutionErrored) improves type safety and provides clear semantics for the task execution lifecycle.

Note that TaskExecutionErrored correctly uses the TaskExecutionException type for the error field, resolving a type mismatch issue mentioned in previous reviews.


27-34: Good task information and configuration classes.

The new TaskExecutionInfo and TaskConfig classes provide structured ways to represent task information and configuration options. The output_batch_size parameter in TaskConfig allows for flexible batching of results.


41-47: Constructor updated to use TaskConfig.

The initialization has been improved to use the new TaskConfig class, providing better structure for task configuration options.


48-58: Enhanced run method with async support and execution state tracking.

The run method is now async and yields structured task execution states, starting with TaskExecutionStarted to signal the beginning of task execution.


59-118: Comprehensive support for different callable types.

The updated run method now handles different types of callables (async functions, generators, async generators, regular functions) with appropriate logic for each. The implementation includes proper batching of results based on the output_batch_size configuration.

The code also addresses the previously identified issue with incomplete final batches by including explicit handling for remaining items when output_batch_size > 1 (lines 83-88 for generators and 110-115 for async generators).


119-136: Improved error handling with structured error information.

The error handling has been enhanced to capture detailed error information including type, message, and traceback using the TaskExecutionException class. This provides more context for debugging task execution failures.

@borisarzentar borisarzentar changed the title feat: enable pipeline task needs mapping feat: pipeline tasks needs mapping Apr 2, 2025
),
Task(
add_data_points,
task_config=TaskConfig(needs=[merge_needs(summarize_text, extract_graph_from_data)]),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have merge_needs to be done automatically in TaskConfig if more than one task is needed?

It's not user friendly I think most would expect for this to work with just having a list of tasks

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's a good idea actually

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is optional. If you don't provide it, inputs will just go directly in the task, in the order they are in the list. But in some cases you don't want inputs to be passed that way, so this merge_needs is for that. You use it if you want to change the data from needs before they enter the task.

pipeline_name: str = "unknown_pipeline",
context: dict = None,
):
dataset_id = dataset_id or uuid4()
Copy link
Collaborator

@dexters1 dexters1 Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently dataset_id is defined in the following way during the add pipeline:

id=uuid5(NAMESPACE_OID, f"{dataset_name}{str(owner_id)}")

We might need to reconsider how the dataset_id is determined and have it be uniform.

It's created like this so that a user cant create a new dataset with the same name, among other reasons

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree on modal I used the hash with UUID5 so would be nice to keep the same and consistent doc hashes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't send dataset_id in rare cases, so then we create one. dataset_id should be always present, but for some demos and examples we omit it.


async def main():
async for run_status in run_code_graph_pipeline("REPO_PATH"):
async for run_status in run_code_graph_pipeline("/Users/borisarzentar/Projects/graphrag"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You left the hardcoded path here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

),
Task(
add_data_points,
task_config=TaskConfig(needs=[merge_needs(summarize_text, extract_graph_from_data)]),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's a good idea actually

pipeline_name: str = "unknown_pipeline",
context: dict = None,
):
dataset_id = dataset_id or uuid4()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree on modal I used the hash with UUID5 so would be nice to keep the same and consistent doc hashes.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
cognee/modules/pipelines/operations/run_tasks_base.py (4)

12-12: Function signature includes unused context parameter.

The function signature includes a context parameter, but it's never used within the function body. Consider either using this parameter or removing it if it's not needed.


29-30: Fix typo in variable name.

The variable name dependendent_task has a typo and should be dependent_task.

- for dependendent_task in task_graph[task.executable]:
-     dependents.setdefault(dependendent_task, []).append(task.executable)
+ for dependent_task in task_graph[task.executable]:
+     dependents.setdefault(dependent_task, []).append(task.executable)

55-58: Fix typo in variable name throughout the function.

The variable name dependendent_task is misspelled throughout the function.

- for dependendent_task in dependents.get(task.executable, []):
-     task_graph[dependendent_task].remove(task.executable)  # Mark dependency as resolved
-     if not task_graph[dependendent_task]:  # If all dependencies resolved, add to queue
-         ready_queue.append(tasks_map[dependendent_task])
+ for dependent_task in dependents.get(task.executable, []):
+     task_graph[dependent_task].remove(task.executable)  # Mark dependency as resolved
+     if not task_graph[dependent_task]:  # If all dependencies resolved, add to queue
+         ready_queue.append(tasks_map[dependent_task])

62-62: Fix typo in error message.

The error message has a typo: "disconneted" should be "disconnected".

- f"{number_of_executed_tasks}/{len(tasks)} tasks executed. You likely have some disconneted tasks or circular dependency."
+ f"{number_of_executed_tasks}/{len(tasks)} tasks executed. You likely have some disconnected tasks or circular dependency."
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 58fb85f and a1c5619.

📒 Files selected for processing (2)
  • cognee/api/v1/cognify/code_graph_pipeline.py (2 hunks)
  • cognee/modules/pipelines/operations/run_tasks_base.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • cognee/api/v1/cognify/code_graph_pipeline.py
🧰 Additional context used
🧬 Code Definitions (1)
cognee/modules/pipelines/operations/run_tasks_base.py (2)
cognee/modules/pipelines/operations/needs.py (2)
  • get_need_task_results (27-60)
  • get_task_needs (15-24)
cognee/modules/pipelines/tasks/Task.py (4)
  • Task (37-136)
  • TaskExecutionCompleted (15-17)
  • TaskExecutionInfo (27-29)
  • run (48-136)
⏰ Context from checks skipped due to timeout of 90000ms (34)
  • GitHub Check: run_dynamic_steps_example_test / test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: Test on macos-15
  • GitHub Check: run_simple_example_test / test
  • GitHub Check: run_simple_example_test / test
  • GitHub Check: Test on macos-15
  • GitHub Check: run_notebook_test / test
  • GitHub Check: Test on macos-13
  • GitHub Check: run_notebook_test / test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: run_networkx_metrics_test / test
  • GitHub Check: run_eval_framework_test / test
  • GitHub Check: run_multimedia_example_test / test
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: chromadb test
  • GitHub Check: windows-latest
  • GitHub Check: Test cognee server start
  • GitHub Check: Test on macos-13
  • GitHub Check: test
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: Test on macos-15
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: Test on macos-13
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: docker-compose-test
  • GitHub Check: test
  • GitHub Check: Build Cognee Backend Docker App Image
  • GitHub Check: run_simple_example_test
🔇 Additional comments (4)
cognee/modules/pipelines/operations/run_tasks_base.py (4)

24-24: Potential collision risk in tasks_map.

If multiple tasks use the same task.executable, tasks_map will overwrite the earlier occurrence. This might lead to unexpected dependencies or missed tasks. Consider detecting or handling collisions more explicitly.

- tasks_map = {task.executable: task for task in tasks}
+ tasks_map = {}
+ for task in tasks:
+     if task.executable in tasks_map:
+         # Handle collision: e.g., raise an exception or generate a unique key
+         raise ValueError(f"Duplicate task.executable found: {task.executable}")
+     tasks_map[task.executable] = task

33-33: Well-implemented task dependency resolution.

Good approach for identifying tasks without dependencies and adding them to the ready queue. This ensures tasks are executed in the correct order based on their dependencies.


42-52: Nice implementation of asynchronous task execution with continuous result updates.

The code efficiently handles asynchronous task execution while continuously updating results as they become available. The use of different task execution information types allows for granular control over result handling.


60-63: Good final verification of task execution completeness.

The code properly verifies that all tasks were executed, throwing a WrongTaskOrderException with a clear message if there are issues. This helps identify potential problems like circular dependencies or disconnected tasks in the dependency graph.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
cognee/modules/pipelines/operations/run_tasks_base.py (1)

12-12: Consider adding a docstring describing the context parameter.

Although the signature includes a context argument, its usage is not clearly specified. Adding a short docstring or comment clarifying how context might be used or passed along can improve maintainability.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a1c5619 and e80158d.

📒 Files selected for processing (2)
  • cognee/modules/pipelines/operations/run_tasks_base.py (1 hunks)
  • cognee/tests/integration/run_toy_tasks/run_tasks_circular_test.py (1 hunks)
🧰 Additional context used
🧬 Code Definitions (2)
cognee/tests/integration/run_toy_tasks/run_tasks_circular_test.py (3)
cognee/modules/pipelines/tasks/Task.py (4)
  • Task (37-136)
  • TaskConfig (32-34)
  • TaskExecutionInfo (27-29)
  • run (48-136)
cognee/modules/pipelines/exceptions.py (1)
  • WrongTaskOrderException (1-6)
cognee/modules/pipelines/operations/run_tasks_base.py (1)
  • run_tasks_base (12-63)
cognee/modules/pipelines/operations/run_tasks_base.py (2)
cognee/modules/pipelines/operations/needs.py (2)
  • get_need_task_results (27-60)
  • get_task_needs (15-24)
cognee/modules/pipelines/tasks/Task.py (4)
  • Task (37-136)
  • TaskExecutionCompleted (15-17)
  • TaskExecutionInfo (27-29)
  • run (48-136)
⏰ Context from checks skipped due to timeout of 90000ms (32)
  • GitHub Check: Test on macos-15
  • GitHub Check: Test on macos-13
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: run_dynamic_steps_example_test / test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: Test on macos-15
  • GitHub Check: run_notebook_test / test
  • GitHub Check: run_simple_example_test / test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: run_notebook_test / test
  • GitHub Check: Test on macos-13
  • GitHub Check: run_multimedia_example_test / test
  • GitHub Check: run_eval_framework_test / test
  • GitHub Check: run_networkx_metrics_test / test
  • GitHub Check: run_simple_example_test / test
  • GitHub Check: Test on macos-15
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: Test on macos-13
  • GitHub Check: chromadb test
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: windows-latest
  • GitHub Check: Test on ubuntu-22.04
  • GitHub Check: test
  • GitHub Check: test
  • GitHub Check: docker-compose-test
  • GitHub Check: Build Cognee Backend Docker App Image
  • GitHub Check: run_simple_example_test
🔇 Additional comments (3)
cognee/tests/integration/run_toy_tasks/run_tasks_circular_test.py (2)

42-47: Good approach for local test invocation.

The if __name__ == "__main__": guard allows running the test standalone. This is helpful for quick debugging and doesn’t conflict with the standard pytest flow.


9-21:

Details

❓ Verification inconclusive

Validate the length of the expected_results array vs. generator outputs.

This test sets expected_results to 12 items, but the first task is a generator yielding 10 values, and a circular dependency condition should prevent the second and third tasks from fully running. Please confirm that the pipeline yields enough values before raising the exception (and doesn’t cause out-of-bounds access).


🏁 Script executed:

#!/bin/bash
# Description: Check how many times the pipeline yields before raising the exception.
# We'll search for 'yield' statements or logs from the tasks to confirm the actual yield count.

rg 'yield ' -A 1

Length of output: 11483


Action Required: Confirm Pipeline Output Length in Circular Task Test

The test in cognee/tests/integration/run_toy_tasks/run_tasks_circular_test.py sets an expected_results array with 12 items while the number_generator() yields 10 values and each async task (i.e. add_one() and multiply_by_two()) is expected to yield one additional value. Please verify that the pipeline actually produces these 12 outputs—even under the circular dependency condition where downstream tasks might be pre-empted—so that no out-of-bounds access occurs when indexing the results.

  • Double-check that the generator yields exactly 10 items.
  • Ensure that the async tasks yield their expected single values.
  • Confirm that the pipeline handles the circular dependency by stopping further processing only after collecting all necessary yield values.
cognee/modules/pipelines/operations/run_tasks_base.py (1)

24-24: Potential collision risk in tasks_map.

As mentioned in a previous review, if multiple tasks share the same task.executable, this dictionary comprehension overwrites earlier entries, leading to unexpected behavior. Consider raising an exception or handling duplicates explicitly.

@hajdul88 hajdul88 self-requested a review April 3, 2025 07:47
@borisarzentar borisarzentar merged commit 0ce6fad into dev Apr 3, 2025
41 of 42 checks passed
@borisarzentar borisarzentar deleted the feat/pipeline-inputs branch April 3, 2025 08:53
borisarzentar added a commit that referenced this pull request Apr 10, 2025
borisarzentar added a commit that referenced this pull request Apr 10, 2025
Reverts #690

I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
borisarzentar added a commit that referenced this pull request Apr 10, 2025
Reverts #690

I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants