From 573b956a8a582f2422b6f2bc59b3bb2bc4305b3e Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 5 Aug 2025 12:44:34 +0200 Subject: [PATCH 1/2] fix: Resolve issue with low level pipeline example --- examples/low_level/pipeline.py | 32 +++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/examples/low_level/pipeline.py b/examples/low_level/pipeline.py index 47f8294f0f..3f097f0a1d 100644 --- a/examples/low_level/pipeline.py +++ b/examples/low_level/pipeline.py @@ -1,9 +1,12 @@ import os import json import asyncio +from typing import List, Any from cognee import prune from cognee import visualize_graph from cognee.low_level import setup, DataPoint +from cognee.modules.data.methods import load_or_create_datasets +from cognee.modules.users.methods import get_default_user from cognee.pipelines import run_tasks, Task from cognee.tasks.storage import add_data_points @@ -27,12 +30,9 @@ class Company(DataPoint): is_type: CompanyType -def ingest_files(): - companies_file_path = os.path.join(os.path.dirname(__file__), "companies.json") - companies = json.loads(open(companies_file_path, "r").read()) - - people_file_path = os.path.join(os.path.dirname(__file__), "people.json") - people = json.loads(open(people_file_path, "r").read()) +def ingest_files(data: List[Any]): + people = data[0]["people"] + companies = data[0]["companies"] people_data_points = {} departments_data_points = {} @@ -72,9 +72,27 @@ async def main(): await prune.prune_data() await prune.prune_system(metadata=True) + # Create relational database tables await setup() - pipeline = run_tasks([Task(ingest_files), Task(add_data_points)]) + # If no user is provided use default user + user = await get_default_user() + + # Create dataset object to keep track of pipeline status + datasets = await load_or_create_datasets(["test_dataset"], [], user) + + # Prepare data for pipeline + companies_file_path = os.path.join(os.path.dirname(__file__), "companies.json") + companies = json.loads(open(companies_file_path, "r").read()) + people_file_path = os.path.join(os.path.dirname(__file__), "people.json") + people = json.loads(open(people_file_path, "r").read()) + + pipeline = run_tasks( + [Task(ingest_files), Task(add_data_points)], + dataset_id=datasets[0].id, + data=[{"companies": companies, "people": people}], + incremental_loading=False, + ) async for status in pipeline: print(status) From 61721b9639974040b786e138e709815771bf2b7e Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 5 Aug 2025 13:18:11 +0200 Subject: [PATCH 2/2] fix: resolve issue with low level pipeline --- .../modules/pipelines/operations/run_tasks.py | 6 +- cognee/tasks/storage/index_data_points.py | 2 +- examples/low_level/pipeline.py | 61 +++++++++++-------- 3 files changed, 40 insertions(+), 29 deletions(-) diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index 1f503f7d27..cc52c947ba 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -2,7 +2,7 @@ import asyncio from uuid import UUID -from typing import Any +from typing import Any, List from functools import wraps from sqlalchemy import select @@ -60,9 +60,9 @@ async def wrapper(*args, distributed=None, **kwargs): @override_run_tasks(run_tasks_distributed) async def run_tasks( - tasks: list[Task], + tasks: List[Task], dataset_id: UUID, - data: Any = None, + data: List[Any] = None, user: User = None, pipeline_name: str = "unknown_pipeline", context: dict = None, diff --git a/cognee/tasks/storage/index_data_points.py b/cognee/tasks/storage/index_data_points.py index 452e7f2ac6..2b6205f4da 100644 --- a/cognee/tasks/storage/index_data_points.py +++ b/cognee/tasks/storage/index_data_points.py @@ -45,7 +45,7 @@ async def index_data_points(data_points: list[DataPoint]): index_name = index_name_and_field[:first_occurence] field_name = index_name_and_field[first_occurence + 1 :] try: - # In case the ammount if indexable points is too large we need to send them in batches + # In case the amount of indexable points is too large we need to send them in batches batch_size = 100 for i in range(0, len(indexable_points), batch_size): batch = indexable_points[i : i + batch_size] diff --git a/examples/low_level/pipeline.py b/examples/low_level/pipeline.py index 3f097f0a1d..804e42ff78 100644 --- a/examples/low_level/pipeline.py +++ b/examples/low_level/pipeline.py @@ -13,57 +13,65 @@ class Person(DataPoint): name: str + # Metadata "index_fields" specifies which DataPoint fields should be embedded for vector search + metadata: dict = {"index_fields": ["name"]} class Department(DataPoint): name: str employees: list[Person] + # Metadata "index_fields" specifies which DataPoint fields should be embedded for vector search + metadata: dict = {"index_fields": ["name"]} class CompanyType(DataPoint): name: str = "Company" + # Metadata "index_fields" specifies which DataPoint fields should be embedded for vector search + metadata: dict = {"index_fields": ["name"]} class Company(DataPoint): name: str departments: list[Department] is_type: CompanyType + # Metadata "index_fields" specifies which DataPoint fields should be embedded for vector search + metadata: dict = {"index_fields": ["name"]} def ingest_files(data: List[Any]): - people = data[0]["people"] - companies = data[0]["companies"] - people_data_points = {} departments_data_points = {} + companies_data_points = {} - for person in people: - new_person = Person(name=person["name"]) - people_data_points[person["name"]] = new_person + for data_item in data: + people = data_item["people"] + companies = data_item["companies"] - if person["department"] not in departments_data_points: - departments_data_points[person["department"]] = Department( - name=person["department"], employees=[new_person] - ) - else: - departments_data_points[person["department"]].employees.append(new_person) + for person in people: + new_person = Person(name=person["name"]) + people_data_points[person["name"]] = new_person - companies_data_points = {} + if person["department"] not in departments_data_points: + departments_data_points[person["department"]] = Department( + name=person["department"], employees=[new_person] + ) + else: + departments_data_points[person["department"]].employees.append(new_person) - # Create a single CompanyType node, so we connect all companies to it. - companyType = CompanyType() + # Create a single CompanyType node, so we connect all companies to it. + companyType = CompanyType() - for company in companies: - new_company = Company(name=company["name"], departments=[], is_type=companyType) - companies_data_points[company["name"]] = new_company + for company in companies: + new_company = Company(name=company["name"], departments=[], is_type=companyType) + companies_data_points[company["name"]] = new_company - for department_name in company["departments"]: - if department_name not in departments_data_points: - departments_data_points[department_name] = Department( - name=department_name, employees=[] - ) + for department_name in company["departments"]: + if department_name not in departments_data_points: + departments_data_points[department_name] = Department( + name=department_name, employees=[] + ) - new_company.departments.append(departments_data_points[department_name]) + new_company.departments.append(departments_data_points[department_name]) return companies_data_points.values() @@ -87,10 +95,13 @@ async def main(): people_file_path = os.path.join(os.path.dirname(__file__), "people.json") people = json.loads(open(people_file_path, "r").read()) + # Run tasks expects a list of data even if it is just one document + data = [{"companies": companies, "people": people}] + pipeline = run_tasks( [Task(ingest_files), Task(add_data_points)], dataset_id=datasets[0].id, - data=[{"companies": companies, "people": people}], + data=data, incremental_loading=False, )