Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cognee/api/v1/add/add.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ async def add(
preferred_loaders: Optional[List[Union[str, dict[str, dict[str, Any]]]]] = None,
incremental_loading: bool = True,
data_per_batch: Optional[int] = 20,
importance_weight: float = 0.5,
):
"""
Add data to Cognee for knowledge graph processing.
Expand Down Expand Up @@ -85,6 +86,9 @@ async def add(
extraction_rules: Optional dictionary of rules (e.g., CSS selectors, XPath) for extracting specific content from web pages using BeautifulSoup
tavily_config: Optional configuration for Tavily API, including API key and extraction settings
soup_crawler_config: Optional configuration for BeautifulSoup crawler, specifying concurrency, crawl delay, and extraction rules.
importance_weight: A float between 0.0 and 1.0 representing the importance of the
ingested data. This weight will influence search result ranking.
Defaults to 0.5.

Returns:
PipelineRunInfo: Information about the ingestion pipeline execution including:
Expand Down Expand Up @@ -164,6 +168,9 @@ async def add(
- TAVILY_API_KEY: YOUR_TAVILY_API_KEY

"""
if not 0.0 <= importance_weight <= 1.0:
raise ValueError("importance_weight must be a float between 0.0 and 1.0")

if preferred_loaders is not None:
transformed = {}
for item in preferred_loaders:
Expand All @@ -182,6 +189,7 @@ async def add(
node_set,
dataset_id,
preferred_loaders,
importance_weight,
),
]

Expand Down
10 changes: 9 additions & 1 deletion cognee/infrastructure/engine/models/DataPoint.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pickle
from uuid import UUID, uuid4
from pydantic import BaseModel, Field, ConfigDict
from pydantic import BaseModel, Field, ConfigDict, field_validator
from datetime import datetime, timezone
from typing_extensions import TypedDict
from typing import Optional, Any, Dict, List
Expand Down Expand Up @@ -49,6 +49,14 @@ class DataPoint(BaseModel):
metadata: Optional[MetaData] = {"index_fields": []}
type: str = Field(default_factory=lambda: DataPoint.__name__)
belongs_to_set: Optional[List["DataPoint"]] = None
importance_weight: Optional[float] = Field(default=0.5, ge=0.0, le=1.0)

@field_validator('importance_weight', mode='before')
@classmethod
def set_default_weight_on_none(cls, v):
if v is None:
return 0.5
return v

def __init__(self, **data):
super().__init__(**data)
Expand Down
1 change: 1 addition & 0 deletions cognee/modules/chunking/LangchainChunker.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ async def read(self):
chunk_index=self.chunk_index,
cut_type="missing",
contains=[],
importance_weight=self.document.importance_weight,
metadata={
"index_fields": ["text"],
},
Expand Down
3 changes: 3 additions & 0 deletions cognee/modules/chunking/TextChunker.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ async def read(self):
chunk_index=self.chunk_index,
cut_type=chunk_data["cut_type"],
contains=[],
importance_weight=self.document.importance_weight,
metadata={
"index_fields": ["text"],
},
Expand All @@ -49,6 +50,7 @@ async def read(self):
chunk_index=self.chunk_index,
cut_type=paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"],
contains=[],
importance_weight=self.document.importance_weight,
metadata={
"index_fields": ["text"],
},
Expand All @@ -71,6 +73,7 @@ async def read(self):
chunk_index=self.chunk_index,
cut_type=paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"],
contains=[],
importance_weight=self.document.importance_weight,
metadata={"index_fields": ["text"]},
)
except Exception as e:
Expand Down
5 changes: 4 additions & 1 deletion cognee/modules/data/models/Data.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime, timezone
from uuid import uuid4
from sqlalchemy import UUID, Column, DateTime, String, JSON, Integer
from sqlalchemy import UUID, Column, DateTime, String, JSON, Integer,Float
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.orm import relationship

Expand Down Expand Up @@ -34,9 +34,11 @@ class Data(Base):
pipeline_status = Column(MutableDict.as_mutable(JSON))
token_count = Column(Integer)
data_size = Column(Integer, nullable=True) # File size in bytes
importance_weight = Column(Float, nullable=False, default=0.5)
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc))


