This repository was archived by the owner on Oct 25, 2024. It is now read-only.
forked from gajus/slonik
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathQueryStream.ts
More file actions
124 lines (95 loc) · 2.87 KB
/
QueryStream.ts
File metadata and controls
124 lines (95 loc) · 2.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
/* eslint-disable canonical/id-match */
import type {
ReadableOptions,
} from 'stream';
import {
Readable,
} from 'stream';
import type {
QueryResult,
} from 'pg';
import Cursor from 'pg-cursor';
/**
* @see https://github.com/brianc/node-pg-query-stream
* @see https://github.com/brianc/node-pg-query-stream/issues/51
*/
export class QueryStream extends Readable {
public _reading: boolean;
public _closed: boolean;
public _result: unknown;
public cursor: typeof Cursor;
public batchSize: number;
public handleRowDescription: Function;
public handlePortalSuspended: Function;
public handleDataRow: Function;
public handleCommandComplete: Function;
public handleReadyForQuery: Function;
public handleError: Function;
public constructor (text: unknown, values: unknown, options?: ReadableOptions & {batchSize?: number, }) {
super({
objectMode: true,
...options,
});
this.cursor = new Cursor(text, values);
this._reading = false;
this._closed = false;
this.batchSize = options?.batchSize ?? 100;
// delegate Submittable callbacks to cursor
this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor);
this.handleDataRow = this.cursor.handleDataRow.bind(this.cursor);
this.handlePortalSuspended = this.cursor.handlePortalSuspended.bind(this.cursor);
this.handleCommandComplete = this.cursor.handleCommandComplete.bind(this.cursor);
this.handleReadyForQuery = this.cursor.handleReadyForQuery.bind(this.cursor);
this.handleError = this.cursor.handleError.bind(this.cursor);
// pg client sets types via _result property
this._result = this.cursor._result;
}
public submit (connection: Object) {
this.cursor.submit(connection);
}
public close (callback: Function) {
this._closed = true;
// eslint-disable-next-line unicorn/consistent-function-scoping
const close = () => {
this.emit('close');
};
this.cursor.close(callback || close);
}
public _read (size: number) {
if (this._reading || this._closed) {
return;
}
this._reading = true;
const readAmount = Math.max(size, this.batchSize);
this.cursor.read(readAmount, (error: Error, rows: unknown[], result: QueryResult) => {
if (this._closed) {
return;
}
if (error) {
this.emit('error', error);
return;
}
if (!rows.length) {
this._closed = true;
setImmediate(() => {
this.emit('close');
});
this.push(null);
return;
}
// push each row into the stream
this._reading = false;
for (const row of rows) {
this.push({
fields: (result.fields || []).map((field) => {
return {
dataTypeId: field.dataTypeID,
name: field.name,
};
}),
row,
});
}
});
}
}