Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
lib: add support for readable byte streams to .toWeb()
Add support for the creation of ReadableByteStream to Readable.toWeb()
and Duplex.toWeb()
This enables the use of .getReader({ mode: "byob" }) on
e.g. socket().toWeb()

Refs: #56004 (comment)
Refs: https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_byte_streams
  • Loading branch information
seriousme committed Jul 10, 2025
commit c5c4351fa60f763306d40890b3f3a15b5b69f66b
6 changes: 5 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -3201,6 +3201,8 @@ changes:
If no value is provided, the size will be `1` for all the chunks.
* `chunk` {any}
* Returns: {number}
* `type` {string} Must be 'bytes' or undefined.
If `type` is set to 'bytes', the `strategy` option is ignored
* Returns: {ReadableStream}

### `stream.Writable.fromWeb(writableStream[, options])`
Expand Down Expand Up @@ -3374,7 +3376,7 @@ duplex.write('hello');
duplex.once('readable', () => console.log('readable', duplex.read()));
```

### `stream.Duplex.toWeb(streamDuplex)`
### `stream.Duplex.toWeb(streamDuplex[, options])`

<!-- YAML
added: v17.0.0
Expand All @@ -3387,6 +3389,8 @@ changes:
-->

* `streamDuplex` {stream.Duplex}
* `options` {Object}
* `type` {string}
* Returns: {Object}
* `readable` {ReadableStream}
* `writable` {WritableStream}
Expand Down
4 changes: 2 additions & 2 deletions lib/internal/streams/duplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ Duplex.fromWeb = function(pair, options) {
options);
};

Duplex.toWeb = function(duplex) {
return lazyWebStreams().newReadableWritablePairFromDuplex(duplex);
Duplex.toWeb = function(duplex, options) {
return lazyWebStreams().newReadableWritablePairFromDuplex(duplex, options);
};

let duplexify;
Expand Down
30 changes: 24 additions & 6 deletions lib/internal/webstreams/adapters.js
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ function newStreamWritableFromWritableStream(writableStream, options = kEmptyObj
* @param {Readable} streamReadable
* @param {{
* strategy : QueuingStrategy
* type : string,
* }} [options]
* @returns {ReadableStream}
*/
Expand All @@ -432,13 +433,15 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
'stream.Readable',
streamReadable);
}
const isBYOB = options?.type === 'bytes';

if (isDestroyed(streamReadable) || !isReadable(streamReadable)) {
const readable = new ReadableStream();
readable.cancel();
return readable;
}


const objectMode = streamReadable.readableObjectMode;
const highWaterMark = streamReadable.readableHighWaterMark;

Expand Down Expand Up @@ -491,15 +494,27 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
streamReadable.on('data', onData);

return new ReadableStream({
start(c) { controller = c; },
type: isBYOB ? 'bytes' : undefined,
start(c) {
controller = c;
if (isBYOB) {
streamReadable.once('end', () => {
// close the controller
controller.close();
// And unlock the last BYOB read request
controller.byobRequest?.respond(0);
wasCanceled = true;
});
}
},

pull() { streamReadable.resume(); },

cancel(reason) {
wasCanceled = true;
destroy(streamReadable, reason);
},
}, strategy);
}, isBYOB ? undefined : strategy);
}

/**
Expand Down Expand Up @@ -601,9 +616,10 @@ function newStreamReadableFromReadableStream(readableStream, options = kEmptyObj

/**
* @param {Duplex} duplex
* @param {{ type?:string}} [options]
* @returns {ReadableWritablePair}
*/
function newReadableWritablePairFromDuplex(duplex) {
function newReadableWritablePairFromDuplex(duplex, options = kEmptyObject) {
// Not using the internal/streams/utils isWritableNodeStream and
// isReadableNodeStream utilities here because they will return false
// if the duplex was created with writable or readable options set to
Expand All @@ -615,9 +631,11 @@ function newReadableWritablePairFromDuplex(duplex) {
throw new ERR_INVALID_ARG_TYPE('duplex', 'stream.Duplex', duplex);
}

validateObject(options, 'options');

if (isDestroyed(duplex)) {
const writable = new WritableStream();
const readable = new ReadableStream();
const readable = new ReadableStream(options);
writable.close();
readable.cancel();
return { readable, writable };
Expand All @@ -633,8 +651,8 @@ function newReadableWritablePairFromDuplex(duplex) {

const readable =
isReadable(duplex) ?
newReadableStreamFromStreamReadable(duplex) :
new ReadableStream();
newReadableStreamFromStreamReadable(duplex, options) :
new ReadableStream(options);

if (!isReadable(duplex))
readable.cancel();
Expand Down
23 changes: 23 additions & 0 deletions test/parallel/test-stream-duplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,26 @@ process.on('exit', () => {
assert.deepStrictEqual(Buffer.from(result.value), dataToRead);
}));
}

// Duplex.toWeb BYOB
{
const dataToRead = Buffer.from('hello');
const dataToWrite = Buffer.from('world');

const duplex = Duplex({
read() {
this.push(dataToRead);
this.push(null);
},
write: common.mustCall((chunk) => {
assert.strictEqual(chunk, dataToWrite);
})
});

const { writable, readable } = Duplex.toWeb(duplex, { type: 'bytes' });
writable.getWriter().write(dataToWrite);
const data = new Uint8Array(dataToRead.length);
readable.getReader({ mode: 'byob' }).read(data).then(common.mustCall((result) => {
assert.deepStrictEqual(Buffer.from(result.value), dataToRead);
}));
}
70 changes: 70 additions & 0 deletions test/parallel/test-stream-readable-to-web-byob.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
'use strict';
const common = require('../common');
if (!common.hasCrypto) { common.skip('missing crypto'); }

const { Readable } = require('stream');
const process = require('process');
const { randomBytes } = require('crypto');
const assert = require('assert');

// Based on: https://github.com/nodejs/node/issues/46347#issuecomment-1413886707
// edit: make it cross-platform as /dev/urandom is not available on Windows
{
let currentMemoryUsage = process.memoryUsage().arrayBuffers;

// We initialize a stream, but not start consuming it
const randomNodeStream = new Readable({
read(size) {
randomBytes(size, (err, buffer) => {
if (err) {
// If an error occurs, emit an 'error' event
this.emit('error', err);
return;
}

// Push the random bytes to the stream
this.push(buffer);
});
}
});
// after 2 seconds, it'll get converted to web stream
let randomWebStream;

// We check memory usage every second
// since it's a stream, it shouldn't be higher than the chunk size
const reportMemoryUsage = () => {
const { arrayBuffers } = process.memoryUsage();
currentMemoryUsage = arrayBuffers;

assert(currentMemoryUsage <= 256 * 1024 * 1024);
};
setInterval(reportMemoryUsage, 1000);

// after 1 second we use Readable.toWeb
// memory usage should stay pretty much the same since it's still a stream
setTimeout(() => {
randomWebStream = Readable.toWeb(randomNodeStream, { type: 'bytes' });
}, 1000);

// after 2 seconds we start consuming the stream
// memory usage will grow, but the old chunks should be garbage-collected pretty quickly
setTimeout(async () => {

const reader = randomWebStream.getReader({ mode: 'byob' });

let done = false;
while (!done) {
// Read a 16 bytes of data from the stream
const result = await reader.read(new Uint8Array(16));
done = result.done;
// We consume the stream, but we don't do anything with the data
// This is to ensure that the stream is being consumed
// and that the memory usage is being reported correctly
}
}, 2000);

setTimeout(() => {
// Test considered passed if we don't crash
process.exit(0);
}, 5000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would cause much flakyness and generic slowness. Don't use timers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem in the doc generation seems to be fixed now with fix: correct link from webstream to streamduplextowebstreamduplex

With regards to the use of timers during tests:
test/parallel/test-stream-readable-to-web-byob.js
is a slightly modified copy of the aready existing:
test/parallel/test-stream-readable-to-web.js
which also uses timers:

setTimeout(() => {
// Test considered passed if we don't crash
process.exit(0);
}, 5000);
}

Do you have any suggestions on how to code these tests without using timers?

Kind regards,
Hans

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that test/parallel/test-stream-readable-to-web.js is doing a rather crude memory leak test. Which indeed does not make sense to replicate for this bytestream case as the new code only modifies the parameters for the creation of a new ReadableStream and does not touch any buffers itself.

Therefore I replaced the memory leak test by a simple test to make sure that the bytestream works as intented. This test does not need timers at all.

Kind regards,
Hans

}
11 changes: 11 additions & 0 deletions test/parallel/test-stream-readable-to-web-termination-byob.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
'use strict';
require('../common');
const { Readable } = require('stream');
{
const r = Readable.from([]);
// Cancelling reader while closing should not cause uncaught exceptions
r.on('close', () => reader.cancel());

const reader = Readable.toWeb(r, { type: 'bytes' }).getReader({ mode: 'byob' });
reader.read(new Uint8Array(16));
}