Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 117 additions & 47 deletions apps/roam/src/utils/syncDgNodesToSupabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,67 +258,137 @@ export const convertDgToSupabaseConcepts = async ({
}
};

const chunk = <T>(array: T[], size: number): T[][] => {
if (array.length === 0) return [];
if (size <= 2) throw new Error(`chunk size must be > 1 (got ${size})`);
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
};

const uploadNodesInBatches = async ({
supabase,
context,
nodes,
content_as_document,
}: {
supabase: DGSupabaseClient;
context: SupabaseContext;
nodes: LocalContentDataInput[];
content_as_document: boolean;
}): Promise<number> => {
const v_space_id = context.spaceId;
const v_creator_id = context.userId;
const batches = chunk(nodes, BATCH_SIZE);
let successes = 0;

for (let idx = 0; idx < batches.length; idx++) {
const batch = batches[idx];

const { error } = await supabase.rpc("upsert_content", {
data: batch as Json,
v_space_id,
v_creator_id,
content_as_document,
});

if (error) {
console.error(`upsert_content failed for batch ${idx + 1}:`, error);
break;
}
successes += batch.length;
}
return successes;
};

export const addMissingEmbeddings = async (
supabase: DGSupabaseClient,
context: SupabaseContext,
) => {
const response = await supabase
.from("my_contents")
.select(
"id, text, emb:ContentEmbedding_openai_text_embedding_3_small_1536(target_id)",
)
.eq("space_id", context.spaceId)
.is("emb", null)
.not("text", "is", null);
if (response.error) {
console.error(response.error);
return 0;
}
// Tell TS about the non-null values
const data = response.data as (Omit<
(typeof response.data)[number],
"text" | "id"
> & {
text: string;
id: number;
})[];
let successes = 0;
const batches = chunk(data, BATCH_SIZE);
for (let idx = 0; idx < batches.length; idx++) {
const batch = batches[idx];
try {
const nodesWithEmbeddings = await fetchEmbeddingsForNodes(batch);
const embeddings = nodesWithEmbeddings.map(
({ id, embedding_inline: { model, vector } }) => ({
target_id: id,
model,
vector: JSON.stringify(vector),
}),
);
const result = await supabase
.from("ContentEmbedding_openai_text_embedding_3_small_1536")
.upsert(embeddings, { onConflict: "target_id" })
.select();
if (result.error) {
console.error(result.error);
break;
}
successes += batch.length;
} catch (e) {
console.error(e);
break;
}
}
if (successes < data.length)
console.warn(
`Tried sending content embeddings, ${successes}/${data.length} sent`,
);
else console.log(`Done sending content embeddings`);
return successes;
};

export const upsertNodesToSupabaseAsContentWithEmbeddings = async (
roamNodes: RoamDiscourseNodeData[],
supabaseClient: DGSupabaseClient,
context: SupabaseContext,
): Promise<void> => {
const { userId } = context;

if (roamNodes.length === 0) {
return;
}
const allNodeInstancesAsLocalContent = convertRoamNodeToLocalContent({
nodes: roamNodes,
});

let nodesWithEmbeddings: LocalContentDataInput[];
try {
nodesWithEmbeddings = await fetchEmbeddingsForNodes(
allNodeInstancesAsLocalContent,
const successes = await uploadNodesInBatches({
supabase: supabaseClient,
context,
nodes: allNodeInstancesAsLocalContent,
content_as_document: true,
});
if (successes < allNodeInstancesAsLocalContent.length)
console.warn(
`Tried sending content, ${successes}/${allNodeInstancesAsLocalContent.length} sent`,
);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.error(
`upsertNodesToSupabaseAsContentWithEmbeddings: Embedding service failed – ${message}`,
else
console.log(
`Done sending ${allNodeInstancesAsLocalContent.length} contents`,
);
return;
}

if (nodesWithEmbeddings.length !== allNodeInstancesAsLocalContent.length) {
console.error(
"upsertNodesToSupabaseAsContentWithEmbeddings: Mismatch between node and embedding counts.",
);
return;
}

const chunk = <T>(array: T[], size: number): T[][] => {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
};

const uploadBatches = async (batches: LocalContentDataInput[][]) => {
for (let idx = 0; idx < batches.length; idx++) {
const batch = batches[idx];

const { error } = await supabaseClient.rpc("upsert_content", {
data: batch as Json,
v_space_id: context.spaceId,
v_creator_id: userId,
content_as_document: true,
});

if (error) {
console.error(`upsert_content failed for batch ${idx + 1}:`, error);
throw error;
}
}
};

await uploadBatches(chunk(nodesWithEmbeddings, BATCH_SIZE));
await addMissingEmbeddings(supabaseClient, context);
};

const getDgNodeTypes = () => {
Expand Down
86 changes: 8 additions & 78 deletions apps/roam/src/utils/upsertNodesAsContentWithEmbeddings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { type RoamDiscourseNodeData } from "./getAllDiscourseNodesSince";
import { type SupabaseContext } from "./supabaseContext";
import { nextApiRoot } from "@repo/utils/execContext";
import type { DGSupabaseClient } from "@repo/database/lib/client";
import type { Json, CompositeTypes } from "@repo/database/dbTypes";
import type { Json, CompositeTypes, Enums } from "@repo/database/dbTypes";

type LocalContentDataInput = Partial<CompositeTypes<"content_local_input">>;

Expand Down Expand Up @@ -38,9 +38,13 @@ export const convertRoamNodeToLocalContent = ({
});
};

export const fetchEmbeddingsForNodes = async (
nodes: LocalContentDataInput[],
): Promise<LocalContentDataInput[]> => {
export const fetchEmbeddingsForNodes = async <T extends { text: string }>(
nodes: T[],
): Promise<
(T & {
embedding_inline: { model: Enums<"EmbeddingName">; vector: number[] };
})[]
> => {
const allEmbeddings: number[][] = [];
const allNodesTexts = nodes.map((node) => node.text || "");

Expand Down Expand Up @@ -98,77 +102,3 @@ export const fetchEmbeddingsForNodes = async (
},
}));
};

