Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
1cff8fd
lib: initial experimental AbortController implementation
jasnell May 23, 2020
c86e131
timers: allow promisified timeouts/immediates to be canceled
jasnell Jun 10, 2020
290ce9f
timers: fix multipleResolves in promisified timeouts/immediates
lundibundi Jun 18, 2020
d097171
timers: move promisified timers implementations
jasnell Jun 18, 2020
26b7626
timers: use AbortController with correct name/message
addaleax Aug 13, 2020
268686a
events: allow use of AbortController with once
jasnell Aug 24, 2020
51031f3
events: allow use of AbortController with on
jasnell Aug 24, 2020
affb52d
doc: revise AbortSignal text and example using events.once()
Trott Sep 1, 2020
561d832
doc: make AbortSignal text consistent in events.md
Trott Sep 3, 2020
b389182
events: make abort_controller event trusted
benjamingr Oct 26, 2020
f76ab5a
lib: let abort_controller target be EventTarget
watilde Oct 30, 2020
76cdf87
test: integrate abort_controller tests from wpt
watilde Oct 30, 2020
dc3d852
fs: add support for AbortSignal in readFile
benjamingr Nov 1, 2020
ceef7e3
fs: support abortsignal in writeFile
benjamingr Nov 6, 2020
01d2b67
events: add a few tests
benjamingr Oct 26, 2020
1ca549d
events: support emit on nodeeventtarget
benjamingr Oct 28, 2020
307bebc
events: define event handler as enumerable
benjamingr Nov 2, 2020
902503c
events: support event handlers on prototypes
benjamingr Nov 2, 2020
6bbd82f
events: define abort on prototype
benjamingr Nov 2, 2020
7d255e6
events: fire handlers in correct oder
benjamingr Nov 6, 2020
0131586
events: disabled manual construction AbortSignal
RaisinTen Nov 12, 2020
555a5a2
events: getEventListeners static
benjamingr Nov 6, 2020
e8bd59c
http: add support for abortsignal to http.request
benjamingr Nov 9, 2020
5ce6f91
http2: add support for AbortSignal to http2Session.request
MadaraUchiha Nov 10, 2020
8277ec1
child_process: add AbortSignal support
benjamingr Nov 28, 2020
6c86e68
test: increase execFile abort coverage
shootermv Dec 7, 2020
4f04104
child_process: clean event listener correctly
benjamingr Dec 7, 2020
44a304d
child_process: add signal support to spawn
benjamingr Dec 7, 2020
b9732d4
child_process: support AbortSignal in fork
benjamingr Dec 22, 2020
130b797
timers: refactor to use validateAbortSignal
Lxxyx Dec 22, 2020
1b04e63
dgram: support AbortSignal in createSocket
Jan 22, 2021
ef6af8c
fs: add AbortSignal support to watch
benjamingr Feb 2, 2021
88b3463
fs: fix pre-aborted writeFile AbortSignal file leak
Feb 16, 2021
a80d272
doc: recommend checking abortSignal.aborted first
jasnell Mar 11, 2021
6b3a21c
events: add max listener warning for EventTarget
jasnell Nov 6, 2020
a2cf563
lib: set abort-controller toStringTag
benjamingr Nov 14, 2020
436a2d2
lib: implement AbortSignal.abort()
jasnell Mar 10, 2021
63099e1
test: update dom/abort tests
jasnell Mar 15, 2021
10131cd
lib: add brand checks to AbortController and AbortSignal
MattiasBuelens Mar 11, 2021
9abc11a
test: fix unreliable test-fs-write-file.js
Trott Nov 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
events: allow use of AbortController with on
Signed-off-by: James M Snell <[email protected]>

PR-URL: #34912
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Denys Otrishko <[email protected]>
  • Loading branch information
jasnell authored and targos committed Apr 30, 2021
commit 51031f328d872c7e62735e4762cf3e2c0e8d10ba
32 changes: 31 additions & 1 deletion doc/api/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ Value: `Symbol.for('nodejs.rejection')`

See how to write a custom [rejection handler][rejection].

## `events.on(emitter, eventName)`
## `events.on(emitter, eventName[, options])`
<!-- YAML
added:
- v13.6.0
Expand All @@ -1013,6 +1013,9 @@ added:

* `emitter` {EventEmitter}
* `eventName` {string|symbol} The name of the event being listened for
* `options` {Object}
* `signal` {AbortSignal} An {AbortSignal} that can be used to cancel awaiting
events.
* Returns: {AsyncIterator} that iterates `eventName` events emitted by the `emitter`

```js
Expand Down Expand Up @@ -1042,6 +1045,33 @@ if the `EventEmitter` emits `'error'`. It removes all listeners when
exiting the loop. The `value` returned by each iteration is an array
composed of the emitted event arguments.

An {AbortSignal} may be used to cancel waiting on events:

```js
const { on, EventEmitter } = require('events');
const ac = new AbortController();

(async () => {
const ee = new EventEmitter();

// Emit later on
process.nextTick(() => {
ee.emit('foo', 'bar');
ee.emit('foo', 42);
});

for await (const event of on(ee, 'foo', { signal: ac.signal })) {
// The execution of this inner block is synchronous and it
// processes one event at a time (even with await). Do not use
// if concurrent execution is required.
console.log(event); // prints ['bar'] [42]
}
// Unreachable here
})();

