Skip to content

Commit e4a4f63

Browse files
louwersaduh95
authored andcommitted
sqlite: fix crash session extension callbacks with workers
PR-URL: #59848 Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Zeyu "Alex" Yang <[email protected]> Reviewed-By: Edy Silva <[email protected]>
1 parent 19a7b1e commit e4a4f63

File tree

5 files changed

+135
-26
lines changed

5 files changed

+135
-26
lines changed

src/node_sqlite.cc

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1623,26 +1623,28 @@ void Backup(const FunctionCallbackInfo<Value>& args) {
16231623
job->ScheduleBackup();
16241624
}
16251625

1626+
struct ConflictCallbackContext {
1627+
std::function<bool(std::string_view)> filterCallback;
1628+
std::function<int(int)> conflictCallback;
1629+
};
1630+
16261631
// the reason for using static functions here is that SQLite needs a
16271632
// function pointer
1628-
static std::function<int(int)> conflictCallback;
16291633

16301634
static int xConflict(void* pCtx, int eConflict, sqlite3_changeset_iter* pIter) {
1631-
if (!conflictCallback) return SQLITE_CHANGESET_ABORT;
1632-
return conflictCallback(eConflict);
1635+
auto ctx = static_cast<ConflictCallbackContext*>(pCtx);
1636+
if (!ctx->conflictCallback) return SQLITE_CHANGESET_ABORT;
1637+
return ctx->conflictCallback(eConflict);
16331638
}
16341639

1635-
static std::function<bool(std::string)> filterCallback;
1636-
16371640
static int xFilter(void* pCtx, const char* zTab) {
1638-
if (!filterCallback) return 1;
1639-
1640-
return filterCallback(zTab) ? 1 : 0;
1641+
auto ctx = static_cast<ConflictCallbackContext*>(pCtx);
1642+
if (!ctx->filterCallback) return 1;
1643+
return ctx->filterCallback(zTab) ? 1 : 0;
16411644
}
16421645