const uploadBatches = async (
batches: LocalContentDataInput[][],
supabaseClient: DGSupabaseClient,
context: SupabaseContext,
) => {
const { spaceId, userId } = context;
for (let idx = 0; idx < batches.length; idx++) {
const batch = batches[idx];
const { error } = await supabaseClient.rpc("upsert_content", {
data: batch as unknown as Json,
v_space_id: spaceId,
v_creator_id: userId,
content_as_document: true,
});

if (error) {
console.error(`upsert_content failed for batch ${idx + 1}:`, error);
throw error;
}
}
};

export const upsertNodesToSupabaseAsContentWithEmbeddings = async (
roamNodes: RoamDiscourseNodeData[],
supabaseClient: DGSupabaseClient,
context: SupabaseContext,
): Promise<void> => {
if (!context?.userId) {
console.error("No Supabase context found.");
return;
}

if (roamNodes.length === 0) {
return;
}
const localContentNodes = convertRoamNodeToLocalContent({
nodes: roamNodes,
});

let nodesWithEmbeddings: LocalContentDataInput[];
try {
nodesWithEmbeddings = await fetchEmbeddingsForNodes(localContentNodes);
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
console.error(
`upsertNodesToSupabaseAsContentWithEmbeddings: Embedding service failed – ${errorMessage}`,
);
return;
}

if (nodesWithEmbeddings.length !== roamNodes.length) {
console.error(
"upsertNodesToSupabaseAsContentWithEmbeddings: Mismatch between node and embedding counts.",
);
return;
}

const batchSize = 200;

const chunk = <T>(array: T[], size: number): T[][] => {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
};

await uploadBatches(
chunk(nodesWithEmbeddings, batchSize),
supabaseClient,
context,
);
};
44 changes: 22 additions & 22 deletions packages/database/src/dbTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1250,21 +1250,19 @@ export type Database = {
Returns: {
arity: number | null
author_id: number | null
created: string | null
created: string
description: string | null
epistemic_status:
| Database["public"]["Enums"]["EpistemicStatus"]
| null
id: number | null
is_schema: boolean | null
last_modified: string | null
literal_content: Json | null
name: string | null
reference_content: Json | null
refs: number[] | null
epistemic_status: Database["public"]["Enums"]["EpistemicStatus"]
id: number
is_schema: boolean
last_modified: string
literal_content: Json
name: string
reference_content: Json
refs: number[]
represented_by_id: number | null
schema_id: number | null
space_id: number | null
space_id: number
}[]
}
concept_in_space: {
Expand Down Expand Up @@ -1384,19 +1382,21 @@ export type Database = {
Returns: {
arity: number | null
author_id: number | null
created: string
created: string | null
description: string | null
epistemic_status: Database["public"]["Enums"]["EpistemicStatus"]
id: number
is_schema: boolean
last_modified: string
literal_content: Json
name: string
reference_content: Json
refs: number[]
epistemic_status:
| Database["public"]["Enums"]["EpistemicStatus"]
| null
id: number | null
is_schema: boolean | null
last_modified: string | null
literal_content: Json | null
name: string | null
reference_content: Json | null
refs: number[] | null
represented_by_id: number | null
schema_id: number | null
space_id: number
space_id: number | null
}[]
}
is_my_account: {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- on update content text trigger
CREATE OR REPLACE FUNCTION public.after_update_content_text_trigger ()
RETURNS TRIGGER
SET search_path = ''
LANGUAGE plpgsql
AS $$
BEGIN
IF OLD.text != NEW.text THEN
DELETE FROM public."ContentEmbedding_openai_text_embedding_3_small_1536" WHERE target_id = NEW.id;
END IF;
RETURN NEW;
END;
$$ ;

CREATE TRIGGER on_update_text_trigger AFTER UPDATE ON public."Content"
FOR EACH ROW EXECUTE FUNCTION public.after_update_content_text_trigger () ;
Loading