Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,19 @@ static PyObject *Message_offset (Message *self, PyObject *ignore) {
}


static PyObject *Message_timestamp (Message *self, PyObject *ignore) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to have a single timestamp() method that returns a tuple: tstype, timestamp

if (self->tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE)
return PyLong_FromLong(self->timestamp);
else
Py_RETURN_NONE;
}


static PyObject *Message_tstype (Message *self, PyObject *ignore) {
return PyLong_FromLong(self->tstype);
}


static PyMethodDef Message_methods[] = {
{ "error", (PyCFunction)Message_error, METH_NOARGS,
" The message object is also used to propagate errors and events, "
Expand Down Expand Up @@ -362,6 +375,16 @@ static PyMethodDef Message_methods[] = {
" :rtype: int or None\n"
"\n"
},
{ "timestamp", (PyCFunction)Message_timestamp, METH_NOARGS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to return the timestamp type as well (none, append, create).
Im not sure whether it should be some class level constants or strings, probably the former. @ewencp?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the timestamp type as a class level constant, e.g.:
TIMESTAMP_NOT_AVAILABLE, TIMESTAMP_CREATE_TIME, TIMESTAMP_APPEND_TIME
Map it directly to the librdkafka counterparts.

" :returns: message timestamp or None if not available.\n"
" :rtype: int or None\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should probably return a local timestamp in Python format (e.g float), and the doc string should be updated to reflect what is being returned.
The timestamp() method should also return the tstype (tstype,timestamp) tuple

"\n"
},
{ "tstype", (PyCFunction)Message_tstype, METH_NOARGS,
" :returns: message timestamp type.\n"
" :rtype: int\n"
"\n"
},
{ NULL }
};

Expand Down Expand Up @@ -495,6 +518,8 @@ PyObject *Message_new0 (const rd_kafka_message_t *rkm) {
self->partition = rkm->partition;
self->offset = rkm->offset;

self->timestamp = rd_kafka_message_timestamp(rkm, &self->tstype);

return (PyObject *)self;
}

Expand Down
2 changes: 2 additions & 0 deletions confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ typedef struct {
PyObject *error;
int32_t partition;
int64_t offset;
int64_t timestamp;
rd_kafka_timestamp_type_t tstype;
} Message;

extern PyTypeObject MessageType;
Expand Down
5 changes: 3 additions & 2 deletions examples/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ def verify_consumer():
'group.id': 'test.py',
'session.timeout.ms': 6000,
'enable.auto.commit': False,
'api.version.request': True,
'on_commit': print_commit_result,
'error_cb': error_cb,
'default.topic.config': {
Expand Down Expand Up @@ -243,9 +244,9 @@ def verify_consumer():
break

if False:
print('%s[%d]@%d: key=%s, value=%s' % \
print('%s[%d]@%d: key=%s, value=%s, tstype=%d, timestamp=%s' % \
(msg.topic(), msg.partition(), msg.offset(),
msg.key(), msg.value()))
msg.key(), msg.value(), msg.tstype(), repr(msg.timestamp())))

if (msg.offset() % 5) == 0:
# Async commit
Expand Down