Skip to content

Commit bde7855

Browse files
committed
nsfs refactor file_reader and file_writer
Signed-off-by: Guy Margalit <[email protected]>
1 parent 2017b80 commit bde7855

File tree

12 files changed

+686
-226
lines changed

12 files changed

+686
-226
lines changed

config.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,7 @@ config.NSFS_WARN_THRESHOLD_MS = 100;
827827
config.NSFS_CALCULATE_MD5 = false;
828828
config.NSFS_TRIGGER_FSYNC = true;
829829
config.NSFS_CHECK_BUCKET_BOUNDARIES = true;
830+
config.NSFS_CHECK_BUCKET_PATH_EXISTS = true;
830831
config.NSFS_REMOVE_PARTS_ON_COMPLETE = true;
831832

832833
config.NSFS_BUF_POOL_WARNING_TIMEOUT = 2 * 60 * 1000;
@@ -1009,7 +1010,8 @@ config.NSFS_GLACIER_RESERVED_BUCKET_TAGS = {};
10091010
// anonymous account name
10101011
config.ANONYMOUS_ACCOUNT_NAME = 'anonymous';
10111012

1012-
config.NFSF_UPLOAD_STREAM_MEM_THRESHOLD = 8 * 1024 * 1024;
1013+
config.NSFS_UPLOAD_STREAM_MEM_THRESHOLD = 8 * 1024 * 1024;
1014+
config.NSFS_DOWNLOAD_STREAM_MEM_THRESHOLD = 8 * 1024 * 1024;
10131015

10141016
// we want to change our handling related to EACCESS error
10151017
config.NSFS_LIST_IGNORE_ENTRY_ON_EACCES = true;

