Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions src/node_platform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ void PerIsolatePlatformData::PostTask(std::unique_ptr<Task> task) {
}

void PerIsolatePlatformData::PostDelayedTask(
std::unique_ptr<Task> task, double delay_in_seconds) {
std::unique_ptr<Task> task,
double delay_in_seconds,
DelayedTask::Nestability nestability) {
if (flush_tasks_ == nullptr) {
// V8 may post tasks during Isolate disposal. In that case, the only
// sensible path forward is to discard the task.
Expand All @@ -263,18 +265,27 @@ void PerIsolatePlatformData::PostDelayedTask(
delayed->task = std::move(task);
delayed->platform_data = shared_from_this();
delayed->timeout = delay_in_seconds;
delayed->nestability = nestability;
foreground_delayed_tasks_.Push(std::move(delayed));
uv_async_send(flush_tasks_);
}

void PerIsolatePlatformData::PostDelayedTask(std::unique_ptr<Task> task,
double delay_in_seconds) {
PostDelayedTask(
std::move(task), delay_in_seconds, DelayedTask::Nestability::kNestable);
}

void PerIsolatePlatformData::PostNonNestableTask(std::unique_ptr<Task> task) {
PostTask(std::move(task));
}

void PerIsolatePlatformData::PostNonNestableDelayedTask(
std::unique_ptr<Task> task,
double delay_in_seconds) {
PostDelayedTask(std::move(task), delay_in_seconds);
PostDelayedTask(std::move(task),
delay_in_seconds,
DelayedTask::Nestability::kNonNestable);
}

PerIsolatePlatformData::~PerIsolatePlatformData() {
Expand Down Expand Up @@ -460,7 +471,12 @@ bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
// Timers may not guarantee queue ordering of events with the same delay if
// the delay is non-zero. This should not be a problem in practice.
uv_timer_start(&delayed->timer, RunForegroundTask, delay_millis, 0);
uv_unref(reinterpret_cast<uv_handle_t*>(&delayed->timer));
// Do not unref non-nestable tasks which can run call into JS.
// For the moment all nestable tasks are from GC or logging so it's
// okay to unref the timer for them.
if (delayed->nestability == DelayedTask::Nestability::kNestable) {
uv_unref(reinterpret_cast<uv_handle_t*>(&delayed->timer));
}
uv_handle_count_++;

scheduled_delayed_tasks_.emplace_back(delayed.release(),
Expand Down
5 changes: 5 additions & 0 deletions src/node_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ class TaskQueue {
};

struct DelayedTask {
enum class Nestability { kNestable, kNonNestable };
std::unique_ptr<v8::Task> task;
uv_timer_t timer;
double timeout;
std::shared_ptr<PerIsolatePlatformData> platform_data;
Nestability nestability;
};

// This acts as the foreground task runner for a given Isolate.
Expand Down Expand Up @@ -86,6 +88,9 @@ class PerIsolatePlatformData :
void DeleteFromScheduledTasks(DelayedTask* task);
void DecreaseHandleCount();

void PostDelayedTask(std::unique_ptr<v8::Task> task,
double delay_in_seconds,
DelayedTask::Nestability nestability);
static void FlushTasks(uv_async_t* handle);
void RunForegroundTask(std::unique_ptr<v8::Task> task);
static void RunForegroundTask(uv_timer_t* timer);
Expand Down
6 changes: 6 additions & 0 deletions test/fixtures/atomics-wait-async-timeout.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
'use strict';

const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
result.value.then((val) => console.log(val));
24 changes: 24 additions & 0 deletions test/parallel/test-atomics-wait-async-timeout-main-2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict';

require('../common');

if (process.argv[2] === 'child') {
const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000 * 1000);
result.value.then((val) => console.log(val));
return;
}

const { spawnSync } = require('child_process');
const assert = require('assert');

const child = spawnSync(process.execPath, [
__filename, 'child',
], { timeout: 1000 });

if (!child.error) {
console.log(child.stderr.toString());
console.log(child.stdout.toString());
}
assert.strictEqual(child.error.code, 'ETIMEDOUT');
19 changes: 19 additions & 0 deletions test/parallel/test-atomics-wait-async-timeout-main.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
'use strict';

require('../common');

if (process.argv[2] === 'child') {
const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
result.value.then((val) => console.log(val));
return;
}

const { spawnSync } = require('child_process');
const assert = require('assert');

const child = spawnSync(process.execPath, [__filename, 'child']);
assert.strictEqual(child.stdout.toString().trim(), 'timed-out');
assert.strictEqual(child.stderr.toString().trim(), '');
assert.strictEqual(child.status, 0);
28 changes: 28 additions & 0 deletions test/parallel/test-atomics-wait-async-timeout-worker-2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
'use strict';

const common = require('../common');
const { Worker, workerData } = require('worker_threads');
const assert = require('assert');

// Do not use isMainThread so that this test itself can be run inside a Worker.
if (!process.env.HAS_STARTED_WORKER) {
process.env.HAS_STARTED_WORKER = 1;
const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const w = new Worker(__filename, {
workerData: i32a
});
const chunks = [];
w.stdout.on('data', (chunk) => chunks.push(chunk));
w.on('exit', common.mustCall((code) => {
assert.strictEqual(Buffer.concat(chunks).toString().trim(), '');
assert.strictEqual(code, 1);
}));
setTimeout(() => {
w.terminate();
}, 1000);
} else {
const i32a = workerData;
const result = Atomics.waitAsync(i32a, 0, 0, 1000 * 1000);
result.value.then((val) => console.log(val));
}
23 changes: 23 additions & 0 deletions test/parallel/test-atomics-wait-async-timeout-worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
'use strict';

const common = require('../common');
const { Worker, workerData } = require('worker_threads');
const assert = require('assert');

// Do not use isMainThread so that this test itself can be run inside a Worker.
if (!process.env.HAS_STARTED_WORKER) {
process.env.HAS_STARTED_WORKER = 1;
const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const w = new Worker(__filename, { workerData: i32a });
const chunks = [];
w.stdout.on('data', (chunk) => chunks.push(chunk));
w.on('exit', common.mustCall((code) => {
assert.strictEqual(Buffer.concat(chunks).toString().trim(), 'timed-out');
assert.strictEqual(code, 0);
}));
} else {
const i32a = workerData;
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
result.value.then((val) => console.log(val));
}