Skip to content

Commit 07f03a5

Browse files
committed
Merge pull request joeferner#120 from Sage/reader
added reader
2 parents 1a23553 + 1de9784 commit 07f03a5

File tree

10 files changed

+435
-2
lines changed

10 files changed

+435
-2
lines changed

README.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,42 @@ Javascript code:
185185
...
186186
```
187187
188+
### Querying large tables
189+
190+
To query large tables you should use a reader:
191+
192+
* `connection.reader(sql, args)` creates a reader
193+
* `reader.nextRow(callback)` returns the next row through the callback
194+
* `reader.nextRows(count, callback)` returns the next `count` rows through the callback. `count` is optional and `nextRows` uses the prefetch row count when `count` is omitted.
195+
* `connection.setPrefetchRowCount(count)` configures the prefetch row count for the connection. Prefetching can have a dramatic impact on performance but uses more memory.
196+
197+
Example:
198+
199+
```javascript
200+
connection.setPrefetchRowCount(50);
201+
var reader = connection.reader("SELECT * FROM auditlogs", []);
202+
203+
function doRead(cb) {
204+
reader.nextRow(function(err, row) {
205+
if (err) return cb(err);
206+
if (row) {
207+
// do something with row
208+
console.log("got " + JSON.stringify(row));
209+
// recurse to read next record
210+
return doRead(cb)
211+
} else {
212+
// we are done
213+
return cb();
214+
}
215+
})
216+
}
217+
218+
doRead(function(err) {
219+
if (err) throw err; // or log it
220+
console.log("all records processed");
221+
});
222+
```
223+
188224
# Limitations/Caveats
189225
190226
* Currently no native support for connection pooling (forthcoming; use generic-pool for now.)

binding.gyp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"sources": [ "src/connection.cpp",
66
"src/oracle_bindings.cpp",
77
"src/executeBaton.cpp",
8+
"src/reader.cpp",
89
"src/outParam.cpp" ],
910
"conditions": [
1011
["OS=='mac'", {

lib/oracle.js

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,22 @@ exports.OCCIDATE = 6;
3434
exports.OCCITIMESTAMP = 7;
3535
exports.OCCINUMBER = 8;
3636
exports.OCCIBLOB = 9;
37+
38+
// Reader.prototype.nextRow is implemented in JS rather than C++.
39+
// This is easier and also more efficient because we don't cross the JS/C++ boundary every time
40+
// we read a record.
41+
bindings.Reader.prototype.nextRow = function(cb) {
42+
var self = this;
43+
if (self._error || (self._rows && self._rows.length > 0)) {
44+
process.nextTick(function() {
45+
cb(self._error, self._rows && self._rows.shift());
46+
});
47+
} else {
48+
// nextRows willl use the prefetch row count as window size
49+
self.nextRows(function(err, result) {
50+
self._error = err || self._error;
51+
self._rows = result;
52+
cb(self._error, self._rows && self._rows.shift());
53+
});
54+
}
55+
};

src/connection.cpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "executeBaton.h"
44
#include "commitBaton.h"
55
#include "rollbackBaton.h"
6+
#include "reader.h"
67
#include "outParam.h"
78
#include <vector>
89
#include <node_version.h>
@@ -21,6 +22,7 @@ void Connection::Init(Handle<Object> target) {
2122

2223
NODE_SET_PROTOTYPE_METHOD(uni::Deref(constructorTemplate), "execute", Execute);
2324
NODE_SET_PROTOTYPE_METHOD(uni::Deref(constructorTemplate), "executeSync", ExecuteSync);
25+
NODE_SET_PROTOTYPE_METHOD(uni::Deref(constructorTemplate), "reader", CreateReader);
2426
NODE_SET_PROTOTYPE_METHOD(uni::Deref(constructorTemplate), "close", Close);
2527
NODE_SET_PROTOTYPE_METHOD(uni::Deref(constructorTemplate), "isConnected", IsConnected);
2628
NODE_SET_PROTOTYPE_METHOD(uni::Deref(constructorTemplate), "setAutoCommit", SetAutoCommit);
@@ -72,6 +74,24 @@ uni::CallbackType Connection::Execute(const uni::FunctionCallbackInfo& args) {
7274
UNI_RETURN(scope, args, Undefined());
7375
}
7476

77+
uni::CallbackType Connection::CreateReader(const uni::FunctionCallbackInfo& args) {
78+
UNI_SCOPE(scope);
79+
Connection* connection = ObjectWrap::Unwrap<Connection>(args.This());
80+
81+
REQ_STRING_ARG(0, sql);
82+
REQ_ARRAY_ARG(1, values);
83+
84+
String::Utf8Value sqlVal(sql);
85+
86+
ReaderBaton* baton = new ReaderBaton(connection, *sqlVal, &values);
87+
88+
Local<Object> readerHandle(uni::Deref(Reader::constructorTemplate)->GetFunction()->NewInstance());
89+
Reader* reader = ObjectWrap::Unwrap<Reader>(readerHandle);
90+
reader->setBaton(baton);
91+
92+
UNI_RETURN(scope, args, readerHandle);
93+
}
94+
7595
uni::CallbackType Connection::Close(const uni::FunctionCallbackInfo& args) {
7696
UNI_SCOPE(scope);
7797
try {
@@ -296,6 +316,15 @@ void Connection::CreateColumnsFromResultSet(oracle::occi::ResultSet* rs, Execute
296316
case oracle::occi::OCCI_TYPECODE_BLOB:
297317
col->type = VALUE_TYPE_BLOB;
298318
break;
319+
// The lines below are temporary mappings for RAW and ROWID types
320+
// I need them to test because my test db has columns like this but this mapping will need to be reviewed
321+
case 23: // see http://docs.oracle.com/cd/B14117_01/appdev.101/b10779/oci03typ.htm#421756
322+
//printf("datatype 23 for column %s\n", col->name.c_str());
323+
col->type = VALUE_TYPE_STRING;
324+
break;
325+
case 104: // rowid
326+
col->type = VALUE_TYPE_STRING;
327+
break;
299328
default:
300329
ostringstream message;
301330
message << "CreateColumnsFromResultSet: Unhandled oracle data type: " << type;

src/connection.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,14 @@ namespace uni {
6767
}
6868

6969
class Connection : public ObjectWrap {
70+
friend class Reader;
71+
friend class ReaderBaton;
7072
public:
7173
static void Init(Handle<Object> target);
7274
static uni::CallbackType New(const uni::FunctionCallbackInfo& args);
7375
static uni::CallbackType Execute(const uni::FunctionCallbackInfo& args);
7476
static uni::CallbackType ExecuteSync(const uni::FunctionCallbackInfo& args);
77+
static uni::CallbackType CreateReader(const uni::FunctionCallbackInfo& args);
7578
static uni::CallbackType Close(const uni::FunctionCallbackInfo& args);
7679
static uni::CallbackType IsConnected(const uni::FunctionCallbackInfo& args);
7780
static uni::CallbackType Commit(const uni::FunctionCallbackInfo& args);
@@ -93,13 +96,20 @@ class Connection : public ObjectWrap {
9396
void setConnection(oracle::occi::Environment* environment, oracle::occi::Connection* connection);
9497
oracle::occi::Environment* getEnvironment() { return m_environment; }
9598

96-
private:
99+
protected:
100+
// shared with Reader
101+
oracle::occi::Connection* getConnection() { return m_connection; }
102+
bool getAutoCommit() { return m_autoCommit; }
103+
int getPrefetchRowCount() { return m_prefetchRowCount; }
104+
97105
static int SetValuesOnStatement(oracle::occi::Statement* stmt, ExecuteBaton* baton);
98106
static void CreateColumnsFromResultSet(oracle::occi::ResultSet* rs, ExecuteBaton* baton, std::vector<column_t*> &columns);
99107
static row_t* CreateRowFromCurrentResultSetRow(oracle::occi::ResultSet* rs, ExecuteBaton* baton, std::vector<column_t*> &columns);
108+
static void handleResult(ExecuteBaton* baton, Handle<Value> (&argv)[2]);
109+
110+
private:
100111
static Local<Array> CreateV8ArrayFromRows(ExecuteBaton* baton, std::vector<column_t*> columns, std::vector<row_t*>* rows);
101112
static Local<Object> CreateV8ObjectFromRow(ExecuteBaton* baton, std::vector<column_t*> columns, row_t* currentRow);
102-
static void handleResult(ExecuteBaton* baton, Handle<Value> (&argv)[2]);
103113

104114
oracle::occi::Connection* m_connection;
105115
oracle::occi::Environment* m_environment;

src/oracle_bindings.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11

22
#include "connection.h"
33
#include "oracle_bindings.h"
4+
#include "reader.h"
45
#include "outParam.h"
56

67
Persistent<FunctionTemplate> OracleClient::s_ct;
@@ -185,6 +186,7 @@ extern "C" {
185186
static void init(Handle<Object> target) {
186187
OracleClient::Init(target);
187188
Connection::Init(target);
189+
Reader::Init(target);
188190
OutParam::Init(target);
189191
}
190192

src/reader.cpp

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
#include <string.h>
2+
#include "connection.h"
3+
#include "executeBaton.h"
4+
#include "reader.h"
5+
6+
using namespace std;
7+
8+
Persistent<FunctionTemplate> Reader::constructorTemplate;
9+
10+
void Reader::Init(Handle<Object> target) {
11+
UNI_SCOPE(scope);
12+
13+
Local<FunctionTemplate> t = FunctionTemplate::New(New);
14+
uni::Reset(constructorTemplate, t);
15+
uni::Deref(constructorTemplate)->InstanceTemplate()->SetInternalFieldCount(1);
16+
uni::Deref(constructorTemplate)->SetClassName(String::NewSymbol("Reader"));
17+
18+
NODE_SET_PROTOTYPE_METHOD(uni::Deref(constructorTemplate), "nextRows", NextRows);
19+
target->Set(String::NewSymbol("Reader"), uni::Deref(constructorTemplate)->GetFunction());
20+
}
21+
22+
Reader::Reader(): ObjectWrap() {
23+
}
24+
25+
Reader::~Reader() {
26+
delete m_baton;
27+
m_baton = NULL;
28+
}
29+
30+
uni::CallbackType Reader::New(const uni::FunctionCallbackInfo& args) {
31+
UNI_SCOPE(scope);
32+
33+
Reader* reader = new Reader();
34+
reader->Wrap(args.This());
35+
36+
UNI_RETURN(scope, args, args.This());
37+
}
38+
39+
void Reader::setBaton(ReaderBaton* baton) {
40+
m_baton = baton;
41+
}
42+
43+
uni::CallbackType Reader::NextRows(const uni::FunctionCallbackInfo& args) {
44+
UNI_SCOPE(scope);
45+
Reader* reader = ObjectWrap::Unwrap<Reader>(args.This());
46+
ReaderBaton* baton = reader->m_baton;
47+
if (baton->error) {
48+
Local<String> message = String::New(baton->error->c_str());
49+
UNI_THROW(Exception::Error(message));
50+
}
51+
52+
if (args.Length() > 1) {
53+
REQ_INT_ARG(0, count);
54+
REQ_FUN_ARG(1, callback);
55+
baton->count = count;
56+
uni::Reset(baton->nextRowsCallback, callback);
57+
} else {
58+
REQ_FUN_ARG(0, callback);
59+
baton->count = baton->connection->getPrefetchRowCount();
60+
uni::Reset(baton->nextRowsCallback, callback);
61+
}
62+
if (baton->count <= 0) baton->count = 1;
63+
64+
uv_work_t* req = new uv_work_t();
65+
req->data = baton;
66+
uv_queue_work(uv_default_loop(), req, EIO_NextRows, (uv_after_work_cb)EIO_AfterNextRows);
67+
baton->connection->Ref();
68+
69+
UNI_RETURN(scope, args, Undefined());
70+
}
71+
72+
void Reader::EIO_NextRows(uv_work_t* req) {
73+
ReaderBaton* baton = static_cast<ReaderBaton*>(req->data);
74+
75+
baton->rows = NULL;
76+
baton->error = NULL;
77+
78+
try {
79+
if(! baton->connection->getConnection()) {
80+
baton->error = new std::string("Connection already closed");
81+
return;
82+
}
83+
if (!baton->rs) {
84+
baton->stmt = baton->connection->getConnection()->createStatement(baton->sql);
85+
baton->stmt->setAutoCommit(baton->connection->getAutoCommit());
86+
baton->stmt->setPrefetchRowCount(baton->count);
87+
Connection::SetValuesOnStatement(baton->stmt, baton);
88+
if (baton->error) goto cleanup;
89+
90+
int status = baton->stmt->execute();
91+
if(status != oracle::occi::Statement::RESULT_SET_AVAILABLE) {
92+
baton->error = new std::string("Connection already closed");
93+
return;
94+
}
95+
baton->rs = baton->stmt->getResultSet();
96+
Connection::CreateColumnsFromResultSet(baton->rs, baton, baton->columns);
97+
if (baton->error) goto cleanup;
98+
}
99+
baton->rows = new vector<row_t*>();
100+
101+
for (int i = 0; i < baton->count && baton->rs->next(); i++) {
102+
row_t* row = Connection::CreateRowFromCurrentResultSetRow(baton->rs, baton, baton->columns);
103+
if (baton->error) goto cleanup;
104+
baton->rows->push_back(row);
105+
}
106+
} catch(oracle::occi::SQLException &ex) {
107+
baton->error = new string(ex.getMessage());
108+
} catch (const exception& ex) {
109+
baton->error = new string(ex.what());
110+
} catch (...) {
111+
baton->error = new string("Unknown Error");
112+
}
113+
cleanup:
114+
// nothing for now, cleanup happens in destructor
115+
;
116+
}
117+
118+
#if NODE_MODULE_VERSION >= 0x000D
119+
void ReaderWeakReferenceCallback(Isolate* isolate, v8::Persistent<v8::Function>* callback, void* dummy)
120+
{
121+
callback->Dispose();
122+
}
123+
#else
124+
void ReaderWeakReferenceCallback(v8::Persistent<v8::Value> callback, void* dummy)
125+
{
126+
(Persistent<Function>(Function::Cast(*callback))).Dispose();
127+
}
128+
#endif
129+
130+
void Reader::EIO_AfterNextRows(uv_work_t* req, int status) {
131+
UNI_SCOPE(scope);
132+
ReaderBaton* baton = static_cast<ReaderBaton*>(req->data);
133+
134+
baton->connection->Unref();
135+
136+
try {
137+
Handle<Value> argv[2];
138+
Connection::handleResult(baton, argv);
139+
node::MakeCallback(Context::GetCurrent()->Global(), uni::Deref(baton->nextRowsCallback), 2, argv);
140+
} catch(const exception &ex) {
141+
Handle<Value> argv[2];
142+
argv[0] = Exception::Error(String::New(ex.what()));
143+
argv[1] = Undefined();
144+
node::MakeCallback(Context::GetCurrent()->Global(), uni::Deref(baton->nextRowsCallback), 2, argv);
145+
}
146+
baton->nextRowsCallback.MakeWeak((void*)NULL, ReaderWeakReferenceCallback);
147+
}
148+

src/reader.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
2+
#ifndef _reader_h_
3+
#define _reader_h_
4+
5+
6+
#include <v8.h>
7+
#include <node.h>
8+
#include <occi.h>
9+
#include "readerBaton.h"
10+
11+
using namespace node;
12+
using namespace v8;
13+
14+
class Reader : public ObjectWrap {
15+
public:
16+
static Persistent<FunctionTemplate> constructorTemplate;
17+
static void Init(Handle<Object> target);
18+
static uni::CallbackType New(const uni::FunctionCallbackInfo& args);
19+
static uni::CallbackType NextRows(const uni::FunctionCallbackInfo& args);
20+
static void EIO_NextRows(uv_work_t* req);
21+
static void EIO_AfterNextRows(uv_work_t* req, int status);
22+
23+
Reader();
24+
~Reader();
25+
26+
void setBaton(ReaderBaton* baton);
27+
28+
private:
29+
ReaderBaton* m_baton;
30+
};
31+
32+
#endif

0 commit comments

Comments
 (0)