diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c index 3539f16f7..441425ddd 100644 --- a/confluent_kafka/src/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -326,6 +326,13 @@ static PyObject *Message_offset (Message *self, PyObject *ignore) { } +static PyObject *Message_timestamp (Message *self, PyObject *ignore) { + return Py_BuildValue("iL", + self->tstype, + self->timestamp); +} + + static PyMethodDef Message_methods[] = { { "error", (PyCFunction)Message_error, METH_NOARGS, " The message object is also used to propagate errors and events, " @@ -362,6 +369,11 @@ static PyMethodDef Message_methods[] = { " :rtype: int or None\n" "\n" }, + { "timestamp", (PyCFunction)Message_timestamp, METH_NOARGS, + " :returns: tuple of message tstype, and timestamp.\n" + " :rtype: (int, int)\n" + "\n" + }, { NULL } }; @@ -495,6 +507,8 @@ PyObject *Message_new0 (const rd_kafka_message_t *rkm) { self->partition = rkm->partition; self->offset = rkm->offset; + self->timestamp = rd_kafka_message_timestamp(rkm, &self->tstype); + return (PyObject *)self; } @@ -1419,6 +1433,10 @@ static PyObject *_init_cimpl (void) { Py_INCREF(KafkaException); PyModule_AddObject(m, "KafkaException", KafkaException); + PyModule_AddIntConstant(m, "TIMESTAMP_NOT_AVAILABLE", RD_KAFKA_TIMESTAMP_NOT_AVAILABLE); + PyModule_AddIntConstant(m, "TIMESTAMP_CREATE_TIME", RD_KAFKA_TIMESTAMP_CREATE_TIME); + PyModule_AddIntConstant(m, "TIMESTAMP_LOG_APPEND_TIME", RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME); + return m; } diff --git a/confluent_kafka/src/confluent_kafka.h b/confluent_kafka/src/confluent_kafka.h index 5e8eb7989..ceeef848a 100644 --- a/confluent_kafka/src/confluent_kafka.h +++ b/confluent_kafka/src/confluent_kafka.h @@ -219,6 +219,8 @@ typedef struct { PyObject *error; int32_t partition; int64_t offset; + int64_t timestamp; + rd_kafka_timestamp_type_t tstype; } Message; extern PyTypeObject MessageType; diff --git a/examples/integration_test.py b/examples/integration_test.py index f1f9a6009..d92597b86 100755 --- a/examples/integration_test.py +++ b/examples/integration_test.py @@ -210,6 +210,7 @@ def verify_consumer(): 'group.id': 'test.py', 'session.timeout.ms': 6000, 'enable.auto.commit': False, + 'api.version.request': True, 'on_commit': print_commit_result, 'error_cb': error_cb, 'default.topic.config': { @@ -243,9 +244,10 @@ def verify_consumer(): break if False: - print('%s[%d]@%d: key=%s, value=%s' % \ + tstype, timestamp = msg.timestamp() + print('%s[%d]@%d: key=%s, value=%s, tstype=%d, timestamp=%s' % \ (msg.topic(), msg.partition(), msg.offset(), - msg.key(), msg.value())) + msg.key(), msg.value(), tstype, timestamp)) if (msg.offset() % 5) == 0: # Async commit diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index 8d538123e..78d630877 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -from confluent_kafka import Consumer, TopicPartition, KafkaError, KafkaException +from confluent_kafka import Consumer, TopicPartition, KafkaError, KafkaException, TIMESTAMP_NOT_AVAILABLE def test_basic_api(): @@ -36,6 +36,9 @@ def dummy_assign_revoke (consumer, partitions): else: print('OK: consumed message') + if msg is not None: + assert msg.timestamp() == (TIMESTAMP_NOT_AVAILABLE, -1) + partitions = list(map(lambda p: TopicPartition("test", p), range(0,100,3))) kc.assign(partitions) diff --git a/tests/test_enums.py b/tests/test_enums.py index 64f46221e..f0bd29ec0 100644 --- a/tests/test_enums.py +++ b/tests/test_enums.py @@ -7,3 +7,9 @@ def test_enums(): KafkaError class without an instantiated object. """ print(confluent_kafka.KafkaError._NO_OFFSET) print(confluent_kafka.KafkaError.REBALANCE_IN_PROGRESS) + +def test_tstype_enums(): + """ Make sure librdkafka tstype enums are available. """ + assert confluent_kafka.TIMESTAMP_NOT_AVAILABLE == 0 + assert confluent_kafka.TIMESTAMP_CREATE_TIME == 1 + assert confluent_kafka.TIMESTAMP_LOG_APPEND_TIME == 2