Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Serialize complex log bodies to json and support gen_ai.system
  • Loading branch information
Liudmila Molkova committed Oct 29, 2024
commit ba71e604a36e8b08689f25825c1453916d28e6ea
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import json
import logging
from typing import Optional, Sequence, Any

Expand Down Expand Up @@ -139,12 +140,10 @@ def _convert_log_to_envelope(log_data: LogData) -> TelemetryItem:

# Event telemetry
if _log_data_is_event(log_data):
if not log_record.body:
log_record.body = "n/a"
_set_statsbeat_custom_events_feature()
envelope.name = 'Microsoft.ApplicationInsights.Event'
data = TelemetryEventData(
name=str(log_record.body)[:32768],
name=_body_to_string(log_record.body),
properties=properties,
)
envelope.data = MonitorBase(base_data=data, base_type="EventData")
Expand All @@ -156,7 +155,7 @@ def _convert_log_to_envelope(log_data: LogData) -> TelemetryItem:
exc_type = "Exception"
# Log body takes priority for message
if log_record.body:
message = str(log_record.body)
message = _body_to_string(log_record.body)
elif exc_message:
message = exc_message # type: ignore
else:
Expand All @@ -175,13 +174,11 @@ def _convert_log_to_envelope(log_data: LogData) -> TelemetryItem:
# pylint: disable=line-too-long
envelope.data = MonitorBase(base_data=data, base_type="ExceptionData")
else: # Message telemetry
if not log_record.body:
log_record.body = "n/a"
envelope.name = _MESSAGE_ENVELOPE_NAME
# pylint: disable=line-too-long
# Severity number: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#field-severitynumber
data = MessageData( # type: ignore
message=str(log_record.body)[:32768],
message=_body_to_string(log_record.body),
severity_level=severity_level,
properties=properties,
)
Expand All @@ -204,6 +201,17 @@ def _get_severity_level(severity_number: Optional[SeverityNumber]):
return 0
return int((severity_number.value - 1) / 4 - 1)

def _body_to_string(log_body: Any) -> str:
if not log_body:
return "n/a"

if isinstance(log_body, str):
return log_body[:32768]

if isinstance(log_body, Exception):
return str(log_body)[:32768]

return json.dumps(log_body)[:32768]

def _is_ignored_attribute(key: str) -> bool:
return key in _IGNORED_ATTRS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from urllib.parse import urlparse

