Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added error_cb for propagating generic errors from librdkafka to app …
…(issue #14)
  • Loading branch information
edenhill committed Jul 13, 2016
commit 0e7b694b455923610858231f862c81a41b08f566
65 changes: 65 additions & 0 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,39 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
}


/****************************************************************************
*
*
* Common callbacks
*
*
*
*
****************************************************************************/
static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) {
Handle *h = opaque;
PyObject *eo, *result;

PyEval_RestoreThread(h->thread_state);
if (!h->error_cb) {
/* No callback defined */
goto done;
}

eo = KafkaError_new0(err, "%s", reason);
result = PyObject_CallFunctionObjArgs(h->error_cb, eo, NULL);
Py_DECREF(eo);

if (result) {
Py_DECREF(result);
} else {
h->callback_crashed++;
rd_kafka_yield(h->rk);
}

done:
h->thread_state = PyEval_SaveThread();
}


/****************************************************************************
Expand All @@ -814,6 +847,28 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
****************************************************************************/



/**
* Clear Python object references in Handle
*/
void Handle_clear (Handle *h) {
if (h->error_cb) {
Py_DECREF(h->error_cb);
}
}

/**
* GC traversal for Python object references
*/
int Handle_traverse (Handle *h, visitproc visit, void *arg) {
if (h->error_cb)
Py_VISIT(h->error_cb);

return 0;
}



/**
* Populate topic conf from provided dict.
*
Expand Down Expand Up @@ -1053,6 +1108,14 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,

Py_DECREF(ks);
continue;

} else if (!strcmp(k, "error_cb")) {
if (h->error_cb)
Py_DECREF(h->error_cb);
h->error_cb = vo;
Py_INCREF(h->error_cb);
Py_DECREF(ks);
continue;
}

/* Special handling for certain config keys. */
Expand Down Expand Up @@ -1102,6 +1165,8 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
Py_DECREF(ks);
}

if (h->error_cb)
rd_kafka_conf_set_error_cb(conf, error_cb);
rd_kafka_topic_conf_set_opaque(tconf, h);
rd_kafka_conf_set_default_topic_conf(conf, tconf);

Expand Down
3 changes: 3 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ The Python bindings also provide some additional configuration properties:
* ``default.topic.config``: value is a dict of topic-level configuration
properties that are applied to all used topics for the instance.

* ``error_cb``: Callback for generic/global error events. This callback is served by
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be including the argument lists for these callbacks?

poll().

* ``on_delivery`` (**Producer**): value is a Python function reference
that is called once for each produced message to indicate the final
delivery result (success or failure).
Expand Down
8 changes: 7 additions & 1 deletion examples/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@



def error_cb (err):
print('Error: %s' % err)

class MyTestDr(object):
""" Producer: Delivery report callback """
Expand Down Expand Up @@ -77,6 +79,7 @@ def verify_producer():

# Producer config
conf = {'bootstrap.servers': bootstrap_servers,
'error_cb': error_cb,
'default.topic.config':{'produce.offset.report': True}}

# Create producer
Expand Down Expand Up @@ -112,7 +115,8 @@ def verify_producer():

def verify_producer_performance(with_dr_cb=True):
""" Time how long it takes to produce and delivery X messages """
conf = {'bootstrap.servers': bootstrap_servers}
conf = {'bootstrap.servers': bootstrap_servers,
'error_cb': error_cb}

p = confluent_kafka.Producer(**conf)

Expand Down Expand Up @@ -207,6 +211,7 @@ def verify_consumer():
'session.timeout.ms': 6000,
'enable.auto.commit': False,
'on_commit': print_commit_result,
'error_cb': error_cb,
'default.topic.config': {
'auto.offset.reset': 'earliest'
}}
Expand Down Expand Up @@ -275,6 +280,7 @@ def verify_consumer_performance():
conf = {'bootstrap.servers': bootstrap_servers,
'group.id': uuid.uuid1(),
'session.timeout.ms': 6000,
'error_cb': error_cb,
'default.topic.config': {
'auto.offset.reset': 'earliest'
}}
Expand Down
4 changes: 4 additions & 0 deletions tests/test_Producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ def test_basic_api():
assert str(e) == "expected configuration dict"


def error_cb (err):
print('error_cb', err)

p = Producer({'socket.timeout.ms':10,
'error_cb': error_cb,
'default.topic.config': {'message.timeout.ms': 10}})

p.produce('mytopic')
Expand Down