diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index 476993dfbf33..ef92eac283c5 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -6,6 +6,8 @@ - Add device.* to part A fields ([#34229](https://github.com/Azure/azure-sdk-for-python/pull/34229)) +- Add live metrics exporting functionality + ([#34141](https://github.com/Azure/azure-sdk-for-python/pull/34141)) - Add application.ver to part A fields ([#34401](https://github.com/Azure/azure-sdk-for-python/pull/34401)) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/__init__.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/__init__.py index 0cee259a4da9..39d410a7ffb8 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/__init__.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/__init__.py @@ -4,8 +4,8 @@ # license information. # ------------------------------------------------------------------------- -from azure.monitor.opentelemetry.exporter._quickpulse._exporter import QuickpulseExporter +from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import enable_live_metrics __all__ = [ - "QuickpulseExporter", + "enable_live_metrics", ] diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py new file mode 100644 index 000000000000..b591258f2ee0 --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py @@ -0,0 +1,36 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# cSpell:disable + +# (OpenTelemetry metric name, Quickpulse metric name) +# Memory +_COMMITTED_BYTES_NAME = ("azuremonitor.memorycommittedbytes", "\\Memory\\Committed Bytes") +# CPU +_PROCESSOR_TIME_NAME = ("azuremonitor.processortotalprocessortime", "\\Processor(_Total)\\% Processor Time") +# Request +_REQUEST_RATE_NAME = ("azuremonitor.requestssec", "\\ApplicationInsights\\Requests/Sec") +_REQUEST_FAILURE_RATE_NAME = ("azuremonitor.requestsfailedsec", "\\ApplicationInsights\\Requests Failed/Sec") +_REQUEST_DURATION_NAME = ("azuremonitor.requestduration", "\\ApplicationInsights\\Request Duration") +# Dependency +_DEPENDENCY_RATE_NAME = ("azuremonitor.dependencycallssec", "\\ApplicationInsights\\Dependency Calls/Sec") +_DEPENDENCY_FAILURE_RATE_NAME = ("azuremonitor.dependencycallsfailedsec", "\\ApplicationInsights\\Dependency Calls Failed/Sec") # pylint: disable=line-too-long +_DEPENDENCY_DURATION_NAME = ("azuremonitor.dependencycallduration", "\\ApplicationInsights\\Dependency Call Duration") +# Exception +_EXCEPTION_RATE_NAME = ("azuremonitor.exceptionssec", "\\ApplicationInsights\\Exceptions/Sec") + +_QUICKPULSE_METRIC_NAME_MAPPINGS = dict( + [ + _COMMITTED_BYTES_NAME, + _PROCESSOR_TIME_NAME, + _PROCESSOR_TIME_NAME, + _REQUEST_RATE_NAME, + _REQUEST_FAILURE_RATE_NAME, + _REQUEST_DURATION_NAME, + _DEPENDENCY_RATE_NAME, + _DEPENDENCY_FAILURE_RATE_NAME, + _DEPENDENCY_DURATION_NAME, + _EXCEPTION_RATE_NAME, + ] +) + +# cSpell:disable diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py index 1ebe4b481e38..ffcfb33312d0 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py @@ -1,9 +1,15 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -import logging - -from typing import Any +from datetime import datetime, timezone +from enum import Enum +from typing import Any, List, Optional +from opentelemetry.context import ( + _SUPPRESS_INSTRUMENTATION_KEY, + attach, + detach, + set_value, +) from opentelemetry.sdk.metrics import ( Counter, Histogram, @@ -12,21 +18,32 @@ ObservableUpDownCounter, UpDownCounter, ) +from opentelemetry.sdk.metrics._internal.point import ( + NumberDataPoint, + HistogramDataPoint, + MetricsData, +) from opentelemetry.sdk.metrics.export import ( AggregationTemporality, MetricExporter, MetricExportResult, MetricsData as OTMetricsData, + MetricReader, ) + +from azure.core.exceptions import HttpResponseError +from azure.monitor.opentelemetry.exporter._quickpulse._constants import _QUICKPULSE_METRIC_NAME_MAPPINGS from azure.monitor.opentelemetry.exporter._quickpulse._generated._client import QuickpulseClient +from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import ( + DocumentIngress, + MetricPoint, + MonitoringDataPoint, +) from azure.monitor.opentelemetry.exporter._connection_string_parser import ConnectionStringParser +from azure.monitor.opentelemetry.exporter._utils import _ticks_since_dot_net_epoch, PeriodicTask -_logger = logging.getLogger(__name__) -__all__ = ["QuickpulseExporter"] - - -APPLICATION_INSIGHTS_METRIC_TEMPORALITIES = { +_APPLICATION_INSIGHTS_METRIC_TEMPORALITIES = { Counter: AggregationTemporality.DELTA, Histogram: AggregationTemporality.DELTA, ObservableCounter: AggregationTemporality.DELTA, @@ -35,27 +52,46 @@ UpDownCounter: AggregationTemporality.CUMULATIVE, } +_SHORT_PING_INTERVAL_SECONDS = 5 +_POST_INTERVAL_SECONDS = 1 +_LONG_PING_INTERVAL_SECONDS = 60 +_POST_CANCEL_INTERVAL_SECONDS = 20 + + +class _Response: + """Response that encapsulates pipeline response and response headers from + QuickPulse client. + """ + def __init__(self, pipeline_response, deserialized, response_headers): + self._pipeline_response = pipeline_response + self._deserialized = deserialized + self._response_headers = response_headers + -class QuickpulseExporter(MetricExporter): +class _UnsuccessfulQuickPulsePostError(Exception): + """Exception raised to indicate unsuccessful QuickPulse post for backoff logic.""" - def __init__(self, **kwargs: Any) -> None: + +class _QuickpulseExporter(MetricExporter): + + def __init__(self, connection_string: Optional[str]) -> None: """Metric exporter for Quickpulse. - :keyword str connection_string: The connection string used for your Application Insights resource. + :param str connection_string: The connection string used for your Application Insights resource. :rtype: None """ - parsed_connection_string = ConnectionStringParser(kwargs.get('connection_string')) + parsed_connection_string = ConnectionStringParser(connection_string) - self._endpoint = parsed_connection_string.endpoint + self._live_endpoint = parsed_connection_string.live_endpoint + self._instrumentation_key = parsed_connection_string.instrumentation_key # TODO: Support AADaudience (scope)/credentials - self.client = QuickpulseClient(host=self._endpoint, **kwargs) + self._client = QuickpulseClient(host=self._live_endpoint) # TODO: Support redirect MetricExporter.__init__( self, - preferred_temporality=APPLICATION_INSIGHTS_METRIC_TEMPORALITIES, # type: ignore - preferred_aggregation=kwargs.get("preferred_aggregation"), # type: ignore + preferred_temporality=_APPLICATION_INSIGHTS_METRIC_TEMPORALITIES, # type: ignore ) def export( @@ -67,14 +103,44 @@ def export( """Exports a batch of metric data :param metrics_data: OpenTelemetry Metric(s) to export. - :type metrics_data: Sequence[~opentelemetry.sdk.metrics._internal.point.MetricsData] + :type metrics_data: ~opentelemetry.sdk.metrics._internal.point.MetricsData :param timeout_millis: The maximum amount of time to wait for each export. Not currently used. :type timeout_millis: float :return: The result of the export. :rtype: ~opentelemetry.sdk.metrics.export.MetricExportResult """ - # TODO - return MetricExportResult.SUCCESS + result = MetricExportResult.SUCCESS + base_monitoring_data_point = kwargs.get("base_monitoring_data_point") + if base_monitoring_data_point is None: + return MetricExportResult.FAILURE + data_points = _metric_to_quick_pulse_data_points( + metrics_data, + base_monitoring_data_point=base_monitoring_data_point, + documents=kwargs.get("documents"), + ) + + token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + try: + post_response = self._client.post( # type: ignore + monitoring_data_points=data_points, + ikey=self._instrumentation_key, + x_ms_qps_transmission_time=_ticks_since_dot_net_epoch(), + cls=_Response, + ) + if not post_response: + # If no response, assume unsuccessful + result = MetricExportResult.FAILURE + else: + header = post_response._response_headers.get("x-ms-qps-subscribed") # pylint: disable=protected-access + if header != "true": + # User leaving the live metrics page will be treated as an unsuccessful + result = MetricExportResult.FAILURE + except Exception: # pylint: disable=broad-except,invalid-name + # Errors are not reported and assumed as unsuccessful + result = MetricExportResult.FAILURE + finally: + detach(token) + return result def force_flush( self, @@ -104,3 +170,159 @@ def shutdown( :param timeout_millis: The maximum amount of time to wait for shutdown. Not currently used. :type timeout_millis: float """ + + + def _ping(self, monitoring_data_point) -> Optional[_Response]: + ping_response = None + token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + try: + ping_response = self._client.ping( # type: ignore + monitoring_data_point=monitoring_data_point, + ikey=self._instrumentation_key, + x_ms_qps_transmission_time=_ticks_since_dot_net_epoch(), + cls=_Response, + ) + return ping_response # type: ignore + except HttpResponseError: + # Errors are not reported + pass + detach(token) + return ping_response + + +class _QuickpulseState(Enum): + """Current state of quickpulse service. + The numerical value represents the ping/post interval in ms for those states. + """ + + PING_SHORT = _SHORT_PING_INTERVAL_SECONDS + PING_LONG = _LONG_PING_INTERVAL_SECONDS + POST_SHORT = _POST_INTERVAL_SECONDS + + +class _QuickpulseMetricReader(MetricReader): + + def __init__( + self, + exporter: _QuickpulseExporter, + base_monitoring_data_point: MonitoringDataPoint, + ) -> None: + self._exporter = exporter + self._quick_pulse_state = _QuickpulseState.PING_SHORT + self._base_monitoring_data_point = base_monitoring_data_point + self._elapsed_num_seconds = 0 + self._worker = PeriodicTask( + interval=_POST_INTERVAL_SECONDS, + function=self._ticker, + name="QuickpulseMetricReader", + ) + self._worker.daemon = True + super().__init__( + preferred_temporality=self._exporter._preferred_temporality, + preferred_aggregation=self._exporter._preferred_aggregation, + ) + self._worker.start() + + def _ticker(self) -> None: + if self._is_ping_state(): + # Send a ping if elapsed number of request meets the threshold + if self._elapsed_num_seconds % int(self._quick_pulse_state.value) == 0: + print("pinging...") + ping_response = self._exporter._ping( # pylint: disable=protected-access + self._base_monitoring_data_point, + ) + if ping_response: + header = ping_response._response_headers.get("x-ms-qps-subscribed") # pylint: disable=protected-access + if header and header == "true": + print("ping succeeded: switching to post") + # Switch state to post if subscribed + self._quick_pulse_state = _QuickpulseState.POST_SHORT + self._elapsed_num_seconds = 0 + else: + # Backoff after _LONG_PING_INTERVAL_SECONDS (60s) of no successful requests + if self._quick_pulse_state is _QuickpulseState.PING_SHORT and \ + self._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS: + print("ping failed for 60s, switching to pinging every 60s") + self._quick_pulse_state = _QuickpulseState.PING_LONG + # TODO: Implement redirect + else: + # Erroneous ping responses instigate backoff logic + # Backoff after _LONG_PING_INTERVAL_SECONDS (60s) of no successful requests + if self._quick_pulse_state is _QuickpulseState.PING_SHORT and \ + self._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS: + print("ping failed for 60s, switching to pinging every 60s") + self._quick_pulse_state = _QuickpulseState.PING_LONG + else: + print("posting...") + try: + self.collect() + except _UnsuccessfulQuickPulsePostError: + # Unsuccessful posts instigate backoff logic + # Backoff after _POST_CANCEL_INTERVAL_SECONDS (20s) of no successful requests + # And resume pinging + if self._elapsed_num_seconds >= _POST_CANCEL_INTERVAL_SECONDS: + print("post failed for 20s, switching to pinging") + self._quick_pulse_state = _QuickpulseState.PING_SHORT + self._elapsed_num_seconds = 0 + + self._elapsed_num_seconds += 1 + + def _receive_metrics( + self, + metrics_data: MetricsData, + timeout_millis: float = 10_000, + **kwargs, + ) -> None: + result = self._exporter.export( + metrics_data, + timeout_millis=timeout_millis, + base_monitoring_data_point=self._base_monitoring_data_point, + documents=[], + ) + if result is MetricExportResult.FAILURE: + # There is currently no way to propagate unsuccessful metric post so + # we raise an _UnsuccessfulQuickPulsePostError exception. MUST handle + # this exception whenever `collect()` is called + raise _UnsuccessfulQuickPulsePostError() + + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: + self._worker.cancel() + self._worker.join() + + def _is_ping_state(self): + return self._quick_pulse_state in (_QuickpulseState.PING_SHORT, _QuickpulseState.PING_LONG) + +def _metric_to_quick_pulse_data_points( # pylint: disable=too-many-nested-blocks + metrics_data: OTMetricsData, + base_monitoring_data_point: MonitoringDataPoint, + documents: Optional[List[DocumentIngress]], +) -> List[MonitoringDataPoint]: + metric_points = [] + for resource_metric in metrics_data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + for point in metric.data.data_points: + if point is not None: + metric_point = MetricPoint( + name=_QUICKPULSE_METRIC_NAME_MAPPINGS[metric.name.lower()], + weight=1, + ) + if isinstance(point, HistogramDataPoint): + metric_point.value = point.sum + elif isinstance(point, NumberDataPoint): + metric_point.value = point.value + else: + metric_point.value = 0 + metric_points.append(metric_point) + return [ + MonitoringDataPoint( + version=base_monitoring_data_point.version, + instance=base_monitoring_data_point.instance, + role_name=base_monitoring_data_point.role_name, + machine_name=base_monitoring_data_point.machine_name, + stream_id=base_monitoring_data_point.stream_id, + timestamp=datetime.now(tz=timezone.utc), + metrics=metric_points, + documents=documents, + ) + ] diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py index 24a9e0d80a24..83bb3e073e3d 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py @@ -1,18 +1,49 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +import platform +from typing import Any, Optional +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.trace.id_generator import RandomIdGenerator +from opentelemetry.sdk.resources import Resource +from azure.monitor.opentelemetry.exporter._generated.models import ContextTagKeys +from azure.monitor.opentelemetry.exporter._quickpulse._exporter import ( + _QuickpulseExporter, + _QuickpulseMetricReader, +) +from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import MonitoringDataPoint +from azure.monitor.opentelemetry.exporter._utils import ( + _get_sdk_version, + _populate_part_a_fields, + Singleton, +) -def enable_live_metrics(connection_string: str) -> None: - QuickpulseStateManager(connection_string) +def enable_live_metrics(**kwargs: Any) -> None: + """Azure Monitor base exporter for OpenTelemetry. -class QuickpulseStateManager: + :keyword str connection_string: The connection string used for your Application Insights resource. + :keyword Resource resource: The OpenTelemetry Resource used for this Python application. + :rtype: None + """ + _QuickpulseManager(kwargs.get('connection_string'), kwargs.get('resource')) - def __new__(cls, *args, **kwargs): - if not hasattr(cls, 'instance'): - cls._instance = super(QuickpulseStateManager, cls).__new__(cls, *args, **kwargs) - return cls._instance - def __init__(self, connection_string): - self._connection_string = connection_string - # TODO +class _QuickpulseManager(metaclass=Singleton): + + def __init__(self, connection_string: Optional[str], resource: Optional[Resource]) -> None: + self._exporter = _QuickpulseExporter(connection_string) + part_a_fields = {} + if resource: + part_a_fields = _populate_part_a_fields(resource) + id_generator = RandomIdGenerator() + self._base_monitoring_data_point = MonitoringDataPoint( + version=_get_sdk_version(), + invariant_version=1, + instance=part_a_fields.get(ContextTagKeys.AI_CLOUD_ROLE_INSTANCE, ""), + role_name=part_a_fields.get(ContextTagKeys.AI_CLOUD_ROLE, ""), + machine_name=platform.node(), + stream_id=str(id_generator.generate_trace_id()), + ) + self._reader = _QuickpulseMetricReader(self._exporter, self._base_monitoring_data_point) + self._meter_provider = MeterProvider([self._reader]) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py index 57398ddcc3d6..a1510d736290 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py @@ -1,6 +1,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +import datetime import locale from os import environ from os.path import isdir @@ -86,6 +87,12 @@ def _get_sdk_version_prefix(): return sdk_version_prefix +def _get_sdk_version(): + return "{}py{}:otel{}:ext{}".format( + _get_sdk_version_prefix(), platform.python_version(), opentelemetry_version, ext_version + ) + + def _getlocale(): try: with warnings.catch_warnings(): @@ -105,9 +112,7 @@ def _getlocale(): ContextTagKeys.AI_DEVICE_LOCALE: _getlocale(), ContextTagKeys.AI_DEVICE_OS_VERSION: platform.version(), ContextTagKeys.AI_DEVICE_TYPE: "Other", - ContextTagKeys.AI_INTERNAL_SDK_VERSION: "{}py{}:otel{}:ext{}".format( - _get_sdk_version_prefix(), platform.python_version(), opentelemetry_version, ext_version - ), + ContextTagKeys.AI_INTERNAL_SDK_VERSION: _get_sdk_version(), } @@ -121,6 +126,23 @@ def ns_to_duration(nanoseconds: int): days, hours, minutes, seconds, microseconds ) + +# Replicate .netDateTime.Ticks(), which is the UTC time, expressed as the number +# of 100-nanosecond intervals that have elapsed since 12:00:00 midnight on +# January 1, 0001. +def _ticks_since_dot_net_epoch(): + # Since time.time() is the elapsed time since UTC January 1, 1970, we have + # to shift this start time, and then multiply by 10^7 to get the number of + # 100-nanosecond intervals + shift_time = int( + ( + datetime.datetime(1970, 1, 1, 0, 0, 0) - + datetime.datetime(1, 1, 1, 0, 0, 0)).total_seconds() + ) * (10 ** 7) + # Add shift time to 100-ns intervals since time.time() + return int(time.time() * (10**7)) + shift_time + + _INSTRUMENTATIONS_BIT_MASK = 0 _INSTRUMENTATIONS_BIT_MASK_LOCK = threading.Lock() @@ -233,3 +255,11 @@ def _filter_custom_properties(properties: Attributes, filter=None) -> Dict[str, continue truncated_properties[key] = str(val)[:8192] return truncated_properties + + +class Singleton(type): + _instance = None + def __call__(cls, *args, **kwargs): + if not cls._instance: + cls._instance = super(Singleton, cls).__call__(*args, **kwargs) + return cls._instance diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/setup.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/setup.py index 833da2774115..56f8392cef1c 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/setup.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/setup.py @@ -81,7 +81,7 @@ }, python_requires=">=3.8", install_requires=[ - "azure-core<2.0.0,>=1.23.0", + "azure-core<2.0.0,>=1.28.0", "fixedint==0.1.6", "msrest>=0.6.10", "opentelemetry-api~=1.21", diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/__init__.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/__init__.py new file mode 100644 index 000000000000..5b7f7a925cc0 --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_exporter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_exporter.py new file mode 100644 index 000000000000..d053b3da4e59 --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_exporter.py @@ -0,0 +1,293 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import unittest +from unittest import mock + +from opentelemetry.sdk.metrics.export import ( + AggregationTemporality, + Metric, + MetricExportResult, + MetricsData as OTMetricsData, + NumberDataPoint, + ResourceMetrics, + ScopeMetrics, + Sum, +) +from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.sdk.resources import Resource, ResourceAttributes +from azure.core.exceptions import HttpResponseError +from azure.monitor.opentelemetry.exporter._quickpulse._generated._client import QuickpulseClient +from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import MonitoringDataPoint +from azure.monitor.opentelemetry.exporter._quickpulse._exporter import ( + _POST_INTERVAL_SECONDS, + _QuickpulseExporter, + _QuickpulseMetricReader, + _QuickpulseState, + _Response, + _UnsuccessfulQuickPulsePostError, +) + + +def throw(exc_type, *args, **kwargs): + def func(*_args, **_kwargs): + raise exc_type(*args, **kwargs) + + return func + + +class TestQuickpulse(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls._resource = Resource.create( + { + ResourceAttributes.SERVICE_INSTANCE_ID: "test_instance", + ResourceAttributes.SERVICE_NAME: "test_service", + } + ) + cls._metrics_data = OTMetricsData( + resource_metrics=ResourceMetrics( + resource=cls._resource, + scope_metrics=ScopeMetrics( + scope=InstrumentationScope("test_scope"), + metrics=[ + Metric( + name="azureMonitor.memoryCommittedBytes", + description="test_desc", + unit="test_unit", + data=Sum( + data_points=[ + NumberDataPoint( + attributes={}, + start_time_unix_nano=0, + time_unix_nano=0, + value=5, + ) + ], + aggregation_temporality=AggregationTemporality.DELTA, + is_monotonic=True, + ) + ) + ], + schema_url="test_url", + ), + schema_url="test_url", + ) + ) + cls._data_point = MonitoringDataPoint( + version="test_version", + invariant_version=1, + instance="test_instance", + role_name="test_role_name", + machine_name="test_machine_name", + stream_id="test_stream_id", + ) + cls._exporter = _QuickpulseExporter( + "InstrumentationKey=4321abcd-5678-4efa-8abc-1234567890ac;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/" + ) + cls._reader = _QuickpulseMetricReader( + cls._exporter, + cls._data_point, + ) + + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._generated._client.QuickpulseClient") + def test_init(self, client_mock): + exporter = _QuickpulseExporter( + connection_string="InstrumentationKey=4321abcd-5678-4efa-8abc-1234567890ab;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/" + ) + self.assertEqual(exporter._live_endpoint, "https://eastus.livediagnostics.monitor.azure.com/") + self.assertEqual(exporter._instrumentation_key, "4321abcd-5678-4efa-8abc-1234567890ab") + self.assertTrue(isinstance(exporter._client, QuickpulseClient)) + self.assertEqual(exporter._client._config.host, "https://eastus.livediagnostics.monitor.azure.com/") + + + def test_export_missing_data_point(self): + result = self._exporter.export(OTMetricsData(resource_metrics=[])) + self.assertEqual(result, MetricExportResult.FAILURE) + + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._generated._client.QuickpulseClient.post") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._metric_to_quick_pulse_data_points") + def test_export_subscribed_false(self, convert_mock, post_mock): + post_response = _Response( + mock.Mock(), + None, + { + "x-ms-qps-subscribed": "false", + } + ) + convert_mock.return_value = [self._data_point] + post_mock.return_value = post_response + result = self._exporter.export( + self._metrics_data, + base_monitoring_data_point=self._data_point + ) + self.assertEqual(result, MetricExportResult.FAILURE) + + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._generated._client.QuickpulseClient.post") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._metric_to_quick_pulse_data_points") + def test_export_subscribed_none(self, convert_mock, post_mock): + post_response = None + convert_mock.return_value = [self._data_point] + post_mock.return_value = post_response + result = self._exporter.export( + self._metrics_data, + base_monitoring_data_point=self._data_point + ) + self.assertEqual(result, MetricExportResult.FAILURE) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._metric_to_quick_pulse_data_points") + def test_export_exception(self, convert_mock): + post_response = _Response( + mock.Mock(), + None, + {}, + ) + convert_mock.return_value = [self._data_point] + with mock.patch( + "azure.monitor.opentelemetry.exporter._quickpulse._generated._client.QuickpulseClient.post", + throw(Exception), + ): # noqa: E501 + result = self._exporter.export( + self._metrics_data, + base_monitoring_data_point=self._data_point + ) + self.assertEqual(result, MetricExportResult.FAILURE) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._generated._client.QuickpulseClient.post") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._metric_to_quick_pulse_data_points") + def test_export_subscribed_true(self, convert_mock, post_mock): + post_response = _Response( + mock.Mock(), + None, + { + "x-ms-qps-subscribed": "true", + } + ) + convert_mock.return_value = [self._data_point] + post_mock.return_value = post_response + result = self._exporter.export( + self._metrics_data, + base_monitoring_data_point=self._data_point + ) + self.assertEqual(result, MetricExportResult.SUCCESS) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._generated._client.QuickpulseClient.ping") + def test_ping(self, ping_mock): + ping_response = _Response( + mock.Mock(), + None, + { + "x-ms-qps-subscribed": "false", + } + ) + ping_mock.return_value = ping_response + response = self._exporter._ping( + monitoring_data_point=self._data_point + ) + self.assertEqual(response, ping_response) + + def test_ping_exception(self): + with mock.patch( + "azure.monitor.opentelemetry.exporter._quickpulse._generated._client.QuickpulseClient.ping", + throw(HttpResponseError), + ): # noqa: E501 + response = self._exporter._ping( + monitoring_data_point=self._data_point + ) + self.assertIsNone(response) + + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter.PeriodicTask") + def test_quickpulsereader_init(self, task_mock): + task_inst_mock = mock.Mock() + task_mock.return_value = task_inst_mock + reader = _QuickpulseMetricReader( + self._exporter, + self._data_point, + ) + self.assertEqual(reader._exporter, self._exporter) + self.assertEqual(reader._quick_pulse_state, _QuickpulseState.PING_SHORT) + self.assertEqual(reader._base_monitoring_data_point, self._data_point) + self.assertEqual(reader._elapsed_num_seconds, 0) + self.assertEqual(reader._elapsed_num_seconds, 0) + self.assertEqual(reader._worker, task_inst_mock) + task_mock.assert_called_with( + interval=_POST_INTERVAL_SECONDS, + function=reader._ticker, + name="QuickpulseMetricReader", + ) + self.assertTrue(reader._worker.daemon) + task_inst_mock.start.assert_called_once() + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._QuickpulseExporter._ping") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter.PeriodicTask") + def test_quickpulsereader_ticker_ping_true(self, task_mock, ping_mock): + task_inst_mock = mock.Mock() + task_mock.return_value = task_inst_mock + reader = _QuickpulseMetricReader( + self._exporter, + self._data_point, + ) + reader._quick_pulse_state = _QuickpulseState.PING_SHORT + reader._elapsed_num_seconds = _QuickpulseState.PING_SHORT.value + ping_mock.return_value = _Response( + None, + None, + { + "x-ms-qps-subscribed": "true" + } + ) + reader._ticker() + ping_mock.assert_called_once_with( + self._data_point, + ) + self.assertEqual(reader._quick_pulse_state, _QuickpulseState.POST_SHORT) + self.assertEqual(reader._elapsed_num_seconds, 1) + + # TODO: Other ticker cases + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._QuickpulseExporter.export") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter.PeriodicTask") + def test_quickpulsereader_receive_metrics(self, task_mock, export_mock): + task_inst_mock = mock.Mock() + task_mock.return_value = task_inst_mock + reader = _QuickpulseMetricReader( + self._exporter, + self._data_point, + ) + export_mock.return_value = MetricExportResult.SUCCESS + reader._receive_metrics( + self._metrics_data, + 20_000, + ) + export_mock.assert_called_once_with( + self._metrics_data, + timeout_millis=20_000, + base_monitoring_data_point=self._data_point, + documents=[], + ) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._QuickpulseExporter.export") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter.PeriodicTask") + def test_quickpulsereader_receive_metrics_exception(self, task_mock, export_mock): + task_inst_mock = mock.Mock() + task_mock.return_value = task_inst_mock + reader = _QuickpulseMetricReader( + self._exporter, + self._data_point, + ) + export_mock.return_value = MetricExportResult.FAILURE + with self.assertRaises(_UnsuccessfulQuickPulsePostError): + reader._receive_metrics( + self._metrics_data, + 20_000, + ) + export_mock.assert_called_once_with( + self._metrics_data, + timeout_millis=20_000, + base_monitoring_data_point=self._data_point, + documents=[], + ) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py new file mode 100644 index 000000000000..0b0f512865bf --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py @@ -0,0 +1,120 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import platform +import unittest +from unittest import mock + +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.resources import Resource, ResourceAttributes + +from azure.monitor.opentelemetry.exporter._generated.models import ContextTagKeys +from azure.monitor.opentelemetry.exporter._quickpulse._exporter import ( + _QuickpulseExporter, + _QuickpulseMetricReader, +) +from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import ( + enable_live_metrics, + _QuickpulseManager, +) +from azure.monitor.opentelemetry.exporter._utils import ( + _get_sdk_version, + _populate_part_a_fields, +) + + +class TestLiveMetrics(unittest.TestCase): + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._QuickpulseManager") + def test_enable_live_metrics(self, manager_mock): + mock_resource = mock.Mock() + enable_live_metrics( + connection_string="test_cs", + resource=mock_resource, + ) + manager_mock.assert_called_with("test_cs", mock_resource) + + +class TestQuickpulseManager(unittest.TestCase): + + @mock.patch("opentelemetry.sdk.trace.id_generator.RandomIdGenerator.generate_trace_id") + def test_init(self, generator_mock): + generator_mock.return_value = "test_trace_id" + resource = Resource.create( + { + ResourceAttributes.SERVICE_INSTANCE_ID: "test_instance", + ResourceAttributes.SERVICE_NAME: "test_service", + } + ) + part_a_fields = _populate_part_a_fields(resource) + qpm = _QuickpulseManager( + connection_string="InstrumentationKey=4321abcd-5678-4efa-8abc-1234567890ac;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/", + resource=resource, + ) + self.assertTrue(isinstance(qpm._exporter, _QuickpulseExporter)) + self.assertEqual( + qpm._exporter._live_endpoint, + "https://eastus.livediagnostics.monitor.azure.com/", + ) + self.assertEqual( + qpm._exporter._instrumentation_key, + "4321abcd-5678-4efa-8abc-1234567890ac", + ) + self.assertEqual(qpm._base_monitoring_data_point.version, _get_sdk_version()) + self.assertEqual(qpm._base_monitoring_data_point.invariant_version, 1) + self.assertEqual( + qpm._base_monitoring_data_point.instance, + part_a_fields.get(ContextTagKeys.AI_CLOUD_ROLE_INSTANCE, "") + ) + self.assertEqual( + qpm._base_monitoring_data_point.role_name, + part_a_fields.get(ContextTagKeys.AI_CLOUD_ROLE, "") + ) + self.assertEqual(qpm._base_monitoring_data_point.machine_name, platform.node()) + self.assertEqual(qpm._base_monitoring_data_point.stream_id, "test_trace_id") + self.assertTrue(isinstance(qpm._reader, _QuickpulseMetricReader)) + self.assertEqual(qpm._reader._exporter, qpm._exporter) + self.assertEqual(qpm._reader._base_monitoring_data_point, qpm._base_monitoring_data_point) + self.assertTrue(isinstance(qpm._meter_provider, MeterProvider)) + self.assertEqual(qpm._meter_provider._sdk_config.metric_readers, [qpm._reader]) + + + def test_singleton(self): + resource = Resource.create( + { + ResourceAttributes.SERVICE_INSTANCE_ID: "test_instance", + ResourceAttributes.SERVICE_NAME: "test_service", + } + ) + part_a_fields = _populate_part_a_fields(resource) + resource2 = Resource.create( + { + ResourceAttributes.SERVICE_INSTANCE_ID: "test_instance2", + ResourceAttributes.SERVICE_NAME: "test_service2", + } + ) + qpm = _QuickpulseManager( + connection_string="InstrumentationKey=4321abcd-5678-4efa-8abc-1234567890ac;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/", + resource=resource, + ) + qpm2 = _QuickpulseManager( + connection_string="InstrumentationKey=4321abcd-5678-4efa-8abc-1234567890ac;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/", + resource=resource2, + ) + self.assertEqual(qpm, qpm2) + self.assertEqual( + qpm._base_monitoring_data_point.instance, + part_a_fields.get(ContextTagKeys.AI_CLOUD_ROLE_INSTANCE, "") + ) + self.assertEqual( + qpm._base_monitoring_data_point.role_name, + part_a_fields.get(ContextTagKeys.AI_CLOUD_ROLE, "") + ) + self.assertEqual( + qpm2._base_monitoring_data_point.instance, + part_a_fields.get(ContextTagKeys.AI_CLOUD_ROLE_INSTANCE, "") + ) + self.assertEqual( + qpm2._base_monitoring_data_point.role_name, + part_a_fields.get(ContextTagKeys.AI_CLOUD_ROLE, "") + ) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_utils.py index f0b630453306..fc08963ffbec 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_utils.py @@ -1,8 +1,10 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +import datetime import os import platform +import time import unittest from azure.monitor.opentelemetry.exporter import _utils @@ -44,6 +46,21 @@ def test_nanoseconds_to_duration(self): self.assertEqual(ns_to_duration(3600 * 1000000000), "0.01:00:00.000") self.assertEqual(ns_to_duration(86400 * 1000000000), "1.00:00:00.000") + + @patch("time.time") + def test_ticks_since_dot_net_epoch(self, time_mock): + current_time = time.time() + shift_time = int( + ( + datetime.datetime(1970, 1, 1, 0, 0, 0) - + datetime.datetime(1, 1, 1, 0, 0, 0)).total_seconds() + ) * (10 ** 7) + time_mock.return_value = current_time + ticks = _utils._ticks_since_dot_net_epoch() + expected_ticks = int(current_time * (10**7)) + shift_time + self.assertEqual(ticks, expected_ticks) + + def test_populate_part_a_fields(self): resource = Resource( {"service.name": "testServiceName", diff --git a/sdk/monitor/azure-monitor-opentelemetry/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry/CHANGELOG.md index fbde3c774e4b..f065dbcbd772 100644 --- a/sdk/monitor/azure-monitor-opentelemetry/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry/CHANGELOG.md @@ -50,6 +50,8 @@ ([#32195](https://github.com/Azure/azure-sdk-for-python/pull/32195)) - Allow OTEL_PYTHON_DISABLED_INSTRUMENTATIONS functionality for Azure Core Tracing in Auto-instrumentation ([#32331](https://github.com/Azure/azure-sdk-for-python/pull/32331)) +- Add instrumentation_options + ([#31793](https://github.com/Azure/azure-sdk-for-python/pull/31793)) ### Bugs Fixed @@ -62,8 +64,6 @@ - Add Azure resource detectors ([#32087](https://github.com/Azure/azure-sdk-for-python/pull/32087)) -- Add instrumentation_options - ([#31793](https://github.com/Azure/azure-sdk-for-python/pull/31793)) ### Other Changes