Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
parse strings, and create error queue
  • Loading branch information
dlarocque committed Aug 7, 2025
commit 70985379ddaa7e2aaa6293d9d186152a32b291a3
69 changes: 55 additions & 14 deletions packages/ai/src/platform/browser/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,28 +75,57 @@ export class BrowserWebSocketHandler implements WebSocketHandler {
}

const messageQueue: unknown[] = [];
const errorQueue: Error[] = [];
let resolvePromise: (() => void) | null = null;
let isClosed = false;

const messageListener = async (event: MessageEvent): Promise<void> => {
let data: string;
if (event.data instanceof Blob) {
try {
const obj = JSON.parse(await event.data.text()) as unknown;
messageQueue.push(obj);
if (resolvePromise) {
resolvePromise();
resolvePromise = null;
}
} catch (e) {
console.warn('Failed to parse WebSocket message to JSON:', e);
}
data = await event.data.text();
} else if (typeof event.data === 'string') {
data = event.data;
} else {
throw new AIError(
AIErrorCode.PARSE_FAILED,
`Failed to parse WebSocket response to JSON. ` +
`Expected data to be a Blob, but was ${typeof event.data}.`
errorQueue.push(
new AIError(
AIErrorCode.PARSE_FAILED,
`Failed to parse WebSocket response. Expected data to be a Blob or string, but was ${typeof event.data}.`
)
);
if (resolvePromise) {
resolvePromise();
resolvePromise = null;
}
return;
}

try {
const obj = JSON.parse(data) as unknown;
messageQueue.push(obj);
} catch (e) {
const err = e as Error;
errorQueue.push(
new AIError(
AIErrorCode.PARSE_FAILED,
`Error parsing WebSocket message to JSON: ${err.message}`
)
);
}

if (resolvePromise) {
resolvePromise();
resolvePromise = null;
}
};

const errorListener = (): void => {
errorQueue.push(
new AIError(AIErrorCode.FETCH_ERROR, 'WebSocket connection error.')
);
if (resolvePromise) {
resolvePromise();
resolvePromise = null;
}
};

const closeListener = (): void => {
Expand All @@ -108,12 +137,18 @@ export class BrowserWebSocketHandler implements WebSocketHandler {
// Clean up listeners to prevent memory leaks
this.ws?.removeEventListener('message', messageListener);
this.ws?.removeEventListener('close', closeListener);
this.ws?.removeEventListener('error', errorListener);
};

this.ws.addEventListener('message', messageListener);
this.ws.addEventListener('close', closeListener);
this.ws.addEventListener('error', errorListener);

while (!isClosed) {
if (errorQueue.length > 0) {
const error = errorQueue.shift()!;
throw error;
}
if (messageQueue.length > 0) {
yield messageQueue.shift()!;
} else {
Expand All @@ -122,6 +157,12 @@ export class BrowserWebSocketHandler implements WebSocketHandler {
});
}
}

// If the loop terminated because isClosed is true, check for any final errors
if (errorQueue.length > 0) {
const error = errorQueue.shift()!;
throw error;
}
}

close(code?: number, reason?: string): Promise<void> {
Expand Down
73 changes: 57 additions & 16 deletions packages/ai/src/platform/node/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,28 +92,57 @@ export class NodeWebSocketHandler implements WebSocketHandler {
}

const messageQueue: unknown[] = [];
const errorQueue: Error[] = [];
let resolvePromise: (() => void) | null = null;
let isClosed = false;

const messageListener = async (event: MessageEvent): Promise<void> => {
let data: string;
if (event.data instanceof Blob) {
try {
const obj = JSON.parse(await event.data.text()) as unknown;
messageQueue.push(obj);
if (resolvePromise) {
resolvePromise();
resolvePromise = null;
}
} catch (e) {
console.warn('Failed to parse WebSocket message to JSON:', e);
}
data = await event.data.text();
} else if (typeof event.data === 'string') {
data = event.data;
} else {
throw new AIError(
AIErrorCode.PARSE_FAILED,
`Failed to parse WebSocket response to JSON. ` +
`Expected data to be a Blob, but was ${typeof event.data}.`
errorQueue.push(
new AIError(
AIErrorCode.PARSE_FAILED,
`Failed to parse WebSocket response. Expected data to be a Blob or string, but was ${typeof event.data}.`
)
);
if (resolvePromise) {
resolvePromise();
resolvePromise = null;
}
return;
}

try {
const obj = JSON.parse(data) as unknown;
messageQueue.push(obj);
} catch (e) {
const err = e as Error;
errorQueue.push(
new AIError(
AIErrorCode.PARSE_FAILED,
`Error parsing WebSocket message to JSON: ${err.message}`
)
);
}

if (resolvePromise) {
resolvePromise();
resolvePromise = null;
}
};

const errorListener = (): void => {
errorQueue.push(
new AIError(AIErrorCode.FETCH_ERROR, 'WebSocket connection error.')
);
if (resolvePromise) {
resolvePromise();
resolvePromise = null;
}
};

const closeListener = (): void => {
Expand All @@ -122,15 +151,21 @@ export class NodeWebSocketHandler implements WebSocketHandler {
resolvePromise();
resolvePromise = null;
}
// Clean up listeners to prevent memory leaks
// Clean up listeners to prevent memory leaks.
this.ws?.removeEventListener('message', messageListener);
this.ws?.removeEventListener('close', closeListener);
this.ws?.removeEventListener('error', errorListener);
};

this.ws.addEventListener('message', messageListener);
this.ws.addEventListener('close', closeListener);
this.ws.addEventListener('error', errorListener);

while (!isClosed) {
if (errorQueue.length > 0) {
const error = errorQueue.shift()!;
throw error;
}
if (messageQueue.length > 0) {
yield messageQueue.shift()!;
} else {
Expand All @@ -139,6 +174,12 @@ export class NodeWebSocketHandler implements WebSocketHandler {
});
}
}

// If the loop terminated because isClosed is true, check for any final errors
if (errorQueue.length > 0) {
const error = errorQueue.shift()!;
throw error;
}
}

close(code?: number, reason?: string): Promise<void> {
Expand All @@ -148,7 +189,7 @@ export class NodeWebSocketHandler implements WebSocketHandler {
}

this.ws.addEventListener('close', () => resolve(), { once: true });
// Calling 'close' during these states results in an error.
// Calling 'close' during these states results in an error
if (
this.ws.readyState === WebSocket.CLOSED ||
this.ws.readyState === WebSocket.CONNECTING
Expand Down
Loading