Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6d9fd17
feat: log pipeline runs
borisarzentar Feb 4, 2025
c5c61dc
Merge branch 'dev' into fear/metrics
alekszievr Feb 5, 2025
01061ae
logging pipeline complete and error
alekszievr Feb 5, 2025
e7bf4c5
pipeline run logging
alekszievr Feb 5, 2025
3f27649
remove legacy pipeline logging
alekszievr Feb 5, 2025
c9746ac
handle errors and use correct sql query
alekszievr Feb 5, 2025
56eb9df
filter for all pipeline ids
alekszievr Feb 5, 2025
3fe998d
Merge branch 'dev' into fear/metrics
alekszievr Feb 5, 2025
3d121d5
adjust tests
alekszievr Feb 5, 2025
cf10922
Merge branch 'dev' into fear/metrics
alekszievr Feb 10, 2025
3218e5e
adjust tests to new run_tasks signature
alekszievr Feb 6, 2025
4a1ef00
get pipeline status in a db engine compatible way
alekszievr Feb 10, 2025
1e40d03
adjust code graph pipeline to new run_tasks signature
alekszievr Feb 10, 2025
6f6e4bb
Merge branch 'dev' into fear/metrics
alekszievr Feb 10, 2025
f35a980
Merge branch 'dev' into fear/metrics
alekszievr Feb 10, 2025
8c7d12a
Merge branch 'dev' into fear/metrics
alekszievr Feb 10, 2025
8ea140b
adjust integration tests
alekszievr Feb 10, 2025
1bd82f2
Merge branch 'dev' into fear/metrics
alekszievr Feb 10, 2025
1fe9c0c
Use run_tasks_base in integration tests
alekszievr Feb 10, 2025
329ac20
More general type for data in pipeline run
alekszievr Feb 10, 2025
00baa81
Dataset id in add
alekszievr Feb 10, 2025
f9d38b8
adjust notebooks
alekszievr Feb 10, 2025
1cb5e6d
Merge branch 'dev' into fear/metrics
alekszievr Feb 10, 2025
5961ade
string conversion of data
alekszievr Feb 11, 2025
0b63bf9
index dataset id, search by dataset id, generate pipeline id from pip…
alekszievr Feb 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
pipeline run logging
  • Loading branch information
alekszievr committed Feb 5, 2025
commit e7bf4c57f92b0534579a5bed8562d7d3bad93849
2 changes: 1 addition & 1 deletion cognee/api/v1/add/add_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async def add(

tasks = [Task(resolve_data_directories), Task(ingest_data, dataset_name, user)]

pipeline = run_tasks(tasks, data, "add_pipeline")
pipeline = run_tasks(tasks=tasks, dataset_id=None, data=data, pipeline_id="add_pipeline")

async for result in pipeline:
print(result)
30 changes: 1 addition & 29 deletions cognee/api/v1/cognify/cognify_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ async def cognify(
async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]):
data_documents: list[Data] = await get_dataset_data(dataset_id=dataset.id)

document_ids_str = [str(document.id) for document in data_documents]

dataset_id = dataset.id
dataset_name = generate_dataset_name(dataset.name)

Expand All @@ -90,15 +88,6 @@ async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]):
logger.info("Dataset %s is already being processed.", dataset_name)
return

await log_pipeline_status(
dataset_id,
PipelineRunStatus.DATASET_PROCESSING_STARTED,
{
"dataset_name": dataset_name,
"files": document_ids_str,
},
)

try:
if not isinstance(tasks, list):
raise ValueError("Tasks must be a list")
Expand All @@ -117,27 +106,10 @@ async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]):

send_telemetry("cognee.cognify EXECUTION COMPLETED", user.id)

await log_pipeline_status(
dataset_id,
PipelineRunStatus.DATASET_PROCESSING_COMPLETED,
{
"dataset_name": dataset_name,
"files": document_ids_str,
},
)

return pipeline_run_status

except Exception as error:
send_telemetry("cognee.cognify EXECUTION ERRORED", user.id)

await log_pipeline_status(
dataset_id,
PipelineRunStatus.DATASET_PROCESSING_ERRORED,
{
"dataset_name": dataset_name,
"files": document_ids_str,
},
)
raise error


