Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion config.js
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,7 @@ config.NSFS_WARN_THRESHOLD_MS = 100;
config.NSFS_CALCULATE_MD5 = false;
config.NSFS_TRIGGER_FSYNC = true;
config.NSFS_CHECK_BUCKET_BOUNDARIES = true;
config.NSFS_CHECK_BUCKET_PATH_EXISTS = true;
config.NSFS_REMOVE_PARTS_ON_COMPLETE = true;

config.NSFS_BUF_POOL_WARNING_TIMEOUT = 2 * 60 * 1000;
Expand Down Expand Up @@ -1009,7 +1010,8 @@ config.NSFS_GLACIER_RESERVED_BUCKET_TAGS = {};
// anonymous account name
config.ANONYMOUS_ACCOUNT_NAME = 'anonymous';

config.NFSF_UPLOAD_STREAM_MEM_THRESHOLD = 8 * 1024 * 1024;
config.NSFS_UPLOAD_STREAM_MEM_THRESHOLD = 8 * 1024 * 1024;
config.NSFS_DOWNLOAD_STREAM_MEM_THRESHOLD = 8 * 1024 * 1024;

// we want to change our handling related to EACCESS error
config.NSFS_LIST_IGNORE_ENTRY_ON_EACCES = true;
Expand Down
14 changes: 14 additions & 0 deletions docs/NooBaaNonContainerized/ConfigFileCustomizations.md
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,20 @@ Warning: After setting this configuration, NooBaa will skip schema validations a
3. systemctl restart noobaa
```

### 37. Trigger Check bucket path exists -
* <u>Key</u>: `NSFS_CHECK_BUCKET_PATH_EXISTS`
* <u>Type</u>: Boolean
* <u>Default</u>: true
* <u>Description</u>: Enable/Disable bucket path existance checks. This is EXPERIMENTAL and NOT recommended for production! When disabled, will reduce some latency on object operations, but calls to non existing bucket paths will result with unexpected behavior (e.g. could return NO_SUCH_OBJECT instead of NO_SUCH_BUCKET).
* <u>Steps</u>:
```
1. Open /path/to/config_dir/config.json file.
2. Set the config key -
Example:
"NSFS_CHECK_BUCKET_PATH_EXISTS": false
```


## Config.json File Examples
The following is an example of a config.json file -

Expand Down
2 changes: 1 addition & 1 deletion src/sdk/bucketspace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ class BucketSpaceFS extends BucketSpaceSimpleFS {
async get_bucket_lifecycle_configuration_rules(params) {
try {
const { name } = params;
dbg.log0('BucketSpaceFS.get_bucket_lifecycle_configuration_rules: Bucket name', name);
dbg.log1('BucketSpaceFS.get_bucket_lifecycle_configuration_rules: Bucket name', name);
const bucket = await this.config_fs.get_bucket_by_name(name);
return bucket.lifecycle_configuration_rules || [];
} catch (error) {
Expand Down
223 changes: 72 additions & 151 deletions src/sdk/namespace_fs.js

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion src/sdk/object_io.js
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,8 @@ class ObjectIO {
];

await stream_utils.pipeline(transforms);
await stream_utils.wait_finished(uploader);
// Explicitly wait for finish as a defensive measure although pipeline should do it
await stream.promises.finished(uploader);

if (splitter.md5) complete_params.md5_b64 = splitter.md5.toString('base64');
if (splitter.sha256) complete_params.sha256_b64 = splitter.sha256.toString('base64');
Expand Down
170 changes: 170 additions & 0 deletions src/test/unit_tests/internal/test_file_reader.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/* Copyright (C) 2020 NooBaa */
'use strict';

const fs = require('fs');
const path = require('path');
const assert = require('assert');
const buffer_utils = require('../../../util/buffer_utils');
const native_fs_utils = require('../../../util/native_fs_utils');
const { FileReader } = require('../../../util/file_reader');
const { multi_buffer_pool } = require('../../../sdk/namespace_fs');

const fs_context = {};

describe('FileReader', () => {

const test_files = fs.readdirSync(__dirname).map(file => path.join(__dirname, file));

/**
* @param {(file_path: string, start?: number, end?: number) => void} tester
*/
function describe_read_cases(tester) {
describe('list files and read entire', () => {
for (const file_path of test_files) {
tester(file_path);
}
});
describe('skip start cases', () => {
tester(__filename, 1, Infinity);
tester(__filename, 3, Infinity);
tester(__filename, 11, Infinity);
tester(__filename, 1023, Infinity);
tester(__filename, 1024, Infinity);
tester(__filename, 1025, Infinity);
});
describe('edge cases', () => {
tester(__filename, 0, 1);
tester(__filename, 0, 2);
tester(__filename, 0, 3);
tester(__filename, 1, 2);
tester(__filename, 1, 3);
tester(__filename, 2, 3);
tester(__filename, 0, 1023);
tester(__filename, 0, 1024);
tester(__filename, 0, 1025);
tester(__filename, 1, 1023);
tester(__filename, 1, 1024);
tester(__filename, 1, 1025);
tester(__filename, 1023, 1024);
tester(__filename, 1023, 1025);
tester(__filename, 1024, 1025);
tester(__filename, 123, 345);
tester(__filename, 1000000000, Infinity);
});
}

describe('as stream.Readable', () => {

describe_read_cases(tester);

function tester(file_path, start = 0, end = Infinity) {
const basename = path.basename(file_path);
it(`test read ${start}-${end} ${basename}`, async () => {
await native_fs_utils.use_file({
fs_context,
bucket_path: file_path,
open_path: file_path,
scope: async file => {
const stat = await file.stat(fs_context);
const aborter = new AbortController();
const signal = aborter.signal;
const file_reader = new FileReader({
fs_context,
file,
file_path,
stat,
start,
end,
signal,
multi_buffer_pool,
highWaterMark: 1024, // bytes
});
const data = await buffer_utils.read_stream_join(file_reader);
const node_fs_stream = fs.createReadStream(file_path, { start, end: end > 0 ? end - 1 : 0 });
const node_fs_data = await buffer_utils.read_stream_join(node_fs_stream);
assert.strictEqual(data.length, node_fs_data.length);
assert.strictEqual(data.toString(), node_fs_data.toString());
}
});
});
}
});

describe('read_into_stream with buffer pooling', () => {

describe_read_cases(tester);

function tester(file_path, start = 0, end = Infinity) {
const basename = path.basename(file_path);
it(`test read ${start}-${end} ${basename}`, async () => {
await native_fs_utils.use_file({
fs_context,
bucket_path: file_path,
open_path: file_path,
scope: async file => {
const stat = await file.stat(fs_context);
const aborter = new AbortController();
const signal = aborter.signal;
const file_reader = new FileReader({
fs_context,
file,
file_path,
stat,
start,
end,
signal,
multi_buffer_pool,
highWaterMark: 1024, // bytes
});
const writable = buffer_utils.write_stream();
await file_reader.read_into_stream(writable);
const data = writable.join();
const node_fs_stream = fs.createReadStream(file_path, { start, end: end > 0 ? end - 1 : 0 });
const node_fs_data = await buffer_utils.read_stream_join(node_fs_stream);
assert.strictEqual(data.length, node_fs_data.length);
assert.strictEqual(data.toString(), node_fs_data.toString());
}
});
});
}

});


// Abort tests are disabled temporarily due to flakiness
//
// describe('abort during read_into_stream', () => {
// tester(__filename);
// function tester(file_path, start = 0, end = Infinity) {
// const basename = path.basename(file_path);
// it(`test abort read ${start}-${end} ${basename}`, async () => {
// await native_fs_utils.use_file({
// fs_context,
// bucket_path: file_path,
// open_path: file_path,
// scope: async file => {
// const stat = await file.stat(fs_context);
// const aborter = new AbortController();
// const signal = aborter.signal;
// const file_reader = new FileReader({
// fs_context,
// file,
// file_path,
// stat,
// start,
// end,
// signal,
// multi_buffer_pool,
// highWaterMark: 1024, // bytes
// });
// const writable = buffer_utils.write_stream();
// const promise = file_reader.read_into_stream(writable);
// setImmediate(() => aborter.abort()); // abort quickly
// await assert.rejects(promise, { name: 'AbortError' });
// }
// });
// });
// }
// });

});
4 changes: 1 addition & 3 deletions src/test/unrelated/for_await_stream.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
/* Copyright (C) 2020 NooBaa */
'use strict';

const util = require('util');
const events = require('events');
const stream = require('stream');
const argv = require('minimist')(process.argv);
const stream_finished = util.promisify(stream.finished);

async function test(mode) {
try {
Expand Down Expand Up @@ -68,7 +66,7 @@ async function test(mode) {

}

await stream_finished(output);
await stream.promises.finished(output);
console.log(`${mode}: done.`);

} catch (err) {
Expand Down
7 changes: 2 additions & 5 deletions src/test/unrelated/stream_pipe_to_multiple_targets.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/* Copyright (C) 2016 NooBaa */
'use strict';

const util = require('util');
const assert = require('assert');
const stream = require('stream');
const crypto = require('crypto');
Expand Down Expand Up @@ -81,8 +80,6 @@ class WriteTarget extends stream.Writable {
}
}

const wait_finished = util.promisify(stream.finished);

async function main() {
try {
console.log('main: starting ...');
Expand Down Expand Up @@ -145,8 +142,8 @@ async function main() {
}

await Promise.allSettled([
wait_finished(hub),
wait_finished(cache),
stream.promises.finished(hub),
stream.promises.finished(cache),
]);

if (cache.destroyed) {
Expand Down
11 changes: 4 additions & 7 deletions src/tools/file_writer_hashing.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ const assert = require('assert');
const FileWriter = require('../util/file_writer');
const config = require('../../config');
const nb_native = require('../util/nb_native');
const stream_utils = require('../util/stream_utils');
const P = require('../util/promise');
const stream = require('stream');
const fs = require('fs');
Expand Down Expand Up @@ -72,12 +71,11 @@ async function hash_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX)
}());
const target = new TargetHash();
const file_writer = new FileWriter({
target_file: target,
target_file: /**@type {any}*/ (target),
fs_context: DEFAULT_FS_CONFIG,
namespace_resource_id: 'MajesticSloth'
});
await stream_utils.pipeline([source_stream, file_writer]);
await stream_utils.wait_finished(file_writer);
await file_writer.write_entire_stream(source_stream);
const write_hash = target.digest();
console.log(
'Hash target',
Expand All @@ -95,7 +93,7 @@ async function hash_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX)

async function file_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX) {
config.NSFS_DEFAULT_IOV_MAX = iov_max;
fs.mkdirSync(F_PREFIX);
fs.mkdirSync(F_PREFIX, { recursive: true });
await P.map_with_concurrency(CONCURRENCY, Array(parts).fill(), async () => {
let target_file;
const data = crypto.randomBytes(PART_SIZE);
Expand All @@ -114,8 +112,7 @@ async function file_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX)
fs_context: DEFAULT_FS_CONFIG,
namespace_resource_id: 'MajesticSloth'
});
await stream_utils.pipeline([source_stream, file_writer]);
await stream_utils.wait_finished(file_writer);
await file_writer.write_entire_stream(source_stream);
if (XATTR) {
await target_file.replacexattr(
DEFAULT_FS_CONFIG,
Expand Down
Loading