Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions doc/api/globals.md
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,9 @@ navigator.locks.request('shared_resource', { mode: 'shared' }, async (lock) => {

See [`worker_threads.locks`][] for detailed API documentation.

Use [`process.locksCounters()`][] to monitor lock acquisitions, contention, and
queue sizes across the process.
Comment on lines +842 to +843
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be either all singular or all plural no ?

Suggested change
Use [`process.locksCounters()`][] to monitor lock acquisitions, contention, and
queue sizes across the process.
Use [`process.locksCounters()`][] to monitor lock acquisitions, contentions, and
queue sizes across the process.


## Class: `PerformanceEntry`

<!-- YAML
Expand Down Expand Up @@ -1376,6 +1379,7 @@ A browser-compatible implementation of [`WritableStreamDefaultWriter`][].
[`localStorage`]: https://developer.mozilla.org/en-US/docs/Web/API/Window/localStorage
[`module`]: modules.md#module
[`perf_hooks.performance`]: perf_hooks.md#perf_hooksperformance
[`process.locksCounters()`]: process.md#processlockscounters
[`process.nextTick()`]: process.md#processnexttickcallback-args
[`process` object]: process.md#process
[`require()`]: modules.md#requireid
Expand Down
55 changes: 55 additions & 0 deletions doc/api/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -3735,6 +3735,60 @@ console.log(resourceUsage());
*/
```

## `process.locksCounters()`

<!-- YAML
added: REPLACEME
-->

* Returns: {Object} Web Locks usage statistics for the current process.
* `totalAborts` {bigint} Lock requests aborted (either `ifAvailable` could not
be granted, or the callback rejected/threw).
* `totalSteals` {bigint} Lock requests granted with `{ steal: true }`.
* `totalExclusiveAcquired` {bigint} Total exclusive locks acquired.
* `totalSharedAcquired` {bigint} Total shared locks acquired.
* `holdersExclusive` {number} Exclusive locks currently held.
* `holdersShared` {number} Shared locks currently held.
* `pendingExclusive` {number} Exclusive lock requests currently queued.
* `pendingShared` {number} Shared lock requests currently queued.

Returns an object containing lock usage metrics for the current process to provide visibility into
[`navigator.locks`][] (Web Locks API).

```mjs
import process from 'node:process';
import { locks } from 'node:worker_threads';

const before = process.locksCounters();

await locks.request('my_resource', async () => {
const current = process.locksCounters();
console.log(current.holdersExclusive); // 1
console.log(current.totalExclusiveAcquired - before.totalExclusiveAcquired); // 1n
});

const after = process.locksCounters();
console.log(after.holdersExclusive); // 0 (released)
console.log(after.totalExclusiveAcquired - before.totalExclusiveAcquired); // 1n (cumulative)
```

```cjs
const process = require('node:process');
const { locks } = require('node:worker_threads');

const before = process.locksCounters();

locks.request('my_resource', async () => {
const current = process.locksCounters();
console.log(current.holdersExclusive); // 1
console.log(current.totalExclusiveAcquired - before.totalExclusiveAcquired); // 1n
}).then(() => {
const after = process.locksCounters();
console.log(after.holdersExclusive); // 0 (released)
console.log(after.totalExclusiveAcquired - before.totalExclusiveAcquired); // 1n (cumulative)
});
```

## `process.send(message[, sendHandle[, options]][, callback])`

<!-- YAML
Expand Down Expand Up @@ -4535,6 +4589,7 @@ cases:
[`module.getSourceMapsSupport()`]: module.md#modulegetsourcemapssupport
[`module.isBuiltin(id)`]: module.md#moduleisbuiltinmodulename
[`module.setSourceMapsSupport()`]: module.md#modulesetsourcemapssupportenabled-options
[`navigator.locks`]: globals.md#navigatorlocks
[`net.Server`]: net.md#class-netserver
[`net.Socket`]: net.md#class-netsocket
[`os.constants.dlopen`]: os.md#dlopen-constants
Expand Down
6 changes: 5 additions & 1 deletion doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,10 @@ added: v24.5.0
An instance of a [`LockManager`][LockManager] that can be used to coordinate
access to resources that may be shared across multiple threads within the same
process. The API mirrors the semantics of the
[browser `LockManager`][]
[browser `LockManager`][].

Use [`process.locksCounters()`][] to monitor lock acquisitions, contention, and
queue sizes across the process.

### Class: `Lock`

Expand Down Expand Up @@ -2249,6 +2252,7 @@ thread spawned will spawn another until the application crashes.
[`process.env`]: process.md#processenv
[`process.execArgv`]: process.md#processexecargv
[`process.exit()`]: process.md#processexitcode
[`process.locksCounters()`]: process.md#processlockscounters
[`process.stderr`]: process.md#processstderr
[`process.stdin`]: process.md#processstdin
[`process.stdout`]: process.md#processstdout
Expand Down
1 change: 1 addition & 0 deletions lib/internal/bootstrap/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ const rawMethods = internalBinding('process_methods');
process.cpuUsage = wrapped.cpuUsage;
process.threadCpuUsage = wrapped.threadCpuUsage;
process.resourceUsage = wrapped.resourceUsage;
process.locksCounters = wrapped.locksCounters;
process.memoryUsage = wrapped.memoryUsage;
process.constrainedMemory = rawMethods.constrainedMemory;
process.availableMemory = rawMethods.availableMemory;
Expand Down
19 changes: 19 additions & 0 deletions lib/internal/process/per_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ function wrapProcessMethods(binding) {
threadCpuUsage: _threadCpuUsage,
memoryUsage: _memoryUsage,
rss,
locksCounters: _locksCounters,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have cpuUsage, threadCpuUsage, memoryUsage, resourceUsage... maybe we should call it lockUsage instead ?

resourceUsage: _resourceUsage,
loadEnvFile: _loadEnvFile,
execve: _execve,
Expand Down Expand Up @@ -349,6 +350,23 @@ function wrapProcessMethods(binding) {
};
}

const locksCountersBuffer = new Float64Array(8);
const locksCountersBigIntView = new BigUint64Array(locksCountersBuffer.buffer, 0, 4);

function locksCounters() {
_locksCounters(locksCountersBuffer);
return {
totalAborts: locksCountersBigIntView[0],
totalSteals: locksCountersBigIntView[1],
totalExclusiveAcquired: locksCountersBigIntView[2],
totalSharedAcquired: locksCountersBigIntView[3],
holdersExclusive: locksCountersBuffer[4],
holdersShared: locksCountersBuffer[5],
pendingExclusive: locksCountersBuffer[6],
pendingShared: locksCountersBuffer[7],
};
}

/**
* Loads the `.env` file to process.env.
* @param {string | URL | Buffer | undefined} path
Expand All @@ -372,6 +390,7 @@ function wrapProcessMethods(binding) {
kill,
exit,
execve,
locksCounters,
loadEnvFile,
};
}
Expand Down
51 changes: 50 additions & 1 deletion src/node_locks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ void LockManager::ProcessQueue(Environment* env) {
* remove later
*/
if (if_available_request) {
{
Mutex::ScopedLock scoped_lock(mutex_);
counters.total_aborts++;
}
Local<Value> null_arg = Null(isolate);
Local<Value> callback_result;
{
Expand Down Expand Up @@ -456,7 +460,17 @@ void LockManager::ProcessQueue(Environment* env) {
grantable_request->released_promise());
{
Mutex::ScopedLock scoped_lock(mutex_);
held_locks_[grantable_request->name()].push_back(granted_lock);
auto& resource_locks = held_locks_[grantable_request->name()];
resource_locks.push_back(granted_lock);
if (grantable_request->steal()) {
counters.total_steals++;
}

if (grantable_request->mode() == Lock::Mode::Exclusive) {
counters.total_exclusive_acquired++;
} else {
counters.total_shared_acquired++;
}
}

// Create and store the new granted lock
Expand Down Expand Up @@ -715,6 +729,10 @@ void LockManager::ReleaseLockAndProcessQueue(Environment* env,
// stolen.
if (!lock->is_stolen()) {
if (was_rejected) {
{
Mutex::ScopedLock scoped_lock(mutex_);
counters.total_aborts++;
}
// Propagate rejection from the user callback
if (lock->released_promise()
->Reject(context, callback_result)
Expand Down Expand Up @@ -800,6 +818,37 @@ void LockManager::CleanupEnvironment(Environment* env_to_cleanup) {
registered_envs_.erase(env_to_cleanup);
}

LockManager::LocksCountersSnapshot LockManager::GetCountersSnapshot() const {
LocksCountersSnapshot snapshot;
Mutex::ScopedLock scoped_lock(mutex_);

snapshot.total_steals = counters.total_steals;
snapshot.total_aborts = counters.total_aborts;
snapshot.total_exclusive_acquired = counters.total_exclusive_acquired;
snapshot.total_shared_acquired = counters.total_shared_acquired;

for (const auto& pending_request : pending_queue_) {
if (pending_request->mode() == Lock::Mode::Exclusive) {
snapshot.pending_exclusive++;
} else {
snapshot.pending_shared++;
}
}

for (const auto& resource : held_locks_) {
const auto& locks = resource.second;
for (const auto& lock : locks) {
if (lock->mode() == Lock::Mode::Exclusive) {
snapshot.holders_exclusive++;
} else {
snapshot.holders_shared++;
}
}
}

return snapshot;
}

// Cleanup hook wrapper
void LockManager::OnEnvironmentCleanup(void* arg) {
Environment* env = static_cast<Environment*>(arg);
Expand Down
18 changes: 18 additions & 0 deletions src/node_locks.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,17 @@ class LockManager final {
std::shared_ptr<Lock> lock,
v8::Local<v8::Value> result,
bool was_rejected = false);
struct LocksCountersSnapshot {
uint64_t total_steals = 0;
uint64_t total_aborts = 0;
uint64_t total_exclusive_acquired = 0;
uint64_t total_shared_acquired = 0;
size_t holders_exclusive = 0;
size_t holders_shared = 0;
size_t pending_exclusive = 0;
size_t pending_shared = 0;
};
LocksCountersSnapshot GetCountersSnapshot() const;

private:
LockManager() = default;
Expand All @@ -171,6 +182,13 @@ class LockManager final {
static LockManager current_;

mutable Mutex mutex_;
struct Counters {
uint64_t total_steals = 0;
uint64_t total_aborts = 0;
uint64_t total_exclusive_acquired = 0;
uint64_t total_shared_acquired = 0;
};
Counters counters;
// All entries for a given Environment* are purged in CleanupEnvironment().
std::unordered_map<std::u16string, std::deque<std::shared_ptr<Lock>>>
held_locks_;
Expand Down
23 changes: 23 additions & 0 deletions src/node_process_methods.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "node_errors.h"
#include "node_external_reference.h"
#include "node_internals.h"
#include "node_locks.h"
#include "node_process-inl.h"
#include "path.h"
#include "util-inl.h"
Expand Down Expand Up @@ -39,6 +40,7 @@ typedef int mode_t;

namespace node {

using node::worker::locks::LockManager;
using v8::Array;
using v8::ArrayBuffer;
using v8::CFunction;
Expand Down Expand Up @@ -156,6 +158,25 @@ static void ThreadCPUUsage(const FunctionCallbackInfo<Value>& args) {
fields[1] = MICROS_PER_SEC * rusage.ru_stime.tv_sec + rusage.ru_stime.tv_usec;
}

static void LocksCounters(const FunctionCallbackInfo<Value>& args) {
LockManager::LocksCountersSnapshot snapshot =
LockManager::GetCurrent()->GetCountersSnapshot();

Local<ArrayBuffer> ab = get_fields_array_buffer(args, 0, 8);

uint64_t* bigint_fields = static_cast<uint64_t*>(ab->Data());
bigint_fields[0] = snapshot.total_aborts;
bigint_fields[1] = snapshot.total_steals;
bigint_fields[2] = snapshot.total_exclusive_acquired;
bigint_fields[3] = snapshot.total_shared_acquired;

double* fields = static_cast<double*>(ab->Data());
fields[4] = static_cast<double>(snapshot.holders_exclusive);
fields[5] = static_cast<double>(snapshot.holders_shared);
fields[6] = static_cast<double>(snapshot.pending_exclusive);
fields[7] = static_cast<double>(snapshot.pending_shared);
}

static void Cwd(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
CHECK(env->has_run_bootstrapping_code());
Expand Down Expand Up @@ -770,6 +791,7 @@ static void CreatePerIsolateProperties(IsolateData* isolate_data,
SetMethod(isolate, target, "rss", Rss);
SetMethod(isolate, target, "cpuUsage", CPUUsage);
SetMethod(isolate, target, "threadCpuUsage", ThreadCPUUsage);
SetMethodNoSideEffect(isolate, target, "locksCounters", LocksCounters);
SetMethod(isolate, target, "resourceUsage", ResourceUsage);

SetMethod(isolate, target, "_debugEnd", DebugEnd);
Expand Down Expand Up @@ -819,6 +841,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(Rss);
registry->Register(CPUUsage);
registry->Register(ThreadCPUUsage);
registry->Register(LocksCounters);
registry->Register(ResourceUsage);

registry->Register(GetActiveRequests);
Expand Down
Loading
Loading