Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions index.js

This file was deleted.

2 changes: 1 addition & 1 deletion lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ module.exports = Client;

var Emitter = require('events').EventEmitter;
var util = require('util');
var Kafka = require('../kafka-native.js');
var Kafka = require('../librdkafka.js');
var assert = require('assert');

var LibrdKafkaError = require('./error');
Expand Down
2 changes: 1 addition & 1 deletion lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var error = require('./error');
var util = require('util');

module.exports = {
Consumer: util.deprecate('Use KafkaConsumer instead. This may be changed in a later version', KafkaConsumer),
Consumer: util.deprecate(KafkaConsumer, 'Use KafkaConsumer instead. This may be changed in a later version'),
Producer: Producer,
KafkaConsumer: KafkaConsumer,
CODES: {
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module.exports = KafkaConsumer;

var Client = require('./client');
var util = require('util');
var Kafka = require('../kafka-native.js');
var Kafka = require('../librdkafka.js');
var TopicReadable = require('./util/topicReadable');
var LibrdKafkaError = require('./error');

Expand Down
6 changes: 3 additions & 3 deletions lib/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module.exports = Producer;
var Client = require('./client');

var util = require('util');
var Kafka = require('../kafka-native.js');
var Kafka = require('../librdkafka.js');
var TopicWritable = require('./util/topicWritable');
var LibrdKafkaError = require('./error');

Expand Down Expand Up @@ -132,7 +132,7 @@ function maybeTopic(name, config) {
// this may be what we want
return name;
}
};
}