process.nextTick(() => ac.abort());
```

## `EventTarget` and `Event` API
<!-- YAML
added: v14.5.0
Expand Down
28 changes: 27 additions & 1 deletion lib/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,13 @@ function eventTargetAgnosticAddListener(emitter, name, listener, flags) {
}
}

function on(emitter, event) {
function on(emitter, event, options) {
const { signal } = { ...options };
validateAbortSignal(signal, 'options.signal');
if (signal && signal.aborted) {
throw lazyDOMException('The operation was aborted', 'AbortError');
}

const unconsumedEvents = [];
const unconsumedPromises = [];
let error = null;
Expand Down Expand Up @@ -769,6 +775,15 @@ function on(emitter, event) {
return() {
eventTargetAgnosticRemoveListener(emitter, event, eventHandler);
eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler);

if (signal) {
eventTargetAgnosticRemoveListener(
signal,
'abort',
abortListener,
{ once: true });
}

finished = true;

for (const promise of unconsumedPromises) {
Expand Down Expand Up @@ -798,9 +813,20 @@ function on(emitter, event) {
addErrorHandlerIfEventEmitter(emitter, errorHandler);
}

if (signal) {
eventTargetAgnosticAddListener(
signal,
'abort',
abortListener,
{ once: true });
}

return iterator;

function abortListener() {
errorHandler(lazyDOMException('The operation was aborted', 'AbortError'));
}

function eventHandler(...args) {
const promise = unconsumedPromises.shift();
if (promise) {
Expand Down
121 changes: 119 additions & 2 deletions test/parallel/test-event-on-async-iterator.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Flags: --expose-internals
// Flags: --expose-internals --no-warnings --experimental-abortcontroller
'use strict';

const common = require('../common');
Expand Down Expand Up @@ -248,6 +248,117 @@ async function nodeEventTarget() {
clearInterval(interval);
}

async function abortableOnBefore() {
const ee = new EventEmitter();
const ac = new AbortController();
ac.abort();
[1, {}, null, false, 'hi'].forEach((signal) => {
assert.throws(() => on(ee, 'foo', { signal }), {
code: 'ERR_INVALID_ARG_TYPE'
});
});
assert.throws(() => on(ee, 'foo', { signal: ac.signal }), {
name: 'AbortError'
});
}

async function eventTargetAbortableOnBefore() {
const et = new EventTarget();
const ac = new AbortController();
ac.abort();
[1, {}, null, false, 'hi'].forEach((signal) => {
assert.throws(() => on(et, 'foo', { signal }), {
code: 'ERR_INVALID_ARG_TYPE'
});
});
assert.throws(() => on(et, 'foo', { signal: ac.signal }), {
name: 'AbortError'
});
}

async function abortableOnAfter() {
const ee = new EventEmitter();
const ac = new AbortController();

const i = setInterval(() => ee.emit('foo', 'foo'), 10);

async function foo() {
for await (const f of on(ee, 'foo', { signal: ac.signal })) {
assert.strictEqual(f, 'foo');
}
}

foo().catch(common.mustCall((error) => {
assert.strictEqual(error.name, 'AbortError');
})).finally(() => {
clearInterval(i);
});

process.nextTick(() => ac.abort());
}

async function eventTargetAbortableOnAfter() {
const et = new EventTarget();
const ac = new AbortController();

const i = setInterval(() => et.dispatchEvent(new Event('foo')), 10);

async function foo() {
for await (const f of on(et, 'foo', { signal: ac.signal })) {
assert(f);
}
}

foo().catch(common.mustCall((error) => {
assert.strictEqual(error.name, 'AbortError');
})).finally(() => {
clearInterval(i);
});

process.nextTick(() => ac.abort());
}

async function eventTargetAbortableOnAfter2() {
const et = new EventTarget();
const ac = new AbortController();

const i = setInterval(() => et.dispatchEvent(new Event('foo')), 10);

async function foo() {
for await (const f of on(et, 'foo', { signal: ac.signal })) {
assert(f);
// Cancel after a single event has been triggered.
ac.abort();
}
}

foo().catch(common.mustCall((error) => {
assert.strictEqual(error.name, 'AbortError');
})).finally(() => {
clearInterval(i);
});
}

async function abortableOnAfterDone() {
const ee = new EventEmitter();
const ac = new AbortController();

const i = setInterval(() => ee.emit('foo', 'foo'), 1);
let count = 0;

async function foo() {
for await (const f of on(ee, 'foo', { signal: ac.signal })) {
assert.strictEqual(f[0], 'foo');
if (++count === 5)
break;
}
ac.abort(); // No error will occur
}

foo().finally(() => {
clearInterval(i);
});
}

async function run() {
const funcs = [
Expand All @@ -260,7 +371,13 @@ async function run() {
iterableThrow,
eventTarget,
errorListenerCount,
nodeEventTarget
nodeEventTarget,
abortableOnBefore,
abortableOnAfter,
eventTargetAbortableOnBefore,
eventTargetAbortableOnAfter,
eventTargetAbortableOnAfter2,
abortableOnAfterDone
];

for (const fn of funcs) {
Expand Down