diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py index 78bf705add9f..74491e4dff99 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py @@ -29,6 +29,19 @@ PROP_PARTITION_KEY, PROP_PARTITION_KEY_AMQP_SYMBOL, PROP_TIMESTAMP, + PROP_ABSOLUTE_EXPIRY_TIME, + PROP_CONTENT_ENCODING, + PROP_CONTENT_TYPE, + PROP_CORRELATION_ID, + PROP_GROUP_ID, + PROP_GROUP_SEQUENCE, + PROP_MESSAGE_ID, + PROP_REPLY_TO, + PROP_REPLY_TO_GROUP_ID, + PROP_SUBJECT, + PROP_TO, + PROP_USER_ID, + PROP_CREATION_TIME, ) if TYPE_CHECKING: @@ -39,6 +52,22 @@ # event_data.encoded_size < 255, batch encode overhead is 5, >=256, overhead is 8 each _BATCH_MESSAGE_OVERHEAD_COST = [5, 8] +_SYS_PROP_KEYS_TO_MSG_PROPERTIES = ( + (PROP_MESSAGE_ID, "message_id"), + (PROP_USER_ID, "user_id"), + (PROP_TO, "to"), + (PROP_SUBJECT, "subject"), + (PROP_REPLY_TO, "reply_to"), + (PROP_CORRELATION_ID, "correlation_id"), + (PROP_CONTENT_TYPE, "content_type"), + (PROP_CONTENT_ENCODING, "content_encoding"), + (PROP_ABSOLUTE_EXPIRY_TIME, "absolute_expiry_time"), + (PROP_CREATION_TIME, "creation_time"), + (PROP_GROUP_ID, "group_id"), + (PROP_GROUP_SEQUENCE, "group_sequence"), + (PROP_REPLY_TO_GROUP_ID, "reply_to_group_id"), +) + class EventData(object): """The EventData class is a container for event content. @@ -60,6 +89,7 @@ class EventData(object): def __init__(self, body=None): # type: (Union[str, bytes, List[AnyStr]]) -> None self._last_enqueued_event_properties = {} # type: Dict[str, Any] + self._sys_properties = None # type: Optional[Dict[bytes, Any]] if body and isinstance(body, list): self.message = Message(body[0]) for more in body[1:]: @@ -207,12 +237,42 @@ def properties(self, value): @property def system_properties(self): - # type: () -> Dict[Union[str, bytes], Any] - """Metadata set by the Event Hubs Service associated with the event + # type: () -> Dict[bytes, Any] + """Metadata set by the Event Hubs Service associated with the event. + + An EventData could have some or all of the following meta data depending on the source + of the event data. + + - b"x-opt-sequence-number" (int) + - b"x-opt-offset" (bytes) + - b"x-opt-partition-key" (bytes) + - b"x-opt-enqueued-time" (int) + - b"message-id" (bytes) + - b"user-id" (bytes) + - b"to" (bytes) + - b"subject" (bytes) + - b"reply-to" (bytes) + - b"correlation-id" (bytes) + - b"content-type" (bytes) + - b"content-encoding" (bytes) + - b"absolute-expiry-time" (int) + - b"creation-time" (int) + - b"group-id" (bytes) + - b"group-sequence" (bytes) + - b"reply-to-group-id" (bytes) :rtype: dict """ - return self.message.annotations + + if self._sys_properties is None: + self._sys_properties = {} + if self.message.properties: + for key, prop_name in _SYS_PROP_KEYS_TO_MSG_PROPERTIES: + value = getattr(self.message.properties, prop_name, None) + if value: + self._sys_properties[key] = value + self._sys_properties.update(self.message.annotations) + return self._sys_properties @property def body(self): diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_constants.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_constants.py index b6466b63ef0b..97a614c341b5 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_constants.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_constants.py @@ -17,6 +17,20 @@ PROP_LAST_ENQUEUED_TIME_UTC = b"last_enqueued_time_utc" PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC = b"runtime_info_retrieval_time_utc" +PROP_MESSAGE_ID = b"message-id" +PROP_USER_ID = b"user-id" +PROP_TO = b"to" +PROP_SUBJECT = b"subject" +PROP_REPLY_TO = b"reply-to" +PROP_CORRELATION_ID = b"correlation-id" +PROP_CONTENT_TYPE = b"content-type" +PROP_CONTENT_ENCODING = b"content-encoding" +PROP_ABSOLUTE_EXPIRY_TIME = b"absolute-expiry-time" +PROP_CREATION_TIME = b"creation-time" +PROP_GROUP_ID = b"group-id" +PROP_GROUP_SEQUENCE = b"group-sequence" +PROP_REPLY_TO_GROUP_ID = b"reply-to-group-id" + EPOCH_SYMBOL = b"com.microsoft:epoch" TIMEOUT_SYMBOL = b"com.microsoft:timeout" RECEIVER_RUNTIME_METRIC_SYMBOL = b"com.microsoft:enable-receiver-runtime-metric" diff --git a/sdk/eventhub/azure-eventhub/tests/unittest/test_event_data.py b/sdk/eventhub/azure-eventhub/tests/unittest/test_event_data.py index a797a281b857..50502563681c 100644 --- a/sdk/eventhub/azure-eventhub/tests/unittest/test_event_data.py +++ b/sdk/eventhub/azure-eventhub/tests/unittest/test_event_data.py @@ -1,6 +1,7 @@ import platform import pytest - +import uamqp +from azure.eventhub import _common pytestmark = pytest.mark.skipif(platform.python_implementation() == "PyPy", reason="This is ignored for PyPy") @@ -52,6 +53,41 @@ def test_app_properties(): assert event_data.properties["a"] == "b" +def test_sys_properties(): + properties = uamqp.message.MessageProperties() + properties.message_id = "message_id" + properties.user_id = "user_id" + properties.to = "to" + properties.subject = "subject" + properties.reply_to = "reply_to" + properties.correlation_id = "correlation_id" + properties.content_type = "content_type" + properties.content_encoding = "content_encoding" + properties.absolute_expiry_time = 1 + properties.creation_time = 1 + properties.group_id = "group_id" + properties.group_sequence = 1 + properties.reply_to_group_id = "reply_to_group_id" + message = uamqp.Message(properties=properties) + message.annotations = {_common.PROP_OFFSET: "@latest"} + ed = EventData._from_message(message) # type: EventData + + assert ed.system_properties[_common.PROP_OFFSET] == "@latest" + assert ed.system_properties[_common.PROP_CORRELATION_ID] == properties.correlation_id + assert ed.system_properties[_common.PROP_MESSAGE_ID] == properties.message_id + assert ed.system_properties[_common.PROP_CONTENT_ENCODING] == properties.content_encoding + assert ed.system_properties[_common.PROP_CONTENT_TYPE] == properties.content_type + assert ed.system_properties[_common.PROP_USER_ID] == properties.user_id + assert ed.system_properties[_common.PROP_TO] == properties.to + assert ed.system_properties[_common.PROP_SUBJECT] == properties.subject + assert ed.system_properties[_common.PROP_REPLY_TO] == properties.reply_to + assert ed.system_properties[_common.PROP_ABSOLUTE_EXPIRY_TIME] == properties.absolute_expiry_time + assert ed.system_properties[_common.PROP_CREATION_TIME] == properties.creation_time + assert ed.system_properties[_common.PROP_GROUP_ID] == properties.group_id + assert ed.system_properties[_common.PROP_GROUP_SEQUENCE] == properties.group_sequence + assert ed.system_properties[_common.PROP_REPLY_TO_GROUP_ID] == properties.reply_to_group_id + + def test_event_data_batch(): batch = EventDataBatch(max_size_in_bytes=100, partition_key="par") batch.add(EventData("A"))