Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add timestamp() to librdkafka messages
  • Loading branch information
qix committed Oct 26, 2016
commit 2e6c56d8fb81ff8a3da2a8cf73fabde478b97d35
19 changes: 19 additions & 0 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,14 @@ static PyObject *Message_offset (Message *self, PyObject *ignore) {
}


static PyObject *Message_timestamp (Message *self, PyObject *ignore) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to have a single timestamp() method that returns a tuple: tstype, timestamp

if (self->timestamp > 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be >=?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should depend on the tstype, not the value of the timestamp.
E.g., even a seemingly invalid timestamp with a proper tstype is valid (because it might mean something in the future)

return PyLong_FromLong(self->timestamp);
else
Py_RETURN_NONE;
}


static PyMethodDef Message_methods[] = {
{ "error", (PyCFunction)Message_error, METH_NOARGS,
" The message object is also used to propagate errors and events, "
Expand Down Expand Up @@ -362,6 +370,11 @@ static PyMethodDef Message_methods[] = {
" :rtype: int or None\n"
"\n"
},
{ "timestamp", (PyCFunction)Message_timestamp, METH_NOARGS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to return the timestamp type as well (none, append, create).
Im not sure whether it should be some class level constants or strings, probably the former. @ewencp?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the timestamp type as a class level constant, e.g.:
TIMESTAMP_NOT_AVAILABLE, TIMESTAMP_CREATE_TIME, TIMESTAMP_APPEND_TIME
Map it directly to the librdkafka counterparts.

" :returns: message timestamp or None if not available.\n"
" :rtype: int or None\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should probably return a local timestamp in Python format (e.g float), and the doc string should be updated to reflect what is being returned.
The timestamp() method should also return the tstype (tstype,timestamp) tuple

"\n"
},
{ NULL }
};

Expand Down Expand Up @@ -495,6 +508,12 @@ PyObject *Message_new0 (const rd_kafka_message_t *rkm) {
self->partition = rkm->partition;
self->offset = rkm->offset;

rd_kafka_timestamp_type_t tstype;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put variable declarations at the beginning of the scope to be C89 safe

self->timestamp = rd_kafka_message_timestamp(rkm, &tstype);
if (tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
// todo: make tstype available to python api
}

return (PyObject *)self;
}

Expand Down
1 change: 1 addition & 0 deletions confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ typedef struct {
PyObject *error;
int32_t partition;
int64_t offset;
int64_t timestamp;
} Message;

extern PyTypeObject MessageType;
Expand Down