Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Include constants, add unit test, use tuple
  • Loading branch information
qix committed Nov 8, 2016
commit a8d2701bbc2ee592c9830697c5089977e4a3736c
25 changes: 9 additions & 16 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -327,15 +327,9 @@ static PyObject *Message_offset (Message *self, PyObject *ignore) {


static PyObject *Message_timestamp (Message *self, PyObject *ignore) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to have a single timestamp() method that returns a tuple: tstype, timestamp

if (self->tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE)
return PyLong_FromLong(self->timestamp);
else
Py_RETURN_NONE;
}


static PyObject *Message_tstype (Message *self, PyObject *ignore) {
return PyLong_FromLong(self->tstype);
return Py_BuildValue("iL",
self->tstype,
self->timestamp);
}


Expand Down Expand Up @@ -376,13 +370,8 @@ static PyMethodDef Message_methods[] = {
"\n"
},
{ "timestamp", (PyCFunction)Message_timestamp, METH_NOARGS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to return the timestamp type as well (none, append, create).
Im not sure whether it should be some class level constants or strings, probably the former. @ewencp?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the timestamp type as a class level constant, e.g.:
TIMESTAMP_NOT_AVAILABLE, TIMESTAMP_CREATE_TIME, TIMESTAMP_APPEND_TIME
Map it directly to the librdkafka counterparts.

" :returns: message timestamp or None if not available.\n"
" :rtype: int or None\n"
"\n"
},
{ "tstype", (PyCFunction)Message_tstype, METH_NOARGS,
" :returns: message timestamp type.\n"
" :rtype: int\n"
" :returns: tuple of message tstype, and timestamp.\n"
" :rtype: (int, int)\n"
"\n"
},
{ NULL }
Expand Down Expand Up @@ -1444,6 +1433,10 @@ static PyObject *_init_cimpl (void) {
Py_INCREF(KafkaException);
PyModule_AddObject(m, "KafkaException", KafkaException);

PyModule_AddIntConstant(m, "TIMESTAMP_NOT_AVAILABLE", RD_KAFKA_TIMESTAMP_NOT_AVAILABLE);
PyModule_AddIntConstant(m, "TIMESTAMP_CREATE_TIME", RD_KAFKA_TIMESTAMP_CREATE_TIME);
PyModule_AddIntConstant(m, "TIMESTAMP_LOG_APPEND_TIME", RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME);

return m;
}

Expand Down
3 changes: 2 additions & 1 deletion examples/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,10 @@ def verify_consumer():
break

if False:
tstype, timestamp = msg.timestamp()
print('%s[%d]@%d: key=%s, value=%s, tstype=%d, timestamp=%s' % \
(msg.topic(), msg.partition(), msg.offset(),
msg.key(), msg.value(), msg.tstype(), repr(msg.timestamp())))
msg.key(), msg.value(), tstype, timestamp))

if (msg.offset() % 5) == 0:
# Async commit
Expand Down
5 changes: 4 additions & 1 deletion tests/test_Consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

from confluent_kafka import Consumer, TopicPartition, KafkaError, KafkaException
from confluent_kafka import Consumer, TopicPartition, KafkaError, KafkaException, TIMESTAMP_NOT_AVAILABLE


def test_basic_api():
Expand Down Expand Up @@ -36,6 +36,9 @@ def dummy_assign_revoke (consumer, partitions):
else:
print('OK: consumed message')

if msg is not None:
assert msg.timestamp() == (TIMESTAMP_NOT_AVAILABLE, -1)

partitions = list(map(lambda p: TopicPartition("test", p), range(0,100,3)))
kc.assign(partitions)

Expand Down
6 changes: 6 additions & 0 deletions tests/test_enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ def test_enums():
KafkaError class without an instantiated object. """
print(confluent_kafka.KafkaError._NO_OFFSET)
print(confluent_kafka.KafkaError.REBALANCE_IN_PROGRESS)

def test_tstype_enums():
""" Make sure librdkafka tstype enums are available. """
assert confluent_kafka.TIMESTAMP_NOT_AVAILABLE == 0
assert confluent_kafka.TIMESTAMP_CREATE_TIME == 1
assert confluent_kafka.TIMESTAMP_LOG_APPEND_TIME == 2