Skip to content
Prev Previous commit
Next Next commit
feedback
  • Loading branch information
lzchen committed Mar 8, 2024
commit d4ebbfd72d88e62ddfde004baa8489d0b214a832
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
### Features Added

- Add live metrics collection of requests/dependencies/exceptions
([#34141](https://github.com/Azure/azure-sdk-for-python/pull/34141))
([#34673](https://github.com/Azure/azure-sdk-for-python/pull/34673))

### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,7 @@ def enable_live_metrics(**kwargs: Any) -> None:
_QuickpulseManager(kwargs.get('connection_string'), kwargs.get('resource'))


# Used by _QuickpulseSpanProcessor to record live metrics on span record
def record_span(span: ReadableSpan) -> None:
qpm = _QuickpulseManager._instance
if qpm:
qpm._record_span(span)


# Used by _QuickpulseLogRecordProcessor to record live metrics on log data record
def record_log_record(log_data: LogData) -> None:
qpm = _QuickpulseManager._instance
if qpm:
qpm._record_log_record(log_data)


# pylint: disable=protected-access,too-many-instance-attributes
class _QuickpulseManager(metaclass=Singleton):

def __init__(self, connection_string: Optional[str], resource: Optional[Resource]) -> None:
Expand Down Expand Up @@ -132,7 +119,9 @@ def _record_span(self, span: ReadableSpan) -> None:
if _is_post_state():
document = _get_span_document(span)
_append_quickpulse_document(document)
duration_ms = (span.end_time - span.start_time) / 1e9
duration_ms = 0
if span.end_time and span.start_time:
duration_ms = (span.end_time - span.start_time) / 1e9
# TODO: Spec out what "success" is
success = span.status.is_ok

Expand All @@ -147,6 +136,7 @@ def _record_span(self, span: ReadableSpan) -> None:
self._dependency_rate_counter.add(1)
else:
self._dependency_failure_rate_counter.add(1)
self._dependency_duration.record(duration_ms)

def _record_log_record(self, log_data: LogData) -> None:
# Only record if in post state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@

from opentelemetry.sdk._logs import LogData, LogRecordProcessor

from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import record_log_record
from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import _QuickpulseManager


class _QuickpulseLogRecordProcessor(LogRecordProcessor):

def emit(self, log_data: LogData) -> None:
record_log_record(log_data)
qpm = _QuickpulseManager._instance
if qpm:
qpm._record_log_record(log_data)
super().emit(log_data)

def shutdown(self):
super().shutdown()
pass

def force_flush(self, timeout_millis: int = 30000):
super().force_flush(timeout_millis=timeout_millis)
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@

from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor

from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import record_span
from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import _QuickpulseManager


class _QuickpulseSpanProcessor(SpanProcessor):

def on_end(self, span: ReadableSpan) -> None:
record_span(span)
qpm = _QuickpulseManager._instance
if qpm:
qpm._record_span(span)
return super().on_end(span)
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ class _QuickpulseState(Enum):


_GLOBAL_QUICKPULSE_STATE = _QuickpulseState.OFFLINE
_QUICKPULSE_DOCUMENTS = []
_QUICKPULSE_DOCUMENTS: List[DocumentIngress] = []

def _set_global_quickpulse_state(state: _QuickpulseState):
# pylint: disable=global-statement
global _GLOBAL_QUICKPULSE_STATE
_GLOBAL_QUICKPULSE_STATE = state

Expand All @@ -49,6 +50,7 @@ def _is_post_state():


def _append_quickpulse_document(document: DocumentIngress):
# pylint: disable=global-statement,global-variable-not-assigned
global _QUICKPULSE_DOCUMENTS
# Limit risk of memory leak by limiting doc length to something manageable
if len(_QUICKPULSE_DOCUMENTS) > 20:
Expand All @@ -60,6 +62,7 @@ def _append_quickpulse_document(document: DocumentIngress):


def _get_and_clear_quickpulse_documents() -> List[DocumentIngress]:
# pylint: disable=global-statement
global _QUICKPULSE_DOCUMENTS
documents = list(_QUICKPULSE_DOCUMENTS)
_QUICKPULSE_DOCUMENTS = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,21 @@ def _metric_to_quick_pulse_data_points( # pylint: disable=too-many-nested-block
)
]

# mypy: disable-error-code="assignment, union-attr"
def _get_span_document(span: ReadableSpan) -> Union[RemoteDependencyDocument, RequestDocument]:
duration = span.end_time - span.start_time
status_code = span.attributes.get(SpanAttributes.HTTP_STATUS_CODE)
grpc_status_code = span.attributes.get(SpanAttributes.RPC_GRPC_STATUS_CODE)
duration = 0
if span.end_time and span.start_time:
duration = span.end_time - span.start_time
status_code = span.attributes.get(SpanAttributes.HTTP_STATUS_CODE, "")
grpc_status_code = span.attributes.get(SpanAttributes.RPC_GRPC_STATUS_CODE, "")
span_kind = span.kind
url = _get_url(span_kind, span.attributes)
if span_kind in (SpanKind.CLIENT, SpanKind.PRODUCER, SpanKind.INTERNAL):
document = RemoteDependencyDocument(
document_type=_DocumentIngressDocumentType.RemoteDependency,
name=span.name,
command_name=url,
result_code=status_code,
result_code=str(status_code),
duration=_ns_to_iso8601_string(duration),
)
else:
Expand All @@ -93,14 +96,15 @@ def _get_span_document(span: ReadableSpan) -> Union[RemoteDependencyDocument, Re
)
return document

# mypy: disable-error-code="assignment"
def _get_log_record_document(log_data: LogData) -> Union[ExceptionDocument, TraceDocument]:
exc_type = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE)
exc_message = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE)
exc_type = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE, "")
exc_message = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE, "")
if exc_type is not None or exc_message is not None:
document = ExceptionDocument(
document_type=_DocumentIngressDocumentType.Exception,
exception_type=exc_type,
exception_message=exc_message,
exception_type=str(exc_type),
exception_message=str(exc_message),
)
else:
document = TraceDocument(
Expand All @@ -114,22 +118,24 @@ def _get_url(span_kind: SpanKind, attributes: Attributes) -> str:
if not attributes:
return ""
http_method = attributes.get(SpanAttributes.HTTP_METHOD)
if not http_method:
return ""
if http_method:
http_scheme = attributes.get(SpanAttributes.HTTP_SCHEME)
# Client
if span_kind in (SpanKind.CLIENT, SpanKind.PRODUCER):
http_url = attributes.get(SpanAttributes.HTTP_URL)
if http_url:
return str(http_url)
else:
host = attributes.get(SpanAttributes.NET_PEER_NAME)
port = attributes.get(SpanAttributes.NET_PEER_PORT, "")
ip = attributes.get(SpanAttributes.NET_PEER_IP)
if http_scheme:
if host:
return f"{http_scheme}://{host}:{port}"
else:
return f"{http_scheme}://{ip}:{port}"

host = attributes.get(SpanAttributes.NET_PEER_NAME)
port = attributes.get(SpanAttributes.NET_PEER_PORT, "")
ip = attributes.get(SpanAttributes.NET_PEER_IP)
if http_scheme:
if host:
return f"{http_scheme}://{host}:{port}"
else:
return f"{http_scheme}://{ip}:{port}"
else: # Server
host = attributes.get(SpanAttributes.NET_HOST_NAME)
port = attributes.get(SpanAttributes.NET_HOST_PORT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ def test_quickpulsereader_receive_metrics(self, task_mock, export_mock):
self._metrics_data,
timeout_millis=20_000,
base_monitoring_data_point=self._data_point,
documents=[],
)

@mock.patch("azure.monitor.opentelemetry.exporter._quickpulse._exporter._QuickpulseExporter.export")
Expand Down