Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Rebalance event passes topic partition parameters through callback
  • Loading branch information
webmakersteve committed Sep 27, 2016
commit e205a49bdcd9e5f630260844fc3f15e3a93462b4
23 changes: 23 additions & 0 deletions src/callbacks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,29 @@ void RebalanceDispatcher::Flush() {
break;
}

std::vector<rebalance_topic_partition_t> parts = _events[i].partitions;

v8::Local<v8::Array> tp_array = Nan::New<v8::Array>();

for (size_t i = 0; i < parts.size(); i++) {
v8::Local<v8::Object> tp_obj = Nan::New<v8::Object>();
rebalance_topic_partition_t tp = parts[i];

Nan::Set(tp_obj, Nan::New("topic").ToLocalChecked(),
Nan::New<v8::String>(tp.topic.c_str()).ToLocalChecked());
Nan::Set(tp_obj, Nan::New("partition").ToLocalChecked(),
Nan::New<v8::Number>(tp.partition));

if (tp.offset >= 0) {
Nan::Set(tp_obj, Nan::New("offset").ToLocalChecked(),
Nan::New<v8::Number>(tp.offset));
}

tp_array->Set(i, tp_obj);
}
// Now convert the TopicPartition list to a JS array
Nan::Set(jsobj, Nan::New("assignment").ToLocalChecked(), tp_array);

argv[0] = jsobj;

