diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py index e95b91a045f2..6c7f237d2f66 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py @@ -1,6 +1,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -from enum import Enum +import sys # cSpell:disable @@ -47,16 +47,6 @@ _LONG_PING_INTERVAL_SECONDS = 60 _POST_CANCEL_INTERVAL_SECONDS = 20 - -# Live metrics data types -class _DocumentIngressDocumentType(Enum): - Request = "Request" - RemoteDependency = "RemoteDependency" - Exception = "Exception" - Event = "Event" - Trace = "Trace" - - # Response Headers _QUICKPULSE_ETAG_HEADER_NAME = "x-ms-qps-configuration-etag" @@ -64,4 +54,13 @@ class _DocumentIngressDocumentType(Enum): _QUICKPULSE_REDIRECT_HEADER_NAME = "x-ms-qps-service-endpoint-redirect-v2" _QUICKPULSE_SUBSCRIBED_HEADER_NAME = "x-ms-qps-subscribed" +# Projections (filtering) + +_QUICKPULSE_PROJECTION_COUNT = "Count()" +_QUICKPULSE_PROJECTION_DURATION = "Duration" +_QUICKPULSE_PROJECTION_CUSTOM = "CustomDimensions." + +_QUICKPULSE_PROJECTION_MAX_VALUE = sys.maxsize +_QUICKPULSE_PROJECTION_MIN_VALUE = -sys.maxsize - 1 + # cSpell:enable diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py index 2b0bbe5b7464..ac1209573bda 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py @@ -23,12 +23,12 @@ MetricReader, ) -from azure.core.exceptions import HttpResponseError from azure.core.pipeline.policies import ContentDecodePolicy from azure.monitor.opentelemetry.exporter._quickpulse._constants import ( _LONG_PING_INTERVAL_SECONDS, _POST_CANCEL_INTERVAL_SECONDS, _POST_INTERVAL_SECONDS, + _QUICKPULSE_ETAG_HEADER_NAME, _QUICKPULSE_SUBSCRIBED_HEADER_NAME, ) from azure.monitor.opentelemetry.exporter._quickpulse._generated._configuration import QuickpulseClientConfiguration @@ -36,14 +36,17 @@ from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import MonitoringDataPoint from azure.monitor.opentelemetry.exporter._quickpulse._policy import _QuickpulseRedirectPolicy from azure.monitor.opentelemetry.exporter._quickpulse._state import ( + _get_and_clear_quickpulse_documents, _get_global_quickpulse_state, + _get_quickpulse_etag, _is_ping_state, _set_global_quickpulse_state, - _get_and_clear_quickpulse_documents, + _set_quickpulse_etag, _QuickpulseState, ) from azure.monitor.opentelemetry.exporter._quickpulse._utils import ( _metric_to_quick_pulse_data_points, + _update_filter_configuration, ) from azure.monitor.opentelemetry.exporter._connection_string_parser import ConnectionStringParser from azure.monitor.opentelemetry.exporter._utils import ( @@ -143,13 +146,14 @@ def export( base_monitoring_data_point=base_monitoring_data_point, documents=_get_and_clear_quickpulse_documents(), ) - + configuration_etag = _get_quickpulse_etag() or "" token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) try: post_response = self._client.publish( # type: ignore endpoint=self._live_endpoint, monitoring_data_points=data_points, - ikey=self._instrumentation_key, + ikey=self._instrumentation_key, # type: ignore + configuration_etag=configuration_etag, transmission_time=_ticks_since_dot_net_epoch(), cls=_Response, ) @@ -163,6 +167,19 @@ def export( if header != "true": # User leaving the live metrics page will be treated as an unsuccessful result = MetricExportResult.FAILURE + else: + # Check if etag has changed + etag = post_response._response_headers.get( # pylint: disable=protected-access + _QUICKPULSE_ETAG_HEADER_NAME # pylint: disable=protected-access + ) + if etag and etag != configuration_etag: + config = ( + post_response._pipeline_response.http_response.content # pylint: disable=protected-access + ) + # Content will only be populated if configuration has changed (etag is different) + if config: + # Update and apply configuration changes + _update_filter_configuration(etag, config) except Exception: # pylint: disable=broad-except,invalid-name _logger.exception("Exception occurred while publishing live metrics.") result = MetricExportResult.FAILURE @@ -201,21 +218,23 @@ def shutdown( def _ping(self, monitoring_data_point: MonitoringDataPoint) -> Optional[_Response]: ping_response = None token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + etag = _get_quickpulse_etag() or "" try: ping_response = self._client.is_subscribed( # type: ignore endpoint=self._live_endpoint, monitoring_data_point=monitoring_data_point, - ikey=self._instrumentation_key, + ikey=self._instrumentation_key, # type: ignore transmission_time=_ticks_since_dot_net_epoch(), machine_name=monitoring_data_point.machine_name, instance_name=monitoring_data_point.instance, stream_id=monitoring_data_point.stream_id, role_name=monitoring_data_point.role_name, - invariant_version=monitoring_data_point.invariant_version, + invariant_version=monitoring_data_point.invariant_version, # type: ignore + configuration_etag=etag, cls=_Response, ) return ping_response # type: ignore - except HttpResponseError: + except Exception: # pylint: disable=broad-except,invalid-name _logger.exception("Exception occurred while pinging live metrics.") detach(token) return ping_response @@ -243,28 +262,42 @@ def __init__( ) self._worker.start() + # pylint: disable=protected-access + # pylint: disable=too-many-nested-blocks def _ticker(self) -> None: if _is_ping_state(): # Send a ping if elapsed number of request meets the threshold if self._elapsed_num_seconds % _get_global_quickpulse_state().value == 0: - ping_response = self._exporter._ping( # pylint: disable=protected-access + ping_response = self._exporter._ping( self._base_monitoring_data_point, ) if ping_response: - header = ping_response._response_headers.get( # pylint: disable=protected-access - _QUICKPULSE_SUBSCRIBED_HEADER_NAME - ) - if header and header == "true": - # Switch state to post if subscribed - _set_global_quickpulse_state(_QuickpulseState.POST_SHORT) - self._elapsed_num_seconds = 0 - else: - # Backoff after _LONG_PING_INTERVAL_SECONDS (60s) of no successful requests - if ( - _get_global_quickpulse_state() is _QuickpulseState.PING_SHORT - and self._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS - ): - _set_global_quickpulse_state(_QuickpulseState.PING_LONG) + try: + subscribed = ping_response._response_headers.get(_QUICKPULSE_SUBSCRIBED_HEADER_NAME) + if subscribed and subscribed == "true": + # Switch state to post if subscribed + _set_global_quickpulse_state(_QuickpulseState.POST_SHORT) + self._elapsed_num_seconds = 0 + # Update config etag + etag = ping_response._response_headers.get(_QUICKPULSE_ETAG_HEADER_NAME) + if etag is None: + etag = "" + if _get_quickpulse_etag() != etag: + _set_quickpulse_etag(etag) + # TODO: Set default document filter config from response body + # config = ping_response._pipeline_response.http_response.content + else: + # Backoff after _LONG_PING_INTERVAL_SECONDS (60s) of no successful requests + if ( + _get_global_quickpulse_state() is _QuickpulseState.PING_SHORT + and self._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS + ): + _set_global_quickpulse_state(_QuickpulseState.PING_LONG) + # Reset etag to default if not subscribed + _set_quickpulse_etag("") + except Exception: # pylint: disable=broad-except,invalid-name + _logger.exception("Exception occurred while pinging live metrics.") + _set_quickpulse_etag("") # TODO: Implement redirect else: # Erroneous ping responses instigate backoff logic @@ -274,6 +307,8 @@ def _ticker(self) -> None: and self._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS ): _set_global_quickpulse_state(_QuickpulseState.PING_LONG) + # Reset etag to default if error + _set_quickpulse_etag("") else: try: self.collect() @@ -283,6 +318,8 @@ def _ticker(self) -> None: # And resume pinging if self._elapsed_num_seconds >= _POST_CANCEL_INTERVAL_SECONDS: _set_global_quickpulse_state(_QuickpulseState.PING_SHORT) + # Reset etag to default + _set_quickpulse_etag("") self._elapsed_num_seconds = 0 self._elapsed_num_seconds += 1 diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py index 08be036d8c29..4174d9718fb2 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py @@ -4,6 +4,7 @@ from datetime import datetime from typing import Any, Iterable +import logging import platform import psutil @@ -39,6 +40,7 @@ _QuickpulseState, _is_post_state, _append_quickpulse_document, + _get_quickpulse_derived_metric_infos, _get_quickpulse_last_process_cpu, _get_quickpulse_last_process_time, _get_quickpulse_process_elapsed_time, @@ -47,7 +49,9 @@ _set_quickpulse_last_process_time, _set_quickpulse_process_elapsed_time, ) +from azure.monitor.opentelemetry.exporter._quickpulse._types import _TelemetryData from azure.monitor.opentelemetry.exporter._quickpulse._utils import ( + _derive_metrics_from_telemetry_data, _get_log_record_document, _get_span_document, ) @@ -61,6 +65,8 @@ Singleton, ) +_logger = logging.getLogger(__name__) + PROCESS = psutil.Process() NUM_CPUS = psutil.cpu_count() @@ -93,7 +99,8 @@ def __init__(self, **kwargs: Any) -> None: id_generator = RandomIdGenerator() self._base_monitoring_data_point = MonitoringDataPoint( version=_get_sdk_version(), - invariant_version=1, + # Invariant version 5 indicates filtering is supported + invariant_version=5, instance=part_a_fields.get(ContextTagKeys.AI_CLOUD_ROLE_INSTANCE, ""), role_name=part_a_fields.get(ContextTagKeys.AI_CLOUD_ROLE, ""), machine_name=platform.node(), @@ -152,39 +159,60 @@ def __init__(self, **kwargs: Any) -> None: def _record_span(self, span: ReadableSpan) -> None: # Only record if in post state if _is_post_state(): - document = _get_span_document(span) - _append_quickpulse_document(document) - duration_ms = 0 - if span.end_time and span.start_time: - duration_ms = (span.end_time - span.start_time) / 1e9 # type: ignore - # TODO: Spec out what "success" is - success = span.status.is_ok - - if span.kind in (SpanKind.SERVER, SpanKind.CONSUMER): - if success: - self._request_rate_counter.add(1) - else: - self._request_failed_rate_counter.add(1) - self._request_duration.record(duration_ms) - else: - if success: - self._dependency_rate_counter.add(1) + try: + document = _get_span_document(span) + _append_quickpulse_document(document) + duration_ms = 0 + if span.end_time and span.start_time: + duration_ms = (span.end_time - span.start_time) / 1e9 # type: ignore + # TODO: Spec out what "success" is + success = span.status.is_ok + + if span.kind in (SpanKind.SERVER, SpanKind.CONSUMER): + if success: + self._request_rate_counter.add(1) + else: + self._request_failed_rate_counter.add(1) + self._request_duration.record(duration_ms) else: - self._dependency_failure_rate_counter.add(1) - self._dependency_duration.record(duration_ms) + if success: + self._dependency_rate_counter.add(1) + else: + self._dependency_failure_rate_counter.add(1) + self._dependency_duration.record(duration_ms) + + metric_infos_dict = _get_quickpulse_derived_metric_infos() + # check if filtering is enabled + if metric_infos_dict: + # Derive metrics for quickpulse filtering + data = _TelemetryData._from_span(span) + _derive_metrics_from_telemetry_data(data) + # TODO: derive exception metrics from span events + except Exception: # pylint: disable=broad-except + _logger.exception("Exception occurred while recording span.") def _record_log_record(self, log_data: LogData) -> None: # Only record if in post state if _is_post_state(): - if log_data.log_record: - log_record = log_data.log_record - if log_record.attributes: - document = _get_log_record_document(log_data) - _append_quickpulse_document(document) - exc_type = log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE) - exc_message = log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE) - if exc_type is not None or exc_message is not None: - self._exception_rate_counter.add(1) + try: + if log_data.log_record: + log_record = log_data.log_record + if log_record.attributes: + document = _get_log_record_document(log_data) + _append_quickpulse_document(document) + exc_type = log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE) + exc_message = log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE) + if exc_type is not None or exc_message is not None: + self._exception_rate_counter.add(1) + + metric_infos_dict = _get_quickpulse_derived_metric_infos() + # check if filtering is enabled + if metric_infos_dict: + # Derive metrics for quickpulse filtering + data = _TelemetryData._from_log_record(log_record) + _derive_metrics_from_telemetry_data(data) + except Exception: # pylint: disable=broad-except + _logger.exception("Exception occurred while recording log record.") # pylint: disable=unused-argument diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_processor.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_processor.py index e31b0be77169..0bcc69ec1375 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_processor.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_processor.py @@ -10,7 +10,7 @@ # pylint: disable=protected-access class _QuickpulseLogRecordProcessor(LogRecordProcessor): - def emit(self, log_data: LogData) -> None: + def emit(self, log_data: LogData) -> None: # type: ignore qpm = _QuickpulseManager._instance if qpm: qpm._record_log_record(log_data) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py index fddf58af19f9..a98e51f8f851 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_state.py @@ -2,14 +2,21 @@ # Licensed under the MIT License. from datetime import datetime from enum import Enum -from typing import List +from typing import Dict, List, Tuple from azure.monitor.opentelemetry.exporter._quickpulse._constants import ( _LONG_PING_INTERVAL_SECONDS, _POST_INTERVAL_SECONDS, + _QUICKPULSE_PROJECTION_MAX_VALUE, + _QUICKPULSE_PROJECTION_MIN_VALUE, _SHORT_PING_INTERVAL_SECONDS, ) -from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import DocumentIngress +from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import ( + AggregationType, + DerivedMetricInfo, + DocumentIngress, + TelemetryType, +) class _QuickpulseState(Enum): @@ -28,6 +35,10 @@ class _QuickpulseState(Enum): _QUICKPULSE_LAST_PROCESS_TIME = 0.0 _QUICKPULSE_PROCESS_ELAPSED_TIME = datetime.now() _QUICKPULSE_LAST_PROCESS_CPU = 0.0 +# Filtering +_QUICKPULSE_ETAG = "" +_QUICKPULSE_DERIVED_METRIC_INFOS: Dict[TelemetryType, List[DerivedMetricInfo]] = {} +_QUICKPULSE_PROJECTION_MAP: Dict[str, Tuple[AggregationType, float, int]] = {} def _set_global_quickpulse_state(state: _QuickpulseState) -> None: @@ -100,3 +111,67 @@ def _get_and_clear_quickpulse_documents() -> List[DocumentIngress]: documents = list(_QUICKPULSE_DOCUMENTS) _QUICKPULSE_DOCUMENTS = [] return documents + + +# Filtering + + +# Used for etag configuration +def _set_quickpulse_etag(etag: str) -> None: + # pylint: disable=global-statement + global _QUICKPULSE_ETAG + _QUICKPULSE_ETAG = etag + + +def _get_quickpulse_etag() -> str: + return _QUICKPULSE_ETAG + + +# Used for updating filter configuration when etag has changed +# Contains filter and projection to apply for each telemetry type if exists +def _set_quickpulse_derived_metric_infos(filters: Dict[TelemetryType, List[DerivedMetricInfo]]) -> None: + # pylint: disable=global-statement + global _QUICKPULSE_DERIVED_METRIC_INFOS + _QUICKPULSE_DERIVED_METRIC_INFOS = filters + + +def _get_quickpulse_derived_metric_infos() -> Dict[TelemetryType, List[DerivedMetricInfo]]: + return _QUICKPULSE_DERIVED_METRIC_INFOS + + +# Used for initializing and setting projections when span/logs are recorded +def _set_quickpulse_projection_map(metric_id: str, aggregation_type: AggregationType, value: float, count: int): + # pylint: disable=global-statement + # pylint: disable=global-variable-not-assigned + global _QUICKPULSE_PROJECTION_MAP + _QUICKPULSE_PROJECTION_MAP[metric_id] = (aggregation_type, value, count) + + +def _get_quickpulse_projection_map() -> Dict[str, Tuple[AggregationType, float, int]]: + return _QUICKPULSE_PROJECTION_MAP + + +# Resets projections per derived metric info for next quickpulse interval +# Called processing of previous quickpulse projections are finished/exported +def _reset_quickpulse_projection_map(): + # pylint: disable=global-statement + global _QUICKPULSE_PROJECTION_MAP + new_map = {} + if _QUICKPULSE_PROJECTION_MAP: + for id, projection in _QUICKPULSE_PROJECTION_MAP.items(): + value = 0 + if projection[0] == AggregationType.MIN: + value = _QUICKPULSE_PROJECTION_MAX_VALUE + elif projection[0] == AggregationType.MAX: + value = _QUICKPULSE_PROJECTION_MIN_VALUE + new_map[id] = (projection[0], value, 0) + _QUICKPULSE_PROJECTION_MAP.clear() + _QUICKPULSE_PROJECTION_MAP = new_map + + +# clears the projection map, usually called when config changes +def _clear_quickpulse_projection_map(): + # pylint: disable=global-statement + # pylint: disable=global-variable-not-assigned + global _QUICKPULSE_PROJECTION_MAP + _QUICKPULSE_PROJECTION_MAP.clear() diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_types.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_types.py new file mode 100644 index 000000000000..abff161cd113 --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_types.py @@ -0,0 +1,177 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# pylint: disable=protected-access +from dataclasses import dataclass +from typing import Dict, no_type_check + +from opentelemetry.sdk._logs import LogRecord +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace import SpanKind + +from azure.monitor.opentelemetry.exporter.export.trace import _utils as trace_utils + + +@dataclass +class _TelemetryData: + custom_dimensions: Dict[str, str] + + @staticmethod + def _from_span(span: ReadableSpan): + if span.kind in (SpanKind.SERVER, SpanKind.CONSUMER): + return _RequestData._from_span(span) + return _DependencyData._from_span(span) + + @staticmethod + @no_type_check + def _from_log_record(log_record: LogRecord): + exc_type = log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE) + exc_message = log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE) + if exc_type is not None or exc_message is not None: + return _ExceptionData._from_log_record(log_record) + return _TraceData._from_log_record(log_record) + + +@dataclass +class _RequestData(_TelemetryData): + duration: float + success: bool + name: str + response_code: int + url: str + + @staticmethod + @no_type_check + def _from_span(span: ReadableSpan): + # Logic should match that of exporter to Breeze + url = "" + duration_ms = 0 + response_code = 0 + success = True + attributes = {} # type: ignore + if span.end_time and span.start_time: + duration_ms = (span.end_time - span.start_time) / 1e9 # type: ignore + if span.attributes: + attributes = span.attributes # type: ignore + url = span.attributes.get(SpanAttributes.HTTP_URL, "") # type: ignore + # TODO: get url for http requests + status_code = span.attributes.get(SpanAttributes.HTTP_STATUS_CODE) + if status_code: + try: + status_code = int(status_code) # type: ignore + except ValueError: + status_code = 0 + else: + status_code = 0 + success = span.status.is_ok and status_code and status_code not in range(400, 500) # type: ignore + response_code = status_code + return _RequestData( + duration=duration_ms, + success=success, + name=span.name, + response_code=response_code, + url=str(url), + custom_dimensions=attributes, + ) + + +@dataclass +class _DependencyData(_TelemetryData): + duration: float + success: bool + name: str + result_code: int + target: str + type: str + data: str + + @staticmethod + @no_type_check + def _from_span(span: ReadableSpan): + # Logic should match that of exporter to Breeze + url = "" + duration_ms = 0 + result_code = 0 + attributes = {} # type: ignore + dependency_type = "" + data = "" + target = trace_utils._get_target_for_dependency_from_peer(span.attributes) + if SpanAttributes.HTTP_METHOD in span.attributes: # type: ignore + dependency_type = "HTTP" + url = trace_utils._get_url_for_http_dependency(span.attributes) # type: ignore + target, _ = trace_utils._get_target_and_path_for_http_dependency( + span.attributes, + target, + url, + ) + data = url + elif SpanAttributes.DB_SYSTEM in span.attributes: # type: ignore + db_system = span.attributes[SpanAttributes.DB_SYSTEM] # type: ignore + dependency_type = db_system # type: ignore + target = trace_utils._get_target_for_db_dependency( + target, # type: ignore + db_system, # type: ignore + span.attributes, + ) + if SpanAttributes.DB_STATEMENT in span.attributes: # type: ignore + data = span.attributes[SpanAttributes.DB_STATEMENT] # type: ignore + elif SpanAttributes.DB_OPERATION in span.attributes: # type: ignore + data = span.attributes[SpanAttributes.DB_OPERATION] # type: ignore + elif SpanAttributes.MESSAGING_SYSTEM in span.attributes: # type: ignore + dependency_type = span.attributes[SpanAttributes.MESSAGING_SYSTEM] # type: ignore + target = trace_utils._get_target_for_messaging_dependency( + target, # type: ignore + span.attributes, + ) + elif SpanAttributes.RPC_SYSTEM in span.attributes: + dependency_type = span.attributes[SpanAttributes.RPC_SYSTEM] + target = trace_utils._get_target_for_rpc_dependency( + target, # type: ignore + span.attributes, + ) + elif span.kind is SpanKind.PRODUCER: + dependency_type = "Queue Message" + msg_system = span.attributes.get(SpanAttributes.MESSAGING_SYSTEM) + if msg_system: + dependency_type += " | {}".format(msg_system) + else: + dependency_type = "InProc" + + return _DependencyData( + duration=duration_ms, + success=span.status.is_ok, + name=span.name, + result_code=result_code, + target=target, + type=str(dependency_type), + data=data, + custom_dimensions=attributes, + ) + + +@dataclass +class _ExceptionData(_TelemetryData): + message: str + stack_trace: str + + @staticmethod + @no_type_check + def _from_log_record(log_record: LogRecord): + return _ExceptionData( + message=str(log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE, "")), + stack_trace=str(log_record.attributes.get(SpanAttributes.EXCEPTION_STACKTRACE, "")), + custom_dimensions=log_record.attributes, + ) + + +@dataclass +class _TraceData(_TelemetryData): + message: str + + @staticmethod + @no_type_check + def _TraceData(log_record: LogRecord): + return _TraceData( + message=str(log_record.body), + custom_dimensions=log_record.attributes, + ) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py index dca763a7f260..f909aa21561b 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_utils.py @@ -1,7 +1,8 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. from datetime import datetime, timedelta, timezone -from typing import List, Optional, Union +import json +from typing import Dict, List, Optional, Tuple, Union from opentelemetry.sdk._logs import LogData from opentelemetry.sdk.metrics._internal.point import ( @@ -15,18 +16,43 @@ from opentelemetry.util.types import Attributes from azure.monitor.opentelemetry.exporter._quickpulse._constants import ( - _DocumentIngressDocumentType, _QUICKPULSE_METRIC_NAME_MAPPINGS, + _QUICKPULSE_PROJECTION_COUNT, + _QUICKPULSE_PROJECTION_CUSTOM, + _QUICKPULSE_PROJECTION_DURATION, + _QUICKPULSE_PROJECTION_MAX_VALUE, + _QUICKPULSE_PROJECTION_MIN_VALUE, ) from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import ( + AggregationType, + DerivedMetricInfo, DocumentIngress, + DocumentType, Exception as ExceptionDocument, + FilterInfo, MetricPoint, MonitoringDataPoint, RemoteDependency as RemoteDependencyDocument, Request as RequestDocument, + TelemetryType, Trace as TraceDocument, ) +from azure.monitor.opentelemetry.exporter._quickpulse._state import ( + _clear_quickpulse_projection_map, + _get_quickpulse_derived_metric_infos, + _get_quickpulse_projection_map, + _reset_quickpulse_projection_map, + _set_quickpulse_derived_metric_infos, + _set_quickpulse_etag, + _set_quickpulse_projection_map, +) +from azure.monitor.opentelemetry.exporter._quickpulse._types import ( + _DependencyData, + _ExceptionData, + _RequestData, + _TelemetryData, + _TraceData, +) def _metric_to_quick_pulse_data_points( # pylint: disable=too-many-nested-blocks @@ -47,9 +73,23 @@ def _metric_to_quick_pulse_data_points( # pylint: disable=too-many-nested-block elif isinstance(point, NumberDataPoint): value = point.value metric_point = MetricPoint( - name=_QUICKPULSE_METRIC_NAME_MAPPINGS[metric.name.lower()], weight=1, value=value + name=_QUICKPULSE_METRIC_NAME_MAPPINGS[metric.name.lower()], # type: ignore + weight=1, + value=value, ) metric_points.append(metric_point) + # Process filtered metrics + for metric in _get_metrics_from_projections(): + metric_point = MetricPoint( + name=metric[0], # type: ignore + weight=1, + value=metric[1], # type: ignore + ) + metric_points.append(metric_point) + + # Reset projection map for next collection cycle + _reset_quickpulse_projection_map() + return [ MonitoringDataPoint( version=base_monitoring_data_point.version, @@ -78,7 +118,7 @@ def _get_span_document(span: ReadableSpan) -> Union[RemoteDependencyDocument, Re url = _get_url(span_kind, span.attributes) if span_kind in (SpanKind.CLIENT, SpanKind.PRODUCER, SpanKind.INTERNAL): document = RemoteDependencyDocument( - document_type=_DocumentIngressDocumentType.RemoteDependency.value, + document_type=DocumentType.REMOTE_DEPENDENCY, name=span.name, command_name=url, result_code=str(status_code), @@ -90,7 +130,7 @@ def _get_span_document(span: ReadableSpan) -> Union[RemoteDependencyDocument, Re else: code = str(grpc_status_code) document = RequestDocument( - document_type=_DocumentIngressDocumentType.Request.value, + document_type=DocumentType.REQUEST, name=span.name, url=url, response_code=code, @@ -105,13 +145,13 @@ def _get_log_record_document(log_data: LogData) -> Union[ExceptionDocument, Trac exc_message = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE) # type: ignore if exc_type is not None or exc_message is not None: document = ExceptionDocument( - document_type=_DocumentIngressDocumentType.Exception.value, + document_type=DocumentType.EXCEPTION, exception_type=str(exc_type), exception_message=str(exc_message), ) else: document = TraceDocument( - document_type=_DocumentIngressDocumentType.Trace.value, + document_type=DocumentType.TRACE, message=log_data.log_record.body, ) return document @@ -157,3 +197,164 @@ def _ns_to_iso8601_string(nanoseconds: int) -> str: dt_microseconds = timedelta(microseconds=microseconds) dt_with_microseconds = dt + dt_microseconds return dt_with_microseconds.isoformat() + + +# Filtering + + +def _update_filter_configuration(etag: str, config_bytes: bytes): + # Clear projection map + _clear_quickpulse_projection_map() + seen_ids = set() + # config is a byte string that when decoded is a json + config = json.loads(config_bytes.decode("utf-8")) + metric_infos: Dict[TelemetryType, List[DerivedMetricInfo]] = {} + for metric_info_dict in config.get("Metrics", []): + metric_info = DerivedMetricInfo.from_dict(metric_info_dict) + # Skip duplicate ids + if metric_info.id in seen_ids: + continue + telemetry_type: TelemetryType = TelemetryType(metric_info.telemetry_type) + # TODO: Filter out invalid configs: telemetry type, operand + # TODO: Rename exception fields + metric_info_list = metric_infos.get(telemetry_type, []) + metric_info_list.append(metric_info) + metric_infos[telemetry_type] = metric_info_list + seen_ids.add(metric_info.id) + # Initialize projections from this derived metric info + _init_derived_metric_projection(metric_info) + _set_quickpulse_derived_metric_infos(metric_infos) + # Update new etag + _set_quickpulse_etag(etag) + + +# Called by record_span/record_log when processing a span/log_record +# Derives metrics from projections if applicable to current filters in config +def _derive_metrics_from_telemetry_data(data: _TelemetryData): + metric_infos_dict = _get_quickpulse_derived_metric_infos() + metric_infos = [] # type: ignore + if isinstance(data, _RequestData): + metric_infos = metric_infos_dict.get(TelemetryType.REQUEST) + elif isinstance(data, _DependencyData): + metric_infos = metric_infos_dict.get(TelemetryType.DEPENDENCY) + elif isinstance(data, _ExceptionData): + metric_infos = metric_infos_dict.get(TelemetryType.EXCEPTION) + elif isinstance(data, _TraceData): + metric_infos = metric_infos_dict.get(TelemetryType.TRACE) + if metric_infos and _check_metric_filters(metric_infos, data): + # Since this data matches the filter, create projections used to + # generate filtered metrics + _create_projections(metric_infos, data) + # TODO: Configuration error handling + + +def _check_metric_filters(metric_infos: List[DerivedMetricInfo], data: _TelemetryData) -> bool: + match = False + for metric_info in metric_infos: + # Should only be a single `FilterConjunctionGroupInfo` in `filter_groups` + # but we use a logical OR to match if there is more than one + for group in metric_info.filter_groups: + match = match or _check_filters(group.filters, data) + return match + + +# pylint: disable=unused-argument +def _check_filters(filters: List[FilterInfo], data: _TelemetryData) -> bool: + # # All of the filters need to match for this to return true (and operation). + # for filter in filters: + # # TODO: apply filter logic + # pass + return True + + +# Projections + + +# Initialize metric projections per DerivedMetricInfo +def _init_derived_metric_projection(filter_info: DerivedMetricInfo): + derived_metric_agg_value = 0 + if filter_info.aggregation == AggregationType.MIN: + derived_metric_agg_value = _QUICKPULSE_PROJECTION_MAX_VALUE + elif filter_info.aggregation == AggregationType.MAX: + derived_metric_agg_value = _QUICKPULSE_PROJECTION_MIN_VALUE + elif filter_info.aggregation == AggregationType.SUM: + derived_metric_agg_value = 0 + elif filter_info.aggregation == AggregationType.AVG: + derived_metric_agg_value = 0 + _set_quickpulse_projection_map( + filter_info.id, + AggregationType(filter_info.aggregation), + derived_metric_agg_value, + 0, + ) + + +# Create projections based off of DerivedMetricInfos and current data being processed +def _create_projections(metric_infos: List[DerivedMetricInfo], data: _TelemetryData): + for metric_info in metric_infos: + value = 0 + if metric_info.projection == _QUICKPULSE_PROJECTION_COUNT: + value = 1 + elif metric_info.projection == _QUICKPULSE_PROJECTION_DURATION: + if isinstance(data, (_DependencyData, _RequestData)): + value = data.duration + else: + continue + elif metric_info.projection.startswith(_QUICKPULSE_PROJECTION_CUSTOM): + key = metric_info.projection.split(_QUICKPULSE_PROJECTION_CUSTOM, 1)[1].strip() + dim_value = data.custom_dimensions.get(key, 0) + try: + value = float(dim_value) + except ValueError: + continue + + aggregate: Optional[Tuple[float, int]] = _calculate_aggregation( + AggregationType(metric_info.aggregation), + metric_info.id, + value, + ) + if aggregate: + _set_quickpulse_projection_map( + metric_info.id, + AggregationType(metric_info.aggregation), + aggregate[0], + aggregate[1], + ) + + +# Calculate aggregation based off of previous projection value, aggregation type of a specific metric filter +# Return type is a Tuple of (value, count) +def _calculate_aggregation(aggregation: AggregationType, id: str, value: float) -> Optional[Tuple[float, int]]: + projection: Optional[Tuple[AggregationType, float, int]] = _get_quickpulse_projection_map().get(id) + if projection: + prev_value = projection[1] + prev_count = projection[2] + if aggregation == AggregationType.SUM: + return (prev_value + value, prev_count + 1) + elif aggregation == AggregationType.MIN: + return (min(prev_value, value), prev_count + 1) + elif aggregation == AggregationType.MAX: + return (max(prev_value, value), prev_count + 1) + elif aggregation == AggregationType.AVG: + return (prev_value + value, prev_count + 1) + return None + + +# Gets filtered metrics from projections to be exported +# Called every second on export +def _get_metrics_from_projections() -> List[Tuple[str, float]]: + metrics = [] + projection_map = _get_quickpulse_projection_map() + for id, projection in projection_map.items(): + metric_value = 0 + aggregation_type = projection[0] + if aggregation_type == AggregationType.MIN: + metric_value = 0 if projection[1] == _QUICKPULSE_PROJECTION_MAX_VALUE else projection[1] + elif aggregation_type == AggregationType.MAX: + metric_value = 0 if projection[1] == _QUICKPULSE_PROJECTION_MIN_VALUE else projection[1] + elif aggregation_type == AggregationType.AVG: + metric_value = 0 if projection[2] == 0 else projection[1] / float(projection[2]) + elif aggregation_type == AggregationType.SUM: + metric_value = projection[1] + metrics.append((id, metric_value)) + return metrics # type: ignore diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/logs/_exporter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/logs/_exporter.py index 0b701bfdbafd..9f945ce25b18 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/logs/_exporter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/logs/_exporter.py @@ -163,7 +163,7 @@ def _convert_log_to_envelope(log_data: LogData) -> TelemetryItem: stack=str(stack_trace)[:32768], ) data = TelemetryExceptionData( # type: ignore - severity_level=severity_level, + severity_level=severity_level, # type: ignore properties=properties, exceptions=[exc_details], ) @@ -175,7 +175,7 @@ def _convert_log_to_envelope(log_data: LogData) -> TelemetryItem: # Severity number: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#field-severitynumber data = MessageData( # type: ignore message=_map_body_to_message(log_record.body), - severity_level=severity_level, + severity_level=severity_level, # type: ignore properties=properties, ) envelope.data = MonitorBase(base_data=data, base_type="MessageData") diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_exporter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_exporter.py index 89df5220caa6..de05c433d5d2 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_exporter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_exporter.py @@ -334,23 +334,7 @@ def _convert_span_to_envelope(span: ReadableSpan) -> TelemetryItem: properties={}, ) envelope.data = MonitorBase(base_data=data, base_type="RemoteDependencyData") - target = None - if SpanAttributes.PEER_SERVICE in span.attributes: - target = span.attributes[SpanAttributes.PEER_SERVICE] - else: - if SpanAttributes.NET_PEER_NAME in span.attributes: - target = span.attributes[SpanAttributes.NET_PEER_NAME] - elif SpanAttributes.NET_PEER_IP in span.attributes: - target = span.attributes[SpanAttributes.NET_PEER_IP] - if SpanAttributes.NET_PEER_PORT in span.attributes: - port = span.attributes[SpanAttributes.NET_PEER_PORT] - # TODO: check default port for rpc - # This logic assumes default ports never conflict across dependency types - # type: ignore - if port != trace_utils._get_default_port_http( - str(span.attributes.get(SpanAttributes.HTTP_SCHEME)) - ) and port != trace_utils._get_default_port_db(str(span.attributes.get(SpanAttributes.DB_SYSTEM))): - target = "{}:{}".format(target, port) + target = trace_utils._get_target_for_dependency_from_peer(span.attributes) if span.kind is SpanKind.CLIENT: if _AZURE_SDK_NAMESPACE_NAME in span.attributes: # Azure specific resources # Currently only eventhub and servicebus are supported @@ -362,16 +346,14 @@ def _convert_span_to_envelope(span: ReadableSpan) -> TelemetryItem: if SpanAttributes.HTTP_USER_AGENT in span.attributes: # TODO: Not exposed in Swagger, need to update def envelope.tags["ai.user.userAgent"] = span.attributes[SpanAttributes.HTTP_USER_AGENT] - scheme = trace_utils._get_scheme_for_http_dependency(span.attributes) - url = trace_utils._get_url_for_http_dependency(scheme, span.attributes) + url = trace_utils._get_url_for_http_dependency(span.attributes) # data if url: data.data = url target, path = trace_utils._get_target_and_path_for_http_dependency( + span.attributes, target, # type: ignore url, - scheme, - span.attributes, ) # http specific logic for name if path: diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_utils.py index c0949bbbaf4a..f31b578af4d5 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_utils.py @@ -1,7 +1,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -from typing import Optional, Tuple +from typing import no_type_check, Optional, Tuple from urllib.parse import urlparse from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes @@ -67,7 +67,7 @@ def _get_azure_sdk_target_source(attributes: Attributes) -> Optional[str]: return None -def _get_scheme_for_http_dependency(attributes: Attributes) -> Optional[str]: +def _get_http_scheme(attributes: Attributes) -> Optional[str]: if attributes: scheme = attributes.get(SpanAttributes.HTTP_SCHEME) if scheme: @@ -75,9 +75,12 @@ def _get_scheme_for_http_dependency(attributes: Attributes) -> Optional[str]: return None -def _get_url_for_http_dependency(scheme: Optional[str], attributes: Attributes) -> Optional[str]: +@no_type_check +def _get_url_for_http_dependency(attributes: Attributes, scheme: Optional[str] = None) -> Optional[str]: url = None if attributes: + if not scheme: + scheme = _get_http_scheme(attributes) if SpanAttributes.HTTP_URL in attributes: url = attributes[SpanAttributes.HTTP_URL] elif scheme and SpanAttributes.HTTP_TARGET in attributes: @@ -106,18 +109,44 @@ def _get_url_for_http_dependency(scheme: Optional[str], attributes: Attributes) peer_port, http_target, ) - return str(url) + return url # type: ignore +@no_type_check +def _get_target_for_dependency_from_peer(attributes: Attributes) -> Optional[str]: + target = "" + if attributes: + if SpanAttributes.PEER_SERVICE in attributes: + target = attributes[SpanAttributes.PEER_SERVICE] # type: ignore + else: + if SpanAttributes.NET_PEER_NAME in attributes: + target = attributes[SpanAttributes.NET_PEER_NAME] # type: ignore + elif SpanAttributes.NET_PEER_IP in attributes: + target = attributes[SpanAttributes.NET_PEER_IP] # type: ignore + if SpanAttributes.NET_PEER_PORT in attributes: + port = attributes[SpanAttributes.NET_PEER_PORT] + # TODO: check default port for rpc + # This logic assumes default ports never conflict across dependency types + # type: ignore + if port != _get_default_port_http( + str(attributes.get(SpanAttributes.HTTP_SCHEME)) + ) and port != _get_default_port_db(str(attributes.get(SpanAttributes.DB_SYSTEM))): + target = "{}:{}".format(target, port) + return target # type: ignore + + +@no_type_check def _get_target_and_path_for_http_dependency( + attributes: Attributes, target: Optional[str], url: Optional[str], - scheme: Optional[str], - attributes: Attributes, + scheme: Optional[str] = None, ) -> Tuple[Optional[str], str]: target_from_url = None path = "" if attributes: + if not scheme: + scheme = _get_http_scheme(attributes) if url: try: parse_url = urlparse(url) @@ -143,11 +172,12 @@ def _get_target_and_path_for_http_dependency( target = str(host) except Exception: # pylint: disable=broad-except pass - elif target_from_url: + elif target_from_url and not target: target = target_from_url return (target, path) +@no_type_check def _get_target_for_db_dependency( target: Optional[str], db_system: Optional[str], @@ -156,18 +186,19 @@ def _get_target_for_db_dependency( if attributes: db_name = attributes.get(SpanAttributes.DB_NAME) if db_name: - if target is None: + if not target: target = str(db_name) else: target = "{}|{}".format(target, db_name) - elif target is None: + elif not target: target = db_system return target +@no_type_check def _get_target_for_messaging_dependency(target: Optional[str], attributes: Attributes) -> Optional[str]: if attributes: - if target is None: + if not target: if SpanAttributes.MESSAGING_DESTINATION in attributes: target = str(attributes[SpanAttributes.MESSAGING_DESTINATION]) elif SpanAttributes.MESSAGING_SYSTEM in attributes: @@ -175,9 +206,10 @@ def _get_target_for_messaging_dependency(target: Optional[str], attributes: Attr return target +@no_type_check def _get_target_for_rpc_dependency(target: Optional[str], attributes: Attributes) -> Optional[str]: if attributes: - if target is None: + if not target: if SpanAttributes.RPC_SYSTEM in attributes: target = str(attributes[SpanAttributes.RPC_SYSTEM]) return target diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_exporter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_exporter.py index 2f5df7b29a24..76231aa35c14 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_exporter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_exporter.py @@ -29,7 +29,9 @@ ) from azure.monitor.opentelemetry.exporter._quickpulse._state import ( _get_global_quickpulse_state, + _get_quickpulse_etag, _set_global_quickpulse_state, + _set_quickpulse_etag, _QuickpulseState, ) @@ -165,6 +167,52 @@ def test_export_subscribed_true(self, convert_mock, post_mock): result = self._exporter.export(self._metrics_data, base_monitoring_data_point=self._data_point) self.assertEqual(result, MetricExportResult.SUCCESS) + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._update_filter_configuration") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._generated._client.QuickpulseClient.publish") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._metric_to_quick_pulse_data_points") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._get_quickpulse_etag") + def test_export_subscribed_true_etag_changed(self, etag_mock, convert_mock, post_mock, update_filter_mock): + pipeline_response = mock.Mock() + config = {"test-config": "test-config-value"} + pipeline_response.http_response.content = config + post_response = _Response( + pipeline_response, + None, + { + "x-ms-qps-subscribed": "true", + "x-ms-qps-configuration-etag": "new-etag", + }, + ) + convert_mock.return_value = [self._data_point] + post_mock.return_value = post_response + etag_mock.return_value = "old-etag" + result = self._exporter.export(self._metrics_data, base_monitoring_data_point=self._data_point) + self.assertEqual(result, MetricExportResult.SUCCESS) + update_filter_mock.assert_called_once_with("new-etag", config) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._update_filter_configuration") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._generated._client.QuickpulseClient.publish") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._metric_to_quick_pulse_data_points") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._get_quickpulse_etag") + def test_export_subscribed_true_etag_same(self, etag_mock, convert_mock, post_mock, update_filter_mock): + pipeline_response = mock.Mock() + config = {"test-config": "test-config-value"} + pipeline_response.http_response.content = config + post_response = _Response( + pipeline_response, + None, + { + "x-ms-qps-subscribed": "true", + "x-ms-qps-configuration-etag": "old-etag", + }, + ) + convert_mock.return_value = [self._data_point] + post_mock.return_value = post_response + etag_mock.return_value = "old-etag" + result = self._exporter.export(self._metrics_data, base_monitoring_data_point=self._data_point) + self.assertEqual(result, MetricExportResult.SUCCESS) + update_filter_mock.assert_not_called() + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._generated._client.QuickpulseClient.is_subscribed") def test_ping(self, ping_mock): ping_response = _Response( @@ -186,6 +234,21 @@ def test_ping_exception(self): response = self._exporter._ping(monitoring_data_point=self._data_point) self.assertIsNone(response) + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._generated._client.QuickpulseClient.is_subscribed") + def test_ping_etag_response(self, ping_mock): + ping_response = _Response( + mock.Mock(), + None, + { + "x-ms-qps-subscribed": "false", + "x-ms-qps-configuration-etag": "new-etag", + }, + ) + ping_mock.return_value = ping_response + response = self._exporter._ping(monitoring_data_point=self._data_point) + self.assertEqual(response, ping_response) + self.assertEqual(response._response_headers["x-ms-qps-configuration-etag"], "new-etag") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter.PeriodicTask") def test_quickpulsereader_init(self, task_mock): task_inst_mock = mock.Mock() @@ -226,7 +289,77 @@ def test_quickpulsereader_ticker_ping_true(self, task_mock, ping_mock): self.assertEqual(_get_global_quickpulse_state(), _QuickpulseState.POST_SHORT) self.assertEqual(reader._elapsed_num_seconds, 1) - # TODO: Other ticker cases + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._QuickpulseExporter._ping") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter.PeriodicTask") + def test_quickpulsereader_ticker_ping_false_backoff(self, task_mock, ping_mock): + task_inst_mock = mock.Mock() + task_mock.return_value = task_inst_mock + reader = _QuickpulseMetricReader( + self._exporter, + self._data_point, + ) + _set_global_quickpulse_state(_QuickpulseState.PING_SHORT) + reader._elapsed_num_seconds = 60 + ping_mock.return_value = _Response(None, None, {"x-ms-qps-subscribed": "false"}) + reader._ticker() + ping_mock.assert_called_once_with( + self._data_point, + ) + self.assertEqual(_get_global_quickpulse_state(), _QuickpulseState.PING_LONG) + self.assertEqual(reader._elapsed_num_seconds, _QuickpulseState.PING_LONG.value + 1) + self.assertEqual(_get_quickpulse_etag(), "") + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._QuickpulseExporter._ping") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter.PeriodicTask") + def test_ticker_erroneous_no_response_backoff(self, task_mock, ping_mock): + _set_global_quickpulse_state(_QuickpulseState.PING_SHORT) + _set_quickpulse_etag("old-etag") + task_inst_mock = mock.Mock() + task_mock.return_value = task_inst_mock + reader = _QuickpulseMetricReader( + self._exporter, + self._data_point, + ) + reader._elapsed_num_seconds = 60 + ping_mock.return_value = None + reader._ticker() + self.assertEqual(_get_global_quickpulse_state(), _QuickpulseState.PING_LONG) + self.assertEqual(_get_quickpulse_etag(), "") + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._QuickpulseMetricReader.collect") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter.PeriodicTask") + def test_ticker_post_state(self, task_mock, collect_mock): + _set_global_quickpulse_state(_QuickpulseState.POST_SHORT) + task_inst_mock = mock.Mock() + task_mock.return_value = task_inst_mock + reader = _QuickpulseMetricReader( + self._exporter, + self._data_point, + ) + reader._elapsed_num_seconds = 0 + reader._ticker() + collect_mock.assert_called_once() + self.assertEqual(_get_global_quickpulse_state(), _QuickpulseState.POST_SHORT) + self.assertEqual(reader._elapsed_num_seconds, 1) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._QuickpulseMetricReader.collect") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter.PeriodicTask") + def test_ticker_post_unsuccessful_backoff(self, task_mock, collect_mock): + _set_global_quickpulse_state(_QuickpulseState.POST_SHORT) + task_inst_mock = mock.Mock() + task_mock.return_value = task_inst_mock + reader = _QuickpulseMetricReader( + self._exporter, + self._data_point, + ) + reader._elapsed_num_seconds = 20 + with mock.patch( + "azure.monitor.opentelemetry.exporter._quickpulse._exporter._QuickpulseMetricReader.collect", + throw(_UnsuccessfulQuickPulsePostError), + ): # noqa: E501 + reader._ticker() + self.assertEqual(_get_global_quickpulse_state(), _QuickpulseState.PING_SHORT) + self.assertEqual(reader._elapsed_num_seconds, 1) @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._QuickpulseExporter.export") @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter.PeriodicTask") diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py index 9f7df9a54b00..e4dcc4b4493a 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_live_metrics.py @@ -102,7 +102,7 @@ def test_init(self, generator_mock): "4321abcd-5678-4efa-8abc-1234567890ac", ) self.assertEqual(qpm._base_monitoring_data_point.version, _get_sdk_version()) - self.assertEqual(qpm._base_monitoring_data_point.invariant_version, 1) + self.assertEqual(qpm._base_monitoring_data_point.invariant_version, 5) self.assertEqual( qpm._base_monitoring_data_point.instance, part_a_fields.get(ContextTagKeys.AI_CLOUD_ROLE_INSTANCE, "") ) @@ -260,10 +260,40 @@ def test_record_span_dep_failure(self, post_state_mock, span_doc_mock, append_do qpm._dependency_failure_rate_counter = mock.Mock() qpm._dependency_duration = mock.Mock() qpm._record_span(span_mock) + span_doc_mock.assert_called_once_with(span_mock) append_doc_mock.assert_called_once_with(span_doc) qpm._dependency_failure_rate_counter.add.assert_called_once_with(1) qpm._dependency_duration.record.assert_called_once_with(5 / 1e9) + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._derive_metrics_from_telemetry_data") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._TelemetryData") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._get_quickpulse_derived_metric_infos") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._append_quickpulse_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._get_span_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._is_post_state") + def test_record_span_derive_filter_metrics( + self, post_state_mock, span_doc_mock, append_doc_mock, info_mock, data_mock, derive_mock + ): + post_state_mock.return_value = True + span_doc = mock.Mock() + span_doc_mock.return_value = span_doc + span_mock = mock.Mock() + span_mock.end_time = 10 + span_mock.start_time = 5 + qpm = _QuickpulseManager( + connection_string="InstrumentationKey=4321abcd-5678-4efa-8abc-1234567890ac;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/", + resource=Resource.create(), + ) + info_mock.return_value = {"test": "value"} + data = mock.Mock() + data_mock._from_span.return_value = data + qpm._record_span(span_mock) + info_mock.assert_called_once() + data_mock._from_span.assert_called_once_with(span_mock) + derive_mock.assert_called_once_with(data) + span_doc_mock.assert_called_once_with(span_mock) + append_doc_mock.assert_called_once_with(span_doc) + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._append_quickpulse_document") @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._get_log_record_document") @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._is_post_state") @@ -283,9 +313,39 @@ def test_record_log_exception(self, post_state_mock, log_doc_mock, append_doc_mo ) qpm._exception_rate_counter = mock.Mock() qpm._record_log_record(log_data_mock) + log_doc_mock.assert_called_once_with(log_data_mock) append_doc_mock.assert_called_once_with(log_record_doc) qpm._exception_rate_counter.add.assert_called_once_with(1) + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._derive_metrics_from_telemetry_data") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._TelemetryData") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._get_quickpulse_derived_metric_infos") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._append_quickpulse_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._get_log_record_document") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._is_post_state") + def test_record_log_derive_filter_metrics( + self, post_state_mock, log_doc_mock, append_doc_mock, info_mock, data_mock, derive_mock + ): + post_state_mock.return_value = True + log_record_doc = mock.Mock() + log_record_mock = mock.Mock() + log_doc_mock.return_value = log_record_doc + log_data_mock = mock.Mock() + log_data_mock.log_record = log_record_mock + qpm = _QuickpulseManager( + connection_string="InstrumentationKey=4321abcd-5678-4efa-8abc-1234567890ac;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/", + resource=Resource.create(), + ) + info_mock.return_value = {"test": "value"} + data = mock.Mock() + data_mock._from_log_record.return_value = data + qpm._record_log_record(log_data_mock) + info_mock.assert_called_once() + data_mock._from_log_record.assert_called_once_with(log_record_mock) + derive_mock.assert_called_once_with(data) + log_doc_mock.assert_called_once_with(log_data_mock) + append_doc_mock.assert_called_once_with(log_record_doc) + def test_process_memory(self): with mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics.PROCESS") as process_mock: memory = collections.namedtuple("memory", "rss") diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_utils.py index 33d0c2e6990a..03b3dff71d51 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_utils.py @@ -1,6 +1,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. from datetime import datetime +import json import unittest from unittest import mock @@ -10,20 +11,34 @@ from azure.monitor.opentelemetry.exporter._quickpulse._constants import ( _COMMITTED_BYTES_NAME, - _DocumentIngressDocumentType, + _QUICKPULSE_PROJECTION_MAX_VALUE, ) -from azure.monitor.opentelemetry.exporter._quickpulse._generated.models._models import ( +from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import ( + AggregationType, + DocumentType, Exception, MetricPoint, MonitoringDataPoint, RemoteDependency, Request, + TelemetryType, Trace, ) +from azure.monitor.opentelemetry.exporter._quickpulse._types import ( + _DependencyData, + _RequestData, +) from azure.monitor.opentelemetry.exporter._quickpulse._utils import ( + _calculate_aggregation, + _check_metric_filters, + _create_projections, + _derive_metrics_from_telemetry_data, + _get_metrics_from_projections, _get_span_document, _get_log_record_document, + _init_derived_metric_projection, _metric_to_quick_pulse_data_points, + _update_filter_configuration, ) @@ -108,6 +123,16 @@ def test_metric_to_qp_data_point_num(self, datetime_mock): self.assertEqual(mdp.metrics, [metric_point]) self.assertEqual(mdp.documents, documents) + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._get_metrics_from_projections") + def test_metric_to_qp_data_point_process_filtered_metrics(self, projection_mock): + metric_data = mock.Mock() + metric_data.resource_metrics = [] + projections = [("ID:1234", 5.0)] + projection_mock.return_value = projections + mdp = _metric_to_quick_pulse_data_points(metric_data, self.base_mdp, [])[0] + self.assertEqual(mdp.metrics[0].name, "ID:1234") + self.assertEqual(mdp.metrics[0].value, 5.0) + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._ns_to_iso8601_string") @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._get_url") def test_get_span_document_client(self, url_mock, iso_mock): @@ -124,7 +149,7 @@ def test_get_span_document_client(self, url_mock, iso_mock): iso_mock.return_value = "1000" doc = _get_span_document(span_mock) self.assertTrue(isinstance(doc, RemoteDependency)) - self.assertEqual(doc.document_type, _DocumentIngressDocumentType.RemoteDependency.value) + self.assertEqual(doc.document_type, DocumentType.REMOTE_DEPENDENCY) self.assertEqual(doc.name, "test_span") self.assertEqual(doc.command_name, "test_url") self.assertEqual(doc.result_code, "200") @@ -146,7 +171,7 @@ def test_get_span_document_server(self, url_mock, iso_mock): iso_mock.return_value = "1000" doc = _get_span_document(span_mock) self.assertTrue(isinstance(doc, Request)) - self.assertEqual(doc.document_type, _DocumentIngressDocumentType.Request.value) + self.assertEqual(doc.document_type, DocumentType.REQUEST) self.assertEqual(doc.name, "test_span") self.assertEqual(doc.url, "test_url") self.assertEqual(doc.response_code, "200") @@ -167,7 +192,7 @@ def test_get_span_document_server_grpc_status(self, url_mock, iso_mock): iso_mock.return_value = "1000" doc = _get_span_document(span_mock) self.assertTrue(isinstance(doc, Request)) - self.assertEqual(doc.document_type, _DocumentIngressDocumentType.Request.value) + self.assertEqual(doc.document_type, DocumentType.REQUEST) self.assertEqual(doc.name, "test_span") self.assertEqual(doc.url, "test_url") self.assertEqual(doc.response_code, "400") @@ -183,7 +208,7 @@ def test_get_log_record_document_server_exc(self): log_data.log_record = log_record doc = _get_log_record_document(log_data) self.assertTrue(isinstance(doc, Exception)) - self.assertEqual(doc.document_type, _DocumentIngressDocumentType.Exception.value) + self.assertEqual(doc.document_type, DocumentType.EXCEPTION) self.assertEqual(doc.exception_type, "exc_type") self.assertEqual(doc.exception_message, "exc_message") @@ -195,5 +220,208 @@ def test_get_log_record_document_server_exc(self): log_data.log_record = log_record doc = _get_log_record_document(log_data) self.assertTrue(isinstance(doc, Trace)) - self.assertEqual(doc.document_type, _DocumentIngressDocumentType.Trace.value) + self.assertEqual(doc.document_type, DocumentType.TRACE) self.assertEqual(doc.message, "body") + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._set_quickpulse_etag") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._set_quickpulse_derived_metric_infos") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._init_derived_metric_projection") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils.DerivedMetricInfo") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._clear_quickpulse_projection_map") + def test_update_filter_configuration( + self, clear_mock, dict_mock, init_projection_mock, set_metric_info_mock, etag_mock + ): + etag = "new-etag" + test_config_bytes = '{"Metrics":[{"Id":"94.e4b85108","TelemetryType":"Request","FilterGroups":[{"Filters":[]}],"Projection":"Count()","Aggregation":"Sum","BackEndAggregation":"Sum"}]}'.encode() + test_config_dict = json.loads(test_config_bytes.decode()).get("Metrics")[0] + metric_info_mock = mock.Mock() + metric_info_mock.telemetry_type = TelemetryType.REQUEST + dict_mock.from_dict.return_value = metric_info_mock + _update_filter_configuration(etag, test_config_bytes) + clear_mock.assert_called_once() + dict_mock.from_dict.assert_called_once_with(test_config_dict) + init_projection_mock.assert_called_once_with(metric_info_mock) + metric_infos = {} + metric_infos[TelemetryType.REQUEST] = [metric_info_mock] + set_metric_info_mock.assert_called_once_with(metric_infos) + etag_mock.assert_called_once_with(etag) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._create_projections") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._check_metric_filters") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._get_quickpulse_derived_metric_infos") + def test_derive_metrics_from_telemetry_data(self, get_derived_mock, filter_mock, projection_mock): + metric_infos = [mock.Mock] + get_derived_mock.return_value = { + TelemetryType.DEPENDENCY: metric_infos, + } + data = _DependencyData( + duration=0, + success=True, + name="test", + result_code=200, + target="", + type="", + data="", + custom_dimensions={}, + ) + filter_mock.return_value = True + _derive_metrics_from_telemetry_data(data) + get_derived_mock.assert_called_once() + filter_mock.assert_called_once_with(metric_infos, data) + projection_mock.assert_called_once_with(metric_infos, data) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._create_projections") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._check_metric_filters") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._get_quickpulse_derived_metric_infos") + def test_derive_metrics_from_telemetry_data_filter_false(self, get_derived_mock, filter_mock, projection_mock): + metric_infos = [mock.Mock] + get_derived_mock.return_value = { + TelemetryType.DEPENDENCY: metric_infos, + } + data = _DependencyData( + duration=0, + success=True, + name="test", + result_code=200, + target="", + type="", + data="", + custom_dimensions={}, + ) + filter_mock.return_value = False + _derive_metrics_from_telemetry_data(data) + get_derived_mock.assert_called_once() + filter_mock.assert_called_once_with(metric_infos, data) + projection_mock.assert_not_called() + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._check_filters") + def test_check_metric_filters(self, filter_mock): + metric_info = mock.Mock() + group_mock = mock.Mock() + filters_mock = mock.Mock() + group_mock.filters = filters_mock + metric_info.filter_groups = [group_mock] + filter_mock.return_value = True + data = mock.Mock() + match = _check_metric_filters([metric_info], data) + filter_mock.assert_called_once_with(filters_mock, data) + self.assertTrue(match) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._check_filters") + def test_check_metric_filters_no_match(self, filter_mock): + metric_info = mock.Mock() + group_mock = mock.Mock() + filters_mock = mock.Mock() + group_mock.filters = filters_mock + metric_info.filter_groups = [group_mock] + filter_mock.return_value = False + data = mock.Mock() + match = _check_metric_filters([metric_info], data) + filter_mock.assert_called_once_with(filters_mock, data) + self.assertFalse(match) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._set_quickpulse_projection_map") + def test_init_derived_metric_projection(self, set_map_mock): + filter_mock = mock.Mock() + filter_mock.aggregation = AggregationType.MIN + filter_mock.id = "mock_id" + _init_derived_metric_projection(filter_mock) + set_map_mock.assert_called_once_with("mock_id", AggregationType.MIN, _QUICKPULSE_PROJECTION_MAX_VALUE, 0) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._set_quickpulse_projection_map") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._calculate_aggregation") + def test_create_projections_count(self, aggregation_mock, set_map_mock): + data_mock = mock.Mock() + metric_info = mock.Mock() + metric_info.id = "mock_id" + metric_info.projection = "Count()" + metric_info.aggregation = AggregationType.SUM + aggregation_mock.return_value = (1, 2) + _create_projections([metric_info], data_mock) + aggregation_mock.assert_called_once_with(AggregationType.SUM, "mock_id", 1) + set_map_mock.assert_called_once_with("mock_id", AggregationType.SUM, 1, 2) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._set_quickpulse_projection_map") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._calculate_aggregation") + def test_create_projections_duration(self, aggregation_mock, set_map_mock): + data_mock = _RequestData( + duration=5.0, + success=True, + name="test", + response_code=200, + url="", + custom_dimensions={}, + ) + metric_info = mock.Mock() + metric_info.id = "mock_id" + metric_info.projection = "Duration" + metric_info.aggregation = AggregationType.SUM + aggregation_mock.return_value = (6.0, 2) + _create_projections([metric_info], data_mock) + aggregation_mock.assert_called_once_with(AggregationType.SUM, "mock_id", 5.0) + set_map_mock.assert_called_once_with("mock_id", AggregationType.SUM, 6.0, 2) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._set_quickpulse_projection_map") + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._calculate_aggregation") + def test_create_projections_dimensions(self, aggregation_mock, set_map_mock): + data_mock = mock.Mock() + data_mock.custom_dimensions = { + "test-key": "6.7", + } + metric_info = mock.Mock() + metric_info.id = "mock_id" + metric_info.projection = "CustomDimensions.test-key" + metric_info.aggregation = AggregationType.SUM + aggregation_mock.return_value = (8.2, 2) + _create_projections([metric_info], data_mock) + aggregation_mock.assert_called_once_with(AggregationType.SUM, "mock_id", 6.7) + set_map_mock.assert_called_once_with("mock_id", AggregationType.SUM, 8.2, 2) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._get_quickpulse_projection_map") + def test_calculate_aggregation_sum(self, projection_map_mock): + projection_map_mock.return_value = {"test-id": (AggregationType.SUM, 3.0, 6)} + agg_tuple = _calculate_aggregation(AggregationType.SUM, "test-id", 4.0) + self.assertEqual(agg_tuple, (7.0, 7)) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._get_quickpulse_projection_map") + def test_calculate_aggregation_min(self, projection_map_mock): + projection_map_mock.return_value = {"test-id": (AggregationType.MIN, 3.0, 6)} + agg_tuple = _calculate_aggregation(AggregationType.MIN, "test-id", 4.0) + self.assertEqual(agg_tuple, (3.0, 7)) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._get_quickpulse_projection_map") + def test_calculate_aggregation_max(self, projection_map_mock): + projection_map_mock.return_value = {"test-id": (AggregationType.MAX, 3.0, 6)} + agg_tuple = _calculate_aggregation(AggregationType.MAX, "test-id", 4.0) + self.assertEqual(agg_tuple, (4.0, 7)) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._get_quickpulse_projection_map") + def test_calculate_aggregation_avg(self, projection_map_mock): + projection_map_mock.return_value = {"test-id": (AggregationType.AVG, 3.0, 3)} + agg_tuple = _calculate_aggregation(AggregationType.AVG, "test-id", 5.0) + self.assertEqual(agg_tuple, (8.0, 4)) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._get_quickpulse_projection_map") + def test_calculate_aggregation_none(self, projection_map_mock): + projection_map_mock.return_value = {"test-id": (AggregationType.AVG, 3.0, 3)} + agg_tuple = _calculate_aggregation(AggregationType.AVG, "test-id2", 5.0) + self.assertIsNone(agg_tuple) + + @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._utils._get_quickpulse_projection_map") + def test_get_metrics_from_projections(self, projection_map_mock): + projection_map_mock.return_value = { + "test-id": (AggregationType.MIN, 3.0, 3), + "test-id2": (AggregationType.MAX, 5.0, 4), + "test-id3": (AggregationType.SUM, 2.0, 2), + "test-id4": (AggregationType.AVG, 12.0, 3), + } + metric_tuples = _get_metrics_from_projections() + self.assertEqual( + metric_tuples, + [ + ("test-id", 3.0), + ("test-id2", 5.0), + ("test-id3", 2.0), + ("test-id4", 4.0), + ], + )