from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes
from opentelemetry.semconv._incubating.attributes import gen_ai_attributes
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
Expand Down Expand Up @@ -425,6 +426,8 @@ def _convert_span_to_envelope(span: ReadableSpan) -> TelemetryItem:
target, # type: ignore
span.attributes,
)
elif gen_ai_attributes.GEN_AI_SYSTEM in span.attributes: # GenAI
data.type = span.attributes[gen_ai_attributes.GEN_AI_SYSTEM]
else:
data.type = "N/A"
elif span.kind is SpanKind.PRODUCER: # Messaging
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import json
import os
import platform
import shutil
Expand Down Expand Up @@ -109,6 +110,24 @@ def setUpClass(cls):
),
InstrumentationScope("test_name"),
)
cls._log_data_complex_body = _logs.LogData(
_logs.LogRecord(
timestamp = 1646865018558419456,
trace_id = 125960616039069540489478540494783893221,
span_id = 2909973987304607650,
severity_text = "WARNING",
trace_flags = None,
severity_number = SeverityNumber.WARN,
body = {"foo": {"bar" : "baz", "qux": 42}},
resource = Resource.create(
attributes={"asd":"test_resource"}
),
attributes={
"test": "attribute"
},
),
InstrumentationScope("test_name"),
)
cls._log_data_event = _logs.LogData(
_logs.LogRecord(
timestamp = 1646865018558419456,
Expand All @@ -126,6 +145,23 @@ def setUpClass(cls):
),
InstrumentationScope("test_name"),
)
cls._log_data_event_complex_body = _logs.LogData(
_logs.LogRecord(
timestamp = 1646865018558419456,
trace_id = 125960616039069540489478540494783893221,
span_id = 2909973987304607650,
severity_text = "INFO",
trace_flags = None,
severity_number = SeverityNumber.INFO,
body = {"foo": {"bar" : "baz", "qux": 42}},
resource = Resource.create(attributes={"asd":"test_resource"}),
attributes={
"event_key": "event_attribute",
_APPLICATION_INSIGHTS_EVENT_MARKER_ATTRIBUTE: True,
},
),
InstrumentationScope("test_name"),
)
cls._exc_data = _logs.LogData(
_logs.LogRecord(
timestamp = 1646865018558419456,
Expand Down Expand Up @@ -350,6 +386,13 @@ def test_log_to_envelope_log_empty(self):
self.assertEqual(envelope.data.base_type, 'MessageData')
self.assertEqual(envelope.data.base_data.message, "n/a")

def test_log_to_envelope_log_complex_body(self):
exporter = self._exporter
envelope = exporter._log_to_envelope(self._log_data_complex_body)
self.assertEqual(envelope.name, 'Microsoft.ApplicationInsights.Message')
self.assertEqual(envelope.data.base_type, 'MessageData')
self.assertEqual(envelope.data.base_data.message, json.dumps(self._log_data_complex_body.log_record.body))

def test_log_to_envelope_exception_with_string_message(self):
exporter = self._exporter
envelope = exporter._log_to_envelope(self._exc_data)
Expand Down Expand Up @@ -420,6 +463,16 @@ def test_log_to_envelope_event(self):
self.assertEqual(envelope.data.base_data.name, record.body)
self.assertEqual(envelope.data.base_data.properties["event_key"], "event_attribute")

def test_log_to_envelope_event_complex_body(self):
exporter = self._exporter
envelope = exporter._log_to_envelope(self._log_data_event_complex_body)
record = self._log_data_event_complex_body.log_record
self.assertEqual(envelope.name, 'Microsoft.ApplicationInsights.Event')
self.assertEqual(envelope.time, ns_to_iso_str(record.timestamp))
self.assertEqual(envelope.data.base_type, 'EventData')
self.assertEqual(envelope.data.base_data.name, json.dumps(record.body))
self.assertEqual(envelope.data.base_data.properties["event_key"], "event_attribute")

def test_log_to_envelope_timestamp(self):
exporter = self._exporter
old_record = self._log_data.log_record
Expand All @@ -429,15 +482,15 @@ def test_log_to_envelope_timestamp(self):
record = self._log_data.log_record
self.assertEqual(envelope.time, ns_to_iso_str(record.observed_timestamp))
self._log_data.log_record = old_record


class TestAzureLogExporterWithDisabledStorage(TestAzureLogExporter):
_exporter_class = partial(AzureMonitorLogExporter, disable_offline_storage=True)

@classmethod
def tearDownClass(cls):
pass

def test_constructor(self):
"""Test the constructor."""
exporter = AzureMonitorLogExporter(
Expand All @@ -449,12 +502,12 @@ def test_constructor(self):
"4321abcd-5678-4efa-8abc-1234567890ab",
)
self.assertEqual(exporter.storage, None)

def test_shutdown(self):
exporter = self._exporter
exporter.shutdown()
self.assertEqual(exporter.storage, None)

def test_export_failure(self):
exporter = self._exporter
with mock.patch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def test_span_to_envelope_client_http(self):
span.end(end_time=end_time)
span._status = Status(status_code=StatusCode.OK)
envelope = exporter._span_to_envelope(span)

self.assertEqual(
envelope.name, "Microsoft.ApplicationInsights.RemoteDependency"
)
Expand Down Expand Up @@ -489,7 +489,7 @@ def test_span_to_envelope_client_db(self):
span.end(end_time=end_time)
span._status = Status(status_code=StatusCode.OK)
envelope = exporter._span_to_envelope(span)

self.assertEqual(
envelope.name, "Microsoft.ApplicationInsights.RemoteDependency"
)
Expand Down Expand Up @@ -609,7 +609,7 @@ def test_span_to_envelope_client_rpc(self):
span.end(end_time=end_time)
span._status = Status(status_code=StatusCode.OK)
envelope = exporter._span_to_envelope(span)

self.assertEqual(
envelope.name, "Microsoft.ApplicationInsights.RemoteDependency"
)
Expand All @@ -624,7 +624,7 @@ def test_span_to_envelope_client_rpc(self):
self.assertEqual(envelope.data.base_data.type, "rpc.system")
self.assertEqual(envelope.data.base_data.target, "service")
self.assertEqual(len(envelope.data.base_data.properties), 0)

# target
span._attributes = {
"rpc.system": "rpc",
Expand Down Expand Up @@ -656,7 +656,7 @@ def test_span_to_envelope_client_messaging(self):
span.end(end_time=end_time)
span._status = Status(status_code=StatusCode.OK)
envelope = exporter._span_to_envelope(span)

self.assertEqual(
envelope.name, "Microsoft.ApplicationInsights.RemoteDependency"
)
Expand All @@ -671,14 +671,51 @@ def test_span_to_envelope_client_messaging(self):
self.assertEqual(envelope.data.base_data.type, "messaging")
self.assertEqual(envelope.data.base_data.target, "celery")
self.assertEqual(len(envelope.data.base_data.properties), 0)

# target
span._attributes = {
"messaging.system": "messaging",
}
envelope = exporter._span_to_envelope(span)
self.assertEqual(envelope.data.base_data.target, "messaging")

def test_span_to_envelope_client_gen_ai(self):
exporter = self._exporter
start_time = 1575494316027613500
end_time = start_time + 1001000000

# SpanKind.CLIENT messaging
span = trace._Span(
name="test",
context=SpanContext(
trace_id=36873507687745823477771305566750195431,
span_id=12030755672171557337,
is_remote=False,
),
attributes={
"gen_ai.system": "az.ai.inference",
},
kind=SpanKind.CLIENT,
)
span.start(start_time=start_time)
span.end(end_time=end_time)
span._status = Status(status_code=StatusCode.UNSET)
envelope = exporter._span_to_envelope(span)

self.assertEqual(
envelope.name, "Microsoft.ApplicationInsights.RemoteDependency"
)
self.assertEqual(envelope.time, "2019-12-04T21:18:36.027613Z")
self.assertEqual(envelope.data.base_data.name, "test")
self.assertEqual(envelope.data.base_data.id, "a6f5d48acb4d31d9")
self.assertEqual(envelope.data.base_data.duration, "0.00:00:01.001")
self.assertTrue(envelope.data.base_data.success)
self.assertEqual(envelope.data.base_data.result_code, "0")

self.assertEqual(envelope.data.base_type, "RemoteDependencyData")
self.assertEqual(envelope.data.base_data.type, "az.ai.inference")
self.assertEqual(len(envelope.data.base_data.properties), 1)

def test_span_to_envelope_client_azure(self):
exporter = self._exporter
start_time = 1575494316027613500
Expand All @@ -703,7 +740,7 @@ def test_span_to_envelope_client_azure(self):
span.end(end_time=end_time)
span._status = Status(status_code=StatusCode.OK)
envelope = exporter._span_to_envelope(span)

self.assertEqual(
envelope.name, "Microsoft.ApplicationInsights.RemoteDependency"
)
Expand All @@ -718,7 +755,7 @@ def test_span_to_envelope_client_azure(self):
self.assertEqual(envelope.data.base_data.type, "Microsoft.EventHub")
self.assertEqual(envelope.data.base_data.target, "test_address/test_destination")
self.assertEqual(len(envelope.data.base_data.properties), 2)

# target
span._attributes = {
"messaging.system": "messaging",
Expand Down Expand Up @@ -749,7 +786,7 @@ def test_span_to_envelope_producer_messaging(self):
span.end(end_time=end_time)
span._status = Status(status_code=StatusCode.OK)
envelope = exporter._span_to_envelope(span)

self.assertEqual(
envelope.name, "Microsoft.ApplicationInsights.RemoteDependency"
)
Expand Down Expand Up @@ -806,7 +843,7 @@ def test_span_to_envelope_internal(self):
span.end(end_time=end_time)
span._status = Status(status_code=StatusCode.OK)
envelope = exporter._span_to_envelope(span)

self.assertEqual(
envelope.name, "Microsoft.ApplicationInsights.RemoteDependency"
)
Expand Down Expand Up @@ -967,7 +1004,7 @@ def test_span_envelope_server_http(self):
self.assertEqual(envelope.tags[ContextTagKeys.AI_LOCATION_IP], "client_ip")
self.assertEqual(envelope.data.base_data.url, "https://www.wikipedia.org/wiki/Rabbit")
self.assertEqual(len(envelope.data.base_data.properties), 0)

# success
span._attributes = {
"http.method": "GET",
Expand Down Expand Up @@ -1080,7 +1117,7 @@ def test_span_envelope_server_messaging(self):
self.assertEqual(envelope.data.base_data.id, "a6f5d48acb4d31d9")
self.assertEqual(envelope.data.base_data.duration, "0.00:00:01.001")
self.assertTrue(envelope.data.base_data.success)

self.assertEqual(envelope.tags[ContextTagKeys.AI_LOCATION_IP], "127.0.0.1")
self.assertEqual(envelope.data.base_data.source, "test name/celery")
self.assertEqual(len(envelope.data.base_data.properties), 0)
Expand Down Expand Up @@ -1281,7 +1318,7 @@ def test_span_events_to_envelopes_exception(self):
span.end()
span._status = Status(status_code=StatusCode.OK)
envelopes = exporter._span_events_to_envelopes(span)

self.assertEqual(len(envelopes), 1)
envelope = envelopes[0]
self.assertEqual(
Expand Down Expand Up @@ -1332,7 +1369,7 @@ def test_span_events_to_envelopes_message(self):
span.end()
span._status = Status(status_code=StatusCode.OK)
envelopes = exporter._span_events_to_envelopes(span)

self.assertEqual(len(envelopes), 1)
envelope = envelopes[0]
self.assertEqual(
Expand Down Expand Up @@ -1384,7 +1421,7 @@ def test_span_events_to_envelopes_sample_rate(self):
span.end()
span._status = Status(status_code=StatusCode.OK)
envelopes = exporter._span_events_to_envelopes(span)

self.assertEqual(len(envelopes), 1)
envelope = envelopes[0]
self.assertEqual(
Expand Down Expand Up @@ -1468,7 +1505,7 @@ def test_get_otel_resource_envelope(self):
self.assertEqual(metric_name, "Microsoft.ApplicationInsights.Metric")
instrumentation_key = envelope.instrumentation_key
self.assertEqual(instrumentation_key, exporter._instrumentation_key)

monitor_base = envelope.data
self.assertEqual(monitor_base.base_type, "MetricData")
metrics_data = monitor_base.base_data
Expand Down