Dispatch(argc, argv);
Expand Down
38 changes: 32 additions & 6 deletions src/callbacks.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,40 @@ class Delivery : public RdKafka::DeliveryReportCb {

// Rebalance dispatcher

struct rebalance_topic_partition_t {
std::string topic;
int partition;
int64_t offset;

rebalance_topic_partition_t(std::string p_topic, int p_partition, int64_t p_offset):
topic(p_topic),
partition(p_partition),
offset(p_offset) {}
};

struct rebalance_event_t {
RdKafka::ErrorCode err;
std::vector<RdKafka::TopicPartition*> partitions;

rebalance_event_t(RdKafka::ErrorCode _err,
std::vector<RdKafka::TopicPartition*> _partitions):
err(_err),
partitions(_partitions) {}
std::vector<rebalance_topic_partition_t> partitions;

rebalance_event_t(RdKafka::ErrorCode p_err,
std::vector<RdKafka::TopicPartition*> p_partitions):
err(p_err) {
// Iterate over the topic partitions because we won't have them later
for (size_t topic_partition_i = 0;
topic_partition_i < p_partitions.size(); topic_partition_i++) {
RdKafka::TopicPartition* topic_partition =
p_partitions[topic_partition_i];

rebalance_topic_partition_t tp(
topic_partition->topic(),
topic_partition->partition(),
topic_partition->offset()
);

partitions.push_back(tp);

}
}
};

class RebalanceDispatcher : public Dispatcher {
Expand Down
150 changes: 150 additions & 0 deletions src/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,154 @@ std::vector<std::string> v8ArrayToStringVector(v8::Local<v8::Array> parameter) {
return newItem;
}

namespace TopicPartition {

/**
* @brief RdKafka::TopicPartition vector to a v8 Array
*
* @see v8ArrayToTopicPartitionVector
*/
v8::Local<v8::Array> ToV8Array(
std::vector<RdKafka::TopicPartition*> topic_partition_list) {
v8::Local<v8::Array> array = Nan::New<v8::Array>();
for (size_t topic_partition_i = 0;
topic_partition_i < topic_partition_list.size(); topic_partition_i++) {
RdKafka::TopicPartition* topic_partition =
topic_partition_list[topic_partition_i];

// We have the list now let's get the properties from it
v8::Local<v8::Object> obj = Nan::New<v8::Object>();

Nan::Set(obj, Nan::New("offset").ToLocalChecked(),
Nan::New<v8::Number>(topic_partition->offset()));
Nan::Set(obj, Nan::New("partition").ToLocalChecked(),
Nan::New<v8::Number>(topic_partition->partition()));
Nan::Set(obj, Nan::New("topic").ToLocalChecked(),
Nan::New<v8::String>(topic_partition->topic().c_str()).ToLocalChecked());

array->Set(topic_partition_i, obj);
}

return array;
}

} // namespace TopicPartition

namespace Metadata {

/**
* @brief RdKafka::Metadata to v8::Object
*
*/
v8::Local<v8::Object> ToV8Object(RdKafka::Metadata* metadata) {
v8::Local<v8::Object> obj = Nan::New<v8::Object>();

v8::Local<v8::Array> broker_data = Nan::New<v8::Array>();
v8::Local<v8::Array> topic_data = Nan::New<v8::Array>();

const BrokerMetadataList* brokers = metadata->brokers(); // NOLINT

unsigned int broker_i = 0;

for (BrokerMetadataList::const_iterator it = brokers->begin();
it != brokers->end(); ++it, broker_i++) {
// Start iterating over brokers and set the object up

const RdKafka::BrokerMetadata* x = *it;

v8::Local<v8::Object> current_broker = Nan::New<v8::Object>();

Nan::Set(current_broker, Nan::New("id").ToLocalChecked(),
Nan::New<v8::Number>(x->id()));
Nan::Set(current_broker, Nan::New("host").ToLocalChecked(),
Nan::New<v8::String>(x->host().c_str()).ToLocalChecked());
Nan::Set(current_broker, Nan::New("port").ToLocalChecked(),
Nan::New<v8::Number>(x->port()));

broker_data->Set(broker_i, current_broker);
}

unsigned int topic_i = 0;

const TopicMetadataList* topics = metadata->topics();

for (TopicMetadataList::const_iterator it = topics->begin();
it != topics->end(); ++it, topic_i++) {
// Start iterating over topics

const RdKafka::TopicMetadata* x = *it;

v8::Local<v8::Object> current_topic = Nan::New<v8::Object>();

Nan::Set(current_topic, Nan::New("name").ToLocalChecked(),
Nan::New<v8::String>(x->topic().c_str()).ToLocalChecked());

v8::Local<v8::Array> current_topic_partitions = Nan::New<v8::Array>();

const PartitionMetadataList* current_partition_data = x->partitions();

unsigned int partition_i = 0;
PartitionMetadataList::const_iterator itt;

for (itt = current_partition_data->begin();
itt != current_partition_data->end(); ++itt, partition_i++) {
// partition iterate
const RdKafka::PartitionMetadata* xx = *itt;

v8::Local<v8::Object> current_partition = Nan::New<v8::Object>();

Nan::Set(current_partition, Nan::New("id").ToLocalChecked(),
Nan::New<v8::Number>(xx->id()));
Nan::Set(current_partition, Nan::New("leader").ToLocalChecked(),
Nan::New<v8::Number>(xx->leader()));

const std::vector<int32_t> * replicas = xx->replicas();
const std::vector<int32_t> * isrs = xx->isrs();

std::vector<int32_t>::const_iterator r_it;
std::vector<int32_t>::const_iterator i_it;

unsigned int r_i = 0;
unsigned int i_i = 0;

v8::Local<v8::Array> current_replicas = Nan::New<v8::Array>();

for (r_it = replicas->begin(); r_it != replicas->end(); ++r_it, r_i++) {
current_replicas->Set(r_i, Nan::New<v8::Int32>(*r_it));
}

v8::Local<v8::Array> current_isrs = Nan::New<v8::Array>();

for (i_it = isrs->begin(); i_it != isrs->end(); ++i_it, i_i++) {
current_isrs->Set(r_i, Nan::New<v8::Int32>(*i_it));
}

Nan::Set(current_partition, Nan::New("replicas").ToLocalChecked(),
current_replicas);
Nan::Set(current_partition, Nan::New("isrs").ToLocalChecked(),
current_isrs);

current_topic_partitions->Set(partition_i, current_partition);
} // iterate over partitions

Nan::Set(current_topic, Nan::New("partitions").ToLocalChecked(),
current_topic_partitions);

topic_data->Set(topic_i, current_topic);
} // End iterating over topics

Nan::Set(obj, Nan::New("orig_broker_id").ToLocalChecked(),
Nan::New<v8::Number>(metadata->orig_broker_id()));

Nan::Set(obj, Nan::New("orig_broker_name").ToLocalChecked(),
Nan::New<v8::String>(metadata->orig_broker_name()).ToLocalChecked());

Nan::Set(obj, Nan::New("topics").ToLocalChecked(), topic_data);
Nan::Set(obj, Nan::New("brokers").ToLocalChecked(), broker_data);

return obj;
}

} // namespace Metadata

} // namespace NodeKafka
16 changes: 16 additions & 0 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

#include "deps/librdkafka/src-cpp/rdkafkacpp.h"

typedef std::vector<const RdKafka::BrokerMetadata*> BrokerMetadataList;
typedef std::vector<const RdKafka::PartitionMetadata*> PartitionMetadataList;
typedef std::vector<const RdKafka::TopicMetadata *> TopicMetadataList;

namespace NodeKafka {

void Log(std::string);
Expand Down Expand Up @@ -45,6 +49,18 @@ class scoped_mutex_lock {
uv_mutex_t &async_lock;
};

namespace TopicPartition {

v8::Local<v8::Array> ToV8Array(std::vector<RdKafka::TopicPartition*>);

}

namespace Metadata {

v8::Local<v8::Object> ToV8Object(RdKafka::Metadata*);

} // namespace Metadata

} // namespace NodeKafka

#endif // SRC_COMMON_H_
1 change: 0 additions & 1 deletion src/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ size_t Message::Size() {

void Message::Free(char * data, void * hint) {
Message* m = static_cast<Message*>(hint);
// @note Am I responsible for freeing data as well?
delete m;
}

Expand Down
Loading