Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: use datapoints only
  • Loading branch information
lxobr committed Oct 20, 2025
commit 590c3ad7ec2dc687c6e9d088a1c672eb2c2f91cc
13 changes: 13 additions & 0 deletions cognee/tasks/feedback/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .extract_feedback_interactions import extract_feedback_interactions
from .generate_improved_answers import generate_improved_answers
from .create_enrichments import create_enrichments
from .link_enrichments_to_feedback import link_enrichments_to_feedback
from .models import FeedbackEnrichment

__all__ = [
"extract_feedback_interactions",
"generate_improved_answers",
"create_enrichments",
"link_enrichments_to_feedback",
"FeedbackEnrichment",
]
112 changes: 26 additions & 86 deletions cognee/tasks/feedback/create_enrichments.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,19 @@
logger = get_logger("create_enrichments")


def _validate_improved_answers(improved_answers: List[Dict]) -> bool:
"""Validate that all items contain required fields for enrichment creation."""
required_fields = [
"question",
"answer", # This is the original answer field from feedback_interaction
"improved_answer",
"new_context",
"feedback_id",
"interaction_id",
]
def _validate_enrichments(enrichments: List[FeedbackEnrichment]) -> bool:
"""Validate that all enrichments contain required fields for completion."""
return all(
all(item.get(field) is not None for field in required_fields) for item in improved_answers
enrichment.question is not None
and enrichment.original_answer is not None
and enrichment.improved_answer is not None
and enrichment.new_context is not None
and enrichment.feedback_id is not None
and enrichment.interaction_id is not None
for enrichment in enrichments
)


def _validate_uuid_fields(improved_answers: List[Dict]) -> bool:
"""Validate that feedback_id and interaction_id are valid UUID objects."""
try:
for item in improved_answers:
feedback_id = item.get("feedback_id")
interaction_id = item.get("interaction_id")
if not isinstance(feedback_id, type(feedback_id)) or not isinstance(
interaction_id, type(interaction_id)
):
return False
return True
except Exception:
return False


