Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 16 additions & 20 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) {

parts = PyList_New(c_parts->cnt);

for (i = 0 ; i < c_parts->cnt ; i++) {
for (i = 0 ; i < (size_t)c_parts->cnt ; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make these size_t changes a separate commit, for tracking.

const rd_kafka_topic_partition_t *rktpar = &c_parts->elems[i];
PyList_SET_ITEM(parts, i,
TopicPartition_new0(
Expand Down Expand Up @@ -778,7 +778,7 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {

c_parts = rd_kafka_topic_partition_list_new((int)PyList_Size(plist));

for (i = 0 ; i < PyList_Size(plist) ; i++) {
for (i = 0 ; i < (size_t)PyList_Size(plist) ; i++) {
TopicPartition *tp = (TopicPartition *)
PyList_GetItem(plist, i);

Expand Down Expand Up @@ -825,9 +825,9 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
result = PyObject_CallFunctionObjArgs(h->error_cb, eo, NULL);
Py_DECREF(eo);

if (result) {
if (result)
Py_DECREF(result);
} else {
else {
CallState_crash(cs);
rd_kafka_yield(h->rk);
}
Expand All @@ -836,25 +836,24 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
CallState_resume(cs);
}

static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
{
static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) {
Handle *h = opaque;
PyObject *eo=NULL, *result=NULL;
CallState *cs=NULL;
PyObject *eo = NULL, *result = NULL;
CallState *cs = NULL;

cs = CallState_get(h);
if (json_len== 0 || !h->stats_cb) {
/* Neither data nor call back defined. */
if (json_len == 0) {
/* No data returned*/
goto done;
}

eo = Py_BuildValue("s", json);
result = PyObject_CallFunctionObjArgs(h->stats_cb, eo, NULL);
Py_DECREF(eo);

if (result) {
if (result)
Py_DECREF(result);
} else {
else {
CallState_crash(cs);
rd_kafka_yield(h->rk);
}
Expand All @@ -880,13 +879,11 @@ static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
* Clear Python object references in Handle
*/
void Handle_clear (Handle *h) {
if (h->error_cb) {
if (h->error_cb)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: There's a trailing whitespace after ). This might seem silly but it will show up as a red blob in editors and git gui :)

Py_DECREF(h->error_cb);
}

if (h->stats_cb) {
if (h->stats_cb)
Py_DECREF(h->stats_cb);
}

PyThread_delete_key(h->tlskey);
}
Expand Down Expand Up @@ -1146,7 +1143,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
Py_DECREF(ks);
continue;

} else if (!strcmp(k, "error_cb")) {
} else if (!strcmp(k, "error_cb") && PyCallable_Check(vo)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will need to check PyCallable inside error_cb if-clause.
E.g.:

if (k=="error_cb") {
  if (!callable) {
    error_out("\"error_cb\" must be a callable");
    return..
  }
  h->error_cb = ..;
}.

Same thing for stats_cb

if (h->error_cb) {
Py_DECREF(h->error_cb);
h->error_cb = NULL;
Expand All @@ -1157,7 +1154,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
}
Py_DECREF(ks);
continue;
} else if (!strcmp(k, "stats_cb")) {
} else if (!strcmp(k, "stats_cb") && PyCallable_Check(vo)) {
if (h->stats_cb) {
Py_DECREF(h->stats_cb);
h->stats_cb = NULL;
Expand Down Expand Up @@ -1220,9 +1217,8 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
if (h->error_cb)
rd_kafka_conf_set_error_cb(conf, error_cb);

if (h->stats_cb) {
if (h->stats_cb)
rd_kafka_conf_set_stats_cb(conf, stats_cb);
}

rd_kafka_topic_conf_set_opaque(tconf, h);
rd_kafka_conf_set_default_topic_conf(conf, tconf);
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def print_usage_and_exit(program_name):
sys.stderr.write('Usage: %s [options..] <bootstrap-brokers> <group> <topic1> <topic2> ..\n' % program_name)
options='''
Options:
-T <intvl> Enable statistics from Kafka at specified interval (ms)
-T <intvl> Enable client statistics at specified interval (ms)
'''
sys.stderr.write(options)
sys.exit(1)
Expand Down
77 changes: 77 additions & 0 deletions examples/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import time
import uuid
import sys
import json

try:
from progress.bar import Bar
Expand All @@ -39,6 +40,7 @@

def error_cb (err):
print('Error: %s' % err)


class MyTestDr(object):
""" Producer: Delivery report callback """
Expand Down Expand Up @@ -353,6 +355,78 @@ def my_on_revoke (consumer, partitions):
c.close()


def verify_stats_cb():
""" Verify stats_cb """

def stats_cb(stats_json_str):
stats_json = json.loads(stats_json_str)
if 'test' in stats_json['topics']:
print("# app_offset stats for topic test partition 0: %d" % stats_json['topics']['test']['partitions']['0']['app_offset'])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe put this in a try block to give a meaningful error message if those dict keys are not available


conf = {'bootstrap.servers': bootstrap_servers,
'group.id': uuid.uuid1(),
'session.timeout.ms': 6000,
'error_cb': error_cb,
'stats_cb': stats_cb,
'statistics.interval.ms': 200,
'default.topic.config': {
'auto.offset.reset': 'earliest'
}}

c = confluent_kafka.Consumer(**conf)
c.subscribe(["test"])

max_msgcnt = 1000000
bytecnt = 0
msgcnt = 0

print('Will now consume %d messages' % max_msgcnt)

if with_progress:
bar = Bar('Consuming', max=max_msgcnt,
suffix='%(index)d/%(max)d [%(eta_td)s]')
else:
bar = None

while True:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, but this loop doesnt actually verify that stats were received.
I think it could probably finish on the first complete stats seen to speed things up.

# Consume until EOF or error

msg = c.poll(timeout=20.0)
if msg is None:
raise Exception('Stalled at %d/%d message, no new messages for 20s' %
(msgcnt, max_msgcnt))

if msg.error():
if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
# Reached EOF for a partition, ignore.
continue
else:
raise confluent_kafka.KafkaException(msg.error())


bytecnt += len(msg)
msgcnt += 1

if bar is not None and (msgcnt % 10000) == 0:
bar.next(n=10000)

if msgcnt == 1:
t_first_msg = time.time()
if msgcnt >= max_msgcnt:
break

if bar is not None:
bar.finish()

if msgcnt > 0:
t_spent = time.time() - t_first_msg
print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' % \
(msgcnt, bytecnt / (1024*1024), t_spent, msgcnt / t_spent,
(bytecnt / t_spent) / (1024*1024)))

print('closing consumer')
c.close()


if __name__ == '__main__':

Expand All @@ -377,6 +451,9 @@ def my_on_revoke (consumer, partitions):
print('=' * 30, 'Verifying Consumer performance', '=' * 30)
verify_consumer_performance()

print('=' * 30, 'Verifying stats_cb', '=' * 30)
verify_stats_cb()

print('=' * 30, 'Done', '=' * 30)


45 changes: 6 additions & 39 deletions examples/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,55 +19,22 @@
# Example Kafka Producer.
# Reads lines from stdin and sends to Kafka.
#

from confluent_kafka import Producer
import sys
import getopt
import json
from pprint import pformat

def stats_cb(stats_json_str):
stats_json = json.loads(stats_json_str)
print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))

def print_usage_and_exit(program_name):
sys.stderr.write('Usage: %s [options..] <bootstrap-brokers> <topic>\n' % program_name)
options='''
Options:
-T <intvl> Enable statistics from Kafka at specified interval (ms)
'''
sys.stderr.write(options)
sys.exit(1)


if __name__ == '__main__':
optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
if len(argv) != 2:
print_usage_and_exit(sys.argv[0])
if len(sys.argv) != 3:
sys.stderr.write('Usage: %s <bootstrap-brokers> <topic>\n' % sys.argv[0])
sys.exit(1)

broker = argv[0]
topic = argv[1]
broker = sys.argv[1]
topic = sys.argv[2]

# Producer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {'bootstrap.servers': broker}

# Check to see if -T option exists
for opt in optlist:
if opt[0] != '-T':
continue
try:
intval = int(opt[1])
except:
sys.stderr.write("Invalid option value for -T: %s\n" % opt[1])
sys.exit(1)

if intval <= 0:
sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1])
sys.exit(1)

conf['stats_cb'] = stats_cb
conf['statistics.interval.ms'] = int(opt[1])

# Create Producer instance
p = Producer(**conf)

Expand Down
55 changes: 54 additions & 1 deletion tests/test_misc.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#!/usr/bin/env python

import confluent_kafka

import json
import time
from pprint import pprint

def test_version():
print('Using confluent_kafka module version %s (0x%x)' % confluent_kafka.version())
Expand All @@ -14,3 +16,54 @@ def test_version():
assert len(sver) > 0
assert iver > 0

def test_error_cb():
""" Tests error_cb. """

def error_cb(error_msg):
print('OK: error_cb() called')
assert error_msg.code() in (confluent_kafka.KafkaError._TRANSPORT, confluent_kafka.KafkaError._ALL_BROKERS_DOWN)

conf = {'bootstrap.servers': 'localhost:9093', # Purposely cause connection refused error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually probably the second most likely port to find a broker on localhost, so maybe you want to connect to localhost:22 or somesuch that will definately not a Kafka broker.

'group.id':'test',
'socket.timeout.ms':'100',
'session.timeout.ms': 1000, # Avoid close() blocking too long
'error_cb': error_cb
}

kc = confluent_kafka.Consumer(**conf)

kc.subscribe(["test"])
kc.poll(timeout=0.001)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not verify that error_cb was actually called.
Maybe construct it like so, without sleep:

in test_error_cb:
   seen_error_cb = True
....
while !seen_error_cb:
    kc.poll(timeout=1)
kc.close()

Test timeouts will handle the case where no callback was seen

time.sleep(1)
kc.unsubscribe()

kc.close()

def test_stats_cb():
""" Tests stats_cb. """

def stats_cb(stats_json_str):
# print(stats_json_str)
try:
stats_json = json.loads(stats_json_str)
if 'type' in stats_json:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldnt this be an assertation for checking that the json doc actually contains something we know?

print("stats_cb: type=%s" % stats_json['type'])
print('OK: stats_cb() called')
except Exception as e:
assert False

conf = {'group.id':'test',
'socket.timeout.ms':'100',
'session.timeout.ms': 1000, # Avoid close() blocking too long
'statistics.interval.ms': 200,
'stats_cb': stats_cb
}

kc = confluent_kafka.Consumer(**conf)

kc.subscribe(["test"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing as suggested for error_cb

kc.poll(timeout=0.001)
time.sleep(1)

kc.close()