From 3eaadf32e068a3af4a825c706155f6486643a6d5 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Thu, 19 Jan 2023 19:31:29 +0530 Subject: [PATCH 01/13] stream: add abort signal for ReadableStream and WritableStream Refs: https://github.com/nodejs/node/issues/39316 --- lib/internal/streams/add-abort-signal.js | 28 ++++--- lib/internal/streams/utils.js | 2 + lib/internal/webstreams/readablestream.js | 4 +- lib/internal/webstreams/writablestream.js | 4 + .../test-webstreams-abort-controller.js | 76 +++++++++++++++++++ 5 files changed, 104 insertions(+), 10 deletions(-) create mode 100644 test/parallel/test-webstreams-abort-controller.js diff --git a/lib/internal/streams/add-abort-signal.js b/lib/internal/streams/add-abort-signal.js index 9bcd202ec63c1e..94763d75140076 100644 --- a/lib/internal/streams/add-abort-signal.js +++ b/lib/internal/streams/add-abort-signal.js @@ -5,6 +5,12 @@ const { codes, } = require('internal/errors'); +const { + isNodeStream, + isWebStream, + kControllerErrorFunction, +} = require('internal/streams/utils'); + const eos = require('internal/streams/end-of-stream'); const { ERR_INVALID_ARG_TYPE } = codes; @@ -18,24 +24,28 @@ const validateAbortSignal = (signal, name) => { } }; -function isNodeStream(obj) { - return !!(obj && typeof obj.pipe === 'function'); -} - module.exports.addAbortSignal = function addAbortSignal(signal, stream) { validateAbortSignal(signal, 'signal'); - if (!isNodeStream(stream)) { - throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream); + if (!isNodeStream(stream) && !isWebStream(stream)) { + throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream); } return module.exports.addAbortSignalNoValidate(signal, stream); }; + module.exports.addAbortSignalNoValidate = function(signal, stream) { if (typeof signal !== 'object' || !('aborted' in signal)) { return stream; } - const onAbort = () => { - stream.destroy(new AbortError(undefined, { cause: signal.reason })); - }; + let onAbort; + if (isNodeStream(stream)) { + onAbort = () => { + stream.destroy(new AbortError(undefined, { cause: signal.reason })); + }; + } else { + onAbort = () => { + stream[kControllerErrorFunction](new AbortError(undefined, { cause: signal.reason })); + }; + } if (signal.aborted) { onAbort(); } else { diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 74faca5fe9bb2a..c9e61ca8cdd8eb 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -13,6 +13,7 @@ const kIsReadable = Symbol('kIsReadable'); const kIsDisturbed = Symbol('kIsDisturbed'); const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise'); +const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction'); function isReadableNodeStream(obj, strict = false) { return !!( @@ -305,6 +306,7 @@ module.exports = { isReadable, kIsReadable, kIsClosedPromise, + kControllerErrorFunction, isClosed, isDestroyed, isDuplexNodeStream, diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 0df8f7aa7f6f35..fe18d352c29a37 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -86,6 +86,7 @@ const { kIsErrored, kIsReadable, kIsClosedPromise, + kControllerErrorFunction, } = require('internal/streams/utils'); const { @@ -263,6 +264,7 @@ class ReadableStream { }; this[kIsClosedPromise] = createDeferredPromise(); + this[kControllerErrorFunction] = () => {}; // The spec requires handling of the strategy first // here. Specifically, if getting the size and @@ -1893,7 +1895,6 @@ function readableStreamClose(stream) { assert(stream[kState].state === 'readable'); stream[kState].state = 'closed'; stream[kIsClosedPromise].resolve(); - const { reader, } = stream[kState]; @@ -2332,6 +2333,7 @@ function setupReadableStreamDefaultController( stream, }; stream[kState].controller = controller; + stream[kControllerErrorFunction] = controller.error.bind(controller); const startResult = startAlgorithm(); diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index a8922c08456358..f5e2d5a5071a59 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -71,6 +71,7 @@ const { const { kIsClosedPromise, + kControllerErrorFunction, } = require('internal/streams/utils'); const { @@ -199,6 +200,7 @@ class WritableStream { }; this[kIsClosedPromise] = createDeferredPromise(); + this[kControllerErrorFunction] = () => {}; const size = extractSizeAlgorithm(strategy?.size); const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1); @@ -370,6 +372,7 @@ function TransferredWritableStream() { }, }; this[kIsClosedPromise] = createDeferredPromise(); + this[kControllerErrorFunction] = () => {}; }, [], WritableStream)); } @@ -1282,6 +1285,7 @@ function setupWritableStreamDefaultController( writeAlgorithm, }; stream[kState].controller = controller; + stream[kControllerErrorFunction] = controller.error.bind(controller); writableStreamUpdateBackpressure( stream, diff --git a/test/parallel/test-webstreams-abort-controller.js b/test/parallel/test-webstreams-abort-controller.js new file mode 100644 index 00000000000000..5e8c150c99058d --- /dev/null +++ b/test/parallel/test-webstreams-abort-controller.js @@ -0,0 +1,76 @@ +'use strict'; + +const common = require('../common'); +const { finished, addAbortSignal } = require('stream'); +const { ReadableStream, WritableStream } = require('stream/web'); +const assert = require('assert'); + +{ + const rs = new ReadableStream({ + start(controller) { + controller.enqueue('hello'); + controller.close(); + } + }); + + const reader = rs.getReader(); + + const ac = new AbortController(); + + addAbortSignal(ac.signal, rs); + + finished(rs, common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + })); + + ac.abort(); + + assert.rejects(reader.read(), 'AbortError: The operation was aborted.'); +} + +{ + const rs = new ReadableStream({ + start(controller) { + controller.enqueue('a'); + controller.enqueue('b'); + controller.enqueue('c'); + controller.close(); + } + }); + + const ac = new AbortController(); + + addAbortSignal(ac.signal, rs); + + assert.rejects((async () => { + for await (const chunk of rs) { + if (chunk === 'b') { + ac.abort(); + } + } + })(), /AbortError/); +} + +{ + const values = []; + const ws = new WritableStream({ + write(chunk) { + values.push(chunk); + } + }); + + finished(ws, common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + assert.deepStrictEqual(values, ['a']); + })); + + const ac = new AbortController(); + + addAbortSignal(ac.signal, ws); + + const writer = ws.getWriter(); + + writer.write('a').then(() => { + ac.abort(); + }); +} From 45b1f4e381f84db791a76ae67362858f3e636f30 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 20 Jan 2023 22:49:18 +0530 Subject: [PATCH 02/13] fixup! make onAbort const Co-authored-by: Antoine du Hamel --- lib/internal/streams/add-abort-signal.js | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/lib/internal/streams/add-abort-signal.js b/lib/internal/streams/add-abort-signal.js index 94763d75140076..d6c8ca4c9c7842 100644 --- a/lib/internal/streams/add-abort-signal.js +++ b/lib/internal/streams/add-abort-signal.js @@ -36,16 +36,13 @@ module.exports.addAbortSignalNoValidate = function(signal, stream) { if (typeof signal !== 'object' || !('aborted' in signal)) { return stream; } - let onAbort; - if (isNodeStream(stream)) { - onAbort = () => { + const onAbort = isNodeStream(stream) ? + () => { stream.destroy(new AbortError(undefined, { cause: signal.reason })); - }; - } else { - onAbort = () => { + } : + () => { stream[kControllerErrorFunction](new AbortError(undefined, { cause: signal.reason })); }; - } if (signal.aborted) { onAbort(); } else { From 134d3e4e1c52eede90ccd1a8007ffbfa2c812447 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 20 Jan 2023 22:49:45 +0530 Subject: [PATCH 03/13] fixup! fix test assertion Co-authored-by: Antoine du Hamel --- test/parallel/test-webstreams-abort-controller.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/parallel/test-webstreams-abort-controller.js b/test/parallel/test-webstreams-abort-controller.js index 5e8c150c99058d..545703cd1365b6 100644 --- a/test/parallel/test-webstreams-abort-controller.js +++ b/test/parallel/test-webstreams-abort-controller.js @@ -25,7 +25,7 @@ const assert = require('assert'); ac.abort(); - assert.rejects(reader.read(), 'AbortError: The operation was aborted.'); + assert.rejects(reader.read(), 'AbortError: The operation was aborted.').then(common.mustCall()); } { From 3020c01c73b953fbc67873338ddf3f970845164f Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 20 Jan 2023 22:50:03 +0530 Subject: [PATCH 04/13] fixup! fix assertion Co-authored-by: Antoine du Hamel --- test/parallel/test-webstreams-abort-controller.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/parallel/test-webstreams-abort-controller.js b/test/parallel/test-webstreams-abort-controller.js index 545703cd1365b6..f848f691200fca 100644 --- a/test/parallel/test-webstreams-abort-controller.js +++ b/test/parallel/test-webstreams-abort-controller.js @@ -48,7 +48,7 @@ const assert = require('assert'); ac.abort(); } } - })(), /AbortError/); + })(), /AbortError/).then(common.mustCall()); } { From 7424c76474e2453eee9c37c9a330f1ab07d232d9 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 20 Jan 2023 22:52:33 +0530 Subject: [PATCH 05/13] fixup! use primordial Co-authored-by: Antoine du Hamel --- lib/internal/webstreams/writablestream.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index f5e2d5a5071a59..04a4db15300682 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -1285,7 +1285,7 @@ function setupWritableStreamDefaultController( writeAlgorithm, }; stream[kState].controller = controller; - stream[kControllerErrorFunction] = controller.error.bind(controller); + stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller); writableStreamUpdateBackpressure( stream, From 85c10478cf049f7bb28ad37d60253565d7b74714 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 20 Jan 2023 22:52:50 +0530 Subject: [PATCH 06/13] fixup! use primordial Co-authored-by: Antoine du Hamel --- lib/internal/webstreams/readablestream.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index fe18d352c29a37..cfdacba7b3fa94 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -2333,7 +2333,7 @@ function setupReadableStreamDefaultController( stream, }; stream[kState].controller = controller; - stream[kControllerErrorFunction] = controller.error.bind(controller); + stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller); const startResult = startAlgorithm(); From cbcaf7725d0a36c4c6472ade05ccec6d8cd70ca5 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 22 Jan 2023 00:58:21 +0530 Subject: [PATCH 07/13] fixup! add more tests --- .../test-webstreams-abort-controller.js | 136 +++++++++++++++--- 1 file changed, 114 insertions(+), 22 deletions(-) diff --git a/test/parallel/test-webstreams-abort-controller.js b/test/parallel/test-webstreams-abort-controller.js index f848f691200fca..1468e418c6cb4a 100644 --- a/test/parallel/test-webstreams-abort-controller.js +++ b/test/parallel/test-webstreams-abort-controller.js @@ -5,13 +5,27 @@ const { finished, addAbortSignal } = require('stream'); const { ReadableStream, WritableStream } = require('stream/web'); const assert = require('assert'); -{ - const rs = new ReadableStream({ +function createTestReadableStream() { + return new ReadableStream({ start(controller) { - controller.enqueue('hello'); + controller.enqueue('a'); + controller.enqueue('b'); + controller.enqueue('c'); controller.close(); } }); +} + +function createTestWritableStream(values) { + return new WritableStream({ + write(chunk) { + values.push(chunk); + } + }); +} + +{ + const rs = createTestReadableStream(); const reader = rs.getReader(); @@ -21,22 +35,18 @@ const assert = require('assert'); finished(rs, common.mustCall((err) => { assert.strictEqual(err.name, 'AbortError'); + assert.rejects(reader.read(), /AbortError/).then(common.mustCall()); + assert.rejects(reader.closed, /AbortError/).then(common.mustCall()); })); - ac.abort(); - - assert.rejects(reader.read(), 'AbortError: The operation was aborted.').then(common.mustCall()); + reader.read().then(common.mustCall((result) => { + assert.strictEqual(result.value, 'a'); + ac.abort(); + })); } { - const rs = new ReadableStream({ - start(controller) { - controller.enqueue('a'); - controller.enqueue('b'); - controller.enqueue('c'); - controller.close(); - } - }); + const rs = createTestReadableStream(); const ac = new AbortController(); @@ -52,25 +62,107 @@ const assert = require('assert'); } { - const values = []; - const ws = new WritableStream({ - write(chunk) { - values.push(chunk); - } - }); + const rs1 = createTestReadableStream(); - finished(ws, common.mustCall((err) => { + const rs2 = createTestReadableStream(); + + const ac = new AbortController(); + + addAbortSignal(ac.signal, rs1); + addAbortSignal(ac.signal, rs2); + + const reader1 = rs1.getReader(); + const reader2 = rs2.getReader(); + + finished(rs1, common.mustCall((err) => { assert.strictEqual(err.name, 'AbortError'); - assert.deepStrictEqual(values, ['a']); + assert.rejects(reader1.read(), /AbortError/).then(common.mustCall()); + assert.rejects(reader1.closed, /AbortError/).then(common.mustCall()); + })); + + finished(rs2, common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + assert.rejects(reader2.read(), /AbortError/).then(common.mustCall()); + assert.rejects(reader2.closed, /AbortError/).then(common.mustCall()); + })); + + ac.abort(); +} + +{ + const rs = createTestReadableStream(); + + const { 0: rs1, 1: rs2 } = rs.tee(); + + const ac = new AbortController(); + + addAbortSignal(ac.signal, rs); + + const reader1 = rs1.getReader(); + const reader2 = rs2.getReader(); + + finished(rs1, common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + assert.rejects(reader1.read(), /AbortError/).then(common.mustCall()); + assert.rejects(reader1.closed, /AbortError/).then(common.mustCall()); })); + finished(rs2, common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + assert.rejects(reader2.read(), /AbortError/).then(common.mustCall()); + assert.rejects(reader2.closed, /AbortError/).then(common.mustCall()); + })); + + ac.abort(); +} + +{ + const values = []; + const ws = createTestWritableStream(values); + const ac = new AbortController(); addAbortSignal(ac.signal, ws); const writer = ws.getWriter(); + finished(ws, common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + assert.deepStrictEqual(values, ['a']); + assert.rejects(writer.write('b'), /AbortError/).then(common.mustCall()); + assert.rejects(writer.closed, /AbortError/).then(common.mustCall()); + })); + writer.write('a').then(() => { ac.abort(); }); } + +{ + const values = []; + + const ws1 = createTestWritableStream(values); + const ws2 = createTestWritableStream(values); + + const ac = new AbortController(); + + addAbortSignal(ac.signal, ws1); + addAbortSignal(ac.signal, ws2); + + const writer1 = ws1.getWriter(); + const writer2 = ws2.getWriter(); + + finished(ws1, common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + assert.rejects(writer1.write('a'), /AbortError/).then(common.mustCall()); + assert.rejects(writer1.closed, /AbortError/).then(common.mustCall()); + })); + + finished(ws2, common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + assert.rejects(writer2.write('a'), /AbortError/).then(common.mustCall()); + assert.rejects(writer2.closed, /AbortError/).then(common.mustCall()); + })); + + ac.abort(); +} From 89ebb9a3c079ba0ccdbe97dfc2b3e3c84bbbcc71 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Mon, 23 Jan 2023 01:20:55 +0530 Subject: [PATCH 08/13] fixup! add doc --- doc/api/stream.md | 35 +++++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index ac9ea0afd8f331..2525260defafc1 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -3231,14 +3231,14 @@ added: v15.4.0 --> * `signal` {AbortSignal} A signal representing possible cancellation -* `stream` {Stream} a stream to attach a signal to +* `stream` {Stream} | {ReadableStream} | {WritableStream} a stream to attach a signal to Attaches an AbortSignal to a readable or writeable stream. This lets code control stream destruction using an `AbortController`. Calling `abort` on the `AbortController` corresponding to the passed `AbortSignal` will behave the same way as calling `.destroy(new AbortError())` -on the stream. +on the stream, and `controller.error(new AbortError())` for webstreams. ```js const fs = require('node:fs'); @@ -3276,6 +3276,37 @@ const stream = addAbortSignal( })(); ``` +Or using an `AbortSignal` with a ReadableStream: + +```js +const controller = new AbortController(); +const rs = new ReadableStream({ + start(controller) { + controller.enqueue('hello'); + controller.enqueue('world'); + controller.close(); + }, +}); + +addAbortSignal(controller.signal, rs); + +finished(rs, (err) => { + if (err) { + if (err.name === 'AbortError') { + // The operation was cancelled + } + } +}); + +const reader = rs.getReader(); + +reader.read().then(({ value, done }) => { + console.log(value); // hello + console.log(done); // false + controller.abort(); +}); +``` + ## API for stream implementers From 1047e5048ac1abd0013ae4fd7810aaf1166214a7 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Mon, 23 Jan 2023 01:32:23 +0530 Subject: [PATCH 09/13] fixup! lint --- doc/api/stream.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 2525260defafc1..a37da89e2ea285 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -3231,7 +3231,9 @@ added: v15.4.0 --> * `signal` {AbortSignal} A signal representing possible cancellation -* `stream` {Stream} | {ReadableStream} | {WritableStream} a stream to attach a signal to +* `stream` {Stream} | {ReadableStream} | {WritableStream} + +A stream to attach a signal to Attaches an AbortSignal to a readable or writeable stream. This lets code control stream destruction using an `AbortController`. From 2eaae4053cf9b989cec12e748459a94e839cbcc9 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Mon, 23 Jan 2023 01:39:46 +0530 Subject: [PATCH 10/13] fixup! doc Co-authored-by: Antoine du Hamel --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index a37da89e2ea285..0cc3ed83d70938 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -3231,7 +3231,7 @@ added: v15.4.0 --> * `signal` {AbortSignal} A signal representing possible cancellation -* `stream` {Stream} | {ReadableStream} | {WritableStream} +* `stream` {Stream|ReadableStream|WritableStream} A stream to attach a signal to From d9464cdd17a1bf6702eb8a6db1317310f669b660 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Mon, 23 Jan 2023 01:41:27 +0530 Subject: [PATCH 11/13] fixup! full stop Co-authored-by: Antoine du Hamel --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 0cc3ed83d70938..d559f484af7554 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -3233,7 +3233,7 @@ added: v15.4.0 * `signal` {AbortSignal} A signal representing possible cancellation * `stream` {Stream|ReadableStream|WritableStream} -A stream to attach a signal to +A stream to attach a signal to. Attaches an AbortSignal to a readable or writeable stream. This lets code control stream destruction using an `AbortController`. From 3f2c929462efed47fd59760df317a9ae32ab314a Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Mon, 23 Jan 2023 01:55:03 +0530 Subject: [PATCH 12/13] fixup! lint trailing space --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index d559f484af7554..876492be087ca4 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -3231,7 +3231,7 @@ added: v15.4.0 --> * `signal` {AbortSignal} A signal representing possible cancellation -* `stream` {Stream|ReadableStream|WritableStream} +* `stream` {Stream|ReadableStream|WritableStream} A stream to attach a signal to. From 6c25869355b646ed94b3757b39e8f4fc127a73e6 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Mon, 23 Jan 2023 14:49:32 +0530 Subject: [PATCH 13/13] fixup! add version info --- doc/api/stream.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/doc/api/stream.md b/doc/api/stream.md index 876492be087ca4..af700ab0c81a58 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -3228,6 +3228,11 @@ readable.getReader().read().then((result) => { * `signal` {AbortSignal} A signal representing possible cancellation