Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cognee/modules/pipelines/operations/run_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import asyncio
from uuid import UUID
from typing import Any
from typing import Any, List
from functools import wraps
from sqlalchemy import select

Expand Down Expand Up @@ -60,9 +60,9 @@ async def wrapper(*args, distributed=None, **kwargs):

@override_run_tasks(run_tasks_distributed)
async def run_tasks(
tasks: list[Task],
tasks: List[Task],
dataset_id: UUID,
data: Any = None,
data: List[Any] = None,
user: User = None,
pipeline_name: str = "unknown_pipeline",
context: dict = None,
Expand Down
2 changes: 1 addition & 1 deletion cognee/tasks/storage/index_data_points.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async def index_data_points(data_points: list[DataPoint]):
index_name = index_name_and_field[:first_occurence]
field_name = index_name_and_field[first_occurence + 1 :]
try:
# In case the ammount if indexable points is too large we need to send them in batches
# In case the amount of indexable points is too large we need to send them in batches
batch_size = 100
for i in range(0, len(indexable_points), batch_size):
batch = indexable_points[i : i + batch_size]
Expand Down
87 changes: 58 additions & 29 deletions examples/low_level/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,69 +1,77 @@
import os
import json
import asyncio
from typing import List, Any
from cognee import prune
from cognee import visualize_graph
from cognee.low_level import setup, DataPoint
from cognee.modules.data.methods import load_or_create_datasets
from cognee.modules.users.methods import get_default_user
from cognee.pipelines import run_tasks, Task
from cognee.tasks.storage import add_data_points


class Person(DataPoint):
name: str
# Metadata "index_fields" specifies which DataPoint fields should be embedded for vector search
metadata: dict = {"index_fields": ["name"]}


class Department(DataPoint):
name: str
employees: list[Person]
# Metadata "index_fields" specifies which DataPoint fields should be embedded for vector search
metadata: dict = {"index_fields": ["name"]}


class CompanyType(DataPoint):
name: str = "Company"
# Metadata "index_fields" specifies which DataPoint fields should be embedded for vector search
metadata: dict = {"index_fields": ["name"]}


class Company(DataPoint):
name: str
departments: list[Department]
is_type: CompanyType
# Metadata "index_fields" specifies which DataPoint fields should be embedded for vector search
metadata: dict = {"index_fields": ["name"]}


def ingest_files():
companies_file_path = os.path.join(os.path.dirname(__file__), "companies.json")
companies = json.loads(open(companies_file_path, "r").read())

people_file_path = os.path.join(os.path.dirname(__file__), "people.json")
people = json.loads(open(people_file_path, "r").read())

def ingest_files(data: List[Any]):
people_data_points = {}
departments_data_points = {}
companies_data_points = {}

for person in people:
new_person = Person(name=person["name"])
people_data_points[person["name"]] = new_person
for data_item in data:
people = data_item["people"]
companies = data_item["companies"]

if person["department"] not in departments_data_points:
departments_data_points[person["department"]] = Department(
name=person["department"], employees=[new_person]
)
else:
departments_data_points[person["department"]].employees.append(new_person)
for person in people:
new_person = Person(name=person["name"])
people_data_points[person["name"]] = new_person

companies_data_points = {}
if person["department"] not in departments_data_points:
departments_data_points[person["department"]] = Department(
name=person["department"], employees=[new_person]
)
else:
departments_data_points[person["department"]].employees.append(new_person)

# Create a single CompanyType node, so we connect all companies to it.
companyType = CompanyType()
# Create a single CompanyType node, so we connect all companies to it.
companyType = CompanyType()

for company in companies:
new_company = Company(name=company["name"], departments=[], is_type=companyType)
companies_data_points[company["name"]] = new_company
for company in companies:
new_company = Company(name=company["name"], departments=[], is_type=companyType)
companies_data_points[company["name"]] = new_company

for department_name in company["departments"]:
if department_name not in departments_data_points:
departments_data_points[department_name] = Department(
name=department_name, employees=[]
)
for department_name in company["departments"]:
if department_name not in departments_data_points:
departments_data_points[department_name] = Department(
name=department_name, employees=[]
)

new_company.departments.append(departments_data_points[department_name])
new_company.departments.append(departments_data_points[department_name])

return companies_data_points.values()

Expand All @@ -72,9 +80,30 @@ async def main():
await prune.prune_data()
await prune.prune_system(metadata=True)

# Create relational database tables
await setup()

pipeline = run_tasks([Task(ingest_files), Task(add_data_points)])
# If no user is provided use default user
user = await get_default_user()

# Create dataset object to keep track of pipeline status
datasets = await load_or_create_datasets(["test_dataset"], [], user)

# Prepare data for pipeline
companies_file_path = os.path.join(os.path.dirname(__file__), "companies.json")
companies = json.loads(open(companies_file_path, "r").read())
people_file_path = os.path.join(os.path.dirname(__file__), "people.json")
people = json.loads(open(people_file_path, "r").read())

# Run tasks expects a list of data even if it is just one document
data = [{"companies": companies, "people": people}]

pipeline = run_tasks(
[Task(ingest_files), Task(add_data_points)],
dataset_id=datasets[0].id,
data=data,
incremental_loading=False,
)

async for status in pipeline:
print(status)
Expand Down
Loading