/**
* Create a topic by topic name and config
Expand Down Expand Up @@ -217,7 +217,7 @@ Producer.prototype.produceSync = function(msg) {
* @see Producer#produceSync
* @deprecated
*/
Producer.prototype.sendMessageSync = util.deprecate(this.produceSync,
Producer.prototype.sendMessageSync = util.deprecate(Producer.prototype.produceSync,
'this.sendMessageSync: use this.produceSync instead');

/**
Expand Down
File renamed without changes.
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
"license": "MIT",
"devDependencies": {
"jshint": "2.x",
"jsdoc": "^3.4.0",
"toolkit-jsdoc": "^1.0.0",
"mocha": "2.x",
"node-gyp": "3.x"
},
"dependencies": {
"bindings": "1.x",
"jsdoc": "^3.4.0",
"nan": "2.x",
"toolkit-jsdoc": "^1.0.0"
"nan": "2.x"
},
"engines": {
"npm": "^2.7.3"
Expand Down
30 changes: 22 additions & 8 deletions src/callbacks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@ namespace NodeKafka {
namespace Callbacks {

Dispatcher::Dispatcher() {
async = new uv_async_t;
uv_async_init(uv_default_loop(), async, AsyncMessage_);

async->data = this;

async = NULL;
uv_mutex_init(&async_lock);
}

Expand All @@ -39,17 +35,35 @@ Dispatcher::~Dispatcher() {
callbacks[i].Reset();
}

// async->data = this;

uv_mutex_destroy(&async_lock);
}

// Only run this if we aren't already listening
void Dispatcher::Activate() {
if (!async) {
async = new uv_async_t;
uv_async_init(uv_default_loop(), async, AsyncMessage_);

async->data = this;
}
}

// Should be able to run this regardless of whether it is active or not
void Dispatcher::Deactivate() {
if (async) {
uv_close(reinterpret_cast<uv_handle_t*>(async), NULL);
async = NULL;
}
}

bool Dispatcher::HasCallbacks() {
return callbacks.size() > 0;
}

void Dispatcher::Execute() {
uv_async_send(async);
if (async) {
uv_async_send(async);
}
}

void Dispatcher::Dispatch(const int _argc, Local<Value> _argv[]) {
Expand Down
2 changes: 2 additions & 0 deletions src/callbacks.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class Dispatcher {
bool HasCallbacks();
virtual void Flush() = 0;
void Execute();
void Activate();
void Deactivate();
protected:
std::vector<v8::Persistent<v8::Function, v8::CopyablePersistentTraits<v8::Function> > > callbacks; // NOLINT

Expand Down
3 changes: 3 additions & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class Connection : public Nan::ObjectWrap {

Callbacks::Event m_event_cb;

virtual void ActivateDispatchers() = 0;
virtual void DeactivateDispatchers() = 0;

protected:
Connection(RdKafka::Conf*, RdKafka::Conf*);
~Connection();
Expand Down
87 changes: 50 additions & 37 deletions src/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,56 @@ Baton Consumer::Connect() {
return Baton(RdKafka::ERR_NO_ERROR);
}

void Consumer::ActivateDispatchers() {
m_event_cb.dispatcher.Activate();
m_consume_cb.dispatcher.Activate();
m_rebalance_cb.dispatcher.Activate();
}

Baton Consumer::Disconnect() {
// Only close client if it is connected
RdKafka::ErrorCode err = RdKafka::ERR_NO_ERROR;

if (IsConnected()) {
m_is_closing = true;
{
scoped_mutex_lock lock(m_connection_lock);

RdKafka::KafkaConsumer* consumer =
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
err = consumer->close();

delete m_client;
m_client = NULL;

RdKafka::wait_destroyed(1000);
}
}

m_is_closing = false;

return Baton(err);
}

void Consumer::DeactivateDispatchers() {
m_event_cb.dispatcher.Deactivate();
m_consume_cb.dispatcher.Deactivate();
m_rebalance_cb.dispatcher.Deactivate();
}

bool Consumer::IsSubscribed() {
if (!IsConnected()) {
return false;
}

if (!m_is_subscribed) {
return false;
}

return true;
}


bool Consumer::HasAssignedPartitions() {
return !m_partitions.empty();
}
Expand Down Expand Up @@ -349,43 +399,6 @@ v8::Local<v8::Object> Consumer::NewInstance(v8::Local<v8::Value> arg) {
return scope.Escape(instance);
}

Baton Consumer::Disconnect() {
// Only close client if it is connected
RdKafka::ErrorCode err = RdKafka::ERR_NO_ERROR;

if (IsConnected()) {
m_is_closing = true;
{
scoped_mutex_lock lock(m_connection_lock);

RdKafka::KafkaConsumer* consumer =
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
err = consumer->close();

delete m_client;
m_client = NULL;

RdKafka::wait_destroyed(1000);
}
}

m_is_closing = false;

return Baton(err);
}

bool Consumer::IsSubscribed() {
if (!IsConnected()) {
return false;
}

if (!m_is_subscribed) {
return false;
}

return true;
}

/* Node exposed methods */

NAN_METHOD(Consumer::NodeOnConsume) {
Expand Down
3 changes: 3 additions & 0 deletions src/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ class Consumer : public Connection {
Baton Subscribe(std::vector<std::string>);
NodeKafka::Message* Consume();

void ActivateDispatchers();
void DeactivateDispatchers();

protected:
static Nan::Persistent<v8::Function> constructor;
static void New(const Nan::FunctionCallbackInfo<v8::Value>& info);
Expand Down
10 changes: 10 additions & 0 deletions src/producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,16 @@ Baton Producer::Connect() {
return Baton(RdKafka::ERR_NO_ERROR);
}

void Producer::ActivateDispatchers() {
m_event_cb.dispatcher.Activate(); // From connection
m_dr_cb.dispatcher.Activate();
}

void Producer::DeactivateDispatchers() {
m_event_cb.dispatcher.Deactivate(); // From connection
m_dr_cb.dispatcher.Deactivate();
}

void Producer::Disconnect() {
if (IsConnected()) {
scoped_mutex_lock lock(m_connection_lock);
Expand Down
3 changes: 3 additions & 0 deletions src/producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class Producer : public Connection {
Baton Produce(void*, size_t, RdKafka::Topic*, int32_t, std::string*);
std::string Name();

void ActivateDispatchers();
void DeactivateDispatchers();

protected:
static Nan::Persistent<v8::Function> constructor;
static void New(const Nan::FunctionCallbackInfo<v8::Value>&);
Expand Down
19 changes: 13 additions & 6 deletions src/workers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ void ProducerConnect::HandleOKCallback() {

v8::Local<v8::Value> argv[argc] = { Nan::Null(), obj};

// Activate the dispatchers
producer->ActivateDispatchers();

callback->Call(argc, argv);
}

Expand Down Expand Up @@ -242,16 +245,15 @@ void ProducerDisconnect::HandleOKCallback() {
const unsigned int argc = 2;
v8::Local<v8::Value> argv[argc] = { Nan::Null(), Nan::True()};

// Deactivate the dispatchers
producer->DeactivateDispatchers();

callback->Call(argc, argv);
}

void ProducerDisconnect::HandleErrorCallback() {
Nan::HandleScope scope;

const unsigned int argc = 1;
v8::Local<v8::Value> argv[argc] = { Nan::Error(ErrorMessage()) };

callback->Call(argc, argv);
// This should never run
assert(0);
}

ProducerProduce::ProducerProduce(
Expand Down Expand Up @@ -327,6 +329,7 @@ void ConsumerConnect::HandleOKCallback() {
Nan::New(consumer->Name()).ToLocalChecked());

v8::Local<v8::Value> argv[argc] = { Nan::Null(), obj };
consumer->ActivateDispatchers();

callback->Call(argc, argv);
}
Expand Down Expand Up @@ -370,6 +373,8 @@ void ConsumerDisconnect::HandleOKCallback() {
const unsigned int argc = 2;
v8::Local<v8::Value> argv[argc] = { Nan::Null(), Nan::True() };

consumer->DeactivateDispatchers();

callback->Call(argc, argv);
}

Expand All @@ -379,6 +384,8 @@ void ConsumerDisconnect::HandleErrorCallback() {
const unsigned int argc = 1;
v8::Local<v8::Value> argv[argc] = { GetErrorObject() };

consumer->DeactivateDispatchers();

callback->Call(argc, argv);
}

Expand Down