datasets = relationship(
"Dataset",
secondary=DatasetData.__tablename__,
Expand All @@ -55,5 +57,6 @@ def to_json(self) -> dict:
"createdAt": self.created_at.isoformat(),
"updatedAt": self.updated_at.isoformat() if self.updated_at else None,
"nodeSet": self.node_set,
"importanceWeight": self.importance_weight,
# "datasets": [dataset.to_json() for dataset in self.datasets]
}
48 changes: 38 additions & 10 deletions cognee/modules/graph/cognee_graph/CogneeGraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,24 @@ async def map_vector_distances_to_graph_nodes(self, node_distances) -> None:
for category, scored_results in node_distances.items():
for scored_result in scored_results:
node_id = str(scored_result.id)
score = scored_result.score
node = self.get_node(node_id)
if node:
node.add_attribute("vector_distance", score)
mapped_nodes += 1
if not node:
continue

# vector_distance → similarity
vector_distance = scored_result.score
vector_score = 1 - vector_distance

# if importance_weight is missing, fallback to 1.0
importance_weight = node.attributes.get("importance_weight", 0.5)

final_score = vector_score * importance_weight

node.add_attribute("vector_distance", vector_distance)
node.add_attribute("importance_weight", importance_weight)
node.add_attribute("importance_score", final_score)

mapped_nodes += 1

async def map_vector_distances_to_graph_edges(
self, vector_engine, query_vector, edge_distances
Expand Down Expand Up @@ -238,17 +251,32 @@ async def map_vector_distances_to_graph_edges(
)
distance = embedding_map.get(edge_key, None)
if distance is not None:
edge.attributes["vector_distance"] = distance
vector_score = 1 - distance
else:
vector_score = 0

# fallback weight
importance_weight = edge.attributes.get("importance_weight", 1.0)

final_score = vector_score * importance_weight

edge.add_attribute("vector_distance", distance)
edge.add_attribute("importance_weight", importance_weight)
edge.add_attribute("importance_score", final_score)

except Exception as ex:
logger.error(f"Error mapping vector distances to edges: {str(ex)}")
raise ex

async def calculate_top_triplet_importances(self, k: int) -> List[Edge]:
def score(edge):
n1 = edge.node1.attributes.get("vector_distance", 1)
n2 = edge.node2.attributes.get("vector_distance", 1)
e = edge.attributes.get("vector_distance", 1)
"""
Rank triplets using merged importance_score:
importance_score = vector_similarity * importance_weight
"""
def final_score(edge: Edge):
n1 = edge.node1.attributes.get("importance_score", 0)
n2 = edge.node2.attributes.get("importance_score", 0)
e = edge.attributes.get("importance_score", 0)
return n1 + n2 + e

return heapq.nsmallest(k, self.edges, key=score)
return heapq.nlargest(k, self.edges, key=final_score)
18 changes: 12 additions & 6 deletions cognee/modules/graph/utils/expand_with_nodes_and_edges.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def _process_ontology_nodes(
name=ont_node_name,
description=ont_node_name,
ontology_valid=True,
importance_weight=data_chunk.importance_weight,
)

elif ontology_node.category == "individuals":
Expand All @@ -58,11 +59,12 @@ def _process_ontology_nodes(
description=ont_node_name,
ontology_valid=True,
belongs_to_set=data_chunk.belongs_to_set,
importance_weight=data_chunk.importance_weight,
)


def _process_ontology_edges(
ontology_edges: list, existing_edges_map: dict, ontology_relationships: list
ontology_edges: list, existing_edges_map: dict, ontology_relationships: list,data_chunk: DocumentChunk,
) -> None:
"""Process ontology edges and add them if new"""
for source, relation, target in ontology_edges:
Expand All @@ -82,6 +84,7 @@ def _process_ontology_edges(
"source_node_id": source_node_id,
"target_node_id": target_node_id,
"ontology_valid": True,
"importance_weight": data_chunk.importance_weight,
},
)
)
Expand Down Expand Up @@ -132,13 +135,14 @@ def _create_type_node(
type=node_name,
description=node_name,
ontology_valid=ontology_validated,
importance_weight=data_chunk.importance_weight,
)

added_nodes_map[type_node_key] = type_node

# Process ontology nodes and edges
_process_ontology_nodes(ontology_nodes, data_chunk, added_nodes_map, added_ontology_nodes_map)
_process_ontology_edges(ontology_edges, existing_edges_map, ontology_relationships)
_process_ontology_edges(ontology_edges, existing_edges_map, ontology_relationships,data_chunk)

return type_node

Expand Down Expand Up @@ -191,13 +195,14 @@ def _create_entity_node(
description=node_description,
ontology_valid=ontology_validated,
belongs_to_set=data_chunk.belongs_to_set,
importance_weight=data_chunk.importance_weight,
)

added_nodes_map[entity_node_key] = entity_node

