Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 53 additions & 7 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) {

parts = PyList_New(c_parts->cnt);

for (i = 0 ; i < c_parts->cnt ; i++) {
for (i = 0 ; i < (size_t)c_parts->cnt ; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make these size_t changes a separate commit, for tracking.

const rd_kafka_topic_partition_t *rktpar = &c_parts->elems[i];
PyList_SET_ITEM(parts, i,
TopicPartition_new0(
Expand Down Expand Up @@ -778,7 +778,7 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {

c_parts = rd_kafka_topic_partition_list_new((int)PyList_Size(plist));

for (i = 0 ; i < PyList_Size(plist) ; i++) {
for (i = 0 ; i < (size_t)PyList_Size(plist) ; i++) {
TopicPartition *tp = (TopicPartition *)
PyList_GetItem(plist, i);

Expand Down Expand Up @@ -825,9 +825,9 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
result = PyObject_CallFunctionObjArgs(h->error_cb, eo, NULL);
Py_DECREF(eo);

if (result) {
if (result)
Py_DECREF(result);
} else {
else {
CallState_crash(cs);
rd_kafka_yield(h->rk);
}
Expand All @@ -836,6 +836,32 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
CallState_resume(cs);
}

static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) {
Handle *h = opaque;
PyObject *eo = NULL, *result = NULL;
CallState *cs = NULL;

cs = CallState_get(h);
if (json_len == 0) {
/* No data returned*/
goto done;
}

eo = Py_BuildValue("s", json);
result = PyObject_CallFunctionObjArgs(h->stats_cb, eo, NULL);
Py_DECREF(eo);

if (result)
Py_DECREF(result);
else {
CallState_crash(cs);
rd_kafka_yield(h->rk);
}

done:
CallState_resume(cs);
return 0;
}

/****************************************************************************
*
Expand All @@ -853,9 +879,11 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
* Clear Python object references in Handle
*/
void Handle_clear (Handle *h) {
if (h->error_cb) {
if (h->error_cb)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: There's a trailing whitespace after ). This might seem silly but it will show up as a red blob in editors and git gui :)

Py_DECREF(h->error_cb);
}

if (h->stats_cb)
Py_DECREF(h->stats_cb);

PyThread_delete_key(h->tlskey);
}
Expand All @@ -867,6 +895,9 @@ int Handle_traverse (Handle *h, visitproc visit, void *arg) {
if (h->error_cb)
Py_VISIT(h->error_cb);

if (h->stats_cb)
Py_VISIT(h->stats_cb);

return 0;
}

Expand Down Expand Up @@ -1112,7 +1143,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
Py_DECREF(ks);
continue;

} else if (!strcmp(k, "error_cb")) {
} else if (!strcmp(k, "error_cb") && PyCallable_Check(vo)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will need to check PyCallable inside error_cb if-clause.
E.g.:

if (k=="error_cb") {
  if (!callable) {
    error_out("\"error_cb\" must be a callable");
    return..
  }
  h->error_cb = ..;
}.

Same thing for stats_cb

if (h->error_cb) {
Py_DECREF(h->error_cb);
h->error_cb = NULL;
Expand All @@ -1123,6 +1154,17 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
}
Py_DECREF(ks);
continue;
} else if (!strcmp(k, "stats_cb") && PyCallable_Check(vo)) {
if (h->stats_cb) {
Py_DECREF(h->stats_cb);
h->stats_cb = NULL;
}
if (vo != Py_None) {
h->stats_cb = vo;
Py_INCREF(h->stats_cb);
}
Py_DECREF(ks);
continue;
}

/* Special handling for certain config keys. */
Expand Down Expand Up @@ -1174,6 +1216,10 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,

if (h->error_cb)
rd_kafka_conf_set_error_cb(conf, error_cb);

if (h->stats_cb)
rd_kafka_conf_set_stats_cb(conf, stats_cb);

rd_kafka_topic_conf_set_opaque(tconf, h);
rd_kafka_conf_set_default_topic_conf(conf, tconf);

Expand Down
1 change: 1 addition & 0 deletions confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ typedef struct {
PyObject_HEAD
rd_kafka_t *rk;
PyObject *error_cb;
PyObject *stats_cb;
int tlskey; /* Thread-Local-Storage key */

union {
Expand Down
47 changes: 39 additions & 8 deletions examples/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,55 @@
#
# Example high-level Kafka 0.9 balanced Consumer
#

from confluent_kafka import Consumer, KafkaException, KafkaError
import sys
import getopt
import json
from pprint import pformat

if __name__ == '__main__':
if len(sys.argv) < 4:
sys.stderr.write('Usage: %s <bootstrap-brokers> <group> <topic1> <topic2> ..\n' % sys.argv[0])
sys.exit(1)
def stats_cb(stats_json_str):
stats_json = json.loads(stats_json_str)
print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))

broker = sys.argv[1]
group = sys.argv[2]
topics = sys.argv[3:]
def print_usage_and_exit(program_name):
sys.stderr.write('Usage: %s [options..] <bootstrap-brokers> <group> <topic1> <topic2> ..\n' % program_name)
options='''
Options:
-T <intvl> Enable client statistics at specified interval (ms)
'''
sys.stderr.write(options)
sys.exit(1)


if __name__ == '__main__':
optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if len(argv) < 3:
print_usage_and_exit(sys.argv[0])

broker = argv[0]
group = argv[1]
topics = argv[2:]
# Consumer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {'bootstrap.servers': broker, 'group.id': group, 'session.timeout.ms': 6000,
'default.topic.config': {'auto.offset.reset': 'smallest'}}

# Check to see if -T option exists
for opt in optlist:
if opt[0] != '-T':
continue
try:
intval = int(opt[1])
except:
sys.stderr.write("Invalid option value for -T: %s\n" % opt[1])
sys.exit(1)

if intval <= 0:
sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1])
sys.exit(1)

conf['stats_cb'] = stats_cb
conf['statistics.interval.ms'] = int(opt[1])

# Create Consumer instance
c = Consumer(**conf)
Expand Down
77 changes: 77 additions & 0 deletions examples/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import time
import uuid
import sys
import json

try:
from progress.bar import Bar
Expand All @@ -39,6 +40,7 @@

def error_cb (err):
print('Error: %s' % err)


class MyTestDr(object):
""" Producer: Delivery report callback """
Expand Down Expand Up @@ -353,6 +355,78 @@ def my_on_revoke (consumer, partitions):
c.close()


def verify_stats_cb():
""" Verify stats_cb """

def stats_cb(stats_json_str):
stats_json = json.loads(stats_json_str)
if 'test' in stats_json['topics']:
print("# app_offset stats for topic test partition 0: %d" % stats_json['topics']['test']['partitions']['0']['app_offset'])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe put this in a try block to give a meaningful error message if those dict keys are not available


conf = {'bootstrap.servers': bootstrap_servers,
'group.id': uuid.uuid1(),
'session.timeout.ms': 6000,
'error_cb': error_cb,
'stats_cb': stats_cb,
'statistics.interval.ms': 200,
'default.topic.config': {
'auto.offset.reset': 'earliest'
}}

c = confluent_kafka.Consumer(**conf)
c.subscribe(["test"])

max_msgcnt = 1000000
bytecnt = 0
msgcnt = 0

print('Will now consume %d messages' % max_msgcnt)

if with_progress:
bar = Bar('Consuming', max=max_msgcnt,
suffix='%(index)d/%(max)d [%(eta_td)s]')
else:
bar = None

while True:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, but this loop doesnt actually verify that stats were received.
I think it could probably finish on the first complete stats seen to speed things up.

# Consume until EOF or error

msg = c.poll(timeout=20.0)
if msg is None:
raise Exception('Stalled at %d/%d message, no new messages for 20s' %
(msgcnt, max_msgcnt))

if msg.error():
if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
# Reached EOF for a partition, ignore.
continue
else:
raise confluent_kafka.KafkaException(msg.error())


bytecnt += len(msg)
msgcnt += 1

if bar is not None and (msgcnt % 10000) == 0:
bar.next(n=10000)

if msgcnt == 1:
t_first_msg = time.time()
if msgcnt >= max_msgcnt:
break

if bar is not None:
bar.finish()

if msgcnt > 0:
t_spent = time.time() - t_first_msg
print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' % \
(msgcnt, bytecnt / (1024*1024), t_spent, msgcnt / t_spent,
(bytecnt / t_spent) / (1024*1024)))

print('closing consumer')
c.close()


if __name__ == '__main__':

Expand All @@ -377,6 +451,9 @@ def my_on_revoke (consumer, partitions):
print('=' * 30, 'Verifying Consumer performance', '=' * 30)
verify_consumer_performance()

print('=' * 30, 'Verifying stats_cb', '=' * 30)
verify_stats_cb()

print('=' * 30, 'Done', '=' * 30)


55 changes: 54 additions & 1 deletion tests/test_misc.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#!/usr/bin/env python

import confluent_kafka

import json
import time
from pprint import pprint

def test_version():
print('Using confluent_kafka module version %s (0x%x)' % confluent_kafka.version())
Expand All @@ -14,3 +16,54 @@ def test_version():
assert len(sver) > 0
assert iver > 0

def test_error_cb():
""" Tests error_cb. """

def error_cb(error_msg):
print('OK: error_cb() called')
assert error_msg.code() in (confluent_kafka.KafkaError._TRANSPORT, confluent_kafka.KafkaError._ALL_BROKERS_DOWN)

conf = {'bootstrap.servers': 'localhost:9093', # Purposely cause connection refused error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually probably the second most likely port to find a broker on localhost, so maybe you want to connect to localhost:22 or somesuch that will definately not a Kafka broker.

'group.id':'test',
'socket.timeout.ms':'100',
'session.timeout.ms': 1000, # Avoid close() blocking too long
'error_cb': error_cb
}

kc = confluent_kafka.Consumer(**conf)

kc.subscribe(["test"])
kc.poll(timeout=0.001)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not verify that error_cb was actually called.
Maybe construct it like so, without sleep:

in test_error_cb:
   seen_error_cb = True
....
while !seen_error_cb:
    kc.poll(timeout=1)
kc.close()

Test timeouts will handle the case where no callback was seen

time.sleep(1)
kc.unsubscribe()

kc.close()

def test_stats_cb():
""" Tests stats_cb. """

def stats_cb(stats_json_str):
# print(stats_json_str)
try:
stats_json = json.loads(stats_json_str)
if 'type' in stats_json:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldnt this be an assertation for checking that the json doc actually contains something we know?

print("stats_cb: type=%s" % stats_json['type'])
print('OK: stats_cb() called')
except Exception as e:
assert False

conf = {'group.id':'test',
'socket.timeout.ms':'100',
'session.timeout.ms': 1000, # Avoid close() blocking too long
'statistics.interval.ms': 200,
'stats_cb': stats_cb
}

kc = confluent_kafka.Consumer(**conf)

kc.subscribe(["test"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing as suggested for error_cb

kc.poll(timeout=0.001)
time.sleep(1)

kc.close()