Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__all__ = ['cimpl','kafkatest']
from .cimpl import *
69 changes: 66 additions & 3 deletions Consumer.c → confluent_kafka/cimpl/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ static int Consumer_clear (Consumer *self) {
Py_DECREF(self->on_revoke);
self->on_revoke = NULL;
}
if (self->on_commit) {
Py_DECREF(self->on_commit);
self->on_commit = NULL;
}
return 0;
}

Expand Down Expand Up @@ -518,7 +522,7 @@ static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,

PyTypeObject ConsumerType = {
PyVarObject_HEAD_INIT(NULL, 0)
"confluent_kafka.Consumer", /*tp_name*/
"cimpl.Consumer", /*tp_name*/
sizeof(Consumer), /*tp_basicsize*/
0, /*tp_itemsize*/
(destructor)Consumer_dealloc, /*tp_dealloc*/
Expand All @@ -543,6 +547,18 @@ PyTypeObject ConsumerType = {
"\n"
" Create new Consumer instance using provided configuration dict.\n"
"\n"
" Special configuration properties:\n"
" ``on_commit``: Optional callback will be called when a commit "
"request has succeeded or failed.\n"
"\n"
"\n"
".. py:function:: on_commit(consumer, err, partitions)\n"
"\n"
" :param Consumer consumer: Consumer instance.\n"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember if passing the consumer instance as part of the callback came up before. I checked and we have it elsewhere. Is this valuable? Often in Python you just grab what you need via closure.

" :param KafkaError err: Commit error object, or None on success.\n"
" :param list(TopicPartition) partitions: List of partitions with "
"their committed offsets or per-partition errors.\n"
"\n"
"\n", /*tp_doc*/
(traverseproc)Consumer_traverse, /* tp_traverse */
(inquiry)Consumer_clear, /* tp_clear */
Expand All @@ -566,8 +582,8 @@ PyTypeObject ConsumerType = {


static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *c_parts,
void *opaque) {
rd_kafka_topic_partition_list_t *c_parts,
void *opaque) {
Consumer *self = opaque;

PyEval_RestoreThread(self->thread_state);
Expand All @@ -590,6 +606,7 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
"Unable to build callback args");
self->thread_state = PyEval_SaveThread();
self->callback_crashed++;
return;
}

Expand Down Expand Up @@ -622,6 +639,51 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
}


static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *c_parts,
void *opaque) {
Consumer *self = opaque;
PyObject *parts, *k_err, *args, *result;

if (!self->on_commit)
return;

PyEval_RestoreThread(self->thread_state);

/* Insantiate error object */
k_err = KafkaError_new_or_None(err, NULL);

/* Construct list of TopicPartition based on 'c_parts' */
parts = c_parts_to_py(c_parts);

args = Py_BuildValue("(OOO)", self, k_err, parts);

Py_DECREF(k_err);
Py_DECREF(parts);

if (!args) {
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
"Unable to build callback args");
self->thread_state = PyEval_SaveThread();
self->callback_crashed++;
return;
}

result = PyObject_CallObject(self->on_commit, args);

Py_DECREF(args);

if (result)
Py_DECREF(result);
else {
self->callback_crashed++;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be doing anything else if the callback fails? I know we return NULL from poll, but will that result in the exception effectively being raised again?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning NULL from a C python method makes it automatically raise any exceptions generated during function execution, so we should be good here.

rd_kafka_yield(rk);
}

self->thread_state = PyEval_SaveThread();
}



static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
PyObject *kwargs) {
Expand All @@ -640,6 +702,7 @@ static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
}

rd_kafka_conf_set_rebalance_cb(conf, Consumer_rebalance_cb);
rd_kafka_conf_set_offset_commit_cb(conf, Consumer_offset_commit_cb);

self->rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
errstr, sizeof(errstr));
Expand Down
14 changes: 7 additions & 7 deletions Producer.c → confluent_kafka/cimpl/Producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ static PyObject *Producer_produce (Producer *self, PyObject *args,
"key",
"partition",
"callback",
"delivery_callback", /* Alias */
"on_delivery", /* Alias */
"partitioner",
NULL };