# Process ontology nodes and edges
_process_ontology_nodes(ontology_nodes, data_chunk, added_nodes_map, added_ontology_nodes_map)
_process_ontology_edges(ontology_edges, existing_edges_map, ontology_relationships)
_process_ontology_edges(ontology_edges, existing_edges_map, ontology_relationships,data_chunk)

return entity_node

Expand Down Expand Up @@ -267,8 +272,8 @@ def _process_graph_nodes(


def _process_graph_edges(
graph: KnowledgeGraph, name_mapping: dict, existing_edges_map: dict, relationships: list
) -> None:
graph: KnowledgeGraph, name_mapping: dict, existing_edges_map: dict, relationships: list,
data_chunk: DocumentChunk) -> None:
"""Process edges in a knowledge graph"""
for edge in graph.edges:
# Apply name mapping if exists
Expand All @@ -291,6 +296,7 @@ def _process_graph_edges(
"source_node_id": source_node_id,
"target_node_id": target_node_id,
"ontology_valid": False,
"importance_weight": data_chunk.importance_weight,
},
)
)
Expand Down Expand Up @@ -379,7 +385,7 @@ def expand_with_nodes_and_edges(
)

# Then process edges
_process_graph_edges(graph, name_mapping, existing_edges_map, relationships)
_process_graph_edges(graph, name_mapping, existing_edges_map, relationships,data_chunk)

# Return combined results
graph_nodes = data_chunks + list(added_ontology_nodes_map.values())
Expand Down
7 changes: 5 additions & 2 deletions cognee/modules/memify/memify.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from cognee.tasks.codingagents.coding_rule_associations import (
add_rule_associations,
)
from cognee.tasks.memify.propagate_importance_weights import propagate_importance_weights

logger = get_logger("memify")

Expand Down Expand Up @@ -69,13 +70,15 @@ async def memify(
if not extraction_tasks:
extraction_tasks = [Task(extract_subgraph_chunks)]
if not enrichment_tasks:
enrichment_tasks = [
default_enrichment_tasks = [
Task(propagate_importance_weights),
Task(
add_rule_associations,
rules_nodeset_name="coding_agent_rules",
task_config={"batch_size": 1},
)
),
]
enrichment_tasks = default_enrichment_tasks

await setup()

Expand Down
32 changes: 27 additions & 5 deletions cognee/modules/retrieval/chunks_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ class ChunksRetriever(BaseRetriever):
def __init__(
self,
top_k: Optional[int] = 5,
default_importance_weight: float = 0.5,
):
self.top_k = top_k
self.default_importance_weight = default_importance_weight
self.candidate = top_k * 10
self.vector_engine = get_vector_engine()

async def get_context(self, query: str) -> Any:
"""
Expand All @@ -48,18 +52,36 @@ async def get_context(self, query: str) -> Any:
f"Starting chunk retrieval for query: '{query[:100]}{'...' if len(query) > 100 else ''}'"
)

vector_engine = get_vector_engine()
vector_engine = self.vector_engine

try:
found_chunks = await vector_engine.search("DocumentChunk_text", query, limit=self.top_k)
found_chunks = await vector_engine.search("DocumentChunk_text", query, limit=self.candidate)
logger.info(f"Found {len(found_chunks)} chunks from vector search")
except CollectionNotFoundError as error:
logger.error("DocumentChunk_text collection not found in vector database")
raise NoDataError("No data found in the system, please add data first.") from error

chunk_payloads = [result.payload for result in found_chunks]
logger.info(f"Returning {len(chunk_payloads)} chunk payloads")
return chunk_payloads
rescored = []
for item in found_chunks:
payload = item.payload or {}
importance_weight = payload.get("importance_weight", self.default_importance_weight)

distance_score = item.score if hasattr(item, "score") and item.score is not None else 0.0
similarity_score = 1 / (1 + distance_score)
final_score = similarity_score * importance_weight
text_preview = payload.get('text', '')[:20]
logger.debug(
f"Chunk: {text_preview:<20} | VecScore: {distance_score:.4f} | Weight: {importance_weight} | Final: {final_score:.4f}")
rescored.append((final_score, payload))

# sort descending by final_score
rescored.sort(key=lambda x: x[0], reverse=True)

# take top_k after re-ranking
top_payloads = [p for (_, p) in rescored[: self.top_k]]

logger.info(f"Returning {len(top_payloads)} re-ranked chunk payloads")
return top_payloads

async def get_completion(
self, query: str, context: Optional[Any] = None, session_id: Optional[str] = None
Expand Down
Loading