Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cognee/api/v1/cognify/cognify_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ async def run_cognify_pipeline(dataset: Dataset, user: User):
Task(classify_documents),
Task(check_permissions_on_documents, user = user, permissions = ["write"]),
Task(extract_chunks_from_documents), # Extract text chunks based on the document type.
Task(add_data_points, task_config = { "batch_size": 10 }),
Task(extract_graph_from_data, graph_model = KnowledgeGraph, task_config = { "batch_size": 10 }), # Generate knowledge graphs from the document chunks.
Task(
summarize_text,
summarization_model = cognee_config.summarization_model,
task_config = { "batch_size": 10 }
),
Task(add_data_points, task_config = { "batch_size": 10 }),
]

pipeline = run_tasks(tasks, data_documents, "cognify_pipeline")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,14 @@ def __init__(
self.model = model
self.dimensions = dimensions

MAX_RETRIES = 5
retry_count = 0

async def embed_text(self, text: List[str]) -> List[List[float]]:
async def exponential_backoff(attempt):
wait_time = min(10 * (2 ** attempt), 60) # Max 60 seconds
await asyncio.sleep(wait_time)

try:
response = await litellm.aembedding(
self.model,
Expand All @@ -38,23 +45,40 @@ async def embed_text(self, text: List[str]) -> List[List[float]]:
api_base = self.endpoint,
api_version = self.api_version
)

self.retry_count = 0

return [data["embedding"] for data in response.data]

except litellm.exceptions.ContextWindowExceededError as error:
if isinstance(text, list):
parts = [text[0:math.ceil(len(text)/2)], text[math.ceil(len(text)/2):]]
if len(text) == 1:
parts = [text]
else:
parts = [text[0:math.ceil(len(text)/2)], text[math.ceil(len(text)/2):]]

parts_futures = [self.embed_text(part) for part in parts]
embeddings = await asyncio.gather(*parts_futures)

all_embeddings = []
for embeddings_part in embeddings:
all_embeddings.extend(embeddings_part)

return [data["embedding"] for data in all_embeddings]
return all_embeddings

logger.error("Context window exceeded for embedding text: %s", str(error))
raise error

except litellm.exceptions.RateLimitError:
if self.retry_count >= self.MAX_RETRIES:
raise Exception(f"Rate limit exceeded and no more retries left.")

await exponential_backoff(self.retry_count)

self.retry_count += 1

return await self.embed_text(text)

except Exception as error:
logger.error("Error embedding text: %s", str(error))
raise error
Expand Down
3 changes: 3 additions & 0 deletions cognee/modules/chunking/TextChunker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def read(self):
is_part_of = self.document,
chunk_index = self.chunk_index,
cut_type = chunk_data["cut_type"],
contains = [],
_metadata = {
"index_fields": ["text"],
"metadata_id": self.document.metadata_id
Expand All @@ -52,6 +53,7 @@ def read(self):
is_part_of = self.document,
chunk_index = self.chunk_index,
cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"],
contains = [],
_metadata = {
"index_fields": ["text"],
"metadata_id": self.document.metadata_id
Expand All @@ -73,6 +75,7 @@ def read(self):
is_part_of = self.document,
chunk_index = self.chunk_index,
cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"],
contains = [],
_metadata = {
"index_fields": ["text"],
"metadata_id": self.document.metadata_id
Expand Down
4 changes: 3 additions & 1 deletion cognee/modules/chunking/models/DocumentChunk.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Optional
from typing import List, Optional
from cognee.infrastructure.engine import DataPoint
from cognee.modules.data.processing.document_types import Document
from cognee.modules.engine.models import Entity

class DocumentChunk(DataPoint):
__tablename__ = "document_chunk"
Expand All @@ -9,6 +10,7 @@ class DocumentChunk(DataPoint):
chunk_index: int
cut_type: str
is_part_of: Document
contains: List[Entity] = None

_metadata: Optional[dict] = {
"index_fields": ["text"],
Expand Down
1 change: 1 addition & 0 deletions cognee/modules/chunking/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .DocumentChunk import DocumentChunk
2 changes: 0 additions & 2 deletions cognee/modules/engine/models/Entity.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from cognee.infrastructure.engine import DataPoint
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
from cognee.modules.engine.models.EntityType import EntityType


Expand All @@ -8,7 +7,6 @@ class Entity(DataPoint):
name: str
is_a: EntityType
description: str
mentioned_in: DocumentChunk

_metadata: dict = {
"index_fields": ["name"],
Expand Down
3 changes: 0 additions & 3 deletions cognee/modules/engine/models/EntityType.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
from cognee.infrastructure.engine import DataPoint
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk


class EntityType(DataPoint):
__tablename__ = "entity_type"
name: str
type: str
description: str
exists_in: DocumentChunk

_metadata: dict = {
"index_fields": ["name"],
Expand Down
21 changes: 13 additions & 8 deletions cognee/modules/graph/utils/expand_with_nodes_and_edges.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Optional

from cognee.infrastructure.engine import DataPoint
from cognee.modules.chunking.models import DocumentChunk
from cognee.modules.engine.models import Entity, EntityType
from cognee.modules.engine.utils import (
generate_edge_name,
Expand All @@ -11,17 +11,19 @@


def expand_with_nodes_and_edges(
graph_node_index: list[tuple[DataPoint, KnowledgeGraph]],
data_chunks: list[DocumentChunk],
chunk_graphs: list[KnowledgeGraph],
existing_edges_map: Optional[dict[str, bool]] = None,
):
if existing_edges_map is None:
existing_edges_map = {}

added_nodes_map = {}
relationships = []
data_points = []

for graph_source, graph in graph_node_index:
for index, data_chunk in enumerate(data_chunks):
graph = chunk_graphs[index]

if graph is None:
continue

Expand All @@ -38,7 +40,6 @@ def expand_with_nodes_and_edges(
name = type_node_name,
type = type_node_name,
description = type_node_name,
exists_in = graph_source,
)
added_nodes_map[f"{str(type_node_id)}_type"] = type_node
else:
Expand All @@ -50,9 +51,13 @@ def expand_with_nodes_and_edges(
name = node_name,
is_a = type_node,
description = node.description,
mentioned_in = graph_source,
)
data_points.append(entity_node)

if data_chunk.contains is None:
data_chunk.contains = []

data_chunk.contains.append(entity_node)

added_nodes_map[f"{str(node_id)}_entity"] = entity_node

# Add relationship that came from graphs.
Expand Down Expand Up @@ -80,4 +85,4 @@ def expand_with_nodes_and_edges(
)
existing_edges_map[edge_key] = True

return (data_points, relationships)
return (data_chunks, relationships)
Loading
Loading