16431646
void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo<Value>& args) {
1644-
conflictCallback = nullptr;
1645-
filterCallback = nullptr;
1647+
ConflictCallbackContext context;
16461648

16471649
DatabaseSync* db;
16481650
ASSIGN_OR_RETURN_UNWRAP(&db, args.This());
@@ -1678,7 +1680,7 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo<Value>& args) {
16781680
return;
16791681
}
16801682
Local<Function> conflictFunc = conflictValue.As<Function>();
1681-
conflictCallback = [env, conflictFunc](int conflictType) -> int {
1683+
context.conflictCallback = [env, conflictFunc](int conflictType) -> int {
16821684
Local<Value> argv[] = {Integer::New(env->isolate(), conflictType)};
16831685
TryCatch try_catch(env->isolate());
16841686
Local<Value> result =
@@ -1716,15 +1718,18 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo<Value>& args) {
17161718

17171719
Local<Function> filterFunc = filterValue.As<Function>();
17181720

1719-
filterCallback = [env, filterFunc](std::string item) -> bool {
1721+
context.filterCallback = [env,
1722+
filterFunc](std::string_view item) -> bool {
17201723
// TODO(@jasnell): The use of ToLocalChecked here means that if
17211724
// the filter function throws an error the process will crash.
17221725
// The filterCallback should be updated to avoid the check and
17231726
// propagate the error correctly.
1724-
Local<Value> argv[] = {String::NewFromUtf8(env->isolate(),
1725-
item.c_str(),
1726-
NewStringType::kNormal)
1727-
.ToLocalChecked()};
1727+
Local<Value> argv[] = {
1728+
String::NewFromUtf8(env->isolate(),
1729+
item.data(),
1730+
NewStringType::kNormal,
1731+
static_cast<int>(item.size()))
1732+
.ToLocalChecked()};
17281733
Local<Value> result =
17291734
filterFunc->Call(env->context(), Null(env->isolate()), 1, argv)
17301735
.ToLocalChecked();
@@ -1740,7 +1745,7 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo<Value>& args) {
17401745
const_cast<void*>(static_cast<const void*>(buf.data())),
17411746
xFilter,
17421747
xConflict,
1743-
nullptr);
1748+
static_cast<void*>(&context));
17441749
if (r == SQLITE_OK) {
17451750
args.GetReturnValue().Set(true);
17461751
return;

test/parallel/test-sqlite-session.js

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ const {
77
constants,
88
} = require('node:sqlite');
99
const { test, suite } = require('node:test');
10+
const { nextDb } = require('../sqlite/next-db.js');
11+
const { Worker } = require('worker_threads');
12+
const { once } = require('events');
1013

1114
/**
1215
* Convenience wrapper around assert.deepStrictEqual that sets a null
@@ -539,3 +542,74 @@ test('session.close() - closing twice', (t) => {
539542
message: 'session is not open'
540543
});
541544
});
545+
546+
test('concurrent applyChangeset with workers', async (t) => {
547+
// Before adding this test, the callbacks were stored in static variables
548+
// this could result in a crash
549+
// this test is a regression test for that scenario
550+
551+
function modeToString(mode) {
552+
if (mode === constants.SQLITE_CHANGESET_ABORT) return 'SQLITE_CHANGESET_ABORT';
553+
if (mode === constants.SQLITE_CHANGESET_OMIT) return 'SQLITE_CHANGESET_OMIT';
554+
}
555+
556+
const dbPath = nextDb();
557+
const db1 = new DatabaseSync(dbPath);
558+
const db2 = new DatabaseSync(':memory:');
559+
const createTable = `
560+
CREATE TABLE data(
561+
key INTEGER PRIMARY KEY,
562+
value TEXT
563+
) STRICT`;
564+
db1.exec(createTable);
565+
db2.exec(createTable);
566+
db1.prepare('INSERT INTO data (key, value) VALUES (?, ?)').run(1, 'hello');
567+
db1.close();
568+
const session = db2.createSession();
569+
db2.prepare('INSERT INTO data (key, value) VALUES (?, ?)').run(1, 'world');
570+
const changeset = session.changeset(); // Changeset with conflict (for db1)
571+
572+
const iterations = 10;
573+
for (let i = 0; i < iterations; i++) {
574+
const workers = [];
575+
const expectedResults = new Map([
576+
[constants.SQLITE_CHANGESET_ABORT, false],
577+
[constants.SQLITE_CHANGESET_OMIT, true]]
578+
);
579+
580+
// Launch two workers (abort and omit modes)
581+
for (const mode of [constants.SQLITE_CHANGESET_ABORT, constants.SQLITE_CHANGESET_OMIT]) {
582+
const worker = new Worker(`${__dirname}/../sqlite/worker.js`, {
583+
workerData: {
584+
dbPath,
585+
changeset,
586+
mode
587+
},
588+
});
589+
workers.push(worker);
590+
}
591+
592+
const results = await Promise.all(workers.map(async (worker) => {
593+
const [message] = await once(worker, 'message');
594+
return message;
595+
}));
596+
597+
// Verify each result
598+
for (const res of results) {
599+
if (res.errorMessage) {
600+
if (res.errcode === 5) { // SQLITE_BUSY
601+
break; // ignore
602+
}
603+
t.assert.fail(`Worker error: ${res.error.message}`);
604+
}
605+
const expected = expectedResults.get(res.mode);
606+
t.assert.strictEqual(
607+
res.result,
608+
expected,
609+
`Iteration ${i}: Worker (${modeToString(res.mode)}) expected ${expected} but got ${res.result}`
610+
);
611+
}
612+
613+
workers.forEach((worker) => worker.terminate()); // Cleanup
614+
}
615+
});

test/parallel/test-sqlite.js

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,10 @@
11
'use strict';
22
const { spawnPromisified, skipIfSQLiteMissing } = require('../common');
33
skipIfSQLiteMissing();
4-
const tmpdir = require('../common/tmpdir');
5-
const { join } = require('node:path');
64
const { DatabaseSync, constants } = require('node:sqlite');
75
const { suite, test } = require('node:test');
86
const { pathToFileURL } = require('node:url');
9-
let cnt = 0;
10-
11-
tmpdir.refresh();
12-
13-
function nextDb() {
14-
return join(tmpdir.path, `database-${cnt++}.db`);
15-
}
7+
const { nextDb } = require('../sqlite/next-db.js');
168

179
suite('accessing the node:sqlite module', () => {
1810
test('cannot be accessed without the node: scheme', (t) => {

test/sqlite/next-db.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
'use strict';
2+
require('../common');
3+
const tmpdir = require('../common/tmpdir');
4+
const { join } = require('node:path');
5+
6+
let cnt = 0;
7+
8+
tmpdir.refresh();
9+
10+
function nextDb() {
11+
return join(tmpdir.path, `database-${cnt++}.db`);
12+
}
13+
14+
module.exports = { nextDb };

test/sqlite/worker.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// This worker is used for one of the tests in test-sqlite-session.js
2+
3+
'use strict';
4+
require('../common');
5+
const { parentPort, workerData } = require('worker_threads');
6+
const { DatabaseSync, constants } = require('node:sqlite');
7+
const { changeset, mode, dbPath } = workerData;
8+
9+
const db = new DatabaseSync(dbPath);
10+
11+
const options = {};
12+
if (mode !== constants.SQLITE_CHANGESET_ABORT && mode !== constants.SQLITE_CHANGESET_OMIT) {
13+
throw new Error('Unexpected value for mode');
14+
}
15+
options.onConflict = () => mode;
16+
17+
try {
18+
const result = db.applyChangeset(changeset, options);
19+
parentPort.postMessage({ mode, result, error: null });
20+
} catch (error) {
21+
parentPort.postMessage({ mode, result: null, errorMessage: error.message, errcode: error.errcode });
22+
} finally {
23+
db.close(); // Just to make sure it is closed ASAP
24+
}

0 commit comments

Comments
 (0)