Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use config superclass to keep track of outstanding threads
  • Loading branch information
webmakersteve committed Sep 27, 2016
commit 764a50010f6083b787d0860e6d137df7351652b9
25 changes: 5 additions & 20 deletions src/callbacks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,13 +386,7 @@ void Delivery::dr_cb(RdKafka::Message &message) {

// Rebalance CB

RebalanceDispatcher::RebalanceDispatcher() {}
RebalanceDispatcher::~RebalanceDispatcher() {}

void RebalanceDispatcher::Add(const rebalance_event_t &e) {
scoped_mutex_lock lock(async_lock);
events.push_back(e);
}
/*

void RebalanceDispatcher::Flush() {
Nan::HandleScope scope;
Expand Down Expand Up @@ -453,26 +447,17 @@ void RebalanceDispatcher::Flush() {
Dispatch(argc, argv);
}
}

*/
Rebalance::Rebalance(Nan::Callback &cb) {}
Rebalance::~Rebalance() {}
Rebalance::Rebalance(NodeKafka::Consumer* that) :
that_(that) {
eof_cnt = 0;
}

void Rebalance::rebalance_cb(RdKafka::KafkaConsumer *consumer,
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*> &partitions) {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
that_->Assign(partitions);
} else {
that_->Unassign();
}

dispatcher.Add(rebalance_event_t(err, partitions));
dispatcher.Execute();
// dispatcher.Add(rebalance_event_t(err, partitions));
// dispatcher.Execute();

eof_cnt = 0;
}

// Partitioner callback
Expand Down
16 changes: 2 additions & 14 deletions src/callbacks.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,28 +174,16 @@ struct rebalance_event_t {
}
};

class RebalanceDispatcher : public Dispatcher {
public:
RebalanceDispatcher();
~RebalanceDispatcher();
void Add(const rebalance_event_t &);
void Flush();
protected:
std::vector<rebalance_event_t> events;
};

class Rebalance : public RdKafka::RebalanceCb {
public:
explicit Rebalance(NodeKafka::Consumer* that);
explicit Rebalance(Nan::Callback &);
~Rebalance();
// NAN_DISALLOW_ASSIGN_COPY_MOVE?
NodeKafka::Consumer* const that_;

void rebalance_cb(RdKafka::KafkaConsumer *, RdKafka::ErrorCode,
std::vector<RdKafka::TopicPartition*> &);
RebalanceDispatcher dispatcher;
private:
int eof_cnt;
v8::Persistent<v8::Function> m_cb;
};

