Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
lib: add webstreams to Duplex.from()
Refs: #39519
PR-URL: #46190
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Robert Nagy <[email protected]>
  • Loading branch information
debadree25 committed May 23, 2023
commit 2c1a619cfbbab701f336eb8e95c2c0cd61ff01b1
38 changes: 23 additions & 15 deletions lib/internal/streams/duplexify.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const {
const { destroyer } = require('internal/streams/destroy');
const Duplex = require('internal/streams/duplex');
const Readable = require('internal/streams/readable');
const Writable = require('internal/streams/writable');
const { createDeferredPromise } = require('internal/util');
const from = require('internal/streams/from');

Expand All @@ -32,6 +33,16 @@ const {
FunctionPrototypeCall,
} = primordials;


const {
isBrandCheck,
} = require('internal/webstreams/util');

const isReadableStream =
isBrandCheck('ReadableStream');
const isWritableStream =
isBrandCheck('WritableStream');

// This is needed for pre node 17.
class Duplexify extends Duplex {
constructor(options) {
Expand Down Expand Up @@ -71,15 +82,13 @@ module.exports = function duplexify(body, name) {
return _duplexify({ writable: false, readable: false });
}

// TODO: Webstreams
// if (isReadableStream(body)) {
// return _duplexify({ readable: Readable.fromWeb(body) });
// }
if (isReadableStream(body)) {
return _duplexify({ readable: Readable.fromWeb(body) });
}

// TODO: Webstreams
// if (isWritableStream(body)) {
// return _duplexify({ writable: Writable.fromWeb(body) });
// }
if (isWritableStream(body)) {
return _duplexify({ writable: Writable.fromWeb(body) });
}

if (typeof body === 'function') {
const { value, write, final, destroy } = fromAsyncGen(body);
Expand Down Expand Up @@ -146,13 +155,12 @@ module.exports = function duplexify(body, name) {
});
}

// TODO: Webstreams.
// if (
// isReadableStream(body?.readable) &&
// isWritableStream(body?.writable)
// ) {
// return Duplexify.fromWeb(body);
// }
if (
isReadableStream(body?.readable) &&
isWritableStream(body?.writable)
) {
return Duplexify.fromWeb(body);
}

if (
typeof body?.writable === 'object' ||
Expand Down
102 changes: 102 additions & 0 deletions test/parallel/test-stream-duplex-from.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const common = require('../common');
const assert = require('assert');
const { Duplex, Readable, Writable, pipeline, PassThrough } = require('stream');
const { ReadableStream, WritableStream } = require('stream/web');
const { Blob } = require('buffer');

{
Expand Down Expand Up @@ -299,3 +300,104 @@ const { Blob } = require('buffer');
assert.strictEqual(res, 'foobar');
})).on('close', common.mustCall());
}

function makeATestReadableStream(value) {
return new ReadableStream({
start(controller) {
controller.enqueue(value);
controller.close();
}
});
}

function makeATestWritableStream(writeFunc) {
return new WritableStream({
write(chunk) {
writeFunc(chunk);
}
});
}

{
const d = Duplex.from({
readable: makeATestReadableStream('foo'),
});
assert.strictEqual(d.readable, true);
assert.strictEqual(d.writable, false);

d.on('data', common.mustCall((data) => {
assert.strictEqual(data.toString(), 'foo');
}));

d.on('end', common.mustCall(() => {
assert.strictEqual(d.readable, false);
}));
}

{
const d = Duplex.from(makeATestReadableStream('foo'));

assert.strictEqual(d.readable, true);
assert.strictEqual(d.writable, false);

d.on('data', common.mustCall((data) => {
assert.strictEqual(data.toString(), 'foo');
}));

d.on('end', common.mustCall(() => {
assert.strictEqual(d.readable, false);
}));
}

{
let ret = '';
const d = Duplex.from({
writable: makeATestWritableStream((chunk) => ret += chunk),
});

assert.strictEqual(d.readable, false);
assert.strictEqual(d.writable, true);

d.end('foo');
d.on('finish', common.mustCall(() => {
assert.strictEqual(ret, 'foo');
assert.strictEqual(d.writable, false);
}));
}

{
let ret = '';
const d = Duplex.from(makeATestWritableStream((chunk) => ret += chunk));

assert.strictEqual(d.readable, false);
assert.strictEqual(d.writable, true);

d.end('foo');
d.on('finish', common.mustCall(() => {
assert.strictEqual(ret, 'foo');
assert.strictEqual(d.writable, false);
}));
}

{
let ret = '';
const d = Duplex.from({
readable: makeATestReadableStream('foo'),
writable: makeATestWritableStream((chunk) => ret += chunk),
});

d.end('bar');

d.on('data', common.mustCall((data) => {
assert.strictEqual(data.toString(), 'foo');
}));

d.on('end', common.mustCall(() => {
assert.strictEqual(d.readable, false);
}));

d.on('finish', common.mustCall(() => {
assert.strictEqual(ret, 'bar');
assert.strictEqual(d.writable, false);
}));
}