diff --git a/eng/tox/install_depend_packages.py b/eng/tox/install_depend_packages.py index 6c785bba33ec..85522869de04 100644 --- a/eng/tox/install_depend_packages.py +++ b/eng/tox/install_depend_packages.py @@ -49,7 +49,7 @@ MAXIMUM_VERSION_GENERIC_OVERRIDES = {} -# SPECIFIC OVERRIDES provide additional filtering of upper and lower bound by +# SPECIFIC OVERRIDES provide additional filtering of upper and lower bound by # binding an override to the specific package being processed. As an example, when # processing the latest or minimum deps for "azure-eventhub", the minimum version of "azure-core" # will be overridden to 1.25.0. @@ -59,6 +59,7 @@ "azure-eventhub-checkpointstoreblob": {"azure-core": "1.25.0", "azure-eventhub": "5.11.0"}, "azure-eventhub-checkpointstoretable": {"azure-core": "1.25.0", "azure-eventhub": "5.11.0"}, "azure-identity": {"msal": "1.23.0"}, + "azure-core-tracing-opentelemetry": {"azure-core": "1.28.0"}, } MAXIMUM_VERSION_SPECIFIC_OVERRIDES = {} @@ -212,7 +213,7 @@ def process_bounded_versions(originating_pkg_name: str, pkg_name: str, versions: v for v in versions if parse_version(v) <= parse_version(restrictions[pkg_name]) ] - # upper bound package-specific + # upper bound package-specific if ( originating_pkg_name in MAXIMUM_VERSION_SPECIFIC_OVERRIDES and pkg_name in MAXIMUM_VERSION_SPECIFIC_OVERRIDES[originating_pkg_name] diff --git a/sdk/core/azure-core-tracing-opentelemetry/dev_requirements.txt b/sdk/core/azure-core-tracing-opentelemetry/dev_requirements.txt index 29d610b6098d..04afdf475ee4 100644 --- a/sdk/core/azure-core-tracing-opentelemetry/dev_requirements.txt +++ b/sdk/core/azure-core-tracing-opentelemetry/dev_requirements.txt @@ -5,3 +5,4 @@ opentelemetry-sdk<2.0.0,>=1.12.0 opentelemetry-instrumentation-requests>=0.32b0 requests azure-storage-blob +-e ../../servicebus/azure-servicebus diff --git a/sdk/core/azure-core-tracing-opentelemetry/test-resources.bicep b/sdk/core/azure-core-tracing-opentelemetry/test-resources.bicep index 2a8f89b9c6dd..3ebea8269269 100644 --- a/sdk/core/azure-core-tracing-opentelemetry/test-resources.bicep +++ b/sdk/core/azure-core-tracing-opentelemetry/test-resources.bicep @@ -1,3 +1,5 @@ +@minLength(6) +@maxLength(21) @description('The base resource name.') param baseName string = resourceGroup().name @@ -7,8 +9,10 @@ param location string = resourceGroup().location @description('The client OID to grant access to test resources.') param testApplicationOid string +var sbVersion = '2017-04-01' + resource storageAccount 'Microsoft.Storage/storageAccounts@2022-09-01' = { - name: '${baseName}storage' + name: '${baseName}sa' location: location kind: 'StorageV2' sku: { @@ -19,10 +23,69 @@ resource storageAccount 'Microsoft.Storage/storageAccounts@2022-09-01' = { } } +resource serviceBusNamespace 'Microsoft.ServiceBus/namespaces@2017-04-01' = { + name: '${baseName}sbnamespace' + location: location + sku: { + name: 'Standard' + } + properties: {} +} + +resource serviceBusQueue 'Microsoft.ServiceBus/namespaces/queues@2017-04-01' = { + parent: serviceBusNamespace + name: '${baseName}sbqueue' + properties: { + lockDuration: 'PT5M' + maxSizeInMegabytes: 4096 + requiresDuplicateDetection: false + requiresSession: false + defaultMessageTimeToLive: 'P10675199DT2H48M5.4775807S' + deadLetteringOnMessageExpiration: false + duplicateDetectionHistoryTimeWindow: 'PT10M' + maxDeliveryCount: 10 + autoDeleteOnIdle: 'P10675199DT2H48M5.4775807S' + enablePartitioning: false + enableExpress: false + } +} + +resource serviceBusTopic 'Microsoft.ServiceBus/namespaces/topics@2017-04-01' = { + parent: serviceBusNamespace + name: '${baseName}sbtopic' + properties: { + autoDeleteOnIdle: 'P10675199DT2H48M5.4775807S' + defaultMessageTimeToLive: 'P10675199DT2H48M5.4775807S' + duplicateDetectionHistoryTimeWindow: 'PT10M' + enableBatchedOperations: true + enableExpress: false + enablePartitioning: false + maxSizeInMegabytes: 4096 + requiresDuplicateDetection: false + status: 'Active' + supportOrdering: true + } +} + +resource serviceBusSubscription 'Microsoft.ServiceBus/namespaces/topics/subscriptions@2017-04-01' = { + parent: serviceBusTopic + name: '${baseName}sbtopic' + properties: {} +} + + var name = storageAccount.name var key = storageAccount.listKeys().keys[0].value -var connectionString = 'DefaultEndpointsProtocol=https;AccountName=${name};AccountKey=${key}' +var storageConnectionString = 'DefaultEndpointsProtocol=https;AccountName=${name};AccountKey=${key}' +var serviceBusConnectionString = listkeys(authRuleResourceId, sbVersion).primaryConnectionString + + +var authRuleResourceId = resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', serviceBusNamespace.name, 'RootManageSharedAccessKey') output AZURE_STORAGE_ACCOUNT_NAME string = name output AZURE_STORAGE_ACCOUNT_KEY string = key -output AZURE_STORAGE_CONNECTION_STRING string = connectionString +output AZURE_STORAGE_CONNECTION_STRING string = storageConnectionString +output AZURE_SERVICEBUS_CONNECTION_STRING string = serviceBusConnectionString +output AZURE_SERVICEBUS_QUEUE_NAME string = serviceBusQueue.name +output AZURE_SERVICEBUS_TOPIC_NAME string = serviceBusTopic.name +output AZURE_SERVICEBUS_SUBSCRIPTION_NAME string = serviceBusSubscription.name diff --git a/sdk/core/azure-core-tracing-opentelemetry/tests/conftest.py b/sdk/core/azure-core-tracing-opentelemetry/tests/conftest.py index d8c7ae758e6e..9a141ee6d633 100644 --- a/sdk/core/azure-core-tracing-opentelemetry/tests/conftest.py +++ b/sdk/core/azure-core-tracing-opentelemetry/tests/conftest.py @@ -34,7 +34,11 @@ def exporter(): @pytest.fixture(scope="session") def config(): return { - "storage_account_name": os.environ["AZURE_STORAGE_ACCOUNT_NAME"], - "storage_account_key": os.environ["AZURE_STORAGE_ACCOUNT_KEY"], - "storage_connection_string": os.environ["AZURE_STORAGE_CONNECTION_STRING"], + "storage_account_name": os.environ.get("AZURE_STORAGE_ACCOUNT_NAME"), + "storage_account_key": os.environ.get("AZURE_STORAGE_ACCOUNT_KEY"), + "storage_connection_string": os.environ.get("AZURE_STORAGE_CONNECTION_STRING"), + "servicebus_connection_string": os.environ.get("AZURE_SERVICEBUS_CONNECTION_STRING"), + "servicebus_queue_name": os.environ.get("AZURE_SERVICEBUS_QUEUE_NAME"), + "servicebus_topic_name": os.environ.get("AZURE_SERVICEBUS_TOPIC_NAME"), + "servicebus_subscription_name": os.environ.get("AZURE_SERVICEBUS_SUBSCRIPTION_NAME"), } diff --git a/sdk/core/azure-core-tracing-opentelemetry/tests/test_servicebus_live.py b/sdk/core/azure-core-tracing-opentelemetry/tests/test_servicebus_live.py new file mode 100644 index 000000000000..47d1771580d5 --- /dev/null +++ b/sdk/core/azure-core-tracing-opentelemetry/tests/test_servicebus_live.py @@ -0,0 +1,210 @@ +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +import pytest + +from azure.servicebus import ServiceBusClient, ServiceBusMessage +from opentelemetry.trace import SpanKind + + +class TestServiceBusTracing: + def _verify_span_attributes(self, *, span): + # Ensure all attributes are set and have a value. + for attr in span.attributes: + assert span.attributes[attr] is not None and span.attributes[attr] != "" + + def _verify_message(self, *, span, dest, server_address): + assert span.name == "ServiceBus.message" + assert span.kind == SpanKind.PRODUCER + self._verify_span_attributes(span=span) + assert span.attributes["az.namespace"] == "Microsoft.ServiceBus" + assert span.attributes["messaging.system"] == "servicebus" + assert span.attributes["messaging.destination.name"] == dest + assert span.attributes["server.address"] == server_address + + def _verify_send(self, *, span, dest, server_address, message_count): + assert span.name == "ServiceBus.send" + assert span.kind == SpanKind.CLIENT + self._verify_span_attributes(span=span) + assert span.attributes["az.namespace"] == "Microsoft.ServiceBus" + assert span.attributes["messaging.system"] == "servicebus" + assert span.attributes["messaging.destination.name"] == dest + assert span.attributes["messaging.operation"] == "publish" + assert span.attributes["server.address"] == server_address + if message_count > 1: + assert span.attributes["messaging.batch.message_count"] == message_count + + def _verify_receive(self, *, span, dest, server_address, message_count): + assert span.name == "ServiceBus.receive" + assert span.kind == SpanKind.CLIENT + self._verify_span_attributes(span=span) + assert span.attributes["az.namespace"] == "Microsoft.ServiceBus" + assert span.attributes["messaging.system"] == "servicebus" + assert span.attributes["messaging.destination.name"] == dest + assert span.attributes["messaging.operation"] == "receive" + assert span.attributes["server.address"] == server_address + for link in span.links: + assert "enqueuedTime" in link.attributes + if message_count > 1: + assert span.attributes["messaging.batch.message_count"] == message_count + + def _verify_complete(self, *, span, dest, server_address): + assert span.name == "ServiceBus.complete" + assert span.kind == SpanKind.CLIENT + self._verify_span_attributes(span=span) + assert span.attributes["az.namespace"] == "Microsoft.ServiceBus" + assert span.attributes["messaging.system"] == "servicebus" + assert span.attributes["messaging.operation"] == "settle" + assert span.attributes["server.address"] == server_address + assert span.attributes["messaging.destination.name"] == dest + + @pytest.mark.live_test_only + def test_servicebus_client_tracing_queue(self, config, exporter, tracer): + connection_string = config["servicebus_connection_string"] + queue_name = config["servicebus_queue_name"] + client = ServiceBusClient.from_connection_string(connection_string) + + with tracer.start_as_current_span(name="root"): + with client.get_queue_sender(queue_name) as sender: + + # Sending a single message + sender.send_messages(ServiceBusMessage("Test foo message")) + + # Sending a batch of messages + message_batch = sender.create_message_batch() + message_batch.add_message(ServiceBusMessage("First batch foo message")) + message_batch.add_message(ServiceBusMessage("Second batch foo message")) + sender.send_messages(message_batch) + + send_spans = exporter.get_finished_spans() + server_address = sender.fully_qualified_namespace + + # We expect 5 spans to have finished: 2 send spans, and 3 message spans. + assert len(send_spans) == 5 + + # Verify the spans from the first send. + self._verify_message(span=send_spans[0], dest=queue_name, server_address=server_address) + self._verify_send(span=send_spans[1], dest=queue_name, server_address=server_address, message_count=1) + + # Verify span links from single send. + link = send_spans[1].links[0] + assert link.context.span_id == send_spans[0].context.span_id + assert link.context.trace_id == send_spans[0].context.trace_id + + # Verify the spans from the second send. + self._verify_message(span=send_spans[2], dest=queue_name, server_address=server_address) + self._verify_message(span=send_spans[3], dest=queue_name, server_address=server_address) + self._verify_send(span=send_spans[4], dest=queue_name, server_address=server_address, message_count=2) + + # Verify span links from batch send. + assert len(send_spans[4].links) == 2 + link = send_spans[4].links[0] + assert link.context.span_id == send_spans[2].context.span_id + assert link.context.trace_id == send_spans[2].context.trace_id + + link = send_spans[4].links[1] + assert link.context.span_id == send_spans[3].context.span_id + assert link.context.trace_id == send_spans[3].context.trace_id + + exporter.clear() + + # Receive all the sent spans. + receiver = client.get_queue_receiver(queue_name=queue_name) + with receiver: + received_msgs = receiver.receive_messages(max_message_count=3, max_wait_time=10) + for msg in received_msgs: + assert "foo" in str(msg) + receiver.complete_message(msg) + + receive_spans = exporter.get_finished_spans() + + # We expect 4 spans to have finished: 1 receive span, and 3 settlement spans. + assert len(receive_spans) == 4 + self._verify_receive(span=receive_spans[0], dest=queue_name, server_address=server_address, message_count=3) + + # Verify span links from receive. + assert len(receive_spans[0].links) == 3 + assert receive_spans[0].links[0].context.span_id == send_spans[0].context.span_id + assert receive_spans[0].links[1].context.span_id == send_spans[2].context.span_id + assert receive_spans[0].links[2].context.span_id == send_spans[3].context.span_id + + # Verify settlement spans. + self._verify_complete(span=receive_spans[1], dest=queue_name, server_address=server_address) + self._verify_complete(span=receive_spans[2], dest=queue_name, server_address=server_address) + self._verify_complete(span=receive_spans[3], dest=queue_name, server_address=server_address) + + @pytest.mark.live_test_only + def test_servicebus_client_tracing_topic(self, config, exporter, tracer): + connection_string = config["servicebus_connection_string"] + topic_name = config["servicebus_topic_name"] + subscription_name = config["servicebus_subscription_name"] + client = ServiceBusClient.from_connection_string(connection_string) + + with tracer.start_as_current_span(name="root"): + with client.get_topic_sender(topic_name) as sender: + + # Sending a single message + sender.send_messages(ServiceBusMessage("Test foo message")) + + # Sending a batch of messages + message_batch = sender.create_message_batch() + message_batch.add_message(ServiceBusMessage("First batch foo message")) + message_batch.add_message(ServiceBusMessage("Second batch foo message")) + sender.send_messages(message_batch) + + send_spans = exporter.get_finished_spans() + server_address = sender.fully_qualified_namespace + + # We expect 5 spans to have finished: 2 send spans, and 3 message spans. + assert len(send_spans) == 5 + + # Verify the spans from the first send. + self._verify_message(span=send_spans[0], dest=topic_name, server_address=server_address) + self._verify_send(span=send_spans[1], dest=topic_name, server_address=server_address, message_count=1) + + # Verify span links from single send. + link = send_spans[1].links[0] + assert link.context.span_id == send_spans[0].context.span_id + assert link.context.trace_id == send_spans[0].context.trace_id + + # Verify the spans from the second send. + self._verify_message(span=send_spans[2], dest=topic_name, server_address=server_address) + self._verify_message(span=send_spans[3], dest=topic_name, server_address=server_address) + self._verify_send(span=send_spans[4], dest=topic_name, server_address=server_address, message_count=2) + + # Verify span links from batch send. + assert len(send_spans[4].links) == 2 + link = send_spans[4].links[0] + assert link.context.span_id == send_spans[2].context.span_id + assert link.context.trace_id == send_spans[2].context.trace_id + + link = send_spans[4].links[1] + assert link.context.span_id == send_spans[3].context.span_id + assert link.context.trace_id == send_spans[3].context.trace_id + + exporter.clear() + + # Receive all the sent spans. + receiver = client.get_subscription_receiver(topic_name, subscription_name) + with receiver: + received_msgs = receiver.receive_messages(max_message_count=3, max_wait_time=10) + for msg in received_msgs: + assert "foo" in str(msg) + receiver.complete_message(msg) + + receive_spans = exporter.get_finished_spans() + + # We expect 4 spans to have finished: 1 receive span, and 3 settlement spans. + assert len(receive_spans) == 4 + self._verify_receive(span=receive_spans[0], dest=topic_name, server_address=server_address, message_count=3) + + assert len(receive_spans[0].links) == 3 + assert receive_spans[0].links[0].context.span_id == send_spans[0].context.span_id + assert receive_spans[0].links[1].context.span_id == send_spans[2].context.span_id + assert receive_spans[0].links[2].context.span_id == send_spans[3].context.span_id + + # Verify settlement spans. + self._verify_complete(span=receive_spans[1], dest=topic_name, server_address=server_address) + self._verify_complete(span=receive_spans[2], dest=topic_name, server_address=server_address) + self._verify_complete(span=receive_spans[3], dest=topic_name, server_address=server_address) diff --git a/sdk/core/azure-core-tracing-opentelemetry/tests/test_storage_live.py b/sdk/core/azure-core-tracing-opentelemetry/tests/test_storage_live.py index ea5deb7293fa..0e52d7a75b69 100644 --- a/sdk/core/azure-core-tracing-opentelemetry/tests/test_storage_live.py +++ b/sdk/core/azure-core-tracing-opentelemetry/tests/test_storage_live.py @@ -27,8 +27,18 @@ def test_blob_service_client_tracing(self, config, exporter, tracer): http_span: ReadableSpan = spans[0] assert http_span.kind == SpanKind.CLIENT + assert http_span.parent assert http_span.parent.span_id == spans[1].context.span_id + assert http_span.attributes + assert http_span.attributes["http.request.method"] == "GET" + assert http_span.attributes["url.full"] + assert http_span.attributes["server.address"] + assert http_span.attributes["http.response.status_code"] == 200 + assert http_span.attributes["az.client_request_id"] + assert http_span.attributes["az.service_request_id"] + method_span: ReadableSpan = spans[1] assert method_span.kind == SpanKind.INTERNAL + assert method_span.parent assert method_span.parent.span_id == spans[2].context.span_id