Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
expose stats_cb
  • Loading branch information
hqin committed Oct 26, 2016
commit 201b6b333f91b2abe545e9262688233b81617836
50 changes: 50 additions & 0 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,33 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
CallState_resume(cs);
}

static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
{
Handle *h = opaque;
PyObject *eo=NULL, *result=NULL;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style is inconsistent, let's try to maintain same style as in existing file to ease readability.

Functions: Open-brace on same line as function decl.
Variable declarations: type SPACE variablename SPACE = SPACE value;
If statements: if SPACE (EXPR) SPACE {
EXPR: lhs SPACE operator SPACE rhs (e.g., json_len == 0)

CallState *cs=NULL;

cs = CallState_get(h);
if (json_len== 0 || !h->stats_cb) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We dont need to enable the C stats_cb if no Python stats_cb has been set, which remove the !h->stats_cb check

/* Neither data nor call back defined. */
goto done;
}

eo = Py_BuildValue("s", json);
result = PyObject_CallFunctionObjArgs(h->stats_cb, eo, NULL);
Py_DECREF(eo);

if (result) {
Py_DECREF(result);
} else {
CallState_crash(cs);
rd_kafka_yield(h->rk);
}

done:
CallState_resume(cs);
return 0;
}

/****************************************************************************
*
Expand All @@ -857,6 +884,10 @@ void Handle_clear (Handle *h) {
Py_DECREF(h->error_cb);
}

if (h->stats_cb) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: remove braces for single-line clauses (you can remove the ones for error_cb above too, shouldnt be there)

Py_DECREF(h->stats_cb);
}

PyThread_delete_key(h->tlskey);
}

Expand All @@ -867,6 +898,9 @@ int Handle_traverse (Handle *h, visitproc visit, void *arg) {
if (h->error_cb)
Py_VISIT(h->error_cb);

if (h->stats_cb)
Py_VISIT(h->stats_cb);

return 0;
}

Expand Down Expand Up @@ -1123,6 +1157,17 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
}
Py_DECREF(ks);
continue;
} else if (!strcmp(k, "stats_cb")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to check here and for error_cb if the object is callable (PyCallable_Check(vo)) (Py_None is okay too)

if (h->stats_cb) {
Py_DECREF(h->stats_cb);
h->stats_cb = NULL;
}
if (vo != Py_None) {
h->stats_cb = vo;
Py_INCREF(h->stats_cb);
}
Py_DECREF(ks);
continue;
}

/* Special handling for certain config keys. */
Expand Down Expand Up @@ -1174,6 +1219,11 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,

if (h->error_cb)
rd_kafka_conf_set_error_cb(conf, error_cb);

if (h->stats_cb) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: remove braces

rd_kafka_conf_set_stats_cb(conf, stats_cb);
}

rd_kafka_topic_conf_set_opaque(tconf, h);
rd_kafka_conf_set_default_topic_conf(conf, tconf);

Expand Down
1 change: 1 addition & 0 deletions confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ typedef struct {
PyObject_HEAD
rd_kafka_t *rk;
PyObject *error_cb;
PyObject *stats_cb;
int tlskey; /* Thread-Local-Storage key */

union {
Expand Down
47 changes: 39 additions & 8 deletions examples/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,55 @@
#
# Example high-level Kafka 0.9 balanced Consumer
#

from confluent_kafka import Consumer, KafkaException, KafkaError
import sys
import getopt
import json
from pprint import pformat

if __name__ == '__main__':
if len(sys.argv) < 4:
sys.stderr.write('Usage: %s <bootstrap-brokers> <group> <topic1> <topic2> ..\n' % sys.argv[0])
sys.exit(1)
def stats_cb(stats_json_str):
stats_json = json.loads(stats_json_str)
print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))

broker = sys.argv[1]
group = sys.argv[2]
topics = sys.argv[3:]
def print_usage_and_exit(program_name):
sys.stderr.write('Usage: %s [options..] <bootstrap-brokers> <group> <topic1> <topic2> ..\n' % program_name)
options='''
Options:
-T <intvl> Enable statistics from Kafka at specified interval (ms)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the stats are from the client, not Kafka, so might want to rephrase that to something like:
Enable client statistics at specified interval (ms)

'''
sys.stderr.write(options)
sys.exit(1)


if __name__ == '__main__':
optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if len(argv) < 3:
print_usage_and_exit(sys.argv[0])

broker = argv[0]
group = argv[1]
topics = argv[2:]
# Consumer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {'bootstrap.servers': broker, 'group.id': group, 'session.timeout.ms': 6000,
'default.topic.config': {'auto.offset.reset': 'smallest'}}

# Check to see if -T option exists
for opt in optlist:
if opt[0] != '-T':
continue
try:
intval = int(opt[1])
except:
sys.stderr.write("Invalid option value for -T: %s\n" % opt[1])
sys.exit(1)

if intval <= 0:
sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1])
sys.exit(1)

conf['stats_cb'] = stats_cb
conf['statistics.interval.ms'] = int(opt[1])

# Create Consumer instance
c = Consumer(**conf)
Expand Down
45 changes: 39 additions & 6 deletions examples/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,55 @@
# Example Kafka Producer.
# Reads lines from stdin and sends to Kafka.
#

from confluent_kafka import Producer
import sys
import getopt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we only the stats_cb example in of the examples (to keep the examples small).
Remove either this one or the other one.

import json
from pprint import pformat

def stats_cb(stats_json_str):
stats_json = json.loads(stats_json_str)
print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))

def print_usage_and_exit(program_name):
sys.stderr.write('Usage: %s [options..] <bootstrap-brokers> <topic>\n' % program_name)
options='''
Options:
-T <intvl> Enable statistics from Kafka at specified interval (ms)
'''
sys.stderr.write(options)
sys.exit(1)


if __name__ == '__main__':
if len(sys.argv) != 3:
sys.stderr.write('Usage: %s <bootstrap-brokers> <topic>\n' % sys.argv[0])
sys.exit(1)
optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
if len(argv) != 2:
print_usage_and_exit(sys.argv[0])

broker = sys.argv[1]
topic = sys.argv[2]
broker = argv[0]
topic = argv[1]

# Producer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {'bootstrap.servers': broker}

# Check to see if -T option exists
for opt in optlist:
if opt[0] != '-T':
continue
try:
intval = int(opt[1])
except:
sys.stderr.write("Invalid option value for -T: %s\n" % opt[1])
sys.exit(1)

if intval <= 0:
sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1])
sys.exit(1)

conf['stats_cb'] = stats_cb
conf['statistics.interval.ms'] = int(opt[1])

# Create Producer instance
p = Producer(**conf)

Expand Down