Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/async_hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const {
enableHooks,
disableHooks,
executionAsyncResource,
aliveResources,
// Internal Embedder API
newAsyncId,
getDefaultTriggerAsyncId,
Expand Down Expand Up @@ -312,6 +313,7 @@ module.exports = {
executionAsyncId,
triggerAsyncId,
executionAsyncResource,
aliveResources,
// Embedder API
AsyncResource,
};
44 changes: 44 additions & 0 deletions lib/internal/async_hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
const {
Error,
FunctionPrototypeBind,
ObjectAssign,
ObjectDefineProperty,
ObjectValues,
Symbol,
} = primordials;

Expand Down Expand Up @@ -99,6 +101,7 @@ const emitAfterNative = emitHookFactory(after_symbol, 'emitAfterNative');
const emitDestroyNative = emitHookFactory(destroy_symbol, 'emitDestroyNative');
const emitPromiseResolveNative =
emitHookFactory(promise_resolve_symbol, 'emitPromiseResolveNative');
let internalTimers;

const topLevelResource = {};

Expand Down Expand Up @@ -427,6 +430,44 @@ function triggerAsyncId() {
return async_id_fields[kTriggerAsyncId];
}

function aliveResources() {
if (internalTimers == null) {
internalTimers = require('internal/timers');
}
const resources = async_wrap.getAliveResources();

const timers = {};
for (const list of ObjectValues(internalTimers.timerListMap)) {
var timer = list._idlePrev === list ? null : list._idlePrev;

while (timer !== null) {
timers[timer[internalTimers.async_id_symbol]] = timer;

timer = timer._idlePrev === list ? null : list._idlePrev;
}
}

const immediates = {};
const queue = internalTimers.outstandingQueue.head != null ?
internalTimers.outstandingQueue : internalTimers.immediateQueue;
var immediate = queue.head;
while (immediate !== null) {
immediates[immediate[internalTimers.async_id_symbol]] = immediate;

immediate = immediate._idleNext;
}

return ObjectAssign({}, resources, timers, immediates);
}

function _getActiveRequests() {
return ObjectValues(async_wrap.getActiveRequests());
}

function _getActiveHandles() {
return ObjectValues(async_wrap.getActiveHandles());
}


module.exports = {
executionAsyncId,
Expand All @@ -447,6 +488,9 @@ module.exports = {
clearAsyncIdStack,
hasAsyncIdStack,
executionAsyncResource,
aliveResources,
_getActiveRequests,
_getActiveHandles,
// Internal Embedder API
newAsyncId,
getOrSetAsyncId,
Expand Down
10 changes: 7 additions & 3 deletions lib/internal/bootstrap/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ process.config = JSONParse(internalBinding('native_module').config);

// Bootstrappers for all threads, including worker threads and main thread
const perThreadSetup = require('internal/process/per_thread');
const {
nativeHooks,
_getActiveRequests,
_getActiveHandles
} = require('internal/async_hooks');
const rawMethods = internalBinding('process_methods');

// Set up methods on the process object for all threads
Expand All @@ -69,8 +74,8 @@ const rawMethods = internalBinding('process_methods');
process.uptime = rawMethods.uptime;

// TODO(joyeecheung): either remove them or make them public
process._getActiveRequests = rawMethods._getActiveRequests;
process._getActiveHandles = rawMethods._getActiveHandles;
process._getActiveRequests = _getActiveRequests;
process._getActiveHandles = _getActiveHandles;

// TODO(joyeecheung): remove these
process.reallyExit = rawMethods.reallyExit;
Expand Down Expand Up @@ -105,7 +110,6 @@ if (credentials.implementsPosixCredentials) {
// process. They use the same functions as the JS embedder API. These callbacks
// are setup immediately to prevent async_wrap.setupHooks() from being hijacked
// and the cost of doing so is negligible.
const { nativeHooks } = require('internal/async_hooks');
internalBinding('async_wrap').setupHooks(nativeHooks);

const {
Expand Down
11 changes: 6 additions & 5 deletions lib/internal/timers.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ const kRefed = Symbol('refed');
// Create a single linked list instance only once at startup
const immediateQueue = new ImmediateList();

// If an uncaught exception was thrown during execution of immediateQueue,
// this queue will store all remaining Immediates that need to run upon
// resolution of all error handling (if process is still alive).
const outstandingQueue = new ImmediateList();

let nextExpiry = Infinity;
let refCount = 0;

Expand Down Expand Up @@ -405,11 +410,6 @@ function setPosition(node, pos) {
}

function getTimerCallbacks(runNextTicks) {
// If an uncaught exception was thrown during execution of immediateQueue,
// this queue will store all remaining Immediates that need to run upon
// resolution of all error handling (if process is still alive).
const outstandingQueue = new ImmediateList();

function processImmediate() {
const queue = outstandingQueue.head !== null ?
outstandingQueue : immediateQueue;
Expand Down Expand Up @@ -599,6 +599,7 @@ module.exports = {
setUnrefTimeout,
getTimerDuration,
immediateQueue,
outstandingQueue,
getTimerCallbacks,
immediateInfoFields: {
kCount,
Expand Down
65 changes: 65 additions & 0 deletions src/async_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,68 @@ static void RegisterDestroyHook(const FunctionCallbackInfo<Value>& args) {
p->env->AddCleanupHook(DestroyParamCleanupHook, p);
}

static void GetActiveRequests(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);

Local<Context> ctx = env->context();
Local<Object> return_obj = Object::New(args.GetIsolate());

for (ReqWrapBase* req_wrap : *env->req_wrap_queue()) {
AsyncWrap* w = req_wrap->GetAsyncWrap();
if (w->persistent().IsEmpty()) continue;
double async_id = w->get_async_id();
Local<Object> req_object = w->object();
return_obj->Set(ctx, Number::New(args.GetIsolate(), async_id), req_object);
}

args.GetReturnValue().Set(return_obj);
}

// Non-static, friend of HandleWrap. Could have been a HandleWrap method but
// implemented here for consistency with GetActiveRequests().
void GetActiveHandles(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);

Local<Context> ctx = env->context();
Local<Object> return_obj = Object::New(args.GetIsolate());

for (auto w : *env->handle_wrap_queue()) {
if (!HandleWrap::HasRef(w)) continue;
double async_id = w->get_async_id();
Local<Object> handle_object = w->object();
return_obj->Set(
ctx, Number::New(args.GetIsolate(), async_id), handle_object);
}

args.GetReturnValue().Set(return_obj);
}

static void GetAliveResources(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);

Local<Context> ctx = env->context();
Local<Object> return_obj = Object::New(args.GetIsolate());

for (ReqWrapBase* req_wrap : *env->req_wrap_queue()) {
AsyncWrap* w = req_wrap->GetAsyncWrap();
if (w->persistent().IsEmpty()) continue;
double async_id = w->get_async_id();
Local<Object> req_resource = w->GetResource();
return_obj->Set(
ctx, Number::New(args.GetIsolate(), async_id), req_resource);
}

for (auto w : *env->handle_wrap_queue()) {
if (!HandleWrap::HasRef(w)) continue;
double async_id = w->get_async_id();
Local<Object> handle_resource = w->GetResource();
return_obj->Set(
ctx, Number::New(args.GetIsolate(), async_id), handle_resource);
}

args.GetReturnValue().Set(return_obj);
}

void AsyncWrap::GetAsyncId(const FunctionCallbackInfo<Value>& args) {
AsyncWrap* wrap;
args.GetReturnValue().Set(kInvalidAsyncId);
Expand Down Expand Up @@ -479,6 +541,9 @@ void AsyncWrap::Initialize(Local<Object> target,
env->SetMethod(target, "enablePromiseHook", EnablePromiseHook);
env->SetMethod(target, "disablePromiseHook", DisablePromiseHook);
env->SetMethod(target, "registerDestroyHook", RegisterDestroyHook);
env->SetMethod(target, "getActiveRequests", GetActiveRequests);
env->SetMethod(target, "getActiveHandles", GetActiveHandles);
env->SetMethod(target, "getAliveResources", GetAliveResources);

PropertyAttribute ReadOnlyDontDelete =
static_cast<PropertyAttribute>(ReadOnly | DontDelete);
Expand Down
32 changes: 0 additions & 32 deletions src/node_process_methods.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,36 +276,6 @@ static void Uptime(const FunctionCallbackInfo<Value>& args) {
args.GetReturnValue().Set(result);
}

static void GetActiveRequests(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);

std::vector<Local<Value>> request_v;
for (ReqWrapBase* req_wrap : *env->req_wrap_queue()) {
AsyncWrap* w = req_wrap->GetAsyncWrap();
if (w->persistent().IsEmpty())
continue;
request_v.emplace_back(w->GetOwner());
}

args.GetReturnValue().Set(
Array::New(env->isolate(), request_v.data(), request_v.size()));
}

// Non-static, friend of HandleWrap. Could have been a HandleWrap method but
// implemented here for consistency with GetActiveRequests().
void GetActiveHandles(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);

std::vector<Local<Value>> handle_v;
for (auto w : *env->handle_wrap_queue()) {
if (!HandleWrap::HasRef(w))
continue;
handle_v.emplace_back(w->GetOwner());
}
args.GetReturnValue().Set(
Array::New(env->isolate(), handle_v.data(), handle_v.size()));
}

static void ResourceUsage(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);

Expand Down Expand Up @@ -481,8 +451,6 @@ static void InitializeProcessMethods(Local<Object> target,
env->SetMethod(target, "hrtimeBigInt", HrtimeBigInt);
env->SetMethod(target, "resourceUsage", ResourceUsage);

env->SetMethod(target, "_getActiveRequests", GetActiveRequests);
env->SetMethod(target, "_getActiveHandles", GetActiveHandles);
env->SetMethod(target, "_kill", Kill);

env->SetMethodNoSideEffect(target, "cwd", Cwd);
Expand Down
21 changes: 21 additions & 0 deletions test/parallel/test-async-hooks-alive-resources.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
'use strict';

require('../common');
const assert = require('assert');
const async_hooks = require('async_hooks');
const { aliveResources } = async_hooks;

let lastInitedAsyncId;
let lastInitedResource;
// Setup init hook such parameters are validated
async_hooks.createHook({
init(asyncId, type, triggerAsyncId, resource) {
lastInitedAsyncId = asyncId;
lastInitedResource = resource;
}
}).enable();

setTimeout(() => {}, 1);

const actual = aliveResources()[lastInitedAsyncId];
assert.strictEqual(actual, lastInitedResource);
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
const common = require('../common');
const assert = require('assert');
const fs = require('fs');
const { aliveResources } = require('async_hooks');

for (let i = 0; i < 12; i++)
fs.open(__filename, 'r', common.mustCall());

assert.strictEqual(process._getActiveRequests().length, 12);
assert.strictEqual(Object.values(aliveResources()).length, 12);
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
require('../common');
const assert = require('assert');
const net = require('net');
const { aliveResources } = require('async_hooks');

const NUM = 8;
const connections = [];
const clients = [];
Expand Down Expand Up @@ -30,18 +32,18 @@ function clientConnected(client) {


function checkAll() {
const handles = process._getActiveHandles();
const handles = Object.values(aliveResources());

clients.forEach(function(item) {
assert.ok(handles.includes(item));
assert.ok(handles.includes(item._handle));
item.destroy();
});

connections.forEach(function(item) {
assert.ok(handles.includes(item));
assert.ok(handles.includes(item._handle));
item.end();
});

assert.ok(handles.includes(server));
assert.ok(handles.includes(server._handle));
server.close();
}
21 changes: 21 additions & 0 deletions test/parallel/test-handle-wrap-isrefed.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
const common = require('../common');
const strictEqual = require('assert').strictEqual;
const { internalBinding } = require('internal/test/binding');
const { aliveResources } = require('async_hooks');

// child_process
{
Expand Down Expand Up @@ -106,5 +107,25 @@ const { kStateSymbol } = require('internal/dgram');
false, 'tcp_wrap: not unrefed on close')));
}

// timers
{
const { Timeout } = require('internal/timers');
strictEqual(Object.values(aliveResources()).filter(
(handle) => (handle instanceof Timeout)).length, 0);
const timer = setTimeout(() => {}, 500);
const handles = Object.values(aliveResources()).filter(
(handle) => (handle instanceof Timeout));
strictEqual(handles.length, 1);
const handle = handles[0];
strictEqual(Object.getPrototypeOf(handle).hasOwnProperty('hasRef'),
true, 'timer: hasRef() missing');
strictEqual(handle.hasRef(), true);
timer.unref();
strictEqual(handle.hasRef(),
false, 'timer: unref() ineffective');
timer.ref();
strictEqual(handle.hasRef(),
true, 'timer: ref() ineffective');
}

// See also test/pseudo-tty/test-handle-wrap-isrefed-tty.js
4 changes: 3 additions & 1 deletion test/pseudo-tty/ref_keeps_node_running.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require('../common');
const { internalBinding } = require('internal/test/binding');
const { TTY, isTTY } = internalBinding('tty_wrap');
const strictEqual = require('assert').strictEqual;
const { aliveResources } = require('async_hooks');

strictEqual(isTTY(0), true, 'fd 0 is not a TTY');

Expand All @@ -14,7 +15,8 @@ handle.readStart();
handle.onread = () => {};

function isHandleActive(handle) {
return process._getActiveHandles().some((active) => active === handle);
return Object.values(aliveResources())
.some((active) => active === handle);
}

strictEqual(isHandleActive(handle), true, 'TTY handle not initially active');
Expand Down