Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions apps/roam/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import {
installDiscourseFloatingMenu,
removeDiscourseFloatingMenu,
} from "./components/DiscourseFloatingMenu";
import { initializeSupabaseSync } from "./utils/syncDgNodesToSupabase";
import {
initializeSupabaseSync,
setSyncActivity,
} from "./utils/syncDgNodesToSupabase";
import { initPluginTimer } from "./utils/pluginTimer";

const initPostHog = () => {
Expand Down Expand Up @@ -137,7 +140,7 @@ export default runExtension(async (onloadArgs) => {
},
listActiveQueries: () => listActiveQueries(extensionAPI),
isDiscourseNode: isDiscourseNode,
// @ts-ignore - we are still using roamjs-components global definition
// @ts-expect-error - we are still using roamjs-components global definition
getDiscourseNodes: getDiscourseNodes,
};

Expand All @@ -152,8 +155,9 @@ export default runExtension(async (onloadArgs) => {
],
observers: observers,
unload: () => {
setSyncActivity(false);
window.roamjs.extension?.smartblocks?.unregisterCommand("QUERYBUILDER");
// @ts-ignore - tldraw throws a warning on multiple loads
// @ts-expect-error - tldraw throws a warning on multiple loads
delete window[Symbol.for("__signia__")];
document.removeEventListener(
"roamjs:query-builder:action",
Expand Down
218 changes: 137 additions & 81 deletions apps/roam/src/utils/syncDgNodesToSupabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,33 @@ import { fetchEmbeddingsForNodes } from "./upsertNodesAsContentWithEmbeddings";
import { convertRoamNodeToLocalContent } from "./upsertNodesAsContentWithEmbeddings";
import { getRoamUrl } from "roamjs-components/dom";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused

import { render as renderToast } from "roamjs-components/components/Toast";
import type { DGSupabaseClient } from "@repo/database/lib/client";
import type { Json, CompositeTypes } from "@repo/database/dbTypes";
import { createClient, type DGSupabaseClient } from "@repo/database/lib/client";
import type { Json, CompositeTypes, Enums } from "@repo/database/dbTypes";

type LocalContentDataInput = Partial<CompositeTypes<"content_local_input">>;
type AccountLocalInput = CompositeTypes<"account_local_input">;
const { createClient } = require("@repo/database/lib/client");

const SYNC_FUNCTION = "embedding";
// Minimal interval between syncs of all clients for this task.
const SYNC_INTERVAL = "45s";
// Interval between syncs for each client individually
const BASE_SYNC_INTERVAL = 5 * 60 * 1000; // 5 minutes
const SYNC_TIMEOUT = "20s";
const BATCH_SIZE = 200;
const DEFAULT_TIME = "1970-01-01";
const DEFAULT_TIME = new Date("1970-01-01");

class FatalError extends Error {}

type SyncTaskInfo = {
lastUpdateTime: string | null;
spaceId: number;
worker: string;
lastUpdateTime?: Date;
nextUpdateTime?: Date;
shouldProceed: boolean;
};

export const endSyncTask = async (
worker: string,
status: "complete" | "failed",
status: Enums<"task_status">,
showToast: boolean = false,
): Promise<void> => {
try {
const supabaseClient = await getLoggedInClient();
Expand All @@ -60,13 +64,15 @@ export const endSyncTask = async (
});
if (error) {
console.error("endSyncTask: Error calling end_sync_task:", error);
renderToast({
id: "discourse-embedding-error",
content: "Failed to complete discourse node embeddings sync",
intent: "danger",
timeout: 5000,
});
} else {
if (showToast)
renderToast({
id: "discourse-embedding-error",
content: "Failed to complete discourse node embeddings sync",
intent: "danger",
timeout: 5000,
});
return;
} else if (showToast) {
if (status === "complete") {
renderToast({
id: "discourse-embedding-complete",
Expand All @@ -85,39 +91,23 @@ export const endSyncTask = async (
}
} catch (error) {
console.error("endSyncTask: Error calling end_sync_task:", error);
renderToast({
id: "discourse-embedding-error",
content: "Failed to complete discourse node embeddings sync",
intent: "danger",
timeout: 5000,
});
if (showToast)
renderToast({
id: "discourse-embedding-error",
content: "Failed to complete discourse node embeddings sync",
intent: "danger",
timeout: 5000,
});
}
};

export const proposeSyncTask = async (): Promise<SyncTaskInfo> => {
export const proposeSyncTask = async (
worker: string,
supabaseClient: DGSupabaseClient,
context: SupabaseContext,
): Promise<SyncTaskInfo> => {
try {
const supabaseClient = await getLoggedInClient();
const context = supabaseClient ? await getSupabaseContext() : null;
if (!context || !supabaseClient) {
console.error("proposeSyncTask: Unable to obtain Supabase context.");
return {
lastUpdateTime: null,
spaceId: 0,
worker: "",
shouldProceed: false,
};
}
const worker = window.roamAlphaAPI.user.uid();
if (!worker) {
console.error("proposeSyncTask: Unable to obtain user UID.");
return {
lastUpdateTime: null,
spaceId: 0,
worker: "",
shouldProceed: false,
};
}

const now = new Date();
const { data, error } = await supabaseClient.rpc("propose_sync_task", {
s_target: context.spaceId,
s_function: SYNC_FUNCTION,
Expand All @@ -126,36 +116,36 @@ export const proposeSyncTask = async (): Promise<SyncTaskInfo> => {
timeout: SYNC_TIMEOUT,
});

const { spaceId } = context;

if (error) {
console.error(
`proposeSyncTask: propose_sync_task failed – ${error.message}`,
);
return { lastUpdateTime: null, spaceId, worker, shouldProceed: false };
return { shouldProceed: false };
}

if (typeof data === "string") {
const timestamp = new Date(data);
const now = new Date();

if (timestamp > now) {
return { lastUpdateTime: null, spaceId, worker, shouldProceed: false };
return {
nextUpdateTime: timestamp,
shouldProceed: false,
};
} else {
return { lastUpdateTime: data, spaceId, worker, shouldProceed: true };
return {
lastUpdateTime: timestamp,
shouldProceed: true,
};
}
}

return { lastUpdateTime: null, spaceId, worker, shouldProceed: true };
return { shouldProceed: true };
} catch (error) {
console.error(
`proposeSyncTask: Unexpected error while contacting sync-task API:`,
error,
);
return {
lastUpdateTime: null,
spaceId: 0,
worker: "",
shouldProceed: false,
};
}
Expand Down Expand Up @@ -188,7 +178,6 @@ const upsertNodeSchemaToContent = async ({

]
`;
//@ts-ignore - backend to be added to roamjs-components
const result = (await window.roamAlphaAPI.data.async.q(
query,
nodeTypesUids,
Expand Down Expand Up @@ -369,30 +358,77 @@ const upsertUsers = async (
}
};

export const createOrUpdateDiscourseEmbedding = async () => {
const { shouldProceed, lastUpdateTime, worker } = await proposeSyncTask();

if (!shouldProceed) {
return;
let doSync = true;
let numFailures = 0;
const MAX_FAILURES = 5;
type TimeoutValue = ReturnType<typeof setTimeout>;
let activeTimeout: TimeoutValue | null = null;
// TODO: Maybe also pause sync while the window is not active?

export const setSyncActivity = (active: boolean) => {
doSync = active;
if (!active && activeTimeout !== null) {
clearTimeout(activeTimeout);
activeTimeout = null;
} else if (active && activeTimeout === null) {
activeTimeout = setTimeout(
// eslint-disable-next-line @typescript-eslint/no-misused-promises
createOrUpdateDiscourseEmbedding,
100,
);
}
};

export const createOrUpdateDiscourseEmbedding = async (showToast = false) => {
if (!doSync) return;
console.debug("starting createOrUpdateDiscourseEmbedding");
let success = true;
let claimed = false;
const worker = window.roamAlphaAPI.user.uid();

try {
if (!worker) {
throw new FatalError("Unable to obtain user UID.");
}
if (!createClient()) {
// not worth retrying
// TODO: Differentiate setup vs connetion error
throw new FatalError("Could not access supabase.");
}
const supabaseClient = await getLoggedInClient();
if (!supabaseClient) {
// TODO: Distinguish connection vs credentials error
throw new Error("Could not log in to client.");
}
const context = await getSupabaseContext();
if (!context) {
// not worth retrying: setup error
throw new FatalError("Error connecting to client.");
}
const { shouldProceed, lastUpdateTime, nextUpdateTime } =
await proposeSyncTask(worker, supabaseClient, context);
if (!shouldProceed) {
if (nextUpdateTime === undefined) {
throw new Error("Can't obtain sync task");
}
console.debug("postponed to ", nextUpdateTime);
if (doSync) {
activeTimeout = setTimeout(
createOrUpdateDiscourseEmbedding, // eslint-disable-line @typescript-eslint/no-misused-promises
Math.max(0, nextUpdateTime.valueOf() - Date.now()) + 100,
);
}
return;
}
claimed = true;
const allUsers = await getAllUsers();
const time = lastUpdateTime === null ? DEFAULT_TIME : lastUpdateTime;
const time = (lastUpdateTime || DEFAULT_TIME).toISOString();
const { allDgNodeTypes, dgNodeTypesWithSettings } = getDgNodeTypes();

const allNodeInstances = await getAllDiscourseNodesSince(
time,
dgNodeTypesWithSettings,
);
const supabaseClient = await getLoggedInClient();
if (!supabaseClient) return null;
const context = await getSupabaseContext();
if (!context) {
console.error("No Supabase context found.");
await endSyncTask(worker, "failed");
return;
}
await upsertUsers(allUsers, supabaseClient, context);
await upsertNodesToSupabaseAsContentWithEmbeddings(
allNodeInstances,
Expand All @@ -407,25 +443,45 @@ export const createOrUpdateDiscourseEmbedding = async () => {
context,
});
await cleanupOrphanedNodes(supabaseClient, context);
await endSyncTask(worker, "complete");
await endSyncTask(worker, "complete", showToast);
} catch (error) {
console.error("createOrUpdateDiscourseEmbedding: Process failed:", error);
await endSyncTask(worker, "failed");
throw error;
success = false;
if (worker && claimed) await endSyncTask(worker, "failed", showToast);
if (error instanceof FatalError) {
doSync = false;
return;
}
}
let timeout = BASE_SYNC_INTERVAL;
if (success) {
numFailures = 0;
} else {
numFailures += 1;
if (numFailures >= MAX_FAILURES) {
doSync = false;
return;
}
const jitter = 0.9 + Math.random() * 0.2; // 0.9x–1.1x
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this used for?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maparent I am curious about this as well

timeout *= 2 ** numFailures * jitter;
}
if (activeTimeout != null) {
clearTimeout(activeTimeout);
activeTimeout = null;
}
if (doSync) {
// eslint-disable-next-line @typescript-eslint/no-misused-promises
activeTimeout = setTimeout(createOrUpdateDiscourseEmbedding, timeout);
}
};

export const initializeSupabaseSync = async () => {
const supabase = createClient();
if (supabase === null) return;
const result = await supabase
.from("Space")
.select()
.eq("url", getRoamUrl())
.maybeSingle();
if (!result.data) {
return;
if (supabase === null) {
doSync = false;
} else {
createOrUpdateDiscourseEmbedding();
doSync = true;
// eslint-disable-next-line @typescript-eslint/no-misused-promises
activeTimeout = setTimeout(createOrUpdateDiscourseEmbedding, 100, true);
}
};