Skip to content

Commit e436fca

Browse files
committed
implement Statement#each with ev_async
1 parent 09e6814 commit e436fca

4 files changed

Lines changed: 170 additions & 15 deletions

File tree

lib/sqlite3.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,19 @@ Database.prototype.all = function(sql) {
4242
return this;
4343
}
4444

45+
// Database#each(sql, [bind1, bind2, ...], [callback])
46+
Database.prototype.each = function(sql) {
47+
var statement = new Statement(this, sql);
48+
statement.each.apply(statement, Array.prototype.slice.call(arguments, 1)).finalize();
49+
return this;
50+
}
51+
4552
Database.prototype.execute = function() {
4653
console.warn('Database#execute() is deprecated. Use Database#all() instead.');
4754
return this.all.apply(this, arguments);
4855
};
56+
57+
Database.prototype.query = function() {
58+
console.warn('Database#query() is deprecated. Use Database#each() instead.');
59+
return this.each.apply(this, arguments);
60+
};

src/statement.cc

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ void Statement::Init(Handle<Object> target) {
2727
NODE_SET_PROTOTYPE_METHOD(constructor_template, "get", Get);
2828
NODE_SET_PROTOTYPE_METHOD(constructor_template, "run", Run);
2929
NODE_SET_PROTOTYPE_METHOD(constructor_template, "all", All);
30+
NODE_SET_PROTOTYPE_METHOD(constructor_template, "each", Each);
3031
NODE_SET_PROTOTYPE_METHOD(constructor_template, "reset", Reset);
3132
NODE_SET_PROTOTYPE_METHOD(constructor_template, "finalize", Finalize);
3233

@@ -101,14 +102,14 @@ Handle<Value> Statement::New(const Arguments& args) {
101102

102103
Statement* stmt = new Statement(db);
103104
stmt->Wrap(args.This());
105+
104106
PrepareBaton* baton = new PrepareBaton(db, Local<Function>::Cast(args[2]), stmt);
105107
baton->sql = std::string(*String::Utf8Value(sql));
106108
db->Schedule(EIO_BeginPrepare, baton);
107109

108110
return args.This();
109111
}
110112

111-
112113
void Statement::EIO_BeginPrepare(Database::Baton* baton) {
113114
assert(baton->db->open);
114115
baton->db->pending++;
@@ -520,6 +521,117 @@ int Statement::EIO_AfterAll(eio_req *req) {
520521
return 0;
521522
}
522523

524+
Handle<Value> Statement::Each(const Arguments& args) {
525+
HandleScope scope;
526+
Statement* stmt = ObjectWrap::Unwrap<Statement>(args.This());
527+
528+
Baton* baton = stmt->Bind<Baton>(args);
529+
if (baton == NULL) {
530+
return ThrowException(Exception::Error(String::New("Data type is not supported")));
531+
}
532+
else {
533+
stmt->Schedule(EIO_BeginEach, baton);
534+
return args.This();
535+
}
536+
}
537+
538+
void Statement::EIO_BeginEach(Baton* baton) {
539+
STATEMENT_BEGIN(Each);
540+
}
541+
542+
int Statement::EIO_Each(eio_req *req) {
543+
STATEMENT_INIT(Baton);
544+
545+
Async* async = new Async(stmt, baton->callback, AsyncEach);
546+
547+
sqlite3_mutex* mtx = sqlite3_db_mutex(stmt->db->handle);
548+
549+
int retrieved = 0;
550+
551+
// Make sure that we also reset when there are no parameters.
552+
if (!baton->parameters.size()) {
553+
sqlite3_reset(stmt->handle);
554+
}
555+
556+
if (stmt->Bind(baton->parameters)) {
557+
while (true) {
558+
sqlite3_mutex_enter(mtx);
559+
stmt->status = sqlite3_step(stmt->handle);
560+
if (stmt->status == SQLITE_ROW) {
561+
sqlite3_mutex_leave(mtx);
562+
Data::Row* row = new Data::Row();
563+
GetRow(row, stmt->handle);
564+
565+
pthread_mutex_lock(&async->mutex);
566+
async->data.push_back(row);
567+
retrieved++;
568+
pthread_mutex_unlock(&async->mutex);
569+
570+
ev_async_send(EV_DEFAULT_ &async->watcher);
571+
}
572+
else {
573+
if (stmt->status != SQLITE_DONE) {
574+
stmt->message = std::string(sqlite3_errmsg(stmt->db->handle));
575+
}
576+
sqlite3_mutex_leave(mtx);
577+
break;
578+
}
579+
}
580+
}
581+
582+
async->completed = true;
583+
ev_async_send(EV_DEFAULT_ &async->watcher);
584+
585+
return 0;
586+
}
587+
588+
void Statement::AsyncEach(EV_P_ ev_async *w, int revents) {
589+
HandleScope scope;
590+
Async* async = static_cast<Async*>(w->data);
591+
592+
while (true) {
593+
// Get the contents out of the data cache for us to process in the JS callback.
594+
Data::Rows rows;
595+
pthread_mutex_lock(&async->mutex);
596+
rows.swap(async->data);
597+
pthread_mutex_unlock(&async->mutex);
598+
599+
if (rows.empty()) {
600+
break;
601+
}
602+
603+
if (!async->callback.IsEmpty() && async->callback->IsFunction()) {
604+
Local<Value> argv[2];
605+
argv[0] = Local<Value>::New(Null());
606+
607+
Data::Rows::const_iterator it = rows.begin();
608+
Data::Rows::const_iterator end = rows.end();
609+
for (int i = 0; it < end; it++, i++) {
610+
argv[1] = RowToJS(*it);
611+
TRY_CATCH_CALL(async->stmt->handle_, async->callback, 2, argv);
612+
delete *it;
613+
}
614+
}
615+
}
616+
617+
if (async->completed) {
618+
delete async;
619+
w->data = NULL;
620+
}
621+
}
622+
623+
int Statement::EIO_AfterEach(eio_req *req) {
624+
HandleScope scope;
625+
STATEMENT_INIT(Baton);
626+
627+
if (stmt->status != SQLITE_DONE) {
628+
Error(baton);
629+
}
630+
631+
STATEMENT_END();
632+
return 0;
633+
}
634+
523635
Handle<Value> Statement::Reset(const Arguments& args) {
524636
HandleScope scope;
525637
Statement* stmt = ObjectWrap::Unwrap<Statement>(args.This());

src/statement.h

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,13 +117,41 @@ class Statement : public EventEmitter {
117117
Baton* baton;
118118
};
119119

120+
typedef void (*Async_Callback)(EV_P_ ev_async *w, int revents);
121+
122+
struct Async {
123+
ev_async watcher;
124+
Statement* stmt;
125+
Data::Rows data;
126+
pthread_mutex_t mutex;
127+
Persistent<Function> callback;
128+
bool completed;
129+
130+
Async(Statement* st, Handle<Function> cb, Async_Callback async_cb) :
131+
stmt(st), completed(false) {
132+
watcher.data = this;
133+
ev_async_init(&watcher, async_cb);
134+
ev_async_start(EV_DEFAULT_UC_ &watcher);
135+
callback = Persistent<Function>::New(cb);
136+
stmt->Ref();
137+
pthread_mutex_init(&mutex, NULL);
138+
}
139+
140+
~Async() {
141+
callback.Dispose();
142+
stmt->Unref();
143+
pthread_mutex_destroy(&mutex);
144+
ev_async_stop(EV_DEFAULT_UC_ &watcher);
145+
}
146+
};
147+
120148
Statement(Database* db_) : EventEmitter(),
121-
db(db_),
122-
handle(NULL),
123-
status(SQLITE_OK),
124-
prepared(false),
125-
locked(false),
126-
finalized(false) {
149+
db(db_),
150+
handle(NULL),
151+
status(SQLITE_OK),
152+
prepared(false),
153+
locked(false),
154+
finalized(false) {
127155
db->Ref();
128156
}
129157

@@ -140,8 +168,11 @@ class Statement : public EventEmitter {
140168
EIO_DEFINITION(Get);
141169
EIO_DEFINITION(Run);
142170
EIO_DEFINITION(All);
171+
EIO_DEFINITION(Each);
143172
EIO_DEFINITION(Reset);
144173

174+
static void AsyncEach(EV_P_ ev_async *w, int revents);
175+
145176
static Handle<Value> Finalize(const Arguments& args);
146177
static void Finalize(Baton* baton);
147178
void Finalize();

test/unicode.test.js

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
var sqlite3 = require('sqlite3');
22
var assert = require('assert');
33

4+
function randomString() {
5+
var str = '';
6+
for (var i = Math.random() * 300; i > 0; i--) {
7+
str += String.fromCharCode(Math.floor(Math.random() * 65536));
8+
}
9+
return str;
10+
};
11+
412
exports['test unicode characters'] = function(beforeExit) {
513
var db = new sqlite3.Database(':memory:');
614

7-
function randomString() {
8-
var str = '';
9-
for (var i = Math.random() * 300; i > 0; i--) {
10-
str += String.fromCharCode(Math.floor(Math.random() * 65536));
11-
}
12-
return str;
13-
};
14-
1515
// Generate random data.
1616
var data = [];
1717
var length = Math.floor(Math.random() * 1000) + 200;

0 commit comments

Comments
 (0)