Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Updates for requested changes
  • Loading branch information
qix committed Oct 26, 2016
commit 339cb33486fde788c6cb5cde8757fb8117c627b7
18 changes: 12 additions & 6 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,18 @@ static PyObject *Message_offset (Message *self, PyObject *ignore) {


static PyObject *Message_timestamp (Message *self, PyObject *ignore) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to have a single timestamp() method that returns a tuple: tstype, timestamp

if (self->timestamp > 0)
if (self->tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE)
return PyLong_FromLong(self->timestamp);
else
Py_RETURN_NONE;
}


static PyObject *Message_tstype (Message *self, PyObject *ignore) {
return PyLong_FromLong(self->tstype);
}


static PyMethodDef Message_methods[] = {
{ "error", (PyCFunction)Message_error, METH_NOARGS,
" The message object is also used to propagate errors and events, "
Expand Down Expand Up @@ -375,6 +380,11 @@ static PyMethodDef Message_methods[] = {
" :rtype: int or None\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should probably return a local timestamp in Python format (e.g float), and the doc string should be updated to reflect what is being returned.
The timestamp() method should also return the tstype (tstype,timestamp) tuple

"\n"
},
{ "tstype", (PyCFunction)Message_tstype, METH_NOARGS,
" :returns: message timestamp type.\n"
" :rtype: int\n"
"\n"
},
{ NULL }
};

Expand Down Expand Up @@ -508,11 +518,7 @@ PyObject *Message_new0 (const rd_kafka_message_t *rkm) {
self->partition = rkm->partition;
self->offset = rkm->offset;

rd_kafka_timestamp_type_t tstype;
self->timestamp = rd_kafka_message_timestamp(rkm, &tstype);
if (tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
// todo: make tstype available to python api
}
self->timestamp = rd_kafka_message_timestamp(rkm, &self->tstype);

return (PyObject *)self;
}
Expand Down
1 change: 1 addition & 0 deletions confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ typedef struct {
int32_t partition;
int64_t offset;
int64_t timestamp;
rd_kafka_timestamp_type_t tstype;
} Message;

extern PyTypeObject MessageType;
Expand Down
5 changes: 3 additions & 2 deletions examples/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ def verify_consumer():
'group.id': 'test.py',
'session.timeout.ms': 6000,
'enable.auto.commit': False,
'api.version.request': True,
'on_commit': print_commit_result,
'error_cb': error_cb,
'default.topic.config': {
Expand Down Expand Up @@ -243,9 +244,9 @@ def verify_consumer():
break

if False:
print('%s[%d]@%d: key=%s, value=%s' % \
print('%s[%d]@%d: key=%s, value=%s, tstype=%d, timestamp=%s' % \
(msg.topic(), msg.partition(), msg.offset(),
msg.key(), msg.value()))
msg.key(), msg.value(), msg.tstype(), repr(msg.timestamp())))

if (msg.offset() % 5) == 0:
# Async commit
Expand Down