-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathwrite.ts
More file actions
56 lines (48 loc) · 1.67 KB
/
write.ts
File metadata and controls
56 lines (48 loc) · 1.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import { WriteError } from '../errors/errors.js';
import type { WriteStream, WriteRequest } from '../grpc/plugin.js';
import type { Table } from '../schema/table.js';
import { decodeTable, decodeRecord, getPrimaryKeys } from '../schema/table.js';
import type { DeleteStaleFunction } from './delete-stale.js';
import type { OverwriteFunction } from './overwrite.js';
export const createWrite = (
//eslint-disable-next-line @typescript-eslint/no-explicit-any
memoryDB: Record<string, any>,
tables: Record<string, Table>,
overwrite: OverwriteFunction,
deleteStale: DeleteStaleFunction,
) => {
return async (stream: WriteStream) => {
for await (const data of stream) {
const request = data as WriteRequest;
switch (request.message) {
case 'migrate_table': {
// Update table schema in the `tables` map
const table = decodeTable(request.migrate_table.table);
tables[table.name] = table;
break;
}
case 'insert': {
const [tableName, batches] = decodeRecord(request.insert.record);
if (!memoryDB[tableName]) {
memoryDB[tableName] = [];
}
const tableSchema = tables[tableName];
const pks = getPrimaryKeys(tableSchema);
for (const batch of batches) {
for (const record of batch) {
overwrite(tableSchema, pks, record);
}
}
break;
}
case 'delete': {
deleteStale(request.delete);
break;
}
default: {
throw new WriteError(`Unknown request message type`, { props: { message: request.message } });
}
}
}
};
};