Expand Down
4 changes: 2 additions & 2 deletions cognee/modules/pipelines/models/PipelineRun.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import enum
from uuid import uuid4
from datetime import datetime, timezone
from sqlalchemy import Column, DateTime, JSON, Enum, UUID
from sqlalchemy import Column, DateTime, JSON, Enum, UUID, String
from cognee.infrastructure.databases.relational import Base


Expand All @@ -20,5 +20,5 @@ class PipelineRun(Base):

status = Column(Enum(PipelineRunStatus))

pipeline_id = Column(UUID, index=True)
pipeline_id = Column(String, index=True)
run_info = Column(JSON)
6 changes: 3 additions & 3 deletions cognee/modules/pipelines/operations/get_pipeline_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ async def get_pipeline_status(pipeline_ids: list[UUID]):
PipelineRun,
func.row_number()
.over(
partition_by=PipelineRun.run_id,
partition_by=PipelineRun.id,
order_by=PipelineRun.created_at.desc(),
)
.label("rn"),
)
.filter(PipelineRun.run_id.in_(pipeline_ids))
.filter(PipelineRun.id.in_(pipeline_ids))
.subquery()
)

Expand All @@ -29,7 +29,7 @@ async def get_pipeline_status(pipeline_ids: list[UUID]):

runs = (await session.execute(latest_runs)).scalars().all()

pipeline_statuses = {str(run.run_id): run.status for run in runs}
pipeline_statuses = {str(run.id): run.status for run in runs}

return pipeline_statuses

Expand Down
4 changes: 2 additions & 2 deletions cognee/modules/pipelines/operations/logPipelineRunComplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ async def logPipelineRunComplete(pipeline_id: UUID, dataset_id: UUID, data: list
pipeline_id=pipeline_id,
status=PipelineRunStatus.DATASET_PROCESSING_COMPLETED,
run_info={
"dataset_id": dataset_id,
"data": [data.id for data in data],
"dataset_id": str(dataset_id),
"data": [str(data.id) for data in data] if isinstance(data, list) else data,
},
)

Expand Down
7 changes: 4 additions & 3 deletions cognee/modules/pipelines/operations/logPipelineRunError.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
from cognee.modules.pipelines.models import PipelineRun, PipelineRunStatus


async def logPipelineRunError(pipeline_id: UUID, dataset_id: UUID, data: list[Data]):
async def logPipelineRunError(pipeline_id: str, dataset_id: UUID, data: list[Data], e: Exception):
pipeline_run_id = uuid4()

pipeline_run = PipelineRun(
id=pipeline_run_id,
pipeline_id=pipeline_id,
status=PipelineRunStatus.DATASET_PROCESSING_ERRORED,
run_info={
"dataset_id": dataset_id,
"data": [data.id for data in data],
"dataset_id": str(dataset_id),
"data": [str(data.id) for data in data] if isinstance(data, list) else data,
"error": str(e),
},
)

Expand Down
6 changes: 3 additions & 3 deletions cognee/modules/pipelines/operations/logPipelineRunStart.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
from cognee.modules.pipelines.models import PipelineRun, PipelineRunStatus


async def logPipelineRunStart(pipeline_id: UUID, dataset_id: UUID, data: list[Data]):
async def logPipelineRunStart(pipeline_id: str, dataset_id: UUID, data: list[Data]):
pipeline_run_id = uuid4()

pipeline_run = PipelineRun(
id=pipeline_run_id,
pipeline_id=pipeline_id,
status=PipelineRunStatus.DATASET_PROCESSING_STARTED,
run_info={
"dataset_id": dataset_id,
"data": [data.id for data in data],
"dataset_id": str(dataset_id),
"data": [str(data.id) for data in data] if isinstance(data, list) else data,
},
)

Expand Down
4 changes: 2 additions & 2 deletions cognee/modules/pipelines/operations/run_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ async def run_tasks(tasks: list[Task], dataset_id: UUID, data: list[Data], pipel
async for _ in run_tasks_with_telemetry(tasks, data, pipeline_id):
pass

yield await logPipelineRunComplete(pipeline_run.id, data)
yield await logPipelineRunComplete(pipeline_id, dataset_id, data)
except Exception as e:
yield await logPipelineRunError(pipeline_run.id, tasks, data, e)
yield await logPipelineRunError(pipeline_id, dataset_id, data, e)
raise e