-
Notifications
You must be signed in to change notification settings - Fork 2
ENG-976 and ENG-973 Do not re-embed a node with an existing embedding. #503
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
ENG-976 and ENG-973 Do not re-embed a node with an existing embedding. #503
Conversation
Updates to Preview Branch (eng-976-do-not-re-embed-a-node-with-an-existing-embedding) ↗︎
Tasks are run on every commit but only new migration files are pushed.
View logs for this Workflow Run ↗︎. |
@CodeRabbit review |
✅ Actions performedReview triggered.
|
📝 WalkthroughWalkthroughRefactored the embedding and node upsert pipeline to use batch processing, centralizing batching logic into helper functions. Introduced a database trigger to automatically delete outdated embeddings when Content text changes. Tightened type safety by making previously nullable fields in database type definitions non-nullable. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant syncDgNodesToSupabase
participant uploadNodesInBatches
participant addMissingEmbeddings
participant Supabase as Supabase/RPC
Caller->>syncDgNodesToSupabase: sync nodes
syncDgNodesToSupabase->>syncDgNodesToSupabase: chunk nodes
syncDgNodesToSupabase->>uploadNodesInBatches: batched upserts
loop for each batch
uploadNodesInBatches->>Supabase: RPC upsert batch
Supabase-->>uploadNodesInBatches: success/failure
end
uploadNodesInBatches-->>syncDgNodesToSupabase: uploaded nodes (with failures logged)
syncDgNodesToSupabase->>addMissingEmbeddings: populate embeddings for uploaded
addMissingEmbeddings-->>syncDgNodesToSupabase: nodes with embeddings
syncDgNodesToSupabase-->>Caller: combined success/failure report
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~30 minutes The changes span multiple domains (Roam utilities, database types, migrations) with heterogeneous modifications. The embedding pipeline refactor introduces new batching logic requiring verification of correctness and error handling. The database type narrowing (nullable → non-nullable) has broad contract implications across the codebase. The trigger logic is straightforward but adds new database-level semantics. Overall complexity is moderate but requires attention to cross-component interactions. Possibly related PRs
Pre-merge checks✅ Passed checks (3 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (5)
packages/database/supabase/schemas/embedding.sql (1)
128-140
: Trigger deletes embeddings on text change — good, but consider multiple modelsThe trigger correctly guards on OLD.text != NEW.text and uses a fully-qualified table with search_path=''. If you ever add more embedding tables (512/large), this only cleans one table. Consider a generic function that deletes from all embedding tables for NEW.id.
Example pattern:-- inside the trigger function PERFORM 1; DELETE FROM public."ContentEmbedding_openai_text_embedding_3_small_1536" WHERE target_id = NEW.id; -- DELETE FROM other embedding tables as needed...Or drive from EmbeddingName via a registry table.
apps/roam/src/utils/syncDgNodesToSupabase.ts (2)
261-267
: Guard chunk inputsAdd fast-path for empty arrays and validate size > 0 to prevent accidental infinite loops if size were ever zeroed.
const chunk = <T>(array: T[], size: number): T[][] => { + if (!array.length) return []; + if (size <= 0) throw new Error(`chunk size must be > 0 (got ${size})`); const chunks: T[][] = []; for (let i = 0; i < array.length; i += size) { chunks.push(array.slice(i, i + size)); } return chunks; };
269-303
: Batch upsert control flow — add transient error retriesOne failing batch stops the loop. Consider a small bounded retry with exponential backoff for 429/5xx to avoid aborting on transient errors.
- const { error } = await supabase.rpc("upsert_content", { + let error: unknown = null; + for (let attempt = 0; attempt < 3; attempt++) { + ({ error } = await supabase.rpc("upsert_content", { data: batch as Json, v_space_id, v_creator_id, content_as_document, - }); + })); + if (!error) break; + const delay = 250 * Math.pow(2, attempt); + await new Promise((r) => setTimeout(r, delay)); + }If helpful I can wire a reusable withRetry utility.
packages/database/supabase/migrations/20251017151558_update_content_text_delete_embeddings.sql (1)
1-16
: Trigger migration mirrors schema — avoid driftFunction/trigger logic is sound. It duplicates the definition in schemas/embedding.sql; ensure any future edits change both or centralize in migrations only. Consider testing that only text changes cause deletes.
apps/roam/src/utils/upsertNodesAsContentWithEmbeddings.ts (1)
41-47
: Generic return shape with embedding_inline — OK; add small resiliencyImplementation is clear. Consider a tiny retry/backoff around fetch for 429/5xx as noted in the TODO, and set a per-request timeout to avoid hanging.
Option sketch:
- const response = await fetch(nextApiRoot() + "/embeddings/openai/small", { + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), 20000); + const response = await fetch(nextApiRoot() + "/embeddings/openai/small", { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ input: batch }), - }); + signal: controller.signal, + }).finally(() => clearTimeout(timeout));
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
apps/roam/src/utils/syncDgNodesToSupabase.ts
(1 hunks)apps/roam/src/utils/upsertNodesAsContentWithEmbeddings.ts
(2 hunks)packages/database/src/dbTypes.ts
(2 hunks)packages/database/supabase/migrations/20251017151558_update_content_text_delete_embeddings.sql
(1 hunks)packages/database/supabase/schemas/embedding.sql
(2 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-05-30T14:49:24.016Z
Learnt from: maparent
PR: DiscourseGraphs/discourse-graph#182
File: apps/website/app/utils/supabase/dbUtils.ts:22-28
Timestamp: 2025-05-30T14:49:24.016Z
Learning: In apps/website/app/utils/supabase/dbUtils.ts, expanding the KNOWN_EMBEDDINGS and DEFAULT_DIMENSIONS mappings to support additional embedding models requires corresponding database model changes (creating new embedding tables), which should be scoped as separate work from API route implementations.
Applied to files:
apps/roam/src/utils/upsertNodesAsContentWithEmbeddings.ts
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Supabase Preview
🔇 Additional comments (7)
packages/database/supabase/schemas/embedding.sql (3)
33-47
: View formatting only — OKNo semantic changes detected to projection or filters. Safe to leave as-is.
49-50
: JOIN/WHERE whitespace changes — OKReadability improved; behavior unchanged.
142-143
: Trigger timing/scope — OKAFTER UPDATE, FOR EACH ROW is appropriate; condition in body limits work to text changes. Keep.
apps/roam/src/utils/syncDgNodesToSupabase.ts (1)
374-389
: New batch upload path + post-fill embeddings — OK with caveats aboveFlow looks good; logs are helpful. Address the idempotent insert and obsolete handling in addMissingEmbeddings for full correctness.
apps/roam/src/utils/upsertNodesAsContentWithEmbeddings.ts (1)
6-6
: Enums import — goodThis enables strong typing for embedding model names.
packages/database/src/dbTypes.ts (2)
1385-1400
: instances_of_schema typings — unchanged semantics, OKTilde lines reflect regeneration; no action needed.
1253-1266
: Type nullability change confirmed — verify query result consumptionThe database schema shows
concept_in_relations
now returns non-null fields (created, epistemic_status, id, is_schema, last_modified, literal_content, name, reference_content, refs, space_id), whereas the related functionconcepts_of_relation
keeps these as nullable — this confirms a breaking type change.However,
PConceptBase = Partial<Tables<"Concept">>
makes all fields optional in TypeScript regardless of database nullability, creating a type-system gap. The query construction atpackages/database/src/lib/queries.ts:153
and filtering at lines 181–193 suggest theconcept_in_relations
relation results are used, but the actual property access patterns on those relation result objects could not be located through pattern matching.Manual verification needed:
- Trace the complete flow where
relations
results from themy_concepts
query are consumed and their fields accessed- Confirm whether calling code relies on optional chaining (
?.
) or null coalescing (??
) for the now-required fields- Verify
PConceptBase = Partial<...>
type is intentional or should be updated to match the new non-null guarantees
export const addMissingEmbeddings = async ( | ||
supabase: DGSupabaseClient, | ||
context: SupabaseContext, | ||
) => { | ||
const response = await supabase | ||
.from("my_contents") | ||
.select( | ||
"id, text, ContentEmbedding_openai_text_embedding_3_small_1536(target_id)", | ||
) | ||
.eq("space_id", context.spaceId) | ||
.is("ContentEmbedding_openai_text_embedding_3_small_1536", null) | ||
.not("text", "is", null); | ||
if (response.error) { | ||
console.error(response.error); | ||
return 0; | ||
} | ||
// Tell TS about the non-null values | ||
const data = response.data as (Omit< | ||
(typeof response.data)[number], | ||
"text" | "id" | ||
> & { | ||
text: string; | ||
id: number; | ||
})[]; | ||
let successes = 0; | ||
const batches = chunk(data, BATCH_SIZE); | ||
for (let idx = 0; idx < batches.length; idx++) { | ||
const batch = batches[idx]; | ||
try { | ||
const nodesWithEmbeddings = await fetchEmbeddingsForNodes(batch); | ||
const embeddings = nodesWithEmbeddings.map( | ||
({ id, embedding_inline: { model, vector } }) => ({ | ||
target_id: id, | ||
model, | ||
vector: JSON.stringify(vector), | ||
}), | ||
); | ||
const result = await supabase | ||
.from("ContentEmbedding_openai_text_embedding_3_small_1536") | ||
.insert(embeddings); | ||
if (result.error) { | ||
console.error(result.error); | ||
break; | ||
} | ||
successes += batch.length; | ||
} catch (e) { | ||
console.error(e); | ||
break; | ||
} | ||
} | ||
if (successes < data.length) | ||
console.warn( | ||
`Tried sending content embeddings, ${successes}/${data.length} sent`, | ||
); | ||
else console.log(`Done sending content embeddings`); | ||
return successes; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Race on insert, “obsolete” handling, and vector casting
- Concurrency: SELECT…is(null) + INSERT can race; primary-key conflict will abort this and all remaining batches. Use an upsert with onConflict: 'target_id' to make it idempotent.
- Obsolete rows: Query treats any embedding row (including obsolete=true) as “exists,” so legacy obsolete embeddings will never be regenerated. Include obsolete=true in the “missing” condition.
- Vector casting: You send JSON.stringify(vector) into a pgvector column. Depending on driver/DB casting, this can fail or coerce incorrectly. Prefer server-side RPC upsert_content_embedding (accepts number[]) or send number[] if the client/REST casts to vector.
Proposed minimal fix (idempotent write + regenerate obsolete):
const response = await supabase
.from("my_contents")
.select(
- "id, text, ContentEmbedding_openai_text_embedding_3_small_1536(target_id)",
+ "id, text, ContentEmbedding_openai_text_embedding_3_small_1536(target_id, obsolete)",
)
.eq("space_id", context.spaceId)
- .is("ContentEmbedding_openai_text_embedding_3_small_1536", null)
+ .or(
+ 'ContentEmbedding_openai_text_embedding_3_small_1536.is.null,ContentEmbedding_openai_text_embedding_3_small_1536.obsolete.is.true'
+ )
.not("text", "is", null);
And make the write idempotent:
- const result = await supabase
- .from("ContentEmbedding_openai_text_embedding_3_small_1536")
- .insert(embeddings);
+ const result = await supabase
+ .from("ContentEmbedding_openai_text_embedding_3_small_1536")
+ .upsert(embeddings, { onConflict: "target_id" });
Alternative (safer casting and single-row idempotency) — call the RPC per item:
for (const { id, embedding_inline: { model, vector } } of nodesWithEmbeddings) {
const { error } = await supabase.rpc("upsert_content_embedding", {
content_id: id,
embedding_array: vector,
model,
});
if (error) { console.error(error); break; }
successes++;
}
Let me know which path you prefer and I can send a full patch.
🏁 Script executed:
#!/bin/bash
# Confirm no legacy obsolete=true rows block re-embedding
rg -n "obsolete\s*:\s*true" -g '!**/node_modules/**'
# Grep for existing uses of upsert_content_embedding to align behavior
rg -nP "upsert_content_embedding\s*\(" -S
Length of output: 1703
🏁 Script executed:
#!/bin/bash
# 1. Check ContentEmbedding table schema for obsolete column
rg -n "ContentEmbedding_openai_text_embedding_3_small_1536" packages/database/supabase/migrations packages/database/supabase/schemas -A 5 -B 2
# 2. Look for how embeddings are queried/used in Roam app
rg -n "ContentEmbedding_openai_text_embedding_3_small_1536" apps/roam -C 3
# 3. Check if addMissingEmbeddings is being called and how
rg -n "addMissingEmbeddings" apps/roam -C 2
Length of output: 28726
🏁 Script executed:
#!/bin/bash
# Confirm vector serialization in the function
sed -n '304,360p' apps/roam/src/utils/syncDgNodesToSupabase.ts
Length of output: 1635
Use upsert with obsolete regeneration and fix vector casting
The function has three substantiated issues that will cause incorrect behavior or data loss:
-
Race condition on insert: Direct
.insert()
without upsert means concurrent syncs or retries will fail on primary-key conflict (target_id), aborting all remaining batches. Use.upsert(embeddings, { onConflict: "target_id" })
to make it idempotent. -
Obsolete rows never re-generated: The query checks
.is("ContentEmbedding_openai_text_embedding_3_small_1536", null)
, but rows withobsolete=true
are not null—they exist in the relation. Legacy obsolete embeddings will never be regenerated. Update the query condition to includeobsolete=true
:.or( 'ContentEmbedding_openai_text_embedding_3_small_1536.is.null,ContentEmbedding_openai_text_embedding_3_small_1536.obsolete.is.true' )
And also select the
obsolete
field from the relation. -
Vector casting: Line 337 sends
JSON.stringify(vector)
to a pgvector column. Depending on driver behavior, this can fail or coerce incorrectly. The existing RPCupsert_content_embedding
expectsfloat[]
and handles server-side conversion. Either call the RPC per item (safer, aligns with existing DB layer), or verify the REST client properly casts stringified arrays to vector.
Use the RPC approach for consistency with the database layer and to avoid cast ambiguity:
for (const { id, embedding_inline: { model, vector } } of nodesWithEmbeddings) {
const { error } = await supabase.rpc("upsert_content_embedding", {
content_id: id,
embedding_array: vector,
model,
});
if (error) { console.error(error); break; }
successes++;
}
🤖 Prompt for AI Agents
In apps/roam/src/utils/syncDgNodesToSupabase.ts around lines 304-360, the
current logic inserts embeddings with .insert(), only selects rows where the
embedding relation is null (so obsolete=true rows are skipped), and passes
JSON.stringify(vector) to a pgvector column; fix by: 1) changing the initial
select to also retrieve the relation's obsolete flag and to treat rows for
regeneration when the relation is null OR relation.obsolete = true; 2) stop
using .insert() for batch embedding writes — either use .upsert(embeddings, {
onConflict: "target_id" }) to make writes idempotent or, preferably for
type-safety and existing DB handling, call the Postgres RPC per embedding
(upsert_content_embedding) sending the raw float[] and model; 3) remove
JSON.stringify(vector) and send the vector as a native array argument to the RPC
(or as properly typed array for upsert) and increment successes per successful
RPC call, handling and logging errors and breaking on failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Good point on the upsert.
- obsolete rows do not need to be regenerated.
- Supabase uses postgrest, which does not understand vectors, hence the string is the official workaround.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maparent, thank you for the clarifications!
-
Upsert fix: Great! Using
.upsert(embeddings, { onConflict: "target_id" })
will make the operation idempotent and prevent conflicts on concurrent syncs or retries. -
Obsolete rows: Understood—if obsolete embeddings don't need regeneration, the current filter is correct. I'll note this as intentional behavior.
-
Vector serialization: You're absolutely right about PostgREST's limitation with pgvector types. Using
JSON.stringify(vector)
is indeed the standard workaround when working with Supabase's REST client, since PostgREST doesn't natively support the vector type. The alternative RPC approach I suggested would only be beneficial if you already have such an RPC defined and want to centralize the logic, but for direct table access, the string approach is correct.
The only remaining change needed is switching from .insert()
to .upsert()
with the conflict resolution. Everything else looks good!
✏️ Learnings added
Learnt from: maparent
PR: DiscourseGraphs/discourse-graph#503
File: apps/roam/src/utils/syncDgNodesToSupabase.ts:304-360
Timestamp: 2025-10-18T12:05:23.793Z
Learning: When inserting pgvector embeddings via Supabase (PostgREST) in TypeScript, use JSON.stringify(vector) to serialize the number[] array to a string, as PostgREST does not natively support the pgvector type. This is the official workaround for working with vector columns through the REST API.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as for retry/backoff, I do not see the need because the sync function will soon be called repeatedly. (It may be reasonable to have an overall backoff.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maparent, that's a very reasonable design decision! Failing fast at the lower-level task and letting the higher-level task handle retries based on its own policy is a well-established pattern.
Your existing sync coordination mechanism already provides the overall backoff you mentioned:
- Periodic retry:
SYNC_INTERVAL = "45s"
ensures failed syncs are automatically retried - Task coordination:
proposeSyncTask
prevents concurrent syncs and implements the interval-based gating - Failure tracking:
endSyncTask
properly reports failures, which will trigger a retry on the next interval
This approach keeps the batch processing logic simple (break on error) while relying on the outer sync loop for resilience. The one change we agreed on—switching to .upsert()
with conflict resolution—will make each batch idempotent, so partial progress during a failed sync won't cause issues on the next retry.
The implementation looks good with that upsert change in place!
f831c7d
to
d47bde1
Compare
d47bde1
to
31aa1f1
Compare
https://linear.app/discourse-graphs/issue/ENG-976/do-not-re-embed-a-node-with-an-existing-embedding
Instead of recomputing embeddings of all the changed nodes (where text may not have changed), just resend all updated nodes. A database trigger will delete the embedding if the text was changed.
Then, look for nodes missing embeddings, and upsert those separately. This also handles ENG-973, i.e. makes the system more resilient if the embedding step failed.
Summary by CodeRabbit
Release Notes
New Features
Improvements