diff --git a/doc/api/fs.md b/doc/api/fs.md index edd7054d6aab36..59c765d958a55c 100644 --- a/doc/api/fs.md +++ b/doc/api/fs.md @@ -3879,6 +3879,9 @@ details. * `file` {string|Buffer|URL|integer} filename or file descriptor -* `data` {string|Buffer|TypedArray|DataView|Object} +* `data` {string|Buffer|TypedArray|DataView|Object + |AsyncIterable|Iterable|Stream} * `options` {Object|string} * `encoding` {string|null} **Default:** `'utf8'` * `mode` {integer} **Default:** `0o666` diff --git a/lib/fs.js b/lib/fs.js index 310397bbed39e1..47732f2df851e5 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -32,6 +32,7 @@ const { ArrayPrototypePush, BigIntPrototypeToString, MathMax, + MathMin, Number, ObjectCreate, ObjectDefineProperties, @@ -44,6 +45,8 @@ const { StringPrototypeCharCodeAt, StringPrototypeIndexOf, StringPrototypeSlice, + SymbolAsyncIterator, + SymbolIterator, } = primordials; const { fs: constants } = internalBinding('constants'); @@ -85,10 +88,12 @@ const { const { FSReqCallback } = binding; const { toPathIfFileURL } = require('internal/url'); const internalUtil = require('internal/util'); +const { isCustomIterable } = require('internal/streams/utils'); const { constants: { kIoMaxLength, kMaxUserId, + kWriteFileMaxChunkSize, }, copyObject, Dirent, @@ -828,12 +833,12 @@ function write(fd, buffer, offset, length, position, callback) { } else { position = length; } - length = 'utf8'; + length = length || 'utf8'; } const str = String(buffer); validateEncoding(str, length); - callback = maybeCallback(position); + callback = maybeCallback(callback || position); const req = new FSReqCallback(); req.oncomplete = wrapper; @@ -2039,28 +2044,24 @@ function lutimesSync(path, atime, mtime) { handleErrorFromBinding(ctx); } -function writeAll(fd, isUserFd, buffer, offset, length, signal, callback) { +function writeAll( + fd, isUserFd, buffer, offset, length, signal, encoding, callback) { if (signal?.aborted) { - const abortError = new AbortError(); - if (isUserFd) { - callback(abortError); - } else { - fs.close(fd, (err) => { - callback(aggregateTwoErrors(err, abortError)); + handleWriteAllErrorCallback(fd, isUserFd, new AbortError(), callback); + return; + } + + if (isCustomIterable(buffer)) { + writeAllCustomIterable( + fd, isUserFd, buffer, offset, length, signal, encoding, callback) + .catch((reason) => { + handleWriteAllErrorCallback(fd, isUserFd, reason, callback); }); - } return; } - // write(fd, buffer, offset, length, position, callback) fs.write(fd, buffer, offset, length, null, (writeErr, written) => { if (writeErr) { - if (isUserFd) { - callback(writeErr); - } else { - fs.close(fd, (err) => { - callback(aggregateTwoErrors(err, writeErr)); - }); - } + handleWriteAllErrorCallback(fd, isUserFd, writeErr, callback); } else if (written === length) { if (isUserFd) { callback(null); @@ -2070,11 +2071,82 @@ function writeAll(fd, isUserFd, buffer, offset, length, signal, callback) { } else { offset += written; length -= written; - writeAll(fd, isUserFd, buffer, offset, length, signal, callback); + writeAll( + fd, isUserFd, buffer, offset, length, signal, encoding, callback); } }); } +async function writeAllCustomIterable( + fd, isUserFd, buffer, offset, length, signal, encoding, callback) { + if (signal?.aborted) { + handleWriteAllErrorCallback(fd, isUserFd, new AbortError(), callback); + return; + } + + const result = await buffer.next(); + if (result.done) { + if (isUserFd) { + callback(null); + } else { + fs.close(fd, callback); + } + return; + } + if (signal?.aborted) { + handleWriteAllErrorCallback(fd, isUserFd, new AbortError(), callback); + return; + } + const resultValue = isArrayBufferView(result.value) ? + result.value : Buffer.from(String(result.value), encoding); + const remaining = resultValue.byteLength; + const writeSize = MathMin(kWriteFileMaxChunkSize, remaining); + fs.write(fd, resultValue, resultValue.byteLength - remaining, writeSize, + null, (writeErr, written) => { + handleWriteAllCustomIterableCallback( + fd, isUserFd, buffer, resultValue, + resultValue.byteLength - remaining, writeSize, + signal, encoding, writeErr, remaining, written, callback); + } + ); +} + +function handleWriteAllCustomIterableCallback(fd, isUserFd, buffer, resultValue, + offset, length, signal, encoding, + writeErr, remaining, written, + callback) { + if (writeErr) { + handleWriteAllErrorCallback(fd, isUserFd, writeErr, callback); + return; + } + + remaining -= written; + if (remaining > 0) { + const writeSize = MathMin(kWriteFileMaxChunkSize, remaining); + fs.write(fd, resultValue, + resultValue.byteLength - remaining, writeSize, + null, (writeErr, written) => { + handleWriteAllCustomIterableCallback( + fd, isUserFd, buffer, resultValue, offset, length, + signal, encoding, writeErr, remaining, written, callback); + }); + return; + } + + writeAllCustomIterable( + fd, isUserFd, buffer, offset, length, signal, encoding, callback); +} + +function handleWriteAllErrorCallback(fd, isUserFd, writeErr, callback) { + if (isUserFd) { + callback(writeErr); + } else { + fs.close(fd, (err) => { + callback(aggregateTwoErrors(err, writeErr)); + }); + } +} + /** * Asynchronously writes data to the file. * @param {string | Buffer | URL | number} path @@ -2093,15 +2165,20 @@ function writeFile(path, data, options, callback) { options = getOptions(options, { encoding: 'utf8', mode: 0o666, flag: 'w' }); const flag = options.flag || 'w'; - if (!isArrayBufferView(data)) { + if (!isArrayBufferView(data) && !isCustomIterable(data)) { validateStringAfterArrayBufferView(data, 'data'); data = Buffer.from(String(data), options.encoding || 'utf8'); } + if (isCustomIterable(data)) { + data = data[SymbolIterator]?.() ?? data[SymbolAsyncIterator]?.(); + } + if (isFd(path)) { const isUserFd = true; const signal = options.signal; - writeAll(path, isUserFd, data, 0, data.byteLength, signal, callback); + writeAll(path, isUserFd, data, + 0, data.byteLength, signal, options.encoding, callback); return; } @@ -2114,7 +2191,8 @@ function writeFile(path, data, options, callback) { } else { const isUserFd = false; const signal = options.signal; - writeAll(fd, isUserFd, data, 0, data.byteLength, signal, callback); + writeAll(fd, isUserFd, data, + 0, data.byteLength, signal, options.encoding, callback); } }); } diff --git a/lib/internal/fs/promises.js b/lib/internal/fs/promises.js index 7a575e69491e38..a90d443e0822de 100644 --- a/lib/internal/fs/promises.js +++ b/lib/internal/fs/promises.js @@ -79,7 +79,7 @@ const pathModule = require('path'); const { promisify } = require('internal/util'); const { EventEmitterMixin } = require('internal/event_target'); const { watch } = require('internal/fs/watchers'); -const { isIterable } = require('internal/streams/utils'); +const { isCustomIterable } = require('internal/streams/utils'); const assert = require('internal/assert'); const kHandle = Symbol('kHandle'); @@ -730,10 +730,6 @@ async function writeFile(path, data, options) { writeFileHandle(fd, data, options.signal, options.encoding), fd.close); } -function isCustomIterable(obj) { - return isIterable(obj) && !isArrayBufferView(obj) && typeof obj !== 'string'; -} - async function appendFile(path, data, options) { options = getOptions(options, { encoding: 'utf8', mode: 0o666, flag: 'a' }); options = copyObject(options); diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index b6e744250799c6..04bc76b4e31df0 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -4,6 +4,7 @@ const { SymbolAsyncIterator, SymbolIterator, } = primordials; +const { isArrayBufferView } = require('internal/util/types'); function isReadable(obj) { return !!(obj && typeof obj.pipe === 'function' && @@ -27,7 +28,12 @@ function isIterable(obj, isAsync) { typeof obj[SymbolIterator] === 'function'; } +function isCustomIterable(obj) { + return isIterable(obj) && !isArrayBufferView(obj) && typeof obj !== 'string'; +} + module.exports = { + isCustomIterable, isIterable, isReadable, isStream, diff --git a/test/parallel/test-fs-write-file-async-iterators.js b/test/parallel/test-fs-write-file-async-iterators.js new file mode 100644 index 00000000000000..00d585c5e65707 --- /dev/null +++ b/test/parallel/test-fs-write-file-async-iterators.js @@ -0,0 +1,169 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const fs = require('fs'); +const join = require('path').join; +const { Readable } = require('stream'); + +const tmpdir = require('../common/tmpdir'); +tmpdir.refresh(); + +{ + const filenameIterable = join(tmpdir.path, 'testIterable.txt'); + const iterable = { + expected: 'abc', + *[Symbol.iterator]() { + yield 'a'; + yield 'b'; + yield 'c'; + } + }; + + fs.writeFile(filenameIterable, iterable, common.mustSucceed(() => { + const data = fs.readFileSync(filenameIterable, 'utf-8'); + assert.strictEqual(iterable.expected, data); + })); +} + +{ + const filenameBufferIterable = join(tmpdir.path, 'testBufferIterable.txt'); + const bufferIterable = { + expected: 'abc', + *[Symbol.iterator]() { + yield Buffer.from('a'); + yield Buffer.from('b'); + yield Buffer.from('c'); + } + }; + + fs.writeFile( + filenameBufferIterable, bufferIterable, common.mustSucceed(() => { + const data = fs.readFileSync(filenameBufferIterable, 'utf-8'); + assert.strictEqual(bufferIterable.expected, data); + }) + ); +} + +{ + const filenameLargeBuffer = join(tmpdir.path, 'testLargeBuffer.txt'); + const largeBuffer = { + expected: 'dogs running'.repeat(512 * 1024), + *[Symbol.iterator]() { + yield Buffer.from('dogs running'.repeat(512 * 1024), 'utf8'); + } + }; + + fs.writeFile( + filenameLargeBuffer, largeBuffer, common.mustSucceed(() => { + const data = fs.readFileSync(filenameLargeBuffer, 'utf-8'); + assert.strictEqual(largeBuffer.expected, data); + }) + ); +} + +{ + const filenameBufferIterableWithEncoding = + join(tmpdir.path, 'testBufferIterableWithEncoding.txt'); + const bufferIterableWithEncoding = { + expected: 'ümlaut sechzig', + *[Symbol.iterator]() { + yield Buffer.from('ümlaut'); + yield Buffer.from(' '); + yield Buffer.from('sechzig'); + } + }; + + fs.writeFile( + filenameBufferIterableWithEncoding, + bufferIterableWithEncoding, common.mustSucceed(() => { + const data = fs.readFileSync(filenameBufferIterableWithEncoding, 'utf-8'); + assert.strictEqual(bufferIterableWithEncoding.expected, data); + }) + ); +} + +{ + const filenameAsyncIterable = join(tmpdir.path, 'testAsyncIterable.txt'); + const asyncIterable = { + expected: 'abc', + *[Symbol.asyncIterator]() { + yield 'a'; + yield 'b'; + yield 'c'; + } + }; + + fs.writeFile(filenameAsyncIterable, asyncIterable, common.mustSucceed(() => { + const data = fs.readFileSync(filenameAsyncIterable, 'utf-8'); + assert.strictEqual(asyncIterable.expected, data); + })); +} + +{ + const filenameStream = join(tmpdir.path, 'testStream.txt'); + const stream = Readable.from(['a', 'b', 'c']); + const expected = 'abc'; + + fs.writeFile(filenameStream, stream, common.mustSucceed(() => { + const data = fs.readFileSync(filenameStream, 'utf-8'); + assert.strictEqual(expected, data); + })); +} + +{ + const filenameStreamWithEncoding = + join(tmpdir.path, 'testStreamWithEncoding.txt'); + const stream = Readable.from(['ümlaut', ' ', 'sechzig']); + const expected = 'ümlaut sechzig'; + + fs.writeFile( + filenameStreamWithEncoding, stream, 'latin1', common.mustSucceed(() => { + const data = fs.readFileSync(filenameStreamWithEncoding, 'latin1'); + assert.strictEqual(expected, data); + }) + ); +} + +{ + const controller = new AbortController(); + const signal = controller.signal; + const filenameIterableAbort = join(tmpdir.path, 'testIterableAbort.txt'); + const iterable = { + expected: 'abc', + *[Symbol.iterator]() { + yield 'a'; + yield 'b'; + yield 'c'; + } + }; + + + fs.writeFile(filenameIterableAbort, + iterable, { signal }, common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + })); + + controller.abort(); +} + +{ + const controller = new AbortController(); + const signal = controller.signal; + const filenameAsyncIterableAbort = + join(tmpdir.path, 'testAsyncIterableAbort.txt'); + const asyncIterable = { + expected: 'abc', + *[Symbol.asyncIterator]() { + yield 'a'; + yield 'b'; + yield 'c'; + } + }; + + fs.writeFile(filenameAsyncIterableAbort, + asyncIterable, { signal }, common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + })); + + process.nextTick(() => controller.abort()); +}