Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Added (global) on_commit callback.
  • Loading branch information
edenhill committed Apr 25, 2016
commit 77305ffef4df835f1e2b363b0a1b97c7ce9e22d5
62 changes: 62 additions & 0 deletions Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ static int Consumer_clear (Consumer *self) {
Py_DECREF(self->on_revoke);
self->on_revoke = NULL;
}
if (self->on_commit) {
Py_DECREF(self->on_commit);
self->on_commit = NULL;
}
return 0;
}

Expand Down Expand Up @@ -543,6 +547,18 @@ PyTypeObject ConsumerType = {
"\n"
" Create new Consumer instance using provided configuration dict.\n"
"\n"
" Special configuration properties:\n"
" ``on_commit``: Optional callback will be called when a commit "
"request has succeeded or failed.\n"
"\n"
"\n"
".. py:function:: on_commit(consumer, err, partitions)\n"
"\n"
" :param Consumer consumer: Consumer instance.\n"
" :param KafkaError err: Commit error object, or None on success.\n"
" :param list(TopicPartition) partitions: List of partitions with "
"their committed offsets or per-partition errors.\n"
"\n"
"\n", /*tp_doc*/
(traverseproc)Consumer_traverse, /* tp_traverse */
(inquiry)Consumer_clear, /* tp_clear */
Expand Down Expand Up @@ -622,6 +638,51 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
}


static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *c_parts,
void *opaque) {
Consumer *self = opaque;
PyObject *parts, *k_err, *args, *result;

if (!self->on_commit)
return;

PyEval_RestoreThread(self->thread_state);

/* Insantiate error object */
k_err = KafkaError_new_or_None(err, NULL);

/* Construct list of TopicPartition based on 'c_parts' */
parts = c_parts_to_py(c_parts);

args = Py_BuildValue("(OOO)", self, k_err, parts);

Py_DECREF(k_err);
Py_DECREF(parts);

if (!args) {
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
"Unable to build callback args");
self->thread_state = PyEval_SaveThread();
self->callback_crashed++;
return;
}

result = PyObject_CallObject(self->on_commit, args);

Py_DECREF(args);

if (result)
Py_DECREF(result);
else {
self->callback_crashed++;
rd_kafka_yield(rk);
}

self->thread_state = PyEval_SaveThread();
}



static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
PyObject *kwargs) {
Expand All @@ -640,6 +701,7 @@ static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
}

rd_kafka_conf_set_rebalance_cb(conf, Consumer_rebalance_cb);
rd_kafka_conf_set_offset_commit_cb(conf, Consumer_offset_commit_cb);

self->rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
errstr, sizeof(errstr));
Expand Down
57 changes: 42 additions & 15 deletions confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,34 @@ static int producer_conf_set_special (Producer *self, rd_kafka_conf_t *conf,
}


/**
* @brief Set single special consumer config value.
*
* @returns 1 if handled, 0 if unknown, or -1 on failure (exception raised).
*/
static int consumer_conf_set_special (Consumer *self, rd_kafka_conf_t *conf,
rd_kafka_topic_conf_t *tconf,
const char *name, PyObject *valobj) {

if (!strcasecmp(name, "on_commit")) {
if (!PyCallable_Check(valobj)) {
cfl_PyErr_Format(
RD_KAFKA_RESP_ERR__INVALID_ARG,
"%s requires a callable "
"object", name);
return -1;
}

self->on_commit = valobj;
Py_INCREF(self->on_commit);

return 1;
}

return 0;
}


/**
* Common config setup for Kafka client handles.
*
Expand Down Expand Up @@ -1004,6 +1032,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
const char *k;
const char *v;
char errstr[256];
int r;

if (!(ks = cfl_PyObject_Unistr(ko))) {
PyErr_SetString(PyExc_TypeError,
Expand All @@ -1028,24 +1057,22 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
}

/* Special handling for certain config keys. */
if (ktype == RD_KAFKA_PRODUCER) {
int r;

if (ktype == RD_KAFKA_PRODUCER)
r = producer_conf_set_special((Producer *)self0,
conf, tconf, k, vo);
if (r == -1) {
/* Error */
Py_DECREF(ks);
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
return NULL;

} else if (r == 1) {
/* Handled */
continue;
}
else
r = consumer_conf_set_special((Consumer *)self0,
conf, tconf, k, vo);
if (r == -1) {
/* Error */
Py_DECREF(ks);
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
return NULL;

/* FALLTHRU */
} else if (r == 1) {
/* Handled */
continue;
}


Expand Down
1 change: 1 addition & 0 deletions confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ typedef struct {
int rebalance_assigned; /* Rebalance: Callback performed assign() call.*/
PyObject *on_assign; /* Rebalance: on_assign callback */
PyObject *on_revoke; /* Rebalance: on_revoke callback */
PyObject *on_commit; /* Commit callback */
int callback_crashed;
PyThreadState *thread_state;
} Consumer;
Expand Down
4 changes: 4 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ providing a dict of configuration properties to the instance constructor, e.g.::

conf = {'bootstrap.servers': 'mybroker.com',
'group.id': 'mygroup', 'session.timeout.ms': 6000,
'on_commit': my_commit_callback,
'default.topic.config': {'auto.offset.reset': 'smallest'}}
consumer = confluent_kafka.Consumer(**conf)

Expand All @@ -40,5 +41,8 @@ The Python bindings also provide some additional configuration properties:
This property may also be set per-message by passing ``callback=somefunc``
to the confluent_kafka.Producer.produce() function.

* ``on_commit`` (**Consumer**): Callback used to indicate success or failure
of commit requests.



8 changes: 8 additions & 0 deletions integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ def verify_producer_performance(with_dr_cb=True):
(t_delivery_spent - t_produce_spent))


def print_commit_result (consumer, err, partitions):
if err is not None:
print('# Failed to commit offsets: %s: %s' % (err, partitions))
else:
print('# Committed offsets for: %s' % partitions)


def verify_consumer():
""" Verify basic Consumer functionality """

Expand All @@ -199,6 +206,7 @@ def verify_consumer():
'group.id': 'test.py',
'session.timeout.ms': 6000,
'enable.auto.commit': False,
'on_commit': print_commit_result,
'default.topic.config': {
'auto.offset.reset': 'earliest'
}}
Expand Down
6 changes: 5 additions & 1 deletion tests/test_Consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ def test_basic_api():
except TypeError as e:
assert str(e) == "expected configuration dict"

kc = Consumer({'group.id':'test', 'socket.timeout.ms':'100'})
def dummy_commit_cb (consumer, err, partitions):
pass

kc = Consumer({'group.id':'test', 'socket.timeout.ms':'100',
'on_commit': dummy_commit_cb})

kc.subscribe(["test"])
kc.unsubscribe()
Expand Down