Skip to content
Prev Previous commit
Next Next commit
exceptions
  • Loading branch information
lzchen committed Mar 5, 2024
commit e2df1e0832d8dab5b8a570cda9a31c86fbfaec91
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,10 @@
from azure.monitor.opentelemetry.exporter._utils import _ticks_since_dot_net_epoch, PeriodicTask


_APPLICATION_INSIGHTS_METRIC_TEMPORALITIES = {
_QUICKPULSE_METRIC_TEMPORALITIES = {
# Use DELTA temporalities because we want to reset the counts every collection interval
Counter: AggregationTemporality.DELTA,
Histogram: AggregationTemporality.DELTA,
ObservableCounter: AggregationTemporality.DELTA,
ObservableGauge: AggregationTemporality.CUMULATIVE,
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
UpDownCounter: AggregationTemporality.CUMULATIVE,
}


Expand Down Expand Up @@ -89,7 +86,7 @@ def __init__(self, connection_string: Optional[str]) -> None:

MetricExporter.__init__(
self,
preferred_temporality=_APPLICATION_INSIGHTS_METRIC_TEMPORALITIES, # type: ignore
preferred_temporality=_QUICKPULSE_METRIC_TEMPORALITIES, # type: ignore
)

def export(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@
import platform
from typing import Any, Optional

from opentelemetry.trace import SpanKind
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.sdk._logs import LogData
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import SpanKind

from azure.monitor.opentelemetry.exporter._generated.models import ContextTagKeys
from azure.monitor.opentelemetry.exporter._quickpulse._constants import (
_DEPENDENCY_DURATION_NAME,
_DEPENDENCY_FAILURE_RATE_NAME,
_DEPENDENCY_RATE_NAME,
_EXCEPTION_RATE_NAME,
_REQUEST_DURATION_NAME,
_REQUEST_FAILURE_RATE_NAME,
_REQUEST_RATE_NAME,
Expand All @@ -29,6 +32,7 @@
_set_global_quickpulse_state,
)
from azure.monitor.opentelemetry.exporter._quickpulse._utils import (
_get_log_record_document,
_get_span_document,
)
from azure.monitor.opentelemetry.exporter._utils import (
Expand All @@ -39,7 +43,7 @@


def enable_live_metrics(**kwargs: Any) -> None:
"""Azure Monitor base exporter for OpenTelemetry.
"""Live metrics entry point.

:keyword str connection_string: The connection string used for your Application Insights resource.
:keyword Resource resource: The OpenTelemetry Resource used for this Python application.
Expand All @@ -48,12 +52,20 @@ def enable_live_metrics(**kwargs: Any) -> None:
_QuickpulseManager(kwargs.get('connection_string'), kwargs.get('resource'))


# Used by _QuickpulseSpanProcessor to record live metrics on span record
def record_span(span: ReadableSpan) -> None:
qpm = _QuickpulseManager._instance
if qpm:
qpm._record_span(span)


# Used by _QuickpulseLogRecordProcessor to record live metrics on log data record
def record_log_record(log_data: LogData) -> None:
qpm = _QuickpulseManager._instance
if qpm:
qpm._record_log_record(log_data)


class _QuickpulseManager(metaclass=Singleton):

def __init__(self, connection_string: Optional[str], resource: Optional[Resource]) -> None:
Expand Down Expand Up @@ -108,13 +120,21 @@ def __init__(self, connection_string: Optional[str], resource: Optional[Resource
"dep/sec",
"live metrics dependency failure rate per second"
)
self._exception_rate_counter = self._meter.create_counter(
_EXCEPTION_RATE_NAME[0],
"exc/sec",
"live metrics exception rate per second"
)

def _record_span(self, span: ReadableSpan):
def _record_span(self, span: ReadableSpan) -> None:
# Only record if in post state
if _is_post_state():
# TODO: Include DocumentIngress in payload
document = _get_span_document(span)
duration_ms = (span.end_time - span.start_time) / 1e9
status_code = str(span.attributes.get(SpanAttributes.HTTP_STATUS_CODE), "")
success = status_code == "200"
# status_code = str(span.attributes.get(SpanAttributes.HTTP_STATUS_CODE), "")
# success = status_code == "200"
success = span.status.is_ok

if span.kind in (SpanKind.SERVER, SpanKind.CONSUMER):
if success:
Expand All @@ -128,6 +148,15 @@ def _record_span(self, span: ReadableSpan):
else:
self._dependency_failure_rate_counter.add(1)



# def record_span_for_quickpulse()
def _record_log_record(self, log_data: LogData) -> None:
# Only record if in post state
if _is_post_state():
if log_data.log_record:
log_record = log_data.log_record
if log_record.attributes:
# TODO: Include DocumentIngress in payload
document = _get_log_record_document(log_data)
exc_type = log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE)
exc_message = log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE)
if exc_type is not None or exc_message is not None:
self._exception_rate_counter.add(1)
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

from opentelemetry.sdk._logs import LogData, LogRecordProcessor

from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import record_log_record


class _QuickpulseLogRecordProcessor(LogRecordProcessor):

def emit(self, log_data: LogData) -> None:
record_log_record(log_data)
super().emit(log_data)

def shutdown(self):
super().shutdown()

def force_flush(self, timeout_millis: int = 30000):
super().force_flush(timeout_millis=timeout_millis)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor

from azure.monitor.opentelemetry.exporter._quickpulse._live_metrics import record_span


class _QuickpulseSpanProcessor(SpanProcessor):

def on_end(self, span: ReadableSpan) -> None:
record_span(span)
return super().on_end(span)
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,29 @@
from datetime import datetime, timedelta, timezone
from typing import List, Optional, Union

from opentelemetry.trace import SpanKind
from opentelemetry.util.types import Attributes
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.sdk._logs import LogData
from opentelemetry.sdk.metrics._internal.point import (
NumberDataPoint,
HistogramDataPoint,
)
from opentelemetry.sdk.metrics.export import MetricsData as OTMetricsData
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import SpanKind
from opentelemetry.util.types import Attributes

from azure.monitor.opentelemetry.exporter._quickpulse._constants import (
_DocumentIngressDocumentType,
_QUICKPULSE_METRIC_NAME_MAPPINGS,
)
from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import (
DocumentIngress,
Exception as ExceptionDocument,
MetricPoint,
MonitoringDataPoint,
RemoteDependency as RemoteDependencyDocument,
Request as RequestDocument,
Trace as TraceDocument,
)
def _metric_to_quick_pulse_data_points( # pylint: disable=too-many-nested-blocks
metrics_data: OTMetricsData,
Expand Down Expand Up @@ -86,6 +90,22 @@ def _get_span_document(span: ReadableSpan) -> Union[RemoteDependencyDocument, Re
)
return document

def _get_log_record_document(log_data: LogData) -> Union[ExceptionDocument, TraceDocument]:
exc_type = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE)
exc_message = log_data.log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE)
if exc_type is not None or exc_message is not None:
document = ExceptionDocument(
document_type=_DocumentIngressDocumentType.Exception,
exception_type=exc_type,
exception_message=exc_message,
)
else:
document = TraceDocument(
document_type=_DocumentIngressDocumentType.Trace,
message=log_data.log_record.body,
)
return document


def _get_url(span_kind: SpanKind, attributes: Attributes) -> str:
if not attributes:
Expand Down