src/sdk/bucketspace_fs.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ class BucketSpaceFS extends BucketSpaceSimpleFS {
433433
async get_bucket_lifecycle_configuration_rules(params) {
434434
try {
435435
const { name } = params;
436-
dbg.log0('BucketSpaceFS.get_bucket_lifecycle_configuration_rules: Bucket name', name);
436+
dbg.log1('BucketSpaceFS.get_bucket_lifecycle_configuration_rules: Bucket name', name);
437437
const bucket = await this.config_fs.get_bucket_by_name(name);
438438
return bucket.lifecycle_configuration_rules || [];
439439
} catch (error) {

src/sdk/namespace_fs.js

Lines changed: 83 additions & 161 deletions
Large diffs are not rendered by default.

src/sdk/object_io.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ class ObjectIO {
435435
];
436436

437437
await stream_utils.pipeline(transforms);
438-
await stream_utils.wait_finished(uploader);
438+
await stream.promises.finished(uploader); // probably redundant and pipeline already does it
439439

440440
if (splitter.md5) complete_params.md5_b64 = splitter.md5.toString('base64');
441441
if (splitter.sha256) complete_params.sha256_b64 = splitter.sha256.toString('base64');
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/* Copyright (C) 2020 NooBaa */
2+
'use strict';
3+
4+
const fs = require('fs');
5+
const path = require('path');
6+
const assert = require('assert');
7+
const buffer_utils = require('../../../util/buffer_utils');
8+
const native_fs_utils = require('../../../util/native_fs_utils');
9+
const { FileReader } = require('../../../util/file_reader');
10+
const { multi_buffer_pool } = require('../../../sdk/namespace_fs');
11+
12+
const fs_context = {};
13+
14+
describe('FileReader', () => {
15+
16+
const test_files = fs.readdirSync(__dirname).map(file => path.join(__dirname, file));
17+
18+
/**
19+
* @param {(file_path: string, start?: number, end?: number) => void} tester
20+
*/
21+
function describe_read_cases(tester) {
22+
describe('list files and read entire', () => {
23+
for (const file_path of test_files) {
24+
tester(file_path);
25+
}
26+
});
27+
describe('skip start cases', () => {
28+
tester(__filename, 1, Infinity);
29+
tester(__filename, 3, Infinity);
30+
tester(__filename, 11, Infinity);
31+
tester(__filename, 1023, Infinity);
32+
tester(__filename, 1024, Infinity);
33+
tester(__filename, 1025, Infinity);
34+
});
35+
describe('edge cases', () => {
36+
tester(__filename, 0, 1);
37+
tester(__filename, 0, 2);
38+
tester(__filename, 0, 3);
39+
tester(__filename, 1, 2);
40+
tester(__filename, 1, 3);
41+
tester(__filename, 2, 3);
42+
tester(__filename, 0, 1023);
43+
tester(__filename, 0, 1024);
44+
tester(__filename, 0, 1025);
45+
tester(__filename, 1, 1023);
46+
tester(__filename, 1, 1024);
47+
tester(__filename, 1, 1025);
48+
tester(__filename, 1023, 1024);
49+
tester(__filename, 1023, 1025);
50+
tester(__filename, 1024, 1025);
51+
tester(__filename, 123, 345);
52+
tester(__filename, 1000000000, Infinity);
53+
});
54+
}
55+
56+
describe('as stream.Readable', () => {
57+
58+
describe_read_cases(tester);
59+
60+
function tester(file_path, start = 0, end = Infinity) {
61+
const basename = path.basename(file_path);
62+
it(`test read ${start}-${end} ${basename}`, async () => {
63+
await native_fs_utils.use_file({
64+
fs_context,
65+
bucket_path: file_path,
66+
open_path: file_path,
67+
scope: async file => {
68+
const stat = await file.stat(fs_context);
69+
const aborter = new AbortController();
70+
const signal = aborter.signal;
71+
const file_reader = new FileReader({
72+
fs_context,
73+
file,
74+
file_path,
75+
stat,
76+
start,
77+
end,
78+
signal,
79+
multi_buffer_pool,
80+
highWaterMark: 1024, // bytes
81+
});
82+
const data = await buffer_utils.read_stream_join(file_reader);
83+
const node_fs_stream = fs.createReadStream(file_path, { start, end: end > 0 ? end - 1 : 0 });
84+
const node_fs_data = await buffer_utils.read_stream_join(node_fs_stream);
85+
assert.strictEqual(data.length, node_fs_data.length);
86+
assert.strictEqual(data.toString(), node_fs_data.toString());
87+
}
88+
});
89+
});
90+
}
91+
});
92+
93+
describe('read_into_stream with buffer pooling', () => {
94+
95+
describe_read_cases(tester);
96+
97+
function tester(file_path, start = 0, end = Infinity) {
98+
const basename = path.basename(file_path);
99+
it(`test read ${start}-${end} ${basename}`, async () => {
100+
await native_fs_utils.use_file({
101+
fs_context,
102+
bucket_path: file_path,
103+
open_path: file_path,
104+
scope: async file => {
105+
const stat = await file.stat(fs_context);
106+
const aborter = new AbortController();
107+
const signal = aborter.signal;
108+
const file_reader = new FileReader({
109+
fs_context,
110+
file,
111+
file_path,
112+
stat,
113+
start,
114+
end,
115+
signal,
116+
multi_buffer_pool,
117+
highWaterMark: 1024, // bytes
118+
});
119+
const writable = buffer_utils.write_stream();
120+
await file_reader.read_into_stream(writable);
121+
const data = writable.join();
122+
const node_fs_stream = fs.createReadStream(file_path, { start, end: end > 0 ? end - 1 : 0 });
123+
const node_fs_data = await buffer_utils.read_stream_join(node_fs_stream);
124+
assert.strictEqual(data.length, node_fs_data.length);
125+
assert.strictEqual(data.toString(), node_fs_data.toString());
126+
}
127+
});
128+
});
129+
}
130+
131+
});
132+
133+
134+
describe('abort during read_into_stream', () => {
135+
136+
describe_read_cases(tester);
137+
138+
function tester(file_path, start = 0, end = Infinity) {
139+
const basename = path.basename(file_path);
140+
it(`test abort read ${start}-${end} ${basename}`, async () => {
141+
await native_fs_utils.use_file({
142+
fs_context,
143+
bucket_path: file_path,
144+
open_path: file_path,
145+
scope: async file => {
146+
const stat = await file.stat(fs_context);
147+
const aborter = new AbortController();
148+
const signal = aborter.signal;
149+
const file_reader = new FileReader({
150+
fs_context,
151+
file,
152+
file_path,
153+
stat,
154+
start,
155+
end,
156+
signal,
157+
multi_buffer_pool,
158+
highWaterMark: 1024, // bytes
159+
});
160+
const writable = buffer_utils.write_stream();
161+
const promise = file_reader.read_into_stream(writable);
162+
setImmediate(() => aborter.abort()); // abort quickly
163+
await assert.rejects(promise, { name: 'AbortError' });
164+
}
165+
});
166+
});
167+
}
168+
169+
});
170+
171+
});

src/test/unrelated/for_await_stream.js

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
/* Copyright (C) 2020 NooBaa */
22
'use strict';
33

4-
const util = require('util');
54
const events = require('events');
65
const stream = require('stream');
76
const argv = require('minimist')(process.argv);
8-
const stream_finished = util.promisify(stream.finished);
97

108
async function test(mode) {
119
try {
@@ -68,7 +66,7 @@ async function test(mode) {
6866

6967
}
7068

71-
await stream_finished(output);
69+
await stream.promises.finished(output);
7270
console.log(`${mode}: done.`);
7371

7472
} catch (err) {

src/test/unrelated/stream_pipe_to_multiple_targets.js

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
/* Copyright (C) 2016 NooBaa */
22
'use strict';
33

4-
const util = require('util');
54
const assert = require('assert');
65
const stream = require('stream');
76
const crypto = require('crypto');
@@ -81,8 +80,6 @@ class WriteTarget extends stream.Writable {
8180
}
8281
}
8382

84-
const wait_finished = util.promisify(stream.finished);
85-
8683
async function main() {
8784
try {
8885
console.log('main: starting ...');
@@ -145,8 +142,8 @@ async function main() {
145142
}
146143

147144
await Promise.allSettled([
148-
wait_finished(hub),
149-
wait_finished(cache),
145+
stream.promises.finished(hub),
146+
stream.promises.finished(cache),
150147
]);
151148

152149
if (cache.destroyed) {

src/tools/file_writer_hashing.js

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ const assert = require('assert');
66
const FileWriter = require('../util/file_writer');
77
const config = require('../../config');
88
const nb_native = require('../util/nb_native');
9-
const stream_utils = require('../util/stream_utils');
109
const P = require('../util/promise');
1110
const stream = require('stream');
1211
const fs = require('fs');
@@ -72,12 +71,11 @@ async function hash_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX)
7271
}());
7372
const target = new TargetHash();
7473
const file_writer = new FileWriter({
75-
target_file: target,
74+
target_file: /**@type {any}*/ (target),
7675
fs_context: DEFAULT_FS_CONFIG,
7776
namespace_resource_id: 'MajesticSloth'
7877
});
79-
await stream_utils.pipeline([source_stream, file_writer]);
80-
await stream_utils.wait_finished(file_writer);
78+
await file_writer.write_entire_stream(source_stream);
8179
const write_hash = target.digest();
8280
console.log(
8381
'Hash target',
@@ -95,7 +93,7 @@ async function hash_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX)
9593

9694
async function file_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX) {
9795
config.NSFS_DEFAULT_IOV_MAX = iov_max;
98-
fs.mkdirSync(F_PREFIX);
96+
fs.mkdirSync(F_PREFIX, { recursive: true });
9997
await P.map_with_concurrency(CONCURRENCY, Array(parts).fill(), async () => {
10098
let target_file;
10199
const data = crypto.randomBytes(PART_SIZE);
@@ -114,8 +112,7 @@ async function file_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX)
114112
fs_context: DEFAULT_FS_CONFIG,
115113
namespace_resource_id: 'MajesticSloth'
116114
});
117-
await stream_utils.pipeline([source_stream, file_writer]);
118-
await stream_utils.wait_finished(file_writer);
115+
await file_writer.write_entire_stream(source_stream);
119116
if (XATTR) {
120117
await target_file.replacexattr(
121118
DEFAULT_FS_CONFIG,

0 commit comments

Comments
 (0)