Expand Down Expand Up @@ -365,16 +365,16 @@ static PyMethodDef Producer_methods[] = {
"\n"
" Produce message to topic.\n"
" This is an asynchronous operation, an application may use the "
"``ondelivery`` argument to pass a function (or lambda) that "
"will be called from :py:func:`poll()` when the message has been "
"succesfully delivered or permanently fails delivery.\n"
"``callback`` (alias ``on_delivery``) argument to pass a function "
"(or lambda) that will be called from :py:func:`poll()` when the "
"message has been succesfully delivered or permanently fails delivery.\n"
"\n"
" :param str topic: Topic to produce message to\n"
" :param str value: Message payload\n"
" :param str key: Message key\n"
" :param int partition: Partition to produce to, elses uses the "
"configured partitioner.\n"
" :param func ondelivery(err,msg): Delivery report callback to call "
" :param func on_delivery(err,msg): Delivery report callback to call "
"(from :py:func:`poll()` or :py:func:`flush()`) on succesful or "
"failed delivery\n"
"\n"
Expand All @@ -393,7 +393,7 @@ static PyMethodDef Producer_methods[] = {
"\n"
" Callbacks:\n"
"\n"
" - ``ondelivery`` callbacks from :py:func:`produce()`\n"
" - ``on_delivery`` callbacks from :py:func:`produce()`\n"
" - ...\n"
"\n"
" :param float timeout: Maximum time to block waiting for events.\n"
Expand Down Expand Up @@ -430,7 +430,7 @@ static PyObject *Producer_new (PyTypeObject *type, PyObject *args,

PyTypeObject ProducerType = {
PyVarObject_HEAD_INIT(NULL, 0)
"confluent_kafka.Producer", /*tp_name*/
"cimpl.Producer", /*tp_name*/
sizeof(Producer), /*tp_basicsize*/
0, /*tp_itemsize*/
(destructor)Producer_dealloc, /*tp_dealloc*/
Expand Down
102 changes: 64 additions & 38 deletions confluent_kafka.c → confluent_kafka/cimpl/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ static PyObject* KafkaError_richcompare (KafkaError *self, PyObject *o2,

static PyTypeObject KafkaErrorType = {
PyVarObject_HEAD_INIT(NULL, 0)
"confluent_kafka.KafkaError", /*tp_name*/
"cimpl.KafkaError", /*tp_name*/
sizeof(KafkaError), /*tp_basicsize*/
0, /*tp_itemsize*/
(destructor)KafkaError_dealloc, /*tp_dealloc*/
Expand Down Expand Up @@ -248,7 +248,7 @@ PyObject *KafkaError_new0 (rd_kafka_resp_err_t err, const char *fmt, ...) {
va_end(ap);
}

KafkaError_init(self, err, fmt ? buf : NULL);
KafkaError_init(self, err, fmt ? buf : rd_kafka_err2str(err));

return (PyObject *)self;
}
Expand All @@ -257,11 +257,10 @@ PyObject *KafkaError_new0 (rd_kafka_resp_err_t err, const char *fmt, ...) {
* @brief Internal factory to create KafkaError object.
* @returns a new KafkaError object if \p err != 0, else a None object.
*/
static PyObject *KafkaError_new_or_None (rd_kafka_resp_err_t err,
const char *str) {
PyObject *KafkaError_new_or_None (rd_kafka_resp_err_t err, const char *str) {
if (!err)
Py_RETURN_NONE;
return KafkaError_new0(err, str);
return KafkaError_new0(err, "%s", str);
}


Expand Down Expand Up @@ -417,7 +416,7 @@ static PySequenceMethods Message_seq_methods = {

PyTypeObject MessageType = {
PyVarObject_HEAD_INIT(NULL, 0)
"confluent_kafka.Message", /*tp_name*/
"cimpl.Message", /*tp_name*/
sizeof(Message), /*tp_basicsize*/
0, /*tp_itemsize*/
(destructor)Message_dealloc, /*tp_dealloc*/
Expand Down Expand Up @@ -661,7 +660,7 @@ static long TopicPartition_hash (TopicPartition *self) {

static PyTypeObject TopicPartitionType = {
PyVarObject_HEAD_INIT(NULL, 0)
"confluent_kafka.TopicPartition", /*tp_name*/
"cimpl.TopicPartition", /*tp_name*/
sizeof(TopicPartition), /*tp_basicsize*/
0, /*tp_itemsize*/
(destructor)TopicPartition_dealloc, /*tp_dealloc*/
Expand Down Expand Up @@ -773,7 +772,7 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {

if (!PyList_Check(plist)) {
PyErr_SetString(PyExc_TypeError,
"requires list of confluent_kafka.TopicPartition");
"requires list of TopicPartition");
return NULL;
}

Expand Down Expand Up @@ -886,7 +885,7 @@ static int producer_conf_set_special (Producer *self, rd_kafka_conf_t *conf,
PyObject *vs;
const char *val;

if (!strcasecmp(name, "delivery_callback")) {
if (!strcasecmp(name, "on_delivery")) {
if (!PyCallable_Check(valobj)) {
cfl_PyErr_Format(
RD_KAFKA_RESP_ERR__INVALID_ARG,
Expand Down Expand Up @@ -966,6 +965,34 @@ static int producer_conf_set_special (Producer *self, rd_kafka_conf_t *conf,
}


/**
* @brief Set single special consumer config value.
*
* @returns 1 if handled, 0 if unknown, or -1 on failure (exception raised).
*/
static int consumer_conf_set_special (Consumer *self, rd_kafka_conf_t *conf,
rd_kafka_topic_conf_t *tconf,
const char *name, PyObject *valobj) {

if (!strcasecmp(name, "on_commit")) {
if (!PyCallable_Check(valobj)) {
cfl_PyErr_Format(
RD_KAFKA_RESP_ERR__INVALID_ARG,
"%s requires a callable "
"object", name);
return -1;
}

self->on_commit = valobj;
Py_INCREF(self->on_commit);

return 1;
}

return 0;
}


/**
* Common config setup for Kafka client handles.
*
Expand Down Expand Up @@ -1004,6 +1031,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
const char *k;
const char *v;
char errstr[256];
int r;

if (!(ks = cfl_PyObject_Unistr(ko))) {
PyErr_SetString(PyExc_TypeError,
Expand All @@ -1028,24 +1056,22 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
}

/* Special handling for certain config keys. */
if (ktype == RD_KAFKA_PRODUCER) {
int r;

if (ktype == RD_KAFKA_PRODUCER)
r = producer_conf_set_special((Producer *)self0,
conf, tconf, k, vo);
if (r == -1) {
/* Error */
Py_DECREF(ks);
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
return NULL;

} else if (r == 1) {
/* Handled */
continue;
}
else
r = consumer_conf_set_special((Consumer *)self0,
conf, tconf, k, vo);
if (r == -1) {
/* Error */
Py_DECREF(ks);
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
return NULL;

/* FALLTHRU */
} else if (r == 1) {
/* Handled */
continue;
}


Expand Down Expand Up @@ -1111,7 +1137,7 @@ static PyObject *version (PyObject *self, PyObject *args) {
return Py_BuildValue("si", "0.9.1", 0x00090100);
}

static PyMethodDef confluent_kafka_methods[] = {
static PyMethodDef cimpl_methods[] = {
{"libversion", libversion, METH_NOARGS,
" Retrieve librdkafka version string and integer\n"
"\n"
Expand Down Expand Up @@ -1204,17 +1230,17 @@ static char *KafkaError_add_errs (PyObject *dict, const char *origdoc) {


#ifdef PY3
static struct PyModuleDef confluent_kafka_moduledef = {
static struct PyModuleDef cimpl_moduledef = {
PyModuleDef_HEAD_INIT,
"confluent_kafka", /* m_name */
"Confluent's Apache Kafka Python client", /* m_doc */
"cimpl", /* m_name */
"Confluent's Apache Kafka Python client (C implementation)", /* m_doc */
-1, /* m_size */
confluent_kafka_methods, /* m_methods */
cimpl_methods, /* m_methods */
};
#endif


static PyObject *_init_confluent_kafka (void) {
static PyObject *_init_cimpl (void) {
PyObject *m;

if (PyType_Ready(&KafkaErrorType) < 0)
Expand All @@ -1229,10 +1255,10 @@ static PyObject *_init_confluent_kafka (void) {
return NULL;

#ifdef PY3
m = PyModule_Create(&confluent_kafka_moduledef);
m = PyModule_Create(&cimpl_moduledef);
#else
m = Py_InitModule3("confluent_kafka", confluent_kafka_methods,
"Confluent's Apache Kafka Python client");
m = Py_InitModule3("cimpl", cimpl_methods,
"Confluent's Apache Kafka Python client (C implementation)");
#endif
if (!m)
return NULL;
Expand All @@ -1257,7 +1283,7 @@ static PyObject *_init_confluent_kafka (void) {
PyModule_AddObject(m, "Consumer", (PyObject *)&ConsumerType);

KafkaException = PyErr_NewExceptionWithDoc(
"confluent_kafka.KafkaException",
"cimpl.KafkaException",
"Kafka exception that wraps the :py:class:`KafkaError` "
"class.\n"
"\n"
Expand All @@ -1273,11 +1299,11 @@ static PyObject *_init_confluent_kafka (void) {


#ifdef PY3
PyMODINIT_FUNC PyInit_confluent_kafka (void) {
return _init_confluent_kafka();
PyMODINIT_FUNC PyInit_cimpl (void) {
return _init_cimpl();
}
#else
PyMODINIT_FUNC initconfluent_kafka (void) {
_init_confluent_kafka();
PyMODINIT_FUNC initcimpl (void) {
_init_cimpl();
}
#endif
Loading