From 2cbbcc555100bceeefc6ed0419fc98a6ddddc437 Mon Sep 17 00:00:00 2001 From: Rita Date: Fri, 8 Nov 2024 08:34:22 +0000 Subject: [PATCH 1/4] Updating cognify pipeline documentation --- docs/pipelines.md | 82 ++++++++++++++++ docs/templates.md | 243 ---------------------------------------------- 2 files changed, 82 insertions(+), 243 deletions(-) create mode 100644 docs/pipelines.md delete mode 100644 docs/templates.md diff --git a/docs/pipelines.md b/docs/pipelines.md new file mode 100644 index 0000000000..8fac1683b8 --- /dev/null +++ b/docs/pipelines.md @@ -0,0 +1,82 @@ +# PIPELINES + +Cognee uses [tasks](https://github.com/topoteretes/cognee/blob/main/cognee/modules/pipelines/tasks/Task.py) grouped into pipelines that populate graph and vector stores. [These tasks](https://github.com/topoteretes/cognee/tree/main/cognee/tasks) analyze and enrich data, enhancing the quality of answers produced by Large Language Models (LLMs). + +The tasks are managed and executed asynchronously using the `run_tasks` and `run_tasks_parallel` functions. + +```python +pipeline = run_tasks(tasks, documents) +async for result in pipeline: + print(result) +``` + +## Main pipeline: [cognee.cognify](https://github.com/topoteretes/cognee/blob/168cb5d1bf1964b5b0c645b2f3d8638d84554fda/cognee/api/v1/cognify/cognify_v2.py#L38) + +This is the main pipeline currently implemented in cognee. It is designed to process data in a structured way and populate the graph and vector stores with the results. + + +This function is the entry point for processing datasets. It handles dataset retrieval, user authorization, and manages the execution of a pipeline of tasks that process documents. + +### Parameters + +- `dataset: Union[str, list[str]] = None`: A string or list of dataset names to be processed. +- `user: User = None`: The user requesting the processing. If not provided, the default user is retrieved. + +### Steps in the Function + +#### User Authentication + +```python +if user is None: + user = await get_default_user() +``` + +If no user is provided, the function retrieves the default user. + +#### Handling Empty or String Dataset Input + +```python +existing_datasets = await get_datasets(user.id) +if datasets is None or len(datasets) == 0: + datasets = existing_datasets +if type(datasets[0]) == str: + datasets = await get_datasets_by_name(datasets, user.id) +``` + +If no datasets are provided, the function retrieves all datasets owned by the user. If a list of dataset names (strings) is provided, they are converted into dataset objects. + +#### Selecting datasets from the input list that are owned by the user + +```python +existing_datasets_map = { + generate_dataset_name(dataset.name): True for dataset in existing_datasets + } +``` + +#### Run Cognify Pipeline for Each Dataset + +```python +awaitables = [] + +for dataset in datasets: + dataset_name = generate_dataset_name(dataset.name) + + if dataset_name in existing_datasets_map: + awaitables.append(run_cognify_pipeline(dataset, user)) + + return await asyncio.gather(*awaitables) +``` + +The `run_cognify_pipeline` function is defined within `cognify` and is responsible for processing a single dataset. This is where most of the heavy lifting occurs. The function processes multiple datasets concurrently using `asyncio.gather`. + + +#### Pipeline Tasks + +The pipeline consists of several tasks, each responsible for different parts of the processing: + +- `classify_documents`: Converts each of the documents into one of the specific Document types: PdfDocument, AudioDocument, ImageDocument or TextDocument +- `check_permissions_on_documents`: Checks if the user has the necessary permissions to access the documents. In this case, it checks for "write" permission. +- `extract_chunks_from_documents`: Extracts text chunks based on the document type. +- `add_data_points`: Creates nodes and edges from the chunks and their properties. Adds them to the graph engine. +- `extract_graph_from_data`: Generates knowledge graphs from the document chunks. +- `summarize_text`: Extracts a summary for each chunk using an llm. diff --git a/docs/templates.md b/docs/templates.md deleted file mode 100644 index 0e1ce42882..0000000000 --- a/docs/templates.md +++ /dev/null @@ -1,243 +0,0 @@ -# TASKS - -!!! tip "cognee uses tasks grouped into pipelines to populate graph and vector stores" - - -Cognee organizes tasks into pipelines that populate graph and vector stores. These tasks analyze and enrich data, enhancing the quality of answers produced by Large Language Models (LLMs). - -This section provides a template to help you structure your data and build pipelines. \ -These tasks serve as a starting point for using Cognee to create reliable LLM pipelines. - - - - - - - -## Task 1: Category Extraction - -Data enrichment is the process of enhancing raw data with additional information to make it more valuable. This template is a sample task that extracts categories from a document and populates a graph with the extracted categories. - -Let's go over the steps to use this template [full code provided here](https://github.com/topoteretes/cognee/blob/main/cognee/tasks/chunk_naive_llm_classifier/chunk_naive_llm_classifier.py): - - -This function is designed to classify chunks of text using a specified language model. The goal is to categorize the text, map relationships, and store the results in a vector engine and a graph engine. The function is asynchronous, allowing for concurrent execution of tasks like classification and data point creation. - -### Parameters - -- `data_chunks: list[DocumentChunk]`: A list of text chunks to be classified. Each chunk represents a piece of text and includes metadata like `chunk_id` and `document_id`. -- `classification_model: Type[BaseModel]`: The model used to classify each chunk of text. This model is expected to output labels that categorize the text. - -### Steps in the Function - -#### Check for Empty Input - -```python -if len(data_chunks) == 0: - return data_chunks -``` - -If there are no data chunks provided, the function returns immediately with the input list (which is empty). - -#### Classify Each Chunk - -```python -chunk_classifications = await asyncio.gather( - *[extract_categories(chunk.text, classification_model) for chunk in data_chunks], -) -``` - -The function uses `asyncio.gather` to concurrently classify each chunk of text. `extract_categories` is called for each chunk, and the results are collected in `chunk_classifications`. - -#### Initialize Data Structures - -```python -classification_data_points = [] -``` - -A list is initialized to store the classification data points that will be used later for mapping relationships and storing in the vector engine. - -#### Generate UUIDs for Classifications - -The function loops through each chunk and generates unique identifiers (UUIDs) for both the main classification type and its subclasses: - -```python -classification_data_points.append(uuid5(NAMESPACE_OID, chunk_classification.label.type)) -classification_data_points.append(uuid5(NAMESPACE_OID, classification_subclass.value)) -``` - -These UUIDs are used to uniquely identify classifications and ensure consistency. - -#### Retrieve or Create Vector Collection - -```python -vector_engine = get_vector_engine() -collection_name = "classification" -``` - -The function interacts with a vector engine. It checks if the collection named "classification" exists. If it does, it retrieves existing data points to avoid duplicates. Otherwise, it creates the collection. - -#### Prepare Data Points, Nodes, and Edges - -The function then builds a list of `data_points` (representing the classification results) and constructs nodes and edges to represent relationships between chunks and their classifications: - -```python -data_points.append(DataPoint[Keyword](...)) -nodes.append((...)) -edges.append((...)) -``` - -- **Nodes**: Represent classifications (e.g., media type, subtype). -- **Edges**: Represent relationships between chunks and classifications (e.g., "is_media_type", "is_subtype_of"). - -#### Create Data Points and Relationships - -If there are new nodes or edges to add, the function stores the data points in the vector engine and updates the graph engine with the new nodes and edges: - -```python -await vector_engine.create_data_points(collection_name, data_points) -await graph_engine.add_nodes(nodes) -await graph_engine.add_edges(edges) -``` - -#### Return the Processed Chunks - -Finally, the function returns the processed `data_chunks`, which can now be used further as needed: - -```python -return data_chunks -``` - -## Pipeline 1: cognee pipeline - -This is the main pipeline currently implemented in cognee. It is designed to process data in a structured way and populate the graph and vector stores with the results - - -This function is the entry point for processing datasets. It handles dataset retrieval, user authorization, and manages the execution of a pipeline of tasks that process documents. - -### Parameters - -- `datasets: Union[str, list[str]] = None`: A string or list of dataset names to be processed. -- `user: User = None`: The user requesting the processing. If not provided, the default user is retrieved. - -### Steps in the Function - -#### Database Engine Initialization - -```python -db_engine = get_relational_engine() -``` - -The function starts by getting an instance of the relational database engine, which is used to retrieve datasets and other necessary data. - -#### Handle Empty or String Dataset Input - -```python -if datasets is None or len(datasets) == 0: - return await cognify(await db_engine.get_datasets()) -if type(datasets[0]) == str: - datasets = await retrieve_datasets(datasets) -``` - -If no datasets are provided, the function retrieves all available datasets from the database. If a list of dataset names (strings) is provided, they are converted into dataset objects. - -#### User Authentication - -```python -if user is None: - user = await get_default_user() -``` - -If no user is provided, the function retrieves the default user. - -#### Run Cognify Pipeline for Each Dataset - -```python -async def run_cognify_pipeline(dataset: Dataset): - # Pipeline logic goes here... -``` - -The `run_cognify_pipeline` function is defined within `cognify` and is responsible for processing a single dataset. This is where most of the heavy lifting occurs. - -#### Retrieve Dataset Data - -The function fetches all the data associated with the dataset. - -```python -data: list[Data] = await get_dataset_data(dataset_id=dataset.id) -``` - -#### Create Document Objects - -Based on the file type (e.g., PDF, Audio, Image, Text), corresponding document objects are created. - -```python -documents = [...] -``` - -#### Check Permissions - -The user's permissions are checked to ensure they can access the documents. - -```python -await check_permissions_on_documents(user, "read", document_ids) -``` - -#### Pipeline Status Logging - -The function logs the start and end of the pipeline processing. - -```python -async with update_status_lock: - task_status = await get_pipeline_status([dataset_id]) - if dataset_id in task_status and task_status[dataset_id] == "DATASET_PROCESSING_STARTED": - logger.info("Dataset %s is already being processed.", dataset_name) - return - await log_pipeline_status(dataset_id, "DATASET_PROCESSING_STARTED", {...}) -``` - -#### Pipeline Tasks - -The pipeline consists of several tasks, each responsible for different parts of the processing: - -- `document_to_ontology`: Maps documents to an ontology structure. -- `source_documents_to_chunks`: Splits documents into chunks. -- `chunk_to_graph_decomposition`: Defines the graph structure for chunks. -- `chunks_into_graph`: Integrates chunks into the knowledge graph. -- `chunk_update_check`: Checks for updated or new chunks. -- `save_chunks_to_store`: Saves chunks to a vector store and graph database. - -Parallel Tasks: `chunk_extract_summary` and `chunk_naive_llm_classifier` run in parallel to summarize and classify chunks. - -- `chunk_remove_disconnected`: Cleans up obsolete chunks. - -The tasks are managed and executed asynchronously using the `run_tasks` and `run_tasks_parallel` functions. - -```python -pipeline = run_tasks(tasks, documents) -async for result in pipeline: - print(result) -``` - -#### Handle Errors - -If any errors occur during processing, they are logged, and the exception is raised. - -```python -except Exception as error: - await log_pipeline_status(dataset_id, "DATASET_PROCESSING_ERROR", {...}) - raise error -``` - -#### Processing Multiple Datasets - -The function prepares to process multiple datasets concurrently using `asyncio.gather`. - -```python -awaitables = [] -for dataset in datasets: - dataset_name = generate_dataset_name(dataset.name) - if dataset_name in existing_datasets: - awaitables.append(run_cognify_pipeline(dataset)) -return await asyncio.gather(*awaitables) -``` From 8317eecb2ef67302a3f5e2f7e46b40f577954d83 Mon Sep 17 00:00:00 2001 From: Rita Date: Fri, 8 Nov 2024 09:18:12 +0000 Subject: [PATCH 2/4] typo fix --- docs/pipelines.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/pipelines.md b/docs/pipelines.md index 8fac1683b8..39be70702d 100644 --- a/docs/pipelines.md +++ b/docs/pipelines.md @@ -19,7 +19,7 @@ This function is the entry point for processing datasets. It handles dataset ret ### Parameters -- `dataset: Union[str, list[str]] = None`: A string or list of dataset names to be processed. +- `datasets: Union[str, list[str]] = None`: A string or list of dataset names to be processed. - `user: User = None`: The user requesting the processing. If not provided, the default user is retrieved. ### Steps in the Function From 28705285087ac2436c6d739d05efe8060a834d86 Mon Sep 17 00:00:00 2001 From: Boris Date: Fri, 8 Nov 2024 10:32:15 +0100 Subject: [PATCH 3/4] Update docs/pipelines.md Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- docs/pipelines.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/pipelines.md b/docs/pipelines.md index 39be70702d..ad9e7be974 100644 --- a/docs/pipelines.md +++ b/docs/pipelines.md @@ -64,8 +64,7 @@ for dataset in datasets: if dataset_name in existing_datasets_map: awaitables.append(run_cognify_pipeline(dataset, user)) - return await asyncio.gather(*awaitables) -``` +return await asyncio.gather(*awaitables) The `run_cognify_pipeline` function is defined within `cognify` and is responsible for processing a single dataset. This is where most of the heavy lifting occurs. The function processes multiple datasets concurrently using `asyncio.gather`. From 8156c90363943ee5cc7cc5913fe7dee55716f2cc Mon Sep 17 00:00:00 2001 From: Rita Date: Fri, 8 Nov 2024 12:41:38 +0000 Subject: [PATCH 4/4] removing a minor confusing part --- docs/pipelines.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/pipelines.md b/docs/pipelines.md index ad9e7be974..2392eab80e 100644 --- a/docs/pipelines.md +++ b/docs/pipelines.md @@ -12,7 +12,7 @@ async for result in pipeline: ## Main pipeline: [cognee.cognify](https://github.com/topoteretes/cognee/blob/168cb5d1bf1964b5b0c645b2f3d8638d84554fda/cognee/api/v1/cognify/cognify_v2.py#L38) -This is the main pipeline currently implemented in cognee. It is designed to process data in a structured way and populate the graph and vector stores with the results. +This is the main pipeline currently implemented in cognee. It is designed to process data in a structured way and populate the graph and vector stores. This function is the entry point for processing datasets. It handles dataset retrieval, user authorization, and manages the execution of a pipeline of tasks that process documents.