Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixes based on pull request review feedback
  • Loading branch information
hqin committed Oct 28, 2016
commit d7ab475d434de28d1f57ee6955fa6891f65a69f4
31 changes: 25 additions & 6 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) {

parts = PyList_New(c_parts->cnt);

for (i = 0 ; i < (size_t)c_parts->cnt ; i++) {
for (i = 0 ; i < c_parts->cnt ; i++) {
const rd_kafka_topic_partition_t *rktpar = &c_parts->elems[i];
PyList_SET_ITEM(parts, i,
TopicPartition_new0(
Expand Down Expand Up @@ -778,7 +778,7 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {

c_parts = rd_kafka_topic_partition_list_new((int)PyList_Size(plist));

for (i = 0 ; i < (size_t)PyList_Size(plist) ; i++) {
for (i = 0 ; i < PyList_Size(plist) ; i++) {
TopicPartition *tp = (TopicPartition *)
PyList_GetItem(plist, i);

Expand Down Expand Up @@ -879,10 +879,10 @@ static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) {
* Clear Python object references in Handle
*/
void Handle_clear (Handle *h) {
if (h->error_cb)
if (h->error_cb)
Py_DECREF(h->error_cb);

if (h->stats_cb)
if (h->stats_cb)
Py_DECREF(h->stats_cb);

PyThread_delete_key(h->tlskey);
Expand Down Expand Up @@ -1143,7 +1143,16 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
Py_DECREF(ks);
continue;

} else if (!strcmp(k, "error_cb") && PyCallable_Check(vo)) {
} else if (!strcmp(k, "error_cb")) {
if (!PyCallable_Check(vo)) {
PyErr_SetString(PyExc_TypeError,
"expected error_cb property "
"as a callable function");
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(ks);
return NULL;
}
if (h->error_cb) {
Py_DECREF(h->error_cb);
h->error_cb = NULL;
Expand All @@ -1154,7 +1163,17 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
}
Py_DECREF(ks);
continue;
} else if (!strcmp(k, "stats_cb") && PyCallable_Check(vo)) {
} else if (!strcmp(k, "stats_cb")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to check here and for error_cb if the object is callable (PyCallable_Check(vo)) (Py_None is okay too)

if (!PyCallable_Check(vo)) {
PyErr_SetString(PyExc_TypeError,
"expected stats_cb property "
"as a callable function");
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(ks);
return NULL;
}

if (h->stats_cb) {
Py_DECREF(h->stats_cb);
h->stats_cb = NULL;
Expand Down
11 changes: 8 additions & 3 deletions examples/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
bootstrap_servers = 'localhost'



# global variable to be set by stats_cb call back function
good_stats_cb_result = False

def error_cb (err):
print('Error: %s' % err)
Expand Down Expand Up @@ -359,9 +360,13 @@ def verify_stats_cb():
""" Verify stats_cb """

def stats_cb(stats_json_str):
global good_stats_cb_result
stats_json = json.loads(stats_json_str)
if 'test' in stats_json['topics']:
print("# app_offset stats for topic test partition 0: %d" % stats_json['topics']['test']['partitions']['0']['app_offset'])
app_offset = stats_json['topics']['test']['partitions']['0']['app_offset']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably want to wrap this in a try: if any of the sub-dict keys are missing and just print the json blob on failure.

if app_offset > 0:
print("# app_offset stats for topic test partition 0: %d" % app_offset)
good_stats_cb_result = True

conf = {'bootstrap.servers': bootstrap_servers,
'group.id': uuid.uuid1(),
Expand All @@ -388,7 +393,7 @@ def stats_cb(stats_json_str):
else:
bar = None

while True:
while not good_stats_cb_result:
# Consume until EOF or error

msg = c.poll(timeout=20.0)
Expand Down
32 changes: 16 additions & 16 deletions tests/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ def test_version():
assert len(sver) > 0
assert iver > 0

# global variable for error_cb call back function
seen_error_cb = False

def test_error_cb():
""" Tests error_cb. """

def error_cb(error_msg):
print('OK: error_cb() called')
global seen_error_cb
seen_error_cb = True
assert error_msg.code() in (confluent_kafka.KafkaError._TRANSPORT, confluent_kafka.KafkaError._ALL_BROKERS_DOWN)

conf = {'bootstrap.servers': 'localhost:9093', # Purposely cause connection refused error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually probably the second most likely port to find a broker on localhost, so maybe you want to connect to localhost:22 or somesuch that will definately not a Kafka broker.

Expand All @@ -31,26 +35,23 @@ def error_cb(error_msg):
}

kc = confluent_kafka.Consumer(**conf)

kc.subscribe(["test"])
kc.poll(timeout=0.001)
time.sleep(1)
kc.unsubscribe()
while not seen_error_cb:
kc.poll(timeout=1)

kc.close()

# global variable for stats_cb call back function
seen_stats_cb = False

def test_stats_cb():
""" Tests stats_cb. """

def stats_cb(stats_json_str):
# print(stats_json_str)
try:
stats_json = json.loads(stats_json_str)
if 'type' in stats_json:
print("stats_cb: type=%s" % stats_json['type'])
print('OK: stats_cb() called')
except Exception as e:
assert False
global seen_stats_cb
seen_stats_cb = True
stats_json = json.loads(stats_json_str)
assert len(stats_json['name']) > 0

conf = {'group.id':'test',
'socket.timeout.ms':'100',
Expand All @@ -62,8 +63,7 @@ def stats_cb(stats_json_str):
kc = confluent_kafka.Consumer(**conf)

kc.subscribe(["test"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing as suggested for error_cb

kc.poll(timeout=0.001)
time.sleep(1)

while not seen_stats_cb:
kc.poll(timeout=1)
kc.close()