Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
tests
  • Loading branch information
lzchen committed Feb 20, 2024
commit 750e63d42ec9491cce2b76d05bd6fbaa3e6037ed
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
# license information.
# -------------------------------------------------------------------------

from azure.monitor.opentelemetry.exporter._quickpulse._exporter import _QuickpulseExporter
from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import enable_live_metrics

__all__ = [
"_QuickpulseExporter",
"enable_live_metrics",
]
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
_POST_CANCEL_INTERVAL_SECONDS = 20


class Response:
class _Response:
"""Response that encapsulates pipeline response and response headers from
QuickPulse client.
"""
Expand All @@ -68,7 +68,7 @@ def __init__(self, pipeline_response, deserialized, response_headers):
self._response_headers = response_headers


class UnsuccessfulQuickPulsePostError(Exception):
class _UnsuccessfulQuickPulsePostError(Exception):
"""Exception raised to indicate unsuccessful QuickPulse post for backoff logic."""


Expand Down Expand Up @@ -112,7 +112,7 @@ def export(
result = MetricExportResult.SUCCESS
base_monitoring_data_point = kwargs.get("base_monitoring_data_point")
if base_monitoring_data_point is None:
return result
return MetricExportResult.FAILURE
data_points = _metric_to_quick_pulse_data_points(
metrics_data,
base_monitoring_data_point=base_monitoring_data_point,
Expand All @@ -125,15 +125,16 @@ def export(
monitoring_data_points=data_points,
ikey=self._instrumentation_key,
x_ms_qps_transmission_time=_ticks_since_dot_net_epoch(),
cls=Response,
cls=_Response,
)
if not post_response:
# If no response, assume unsuccessful
result = MetricExportResult.FAILURE
header = post_response._response_headers.get("x-ms-qps-subscribed") # pylint: disable=protected-access
if header != "true":
# User leaving the live metrics page will be treated as an unsuccessful
result = MetricExportResult.FAILURE
else:
header = post_response._response_headers.get("x-ms-qps-subscribed") # pylint: disable=protected-access
if header != "true":
# User leaving the live metrics page will be treated as an unsuccessful
result = MetricExportResult.FAILURE
except Exception: # pylint: disable=broad-except,invalid-name
# Errors are not reported and assumed as unsuccessful
result = MetricExportResult.FAILURE
Expand Down Expand Up @@ -171,15 +172,15 @@ def shutdown(
"""


def _ping(self, monitoring_data_point) -> Optional[Response]:
def _ping(self, monitoring_data_point) -> Optional[_Response]:
ping_response = None
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
ping_response = self._client.ping( # type: ignore
monitoring_data_point=monitoring_data_point,
ikey=self._instrumentation_key,
x_ms_qps_transmission_time=_ticks_since_dot_net_epoch(),
cls=Response,
cls=_Response,
)
return ping_response # type: ignore
except HttpResponseError:
Expand Down Expand Up @@ -255,7 +256,7 @@ def _ticker(self) -> None:
print("posting...")
try:
self.collect()
except UnsuccessfulQuickPulsePostError:
except _UnsuccessfulQuickPulsePostError:
# Unsuccessful posts instigate backoff logic
# Backoff after _POST_CANCEL_INTERVAL_SECONDS (20s) of no successful requests
# And resume pinging
Expand All @@ -280,9 +281,9 @@ def _receive_metrics(
)
if result is MetricExportResult.FAILURE:
# There is currently no way to propagate unsuccessful metric post so
# we raise an UnsuccessfulQuickPulsePostError exception. MUST handle
# we raise an _UnsuccessfulQuickPulsePostError exception. MUST handle
# this exception whenever `collect()` is called
raise UnsuccessfulQuickPulsePostError()
raise _UnsuccessfulQuickPulsePostError()

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
self._worker.cancel()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

import unittest
from unittest import mock

from opentelemetry.sdk.metrics.export import (
AggregationTemporality,
Histogram,
MetricExporter,
Metric,
MetricExportResult,
MetricsData as OTMetricsData,
MetricReader,
NumberDataPoint,
ResourceMetrics,
ScopeMetrics,
Sum,
)
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry.sdk.resources import Resource, ResourceAttributes
from azure.monitor.opentelemetry.exporter._quickpulse._generated._client import QuickpulseClient
from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import MonitoringDataPoint
from azure.monitor.opentelemetry.exporter._quickpulse._exporter import (
_metric_to_quick_pulse_data_points,
_QuickpulseExporter,
_QuickpulseMetricReader,
_Response,
)


def throw(exc_type, *args, **kwargs):
def func(*_args, **_kwargs):
raise exc_type(*args, **kwargs)

return func


class TestQuickpulseExporter(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls._resource = Resource.create(
{
ResourceAttributes.SERVICE_INSTANCE_ID: "test_instance",
ResourceAttributes.SERVICE_NAME: "test_service",
}
)
cls._metrics_data = OTMetricsData(
resource_metrics=ResourceMetrics(
resource=cls._resource,
scope_metrics=ScopeMetrics(
scope=InstrumentationScope("test_scope"),
metrics=[
Metric(
name="azureMonitor.memoryCommittedBytes",
description="test_desc",
unit="test_unit",
data=Sum(
data_points=[
NumberDataPoint(
attributes={},
start_time_unix_nano=0,
time_unix_nano=0,
value=5,
)
],
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=True,
)
)
],
schema_url="test_url",
),
schema_url="test_url",
)
)
cls._data_point = MonitoringDataPoint(
version="test_version",
invariant_version=1,
instance="test_instance",
role_name="test_role_name",
machine_name="test_machine_name",
stream_id="test_stream_id",
)
cls._exporter = _QuickpulseExporter(
"InstrumentationKey=4321abcd-5678-4efa-8abc-1234567890ac;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/"
)

# @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._generated._client.QuickpulseClient.__new__")
# def test_init(self, client_mock):
# client_inst_mock = mock.Mock()
# client_mock.return_value = client_inst_mock
# exporter = _QuickpulseExporter(
# connection_string="InstrumentationKey=4321abcd-5678-4efa-8abc-1234567890ab;LiveEndpoint=https://eastus.livediagnostics.monitor.azure.com/"
# )

# self.assertEqual(exporter._live_endpoint, "https://eastus.livediagnostics.monitor.azure.com/")
# self.assertEqual(exporter._instrumentation_key, "4321abcd-5678-4efa-8abc-1234567890ab")
# self.assertEqual(exporter._client, client_inst_mock)
# client_mock.assert_called_with(
# QuickpulseClient,
# host="https://eastus.livediagnostics.monitor.azure.com/"
# )


# def test_export_missing_data_point(self):
# result = self._exporter.export(OTMetricsData(resource_metrics=[]))
# self.assertEqual(result, MetricExportResult.FAILURE)


@mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._generated._client.QuickpulseClient.post")
@mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._metric_to_quick_pulse_data_points")
def test_export_subscribed_false(self, convert_mock, post_mock):
post_response = _Response(
mock.Mock(),
None,
{
"x-ms-qps-subscribed": "false",
}
)
convert_mock.return_value = [self._data_point]
post_mock.return_value = post_response
result = self._exporter.export(
self._metrics_data,
base_monitoring_data_point=self._data_point
)
self.assertEqual(result, MetricExportResult.FAILURE)


@mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._generated._client.QuickpulseClient.post")
@mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._metric_to_quick_pulse_data_points")
def test_export_subscribed_none(self, convert_mock, post_mock):
post_response = None
convert_mock.return_value = [self._data_point]
post_mock.return_value = post_response
result = self._exporter.export(
self._metrics_data,
base_monitoring_data_point=self._data_point
)
self.assertEqual(result, MetricExportResult.FAILURE)

# @mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._metric_to_quick_pulse_data_points")
# def test_export_exception(self, convert_mock):
# post_response = _Response(
# mock.Mock(),
# None,
# {},
# )
# convert_mock.return_value = [self._data_point]
# with mock.patch(
# "azure.monitor.opentelemetry.exporter._quickpulse._generated._client.QuickpulseClient.post",
# throw(Exception),
# ): # noqa: E501
# result = self._exporter.export(
# self._metrics_data,
# base_monitoring_data_point=self._data_point
# )
# self.assertEqual(result, MetricExportResult.FAILURE)

@mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._generated._client.QuickpulseClient.post")
@mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._metric_to_quick_pulse_data_points")
def test_export_subscribed_true(self, convert_mock, post_mock):
post_response = _Response(
mock.Mock(),
None,
{
"x-ms-qps-subscribed": "true",
}
)
convert_mock.return_value = [self._data_point]
post_mock.return_value = post_response
result = self._exporter.export(
self._metrics_data,
base_monitoring_data_point=self._data_point
)
self.assertEqual(result, MetricExportResult.SUCCESS)
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

import platform
import unittest
from unittest import mock

from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.resources import Resource, ResourceAttributes

from azure.monitor.opentelemetry.exporter._generated.models import ContextTagKeys
from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import MonitoringDataPoint
from azure.monitor.opentelemetry.exporter._quickpulse._exporter import (
_QuickpulseExporter,
_QuickpulseMetricReader,
)
from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import (
enable_live_metrics,
_QuickpulseManager,
)
from azure.monitor.opentelemetry.exporter._utils import (
_get_sdk_version,
_populate_part_a_fields,
)


class TestLiveMetrics(unittest.TestCase):

@mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._live_metrics._QuickpulseManager")
def test_enable_live_metrics(self, manager_mock):
mock_resource = mock.Mock()
enable_live_metrics(
connection_string="test_cs",
resource=mock_resource,
)
manager_mock.assert_called_with("test_cs", mock_resource)


class TestQuickpulseManager(unittest.TestCase):

@mock.patch("opentelemetry.sdk.metrics.MeterProvider.__new__")
@mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._QuickpulseMetricReader.__new__")
@mock.patch("opentelemetry.sdk.trace.id_generator.RandomIdGenerator.__new__")
@mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._QuickpulseExporter.__new__")
@mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._generated.models.MonitoringDataPoint.__new__")
def test_init(self, point_mock, exporter_mock, generator_mock, reader_mock, provider_mock):
point_inst_mock = mock.Mock()
point_mock.return_value = point_inst_mock
exporter_inst_mock = mock.Mock()
exporter_mock.return_value = exporter_inst_mock
reader_inst_mock = mock.Mock()
reader_mock.return_value = reader_inst_mock
provider_inst_mock = mock.Mock()
provider_mock.return_value = provider_inst_mock
generator_inst_mock = mock.Mock()
generator_mock.return_value = generator_inst_mock
generator_inst_mock.generate_trace_id.return_value = "test_trace_id"
resource = Resource.create(
{
ResourceAttributes.SERVICE_INSTANCE_ID: "test_instance",
ResourceAttributes.SERVICE_NAME: "test_service",
}
)
part_a_fields = _populate_part_a_fields(resource)
qpm = _QuickpulseManager(
connection_string="test_cs",
resource=resource,
)
self.assertEqual(qpm._base_monitoring_data_point, point_inst_mock)
self.assertEqual(qpm._exporter, exporter_inst_mock)
self.assertEqual(qpm._reader, reader_inst_mock)
self.assertEqual(qpm._meter_provider, provider_inst_mock)
point_mock.assert_called_with(
MonitoringDataPoint,
version=_get_sdk_version(),
invariant_version=1,
instance=part_a_fields.get(ContextTagKeys.AI_CLOUD_ROLE_INSTANCE, ""),
role_name=part_a_fields.get(ContextTagKeys.AI_CLOUD_ROLE, ""),
machine_name=platform.node(),
stream_id="test_trace_id",
)
exporter_mock.assert_called_with(_QuickpulseExporter, "test_cs")
reader_mock.assert_called_with(_QuickpulseMetricReader, exporter_inst_mock, point_inst_mock)
provider_mock.assert_called_with(MeterProvider, [reader_inst_mock])