class Partitioner : public RdKafka::PartitionerCb {
Expand Down
33 changes: 12 additions & 21 deletions src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

#include "src/config.h"

using RdKafka::Conf;
using Nan::MaybeLocal;
using Nan::Maybe;
using v8::Local;
Expand All @@ -24,9 +23,7 @@ using std::endl;

namespace NodeKafka {

namespace Config {

void DumpConfig(std::list<std::string> *dump) {
void Conf::DumpConfig(std::list<std::string> *dump) {
for (std::list<std::string>::iterator it = dump->begin();
it != dump->end(); ) {
std::cout << *it << " = ";
Expand All @@ -37,22 +34,8 @@ void DumpConfig(std::list<std::string> *dump) {
std::cout << std::endl;
}

template<typename T>
void LoadParameter(v8::Local<v8::Object> object, std::string field, const T &to) { // NOLINT
to = GetParameter<T>(object, field, to);
}

std::string GetValue(RdKafka::Conf* rdconf, const std::string name) {
std::string value;
if (rdconf->get(name, value) == RdKafka::Conf::CONF_UNKNOWN) {
return std::string();
}

return value;
}

RdKafka::Conf* Create(RdKafka::Conf::ConfType type, v8::Local<v8::Object> object, std::string &errstr) { // NOLINT
RdKafka::Conf* rdconf = RdKafka::Conf::create(type);
Conf * Conf::create(RdKafka::Conf::ConfType type, v8::Local<v8::Object> object, std::string &errstr) { // NOLINT
Conf* rdconf = static_cast<Conf*>(RdKafka::Conf::create(type));

v8::Local<v8::Array> property_names = object->GetOwnPropertyNames();

Expand All @@ -78,12 +61,20 @@ RdKafka::Conf* Create(RdKafka::Conf::ConfType type, v8::Local<v8::Object> object
delete rdconf;
return NULL;
}
} else {
Log("Value is a function");
if (string_key.compare("rebalance_cb") == 0) {
Nan::Callback cb(value.As<v8::Function>());
NodeKafka::Callbacks::Rebalance rebalance_cb(cb);
rdconf->set(string_key, &rebalance_cb, errstr);
}
}
}

return rdconf;

}

} // namespace Config
Conf::~Conf() {}

} // namespace NodeKafka
17 changes: 11 additions & 6 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@

#include "deps/librdkafka/src-cpp/rdkafkacpp.h"
#include "src/common.h"
#include "src/callbacks.h"

namespace NodeKafka {
namespace Config {

void DumpConfig(std::list<std::string> *);
template<typename T> void LoadParameter(v8::Local<v8::Object>, std::string, T &); // NOLINT
std::string GetValue(RdKafka::Conf*, const std::string);
RdKafka::Conf* Create(RdKafka::Conf::ConfType, v8::Local<v8::Object>, std::string &); // NOLINT
class Conf : public RdKafka::Conf {
public:
~Conf();

} // namespace Config
static Conf* create(RdKafka::Conf::ConfType, v8::Local<v8::Object>, std::string &); // NOLINT

static void DumpConfig(std::list<std::string> *);
protected:
bool m_has_rebalance_cb;
bool m_has_partitioner_cb;
};

} // namespace NodeKafka

Expand Down
73 changes: 5 additions & 68 deletions src/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ consumer_commit_t::consumer_commit_t() {
*/

Consumer::Consumer(RdKafka::Conf* gconfig, RdKafka::Conf* tconfig):
Connection(gconfig, tconfig),
m_consume_cb(),
m_rebalance_cb(this) {
Connection(gconfig, tconfig) {
m_is_subscribed = false;
m_is_manually_rebalancing = false;

Expand Down Expand Up @@ -76,10 +74,6 @@ Baton Consumer::Connect() {

void Consumer::ActivateDispatchers() {
m_event_cb.dispatcher.Activate();
m_consume_cb.dispatcher.Activate();
if (m_is_manually_rebalancing) {
m_rebalance_cb.dispatcher.Activate();
}
}

Baton Consumer::Disconnect() {
Expand Down Expand Up @@ -107,10 +101,6 @@ Baton Consumer::Disconnect() {

void Consumer::DeactivateDispatchers() {
m_event_cb.dispatcher.Deactivate();
m_consume_cb.dispatcher.Deactivate();
if (m_is_manually_rebalancing) {
m_rebalance_cb.dispatcher.Deactivate();
}
}

bool Consumer::IsSubscribed() {
Expand Down Expand Up @@ -338,9 +328,6 @@ void Consumer::Init(v8::Local<v8::Object> exports) {
* @sa RdKafka::KafkaConsumer
*/

Nan::SetPrototypeMethod(tpl, "onConsume", NodeOnConsume);
Nan::SetPrototypeMethod(tpl, "onRebalance", NodeOnRebalance);

/*
* @brief Methods exposed to do with message retrieval
*/
Expand Down Expand Up @@ -385,15 +372,15 @@ void Consumer::New(const Nan::FunctionCallbackInfo<v8::Value>& info) {

std::string errstr;

RdKafka::Conf* gconfig =
Config::Create(RdKafka::Conf::CONF_GLOBAL, info[0]->ToObject(), errstr);
Conf* gconfig =
Conf::create(RdKafka::Conf::CONF_GLOBAL, info[0]->ToObject(), errstr);

if (!gconfig) {
return Nan::ThrowError(errstr.c_str());
}

RdKafka::Conf* tconfig =
Config::Create(RdKafka::Conf::CONF_TOPIC, info[1]->ToObject(), errstr);
Conf* tconfig =
Conf::create(RdKafka::Conf::CONF_TOPIC, info[1]->ToObject(), errstr);

if (!tconfig) {
delete gconfig;
Expand Down Expand Up @@ -425,58 +412,8 @@ v8::Local<v8::Object> Consumer::NewInstance(v8::Local<v8::Value> arg) {
return scope.Escape(instance);
}

Baton Consumer::UseManualRebalancing(v8::Local<v8::Function> cb) {
m_rebalance_cb.dispatcher.AddCallback(cb);

std::string errstr;
if (!m_is_manually_rebalancing) {
// Okay. We want to set the rebalance CB now
m_is_manually_rebalancing = true;
if (RdKafka::Conf::CONF_OK !=
m_gconfig->set("rebalance_cb", &m_rebalance_cb, errstr)) {
return Baton(RdKafka::ERR__FAIL, errstr);
}
}

return Baton(RdKafka::ERR_NO_ERROR);
}

/* Node exposed methods */

NAN_METHOD(Consumer::NodeOnConsume) {
if (info.Length() < 1 || !info[0]->IsFunction()) {
// Just throw an exception
return Nan::ThrowError("Need to specify a callback");
}

Consumer* obj = ObjectWrap::Unwrap<Consumer>(info.This());

v8::Local<v8::Function> cb = info[0].As<v8::Function>();
obj->m_consume_cb.dispatcher.AddCallback(cb);

info.GetReturnValue().Set(Nan::True());
}

NAN_METHOD(Consumer::NodeOnRebalance) {
if (info.Length() < 1 || !info[0]->IsFunction()) {
// Just throw an exception
return Nan::ThrowError("Need to specify a callback");
}

Consumer* consumer = ObjectWrap::Unwrap<Consumer>(info.This());

v8::Local<v8::Function> cb = info[0].As<v8::Function>();

// Check if we need to put the dispatcher in
Baton b = consumer->UseManualRebalancing(cb);

if (b.err() != RdKafka::ERR_NO_ERROR) {
Nan::ThrowError(b.errstr().c_str());
}

info.GetReturnValue().Set(Nan::True());
}

NAN_METHOD(Consumer::NodeGetAssignments) {
Nan::HandleScope scope;

Expand Down
9 changes: 0 additions & 9 deletions src/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ struct consumer_commit_t {

class Consumer : public Connection {
public:
friend class NodeKafka::Callbacks::Rebalance;

static void Init(v8::Local<v8::Object>);
static v8::Local<v8::Object> NewInstance(v8::Local<v8::Value>);

Expand Down Expand Up @@ -77,8 +75,6 @@ class Consumer : public Connection {
void ActivateDispatchers();
void DeactivateDispatchers();

Baton UseManualRebalancing(v8::Local<v8::Function>);

protected:
static Nan::Persistent<v8::Function> constructor;
static void New(const Nan::FunctionCallbackInfo<v8::Value>& info);
Expand All @@ -94,12 +90,7 @@ class Consumer : public Connection {
bool m_is_subscribed;
bool m_is_manually_rebalancing;

Callbacks::Consume m_consume_cb;
Callbacks::Rebalance m_rebalance_cb;

// Node methods
static NAN_METHOD(NodeOnConsume);
static NAN_METHOD(NodeOnRebalance);
static NAN_METHOD(NodeConnect);
static NAN_METHOD(NodeSubscribe);
static NAN_METHOD(NodeSubscribeSync);
Expand Down
4 changes: 2 additions & 2 deletions src/producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,14 @@ void Producer::New(const Nan::FunctionCallbackInfo<v8::Value>& info) {
std::string errstr;

RdKafka::Conf* gconfig =
Config::Create(RdKafka::Conf::CONF_GLOBAL, info[0]->ToObject(), errstr);
Conf::create(RdKafka::Conf::CONF_GLOBAL, info[0]->ToObject(), errstr);

if (!gconfig) {
return Nan::ThrowError(errstr.c_str());
}

RdKafka::Conf* tconfig =
Config::Create(RdKafka::Conf::CONF_TOPIC, info[1]->ToObject(), errstr);
Conf::create(RdKafka::Conf::CONF_TOPIC, info[1]->ToObject(), errstr);

if (!tconfig) {
// No longer need this since we aren't instantiating anything
Expand Down
2 changes: 1 addition & 1 deletion src/topic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void Topic::New(const Nan::FunctionCallbackInfo<v8::Value>& info) {
std::string errstr;

RdKafka::Conf* config =
Config::Create(RdKafka::Conf::CONF_TOPIC, info[1]->ToObject(), errstr);
Conf::create(RdKafka::Conf::CONF_TOPIC, info[1]->ToObject(), errstr);

if (!config) {
return Nan::ThrowError(errstr.c_str());
Expand Down