Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,42 @@ Javascript code:
...
```

### Querying large tables

To query large tables you should use a reader:

* `connection.reader(sql, args)` creates a reader
* `reader.nextRow(callback)` returns the next row through the callback
* `reader.nextRows(count, callback)` returns the next `count` rows through the callback. `count` is optional and `nextRows` uses the prefetch row count when `count` is omitted.
* `connection.setPrefetchRowCount(count)` configures the prefetch row count for the connection. Prefetching can have a dramatic impact on performance but uses more memory.

Example:

```javascript
connection.setPrefetchRowCount(50);
var reader = connection.reader("SELECT * FROM auditlogs", []);

function doRead(cb) {
reader.nextRow(function(err, row) {
if (err) return cb(err);
if (row) {
// do something with row
console.log("got " + JSON.stringify(row));
// recurse to read next record
return doRead(cb)
} else {
// we are done
return cb();
}
})
}

doRead(function(err) {
if (err) throw err; // or log it
console.log("all records processed");
});
```

# Limitations/Caveats

* Currently no native support for connection pooling (forthcoming; use generic-pool for now.)
Expand Down
1 change: 1 addition & 0 deletions binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"sources": [ "src/connection.cpp",
"src/oracle_bindings.cpp",
"src/executeBaton.cpp",
"src/reader.cpp",
"src/outParam.cpp" ],
"conditions": [
["OS=='mac'", {
Expand Down
19 changes: 19 additions & 0 deletions lib/oracle.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,22 @@ exports.OCCIDATE = 6;
exports.OCCITIMESTAMP = 7;
exports.OCCINUMBER = 8;
exports.OCCIBLOB = 9;

// Reader.prototype.nextRow is implemented in JS rather than C++.
// This is easier and also more efficient because we don't cross the JS/C++ boundary every time
// we read a record.
bindings.Reader.prototype.nextRow = function(cb) {
var self = this;
if (self._error || (self._rows && self._rows.length > 0)) {
process.nextTick(function() {
cb(self._error, self._rows && self._rows.shift());
});
} else {
// nextRows willl use the prefetch row count as window size
self.nextRows(function(err, result) {
self._error = err || self._error;
self._rows = result;
cb(self._error, self._rows && self._rows.shift());
});
}
};
29 changes: 29 additions & 0 deletions src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "executeBaton.h"
#include "commitBaton.h"
#include "rollbackBaton.h"
#include "reader.h"
#include "outParam.h"
#include <vector>
#include <node_version.h>
Expand All @@ -21,6 +22,7 @@ void Connection::Init(Handle<Object> target) {

NODE_SET_PROTOTYPE_METHOD(uni::Deref(constructorTemplate), "execute", Execute);
NODE_SET_PROTOTYPE_METHOD(uni::Deref(constructorTemplate), "executeSync", ExecuteSync);
NODE_SET_PROTOTYPE_METHOD(uni::Deref(constructorTemplate), "reader", CreateReader);
NODE_SET_PROTOTYPE_METHOD(uni::Deref(constructorTemplate), "close", Close);
NODE_SET_PROTOTYPE_METHOD(uni::Deref(constructorTemplate), "isConnected", IsConnected);
NODE_SET_PROTOTYPE_METHOD(uni::Deref(constructorTemplate), "setAutoCommit", SetAutoCommit);
Expand Down Expand Up @@ -72,6 +74,24 @@ uni::CallbackType Connection::Execute(const uni::FunctionCallbackInfo& args) {
UNI_RETURN(scope, args, Undefined());
}

uni::CallbackType Connection::CreateReader(const uni::FunctionCallbackInfo& args) {
UNI_SCOPE(scope);
Connection* connection = ObjectWrap::Unwrap<Connection>(args.This());

REQ_STRING_ARG(0, sql);
REQ_ARRAY_ARG(1, values);

String::Utf8Value sqlVal(sql);

ReaderBaton* baton = new ReaderBaton(connection, *sqlVal, &values);

Local<Object> readerHandle(uni::Deref(Reader::constructorTemplate)->GetFunction()->NewInstance());
Reader* reader = ObjectWrap::Unwrap<Reader>(readerHandle);
reader->setBaton(baton);

UNI_RETURN(scope, args, readerHandle);
}

uni::CallbackType Connection::Close(const uni::FunctionCallbackInfo& args) {
UNI_SCOPE(scope);
try {
Expand Down Expand Up @@ -296,6 +316,15 @@ void Connection::CreateColumnsFromResultSet(oracle::occi::ResultSet* rs, Execute
case oracle::occi::OCCI_TYPECODE_BLOB:
col->type = VALUE_TYPE_BLOB;
break;
// The lines below are temporary mappings for RAW and ROWID types
// I need them to test because my test db has columns like this but this mapping will need to be reviewed
case 23: // see http://docs.oracle.com/cd/B14117_01/appdev.101/b10779/oci03typ.htm#421756
//printf("datatype 23 for column %s\n", col->name.c_str());
col->type = VALUE_TYPE_STRING;
break;
case 104: // rowid
col->type = VALUE_TYPE_STRING;
break;
default:
ostringstream message;
message << "CreateColumnsFromResultSet: Unhandled oracle data type: " << type;
Expand Down
14 changes: 12 additions & 2 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,14 @@ namespace uni {
}

class Connection : public ObjectWrap {
friend class Reader;
friend class ReaderBaton;
public:
static void Init(Handle<Object> target);
static uni::CallbackType New(const uni::FunctionCallbackInfo& args);
static uni::CallbackType Execute(const uni::FunctionCallbackInfo& args);
static uni::CallbackType ExecuteSync(const uni::FunctionCallbackInfo& args);
static uni::CallbackType CreateReader(const uni::FunctionCallbackInfo& args);
static uni::CallbackType Close(const uni::FunctionCallbackInfo& args);
static uni::CallbackType IsConnected(const uni::FunctionCallbackInfo& args);
static uni::CallbackType Commit(const uni::FunctionCallbackInfo& args);
Expand All @@ -93,13 +96,20 @@ class Connection : public ObjectWrap {
void setConnection(oracle::occi::Environment* environment, oracle::occi::Connection* connection);
oracle::occi::Environment* getEnvironment() { return m_environment; }

private:
protected:
// shared with Reader
oracle::occi::Connection* getConnection() { return m_connection; }
bool getAutoCommit() { return m_autoCommit; }
int getPrefetchRowCount() { return m_prefetchRowCount; }

static int SetValuesOnStatement(oracle::occi::Statement* stmt, ExecuteBaton* baton);
static void CreateColumnsFromResultSet(oracle::occi::ResultSet* rs, ExecuteBaton* baton, std::vector<column_t*> &columns);
static row_t* CreateRowFromCurrentResultSetRow(oracle::occi::ResultSet* rs, ExecuteBaton* baton, std::vector<column_t*> &columns);
static void handleResult(ExecuteBaton* baton, Handle<Value> (&argv)[2]);

private:
static Local<Array> CreateV8ArrayFromRows(ExecuteBaton* baton, std::vector<column_t*> columns, std::vector<row_t*>* rows);
static Local<Object> CreateV8ObjectFromRow(ExecuteBaton* baton, std::vector<column_t*> columns, row_t* currentRow);
static void handleResult(ExecuteBaton* baton, Handle<Value> (&argv)[2]);

oracle::occi::Connection* m_connection;
oracle::occi::Environment* m_environment;
Expand Down
2 changes: 2 additions & 0 deletions src/oracle_bindings.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

#include "connection.h"
#include "oracle_bindings.h"
#include "reader.h"
#include "outParam.h"

Persistent<FunctionTemplate> OracleClient::s_ct;
Expand Down Expand Up @@ -185,6 +186,7 @@ extern "C" {
static void init(Handle<Object> target) {
OracleClient::Init(target);
Connection::Init(target);
Reader::Init(target);
OutParam::Init(target);
}

Expand Down
148 changes: 148 additions & 0 deletions src/reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
#include <string.h>
#include "connection.h"
#include "executeBaton.h"
#include "reader.h"

using namespace std;

Persistent<FunctionTemplate> Reader::constructorTemplate;

void Reader::Init(Handle<Object> target) {
UNI_SCOPE(scope);

Local<FunctionTemplate> t = FunctionTemplate::New(New);
uni::Reset(constructorTemplate, t);
uni::Deref(constructorTemplate)->InstanceTemplate()->SetInternalFieldCount(1);
uni::Deref(constructorTemplate)->SetClassName(String::NewSymbol("Reader"));

NODE_SET_PROTOTYPE_METHOD(uni::Deref(constructorTemplate), "nextRows", NextRows);
target->Set(String::NewSymbol("Reader"), uni::Deref(constructorTemplate)->GetFunction());
}

Reader::Reader(): ObjectWrap() {
}

Reader::~Reader() {
delete m_baton;
m_baton = NULL;
}

uni::CallbackType Reader::New(const uni::FunctionCallbackInfo& args) {
UNI_SCOPE(scope);

Reader* reader = new Reader();
reader->Wrap(args.This());

UNI_RETURN(scope, args, args.This());
}

void Reader::setBaton(ReaderBaton* baton) {
m_baton = baton;
}

uni::CallbackType Reader::NextRows(const uni::FunctionCallbackInfo& args) {
UNI_SCOPE(scope);
Reader* reader = ObjectWrap::Unwrap<Reader>(args.This());
ReaderBaton* baton = reader->m_baton;
if (baton->error) {
Local<String> message = String::New(baton->error->c_str());
UNI_THROW(Exception::Error(message));
}

if (args.Length() > 1) {
REQ_INT_ARG(0, count);
REQ_FUN_ARG(1, callback);
baton->count = count;
uni::Reset(baton->nextRowsCallback, callback);
} else {
REQ_FUN_ARG(0, callback);
baton->count = baton->connection->getPrefetchRowCount();
uni::Reset(baton->nextRowsCallback, callback);
}
if (baton->count <= 0) baton->count = 1;

uv_work_t* req = new uv_work_t();
req->data = baton;
uv_queue_work(uv_default_loop(), req, EIO_NextRows, (uv_after_work_cb)EIO_AfterNextRows);
baton->connection->Ref();

UNI_RETURN(scope, args, Undefined());
}

void Reader::EIO_NextRows(uv_work_t* req) {
ReaderBaton* baton = static_cast<ReaderBaton*>(req->data);

baton->rows = NULL;
baton->error = NULL;

try {
if(! baton->connection->getConnection()) {
baton->error = new std::string("Connection already closed");
return;
}
if (!baton->rs) {
baton->stmt = baton->connection->getConnection()->createStatement(baton->sql);
baton->stmt->setAutoCommit(baton->connection->getAutoCommit());
baton->stmt->setPrefetchRowCount(baton->count);
Connection::SetValuesOnStatement(baton->stmt, baton);
if (baton->error) goto cleanup;

int status = baton->stmt->execute();
if(status != oracle::occi::Statement::RESULT_SET_AVAILABLE) {
baton->error = new std::string("Connection already closed");
return;
}
baton->rs = baton->stmt->getResultSet();
Connection::CreateColumnsFromResultSet(baton->rs, baton, baton->columns);
if (baton->error) goto cleanup;
}
baton->rows = new vector<row_t*>();

for (int i = 0; i < baton->count && baton->rs->next(); i++) {
row_t* row = Connection::CreateRowFromCurrentResultSetRow(baton->rs, baton, baton->columns);
if (baton->error) goto cleanup;
baton->rows->push_back(row);
}
} catch(oracle::occi::SQLException &ex) {
baton->error = new string(ex.getMessage());
} catch (const exception& ex) {
baton->error = new string(ex.what());
} catch (...) {
baton->error = new string("Unknown Error");
}
cleanup:
// nothing for now, cleanup happens in destructor
;
}

#if NODE_MODULE_VERSION >= 0x000D
void ReaderWeakReferenceCallback(Isolate* isolate, v8::Persistent<v8::Function>* callback, void* dummy)
{
callback->Dispose();
}
#else
void ReaderWeakReferenceCallback(v8::Persistent<v8::Value> callback, void* dummy)
{
(Persistent<Function>(Function::Cast(*callback))).Dispose();
}
#endif

void Reader::EIO_AfterNextRows(uv_work_t* req, int status) {
UNI_SCOPE(scope);
ReaderBaton* baton = static_cast<ReaderBaton*>(req->data);

baton->connection->Unref();

try {
Handle<Value> argv[2];
Connection::handleResult(baton, argv);
node::MakeCallback(Context::GetCurrent()->Global(), uni::Deref(baton->nextRowsCallback), 2, argv);
} catch(const exception &ex) {
Handle<Value> argv[2];
argv[0] = Exception::Error(String::New(ex.what()));
argv[1] = Undefined();
node::MakeCallback(Context::GetCurrent()->Global(), uni::Deref(baton->nextRowsCallback), 2, argv);
}
baton->nextRowsCallback.MakeWeak((void*)NULL, ReaderWeakReferenceCallback);
}

32 changes: 32 additions & 0 deletions src/reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@

#ifndef _reader_h_
#define _reader_h_


#include <v8.h>
#include <node.h>
#include <occi.h>
#include "readerBaton.h"

using namespace node;
using namespace v8;

class Reader : public ObjectWrap {
public:
static Persistent<FunctionTemplate> constructorTemplate;
static void Init(Handle<Object> target);
static uni::CallbackType New(const uni::FunctionCallbackInfo& args);
static uni::CallbackType NextRows(const uni::FunctionCallbackInfo& args);
static void EIO_NextRows(uv_work_t* req);
static void EIO_AfterNextRows(uv_work_t* req, int status);

Reader();
~Reader();

void setBaton(ReaderBaton* baton);

private:
ReaderBaton* m_baton;
};

#endif
Loading