async def _generate_enrichment_report(
question: str, improved_answer: str, new_context: str, report_prompt_location: str
) -> str:
Expand All @@ -65,80 +48,37 @@ async def _generate_enrichment_report(
return f"Educational content for: {question} - {improved_answer}"


async def _create_enrichment_datapoint(
improved_answer_item: Dict,
report_text: str,
nodeset: NodeSet,
) -> Optional[FeedbackEnrichment]:
"""Create a single FeedbackEnrichment DataPoint with proper ID and nodeset assignment."""
try:
question = improved_answer_item["question"]
improved_answer = improved_answer_item["improved_answer"]

enrichment = FeedbackEnrichment(
id=str(uuid5(NAMESPACE_OID, f"{question}_{improved_answer}")),
text=report_text,
question=question,
original_answer=improved_answer_item["answer"], # Use "answer" field
improved_answer=improved_answer,
feedback_id=improved_answer_item["feedback_id"],
interaction_id=improved_answer_item["interaction_id"],
belongs_to_set=[nodeset],
)

return enrichment
except Exception as exc:
logger.error(
"Failed to create enrichment datapoint",
error=str(exc),
question=improved_answer_item.get("question"),
)
return None


async def create_enrichments(
improved_answers: List[Dict],
enrichments: List[FeedbackEnrichment],
report_prompt_location: str = "feedback_report_prompt.txt",
) -> List[FeedbackEnrichment]:
"""Create FeedbackEnrichment DataPoint instances from improved answers."""
if not improved_answers:
logger.info("No improved answers provided; returning empty list")
"""Fill text and belongs_to_set fields of existing FeedbackEnrichment DataPoints."""
if not enrichments:
logger.info("No enrichments provided; returning empty list")
return []

if not _validate_improved_answers(improved_answers):
if not _validate_enrichments(enrichments):
logger.error("Input validation failed; missing required fields")
return []

if not _validate_uuid_fields(improved_answers):
logger.error("UUID validation failed; invalid feedback_id or interaction_id")
return []

logger.info("Creating enrichments", count=len(improved_answers))
logger.info("Completing enrichments", count=len(enrichments))

# Create nodeset once for all enrichments
nodeset = NodeSet(id=uuid5(NAMESPACE_OID, name="FeedbackEnrichment"), name="FeedbackEnrichment")

enrichments: List[FeedbackEnrichment] = []

for improved_answer_item in improved_answers:
question = improved_answer_item["question"]
improved_answer = improved_answer_item["improved_answer"]
new_context = improved_answer_item["new_context"]
completed_enrichments: List[FeedbackEnrichment] = []

for enrichment in enrichments:
report_text = await _generate_enrichment_report(
question, improved_answer, new_context, report_prompt_location
enrichment.question,
enrichment.improved_answer,
enrichment.new_context,
report_prompt_location,
)

enrichment = await _create_enrichment_datapoint(improved_answer_item, report_text, nodeset)
enrichment.text = report_text
enrichment.belongs_to_set = [nodeset]

if enrichment:
enrichments.append(enrichment)
else:
logger.warning(
"Failed to create enrichment",
question=question,
interaction_id=improved_answer_item.get("interaction_id"),
)
completed_enrichments.append(enrichment)

logger.info("Created enrichments", successful=len(enrichments))
return enrichments
logger.info("Completed enrichments", successful=len(completed_enrichments))
return completed_enrichments
68 changes: 36 additions & 32 deletions cognee/tasks/feedback/extract_feedback_interactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
from cognee.infrastructure.llm.prompts.read_query_prompt import read_query_prompt
from cognee.shared.logging_utils import get_logger
from cognee.infrastructure.databases.graph import get_graph_engine
from uuid import uuid5, NAMESPACE_OID

from .utils import filter_negative_feedback
from .models import FeedbackEnrichment


logger = get_logger("extract_feedback_interactions")
Expand Down Expand Up @@ -49,11 +51,8 @@ def _match_feedback_nodes_to_interactions_by_edges(
feedback_nodes: List, interaction_nodes: List, graph_edges: List
) -> List[Tuple[Tuple, Tuple]]:
"""Match feedback to interactions using gives_feedback_to edges."""
# Build single lookup maps using normalized Cognee IDs
interaction_by_id = {node_id: (node_id, props) for node_id, props in interaction_nodes}
feedback_by_id = {node_id: (node_id, props) for node_id, props in feedback_nodes}

# Filter to only gives_feedback_to edges
feedback_edges = [
(source_id, target_id)
for source_id, target_id, rel, _ in graph_edges
Expand Down Expand Up @@ -103,23 +102,22 @@ async def _generate_human_readable_context_summary(
return raw_context_text or ""


def _has_required_feedback_fields(record: Dict) -> bool:
"""Validate required fields exist in the item dict."""
required_fields = [
"question",
"answer",
"context",
"feedback_text",
"feedback_id",
"interaction_id",
]
return all(record.get(field_name) is not None for field_name in required_fields)
def _has_required_feedback_fields(enrichment: FeedbackEnrichment) -> bool:
"""Validate required fields exist in the FeedbackEnrichment DataPoint."""
return (
enrichment.question is not None
and enrichment.original_answer is not None
and enrichment.context is not None
and enrichment.feedback_text is not None
and enrichment.feedback_id is not None
and enrichment.interaction_id is not None
)


async def _build_feedback_interaction_record(
feedback_node_id: str, feedback_props: Dict, interaction_node_id: str, interaction_props: Dict
) -> Optional[Dict]:
"""Build a single feedback-interaction record with context summary."""
) -> Optional[FeedbackEnrichment]:
"""Build a single FeedbackEnrichment DataPoint with context summary."""
try:
question_text = interaction_props.get("question")
original_answer_text = interaction_props.get("answer")
Expand All @@ -130,17 +128,23 @@ async def _build_feedback_interaction_record(
question_text or "", raw_context_text
)

feedback_interaction_record = {
"question": question_text,
"answer": original_answer_text,
"context": context_summary_text,
"feedback_text": feedback_text,
"feedback_id": UUID(str(feedback_node_id)),
"interaction_id": UUID(str(interaction_node_id)),
}

if _has_required_feedback_fields(feedback_interaction_record):
return feedback_interaction_record
enrichment = FeedbackEnrichment(
id=str(uuid5(NAMESPACE_OID, f"{question_text}_{interaction_node_id}")),
text="",
question=question_text,
original_answer=original_answer_text,
improved_answer="",
feedback_id=UUID(str(feedback_node_id)),
interaction_id=UUID(str(interaction_node_id)),
belongs_to_set=None,
context=context_summary_text,
feedback_text=feedback_text,
new_context="",
explanation="",
)

if _has_required_feedback_fields(enrichment):
return enrichment
else:
logger.warning("Skipping invalid feedback item", interaction=str(interaction_node_id))
return None
Expand All @@ -151,9 +155,9 @@ async def _build_feedback_interaction_record(

async def _build_feedback_interaction_records(
matched_feedback_interaction_pairs: List[Tuple[Tuple, Tuple]],
) -> List[Dict]:
"""Build all feedback-interaction records from matched pairs."""
feedback_interaction_records: List[Dict] = []
) -> List[FeedbackEnrichment]:
"""Build all FeedbackEnrichment DataPoints from matched pairs."""
feedback_interaction_records: List[FeedbackEnrichment] = []
for (feedback_node_id, feedback_props), (
interaction_node_id,
interaction_props,
Expand All @@ -168,8 +172,8 @@ async def _build_feedback_interaction_records(

async def extract_feedback_interactions(
subgraphs: List, last_n: Optional[int] = None
) -> List[Dict]:
"""Extract negative feedback-interaction pairs; fetch internally and use last_n param for limiting."""
) -> List[FeedbackEnrichment]:
"""Extract negative feedback-interaction pairs and create FeedbackEnrichment DataPoints."""
graph_nodes, graph_edges = await _fetch_feedback_and_interaction_graph_data()
if not graph_nodes:
return []
Expand Down
Loading