Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions eng/tox/install_depend_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@

MAXIMUM_VERSION_GENERIC_OVERRIDES = {}

# SPECIFIC OVERRIDES provide additional filtering of upper and lower bound by
# SPECIFIC OVERRIDES provide additional filtering of upper and lower bound by
# binding an override to the specific package being processed. As an example, when
# processing the latest or minimum deps for "azure-eventhub", the minimum version of "azure-core"
# will be overridden to 1.25.0.
Expand All @@ -59,6 +59,7 @@
"azure-eventhub-checkpointstoreblob": {"azure-core": "1.25.0", "azure-eventhub": "5.11.0"},
"azure-eventhub-checkpointstoretable": {"azure-core": "1.25.0", "azure-eventhub": "5.11.0"},
"azure-identity": {"msal": "1.23.0"},
"azure-core-tracing-opentelemetry": {"azure-core": "1.28.0"},
}

MAXIMUM_VERSION_SPECIFIC_OVERRIDES = {}
Expand Down Expand Up @@ -212,7 +213,7 @@ def process_bounded_versions(originating_pkg_name: str, pkg_name: str, versions:
v for v in versions if parse_version(v) <= parse_version(restrictions[pkg_name])
]

# upper bound package-specific
# upper bound package-specific
if (
originating_pkg_name in MAXIMUM_VERSION_SPECIFIC_OVERRIDES
and pkg_name in MAXIMUM_VERSION_SPECIFIC_OVERRIDES[originating_pkg_name]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ opentelemetry-sdk<2.0.0,>=1.12.0
opentelemetry-instrumentation-requests>=0.32b0
requests
azure-storage-blob
-e ../../servicebus/azure-servicebus
69 changes: 66 additions & 3 deletions sdk/core/azure-core-tracing-opentelemetry/test-resources.bicep
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
@minLength(6)
@maxLength(21)
@description('The base resource name.')
param baseName string = resourceGroup().name

Expand All @@ -7,8 +9,10 @@ param location string = resourceGroup().location
@description('The client OID to grant access to test resources.')
param testApplicationOid string

var sbVersion = '2017-04-01'

resource storageAccount 'Microsoft.Storage/storageAccounts@2022-09-01' = {
name: '${baseName}storage'
name: '${baseName}sa'
location: location
kind: 'StorageV2'
sku: {
Expand All @@ -19,10 +23,69 @@ resource storageAccount 'Microsoft.Storage/storageAccounts@2022-09-01' = {
}
}

resource serviceBusNamespace 'Microsoft.ServiceBus/namespaces@2017-04-01' = {
name: '${baseName}sbnamespace'
location: location
sku: {
name: 'Standard'
}
properties: {}
}

resource serviceBusQueue 'Microsoft.ServiceBus/namespaces/queues@2017-04-01' = {
parent: serviceBusNamespace
name: '${baseName}sbqueue'
properties: {
lockDuration: 'PT5M'
maxSizeInMegabytes: 4096
requiresDuplicateDetection: false
requiresSession: false
defaultMessageTimeToLive: 'P10675199DT2H48M5.4775807S'
deadLetteringOnMessageExpiration: false
duplicateDetectionHistoryTimeWindow: 'PT10M'
maxDeliveryCount: 10
autoDeleteOnIdle: 'P10675199DT2H48M5.4775807S'
enablePartitioning: false
enableExpress: false
}
}

resource serviceBusTopic 'Microsoft.ServiceBus/namespaces/topics@2017-04-01' = {
parent: serviceBusNamespace
name: '${baseName}sbtopic'
properties: {
autoDeleteOnIdle: 'P10675199DT2H48M5.4775807S'
defaultMessageTimeToLive: 'P10675199DT2H48M5.4775807S'
duplicateDetectionHistoryTimeWindow: 'PT10M'
enableBatchedOperations: true
enableExpress: false
enablePartitioning: false
maxSizeInMegabytes: 4096
requiresDuplicateDetection: false
status: 'Active'
supportOrdering: true
}
}

resource serviceBusSubscription 'Microsoft.ServiceBus/namespaces/topics/subscriptions@2017-04-01' = {
parent: serviceBusTopic
name: '${baseName}sbtopic'
properties: {}
}


var name = storageAccount.name
var key = storageAccount.listKeys().keys[0].value
var connectionString = 'DefaultEndpointsProtocol=https;AccountName=${name};AccountKey=${key}'
var storageConnectionString = 'DefaultEndpointsProtocol=https;AccountName=${name};AccountKey=${key}'
var serviceBusConnectionString = listkeys(authRuleResourceId, sbVersion).primaryConnectionString


var authRuleResourceId = resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', serviceBusNamespace.name, 'RootManageSharedAccessKey')

output AZURE_STORAGE_ACCOUNT_NAME string = name
output AZURE_STORAGE_ACCOUNT_KEY string = key
output AZURE_STORAGE_CONNECTION_STRING string = connectionString
output AZURE_STORAGE_CONNECTION_STRING string = storageConnectionString
output AZURE_SERVICEBUS_CONNECTION_STRING string = serviceBusConnectionString
output AZURE_SERVICEBUS_QUEUE_NAME string = serviceBusQueue.name
output AZURE_SERVICEBUS_TOPIC_NAME string = serviceBusTopic.name
output AZURE_SERVICEBUS_SUBSCRIPTION_NAME string = serviceBusSubscription.name
10 changes: 7 additions & 3 deletions sdk/core/azure-core-tracing-opentelemetry/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ def exporter():
@pytest.fixture(scope="session")
def config():
return {
"storage_account_name": os.environ["AZURE_STORAGE_ACCOUNT_NAME"],
"storage_account_key": os.environ["AZURE_STORAGE_ACCOUNT_KEY"],
"storage_connection_string": os.environ["AZURE_STORAGE_CONNECTION_STRING"],
"storage_account_name": os.environ.get("AZURE_STORAGE_ACCOUNT_NAME"),
"storage_account_key": os.environ.get("AZURE_STORAGE_ACCOUNT_KEY"),
"storage_connection_string": os.environ.get("AZURE_STORAGE_CONNECTION_STRING"),
"servicebus_connection_string": os.environ.get("AZURE_SERVICEBUS_CONNECTION_STRING"),
"servicebus_queue_name": os.environ.get("AZURE_SERVICEBUS_QUEUE_NAME"),
"servicebus_topic_name": os.environ.get("AZURE_SERVICEBUS_TOPIC_NAME"),
"servicebus_subscription_name": os.environ.get("AZURE_SERVICEBUS_SUBSCRIPTION_NAME"),
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import pytest

from azure.servicebus import ServiceBusClient, ServiceBusMessage
from opentelemetry.trace import SpanKind


class TestServiceBusTracing:
def _verify_span_attributes(self, *, span):
# Ensure all attributes are set and have a value.
for attr in span.attributes:
assert span.attributes[attr] is not None and span.attributes[attr] != ""

def _verify_message(self, *, span, dest, server_address):
assert span.name == "ServiceBus.message"
assert span.kind == SpanKind.PRODUCER
self._verify_span_attributes(span=span)
assert span.attributes["az.namespace"] == "Microsoft.ServiceBus"
assert span.attributes["messaging.system"] == "servicebus"
assert span.attributes["messaging.destination.name"] == dest
assert span.attributes["server.address"] == server_address

def _verify_send(self, *, span, dest, server_address, message_count):
assert span.name == "ServiceBus.send"
assert span.kind == SpanKind.CLIENT
self._verify_span_attributes(span=span)
assert span.attributes["az.namespace"] == "Microsoft.ServiceBus"
assert span.attributes["messaging.system"] == "servicebus"
assert span.attributes["messaging.destination.name"] == dest
assert span.attributes["messaging.operation"] == "publish"
assert span.attributes["server.address"] == server_address
if message_count > 1:
assert span.attributes["messaging.batch.message_count"] == message_count

def _verify_receive(self, *, span, dest, server_address, message_count):
assert span.name == "ServiceBus.receive"
assert span.kind == SpanKind.CLIENT
self._verify_span_attributes(span=span)
assert span.attributes["az.namespace"] == "Microsoft.ServiceBus"
assert span.attributes["messaging.system"] == "servicebus"
assert span.attributes["messaging.destination.name"] == dest
assert span.attributes["messaging.operation"] == "receive"
assert span.attributes["server.address"] == server_address
for link in span.links:
assert "enqueuedTime" in link.attributes
if message_count > 1:
assert span.attributes["messaging.batch.message_count"] == message_count

def _verify_complete(self, *, span, dest, server_address):
assert span.name == "ServiceBus.complete"
assert span.kind == SpanKind.CLIENT
self._verify_span_attributes(span=span)
assert span.attributes["az.namespace"] == "Microsoft.ServiceBus"
assert span.attributes["messaging.system"] == "servicebus"
assert span.attributes["messaging.operation"] == "settle"
assert span.attributes["server.address"] == server_address
assert span.attributes["messaging.destination.name"] == dest

@pytest.mark.live_test_only
def test_servicebus_client_tracing_queue(self, config, exporter, tracer):
connection_string = config["servicebus_connection_string"]
queue_name = config["servicebus_queue_name"]
client = ServiceBusClient.from_connection_string(connection_string)

with tracer.start_as_current_span(name="root"):
with client.get_queue_sender(queue_name) as sender:

# Sending a single message
sender.send_messages(ServiceBusMessage("Test foo message"))

# Sending a batch of messages
message_batch = sender.create_message_batch()
message_batch.add_message(ServiceBusMessage("First batch foo message"))
message_batch.add_message(ServiceBusMessage("Second batch foo message"))
sender.send_messages(message_batch)

send_spans = exporter.get_finished_spans()
server_address = sender.fully_qualified_namespace

# We expect 5 spans to have finished: 2 send spans, and 3 message spans.
assert len(send_spans) == 5

# Verify the spans from the first send.
self._verify_message(span=send_spans[0], dest=queue_name, server_address=server_address)
self._verify_send(span=send_spans[1], dest=queue_name, server_address=server_address, message_count=1)

# Verify span links from single send.
link = send_spans[1].links[0]
assert link.context.span_id == send_spans[0].context.span_id
assert link.context.trace_id == send_spans[0].context.trace_id

# Verify the spans from the second send.
self._verify_message(span=send_spans[2], dest=queue_name, server_address=server_address)
self._verify_message(span=send_spans[3], dest=queue_name, server_address=server_address)
self._verify_send(span=send_spans[4], dest=queue_name, server_address=server_address, message_count=2)

# Verify span links from batch send.
assert len(send_spans[4].links) == 2
link = send_spans[4].links[0]
assert link.context.span_id == send_spans[2].context.span_id
assert link.context.trace_id == send_spans[2].context.trace_id

link = send_spans[4].links[1]
assert link.context.span_id == send_spans[3].context.span_id
assert link.context.trace_id == send_spans[3].context.trace_id

exporter.clear()

# Receive all the sent spans.
receiver = client.get_queue_receiver(queue_name=queue_name)
with receiver:
received_msgs = receiver.receive_messages(max_message_count=3, max_wait_time=10)
for msg in received_msgs:
assert "foo" in str(msg)
receiver.complete_message(msg)

receive_spans = exporter.get_finished_spans()

# We expect 4 spans to have finished: 1 receive span, and 3 settlement spans.
assert len(receive_spans) == 4
self._verify_receive(span=receive_spans[0], dest=queue_name, server_address=server_address, message_count=3)

# Verify span links from receive.
assert len(receive_spans[0].links) == 3
assert receive_spans[0].links[0].context.span_id == send_spans[0].context.span_id
assert receive_spans[0].links[1].context.span_id == send_spans[2].context.span_id
assert receive_spans[0].links[2].context.span_id == send_spans[3].context.span_id

# Verify settlement spans.
self._verify_complete(span=receive_spans[1], dest=queue_name, server_address=server_address)
self._verify_complete(span=receive_spans[2], dest=queue_name, server_address=server_address)
self._verify_complete(span=receive_spans[3], dest=queue_name, server_address=server_address)

@pytest.mark.live_test_only
def test_servicebus_client_tracing_topic(self, config, exporter, tracer):
connection_string = config["servicebus_connection_string"]
topic_name = config["servicebus_topic_name"]
subscription_name = config["servicebus_subscription_name"]
client = ServiceBusClient.from_connection_string(connection_string)

with tracer.start_as_current_span(name="root"):
with client.get_topic_sender(topic_name) as sender:

# Sending a single message
sender.send_messages(ServiceBusMessage("Test foo message"))

# Sending a batch of messages
message_batch = sender.create_message_batch()
message_batch.add_message(ServiceBusMessage("First batch foo message"))
message_batch.add_message(ServiceBusMessage("Second batch foo message"))
sender.send_messages(message_batch)

send_spans = exporter.get_finished_spans()
server_address = sender.fully_qualified_namespace

# We expect 5 spans to have finished: 2 send spans, and 3 message spans.
assert len(send_spans) == 5

# Verify the spans from the first send.
self._verify_message(span=send_spans[0], dest=topic_name, server_address=server_address)
self._verify_send(span=send_spans[1], dest=topic_name, server_address=server_address, message_count=1)

# Verify span links from single send.
link = send_spans[1].links[0]
assert link.context.span_id == send_spans[0].context.span_id
assert link.context.trace_id == send_spans[0].context.trace_id

# Verify the spans from the second send.
self._verify_message(span=send_spans[2], dest=topic_name, server_address=server_address)
self._verify_message(span=send_spans[3], dest=topic_name, server_address=server_address)
self._verify_send(span=send_spans[4], dest=topic_name, server_address=server_address, message_count=2)

# Verify span links from batch send.
assert len(send_spans[4].links) == 2
link = send_spans[4].links[0]
assert link.context.span_id == send_spans[2].context.span_id
assert link.context.trace_id == send_spans[2].context.trace_id

link = send_spans[4].links[1]
assert link.context.span_id == send_spans[3].context.span_id
assert link.context.trace_id == send_spans[3].context.trace_id

exporter.clear()

# Receive all the sent spans.
receiver = client.get_subscription_receiver(topic_name, subscription_name)
with receiver:
received_msgs = receiver.receive_messages(max_message_count=3, max_wait_time=10)
for msg in received_msgs:
assert "foo" in str(msg)
receiver.complete_message(msg)

receive_spans = exporter.get_finished_spans()

# We expect 4 spans to have finished: 1 receive span, and 3 settlement spans.
assert len(receive_spans) == 4
self._verify_receive(span=receive_spans[0], dest=topic_name, server_address=server_address, message_count=3)

assert len(receive_spans[0].links) == 3
assert receive_spans[0].links[0].context.span_id == send_spans[0].context.span_id
assert receive_spans[0].links[1].context.span_id == send_spans[2].context.span_id
assert receive_spans[0].links[2].context.span_id == send_spans[3].context.span_id

# Verify settlement spans.
self._verify_complete(span=receive_spans[1], dest=topic_name, server_address=server_address)
self._verify_complete(span=receive_spans[2], dest=topic_name, server_address=server_address)
self._verify_complete(span=receive_spans[3], dest=topic_name, server_address=server_address)
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,18 @@ def test_blob_service_client_tracing(self, config, exporter, tracer):

http_span: ReadableSpan = spans[0]
assert http_span.kind == SpanKind.CLIENT
assert http_span.parent
assert http_span.parent.span_id == spans[1].context.span_id

assert http_span.attributes
assert http_span.attributes["http.request.method"] == "GET"
assert http_span.attributes["url.full"]
assert http_span.attributes["server.address"]
assert http_span.attributes["http.response.status_code"] == 200
assert http_span.attributes["az.client_request_id"]
assert http_span.attributes["az.service_request_id"]

method_span: ReadableSpan = spans[1]
assert method_span.kind == SpanKind.INTERNAL
assert method_span.parent
assert method_span.parent.span_id == spans[2].context.span_id