Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions src/sentry/spans/consumers/process_segments/convert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from collections.abc import MutableMapping
from typing import Any

from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType
from sentry_protos.snuba.v1.trace_item_pb2 import (
AnyValue,
ArrayValue,
KeyValue,
KeyValueList,
TraceItem,
)

from sentry.spans.consumers.process_segments.types import Span

FIELD_TO_ATTRIBUTE = {
"description": "sentry.raw_description",
"duration_ms": "sentry.duration_ms",
"is_segment": "sentry.is_segment",
"exclusive_time_ms": "sentry.exclusive_time_ms",
Copy link
Member

@jan-auer jan-auer May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exclusive_time_ms was not part of the Relay implementation, but eap_items_span converted this field. @phacops do we need this or should we actually map exclusive_time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the product adapted to reading sentry.exclusive_time_ms (

"span.self_time": "exclusive_time_ms",
) so I'd leave it like this.

"start_timestamp_precise": "sentry.start_timestamp_precise",
"end_timestamp_precise": "sentry.end_timestamp_precise",
"is_remote": "sentry.is_remote",
"parent_span_id": "sentry.parent_span_id",
"profile_id": "sentry.profile_id",
"segment_id": "sentry.segment_id",
"received": "sentry.received",
"origin": "sentry.origin",
"kind": "sentry.kind",
"hash": "sentry.hash",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is written in enrichment. We use this for performance issue detection and used to persist it.

}


def convert_span_to_item(span: Span) -> TraceItem:
attributes: MutableMapping[str, AnyValue] = {} # TODO

client_sample_rate = 1.0
server_sample_rate = 1.0

for k, v in (span.get("data") or {}).items():
attributes[k] = _anyvalue(v)

for k, v in (span.get("measurements") or {}).items():
if k is not None and v is not None:
if k == "client_sample_rate":
client_sample_rate = v["value"]
elif k == "server_sample_rate":
server_sample_rate = v["value"]
else:
attributes[k] = AnyValue(double_value=float(v["value"]))

for k, v in (span.get("sentry_tags") or {}).items():
if v is not None:
if k == "description":
k = "sentry.normalized_description"
else:
k = f"sentry.{k}"

attributes[k] = AnyValue(string_value=str(v))

for k, v in (span.get("tags") or {}).items():
if v is not None:
attributes[k] = AnyValue(string_value=str(v))

for field_name, attribute_name in FIELD_TO_ATTRIBUTE.items():
if value := span.get(field_name):
attributes[attribute_name] = _anyvalue(value)

return TraceItem(
organization_id=span["organization_id"],
project_id=span["project_id"],
trace_id=span["trace_id"],
item_id=int(span["span_id"], 16).to_bytes(16, "little"),
item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN,
timestamp=_timestamp(span["start_timestamp_precise"]),
attributes=attributes,
client_sample_rate=client_sample_rate,
server_sample_rate=server_sample_rate,
retention_days=span["retention_days"],
received=_timestamp(span["received"]),
)


def _anyvalue(value: Any) -> AnyValue:
if isinstance(value, str):
return AnyValue(string_value=value)
elif isinstance(value, bool):
return AnyValue(bool_value=value)
elif isinstance(value, int):
return AnyValue(int_value=value)
elif isinstance(value, float):
return AnyValue(double_value=value)
elif isinstance(value, list):
array_values = [_anyvalue(v) for v in value if v is not None]
return AnyValue(array_value=ArrayValue(values=array_values))
elif isinstance(value, dict):
kv_values = [KeyValue(key=k, value=_anyvalue(v)) for k, v in value.items() if v is not None]
return AnyValue(kvlist_value=KeyValueList(values=kv_values))

raise ValueError(f"Unknown value type: {type(value)}")


def _timestamp(value: float) -> Timestamp:
return Timestamp(
seconds=int(value),
nanos=round((value % 1) * 1_000_000) * 1000,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We map timestamps with full available precision. EAP will truncate those, but we do not do this here to reduce coupling.

)
28 changes: 19 additions & 9 deletions src/sentry/spans/consumers/process_segments/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

from sentry import options
from sentry.conf.types.kafka_definition import Topic
from sentry.spans.consumers.process_segments.convert import convert_span_to_item
from sentry.spans.consumers.process_segments.message import process_segment
from sentry.spans.consumers.process_segments.types import Span
from sentry.utils.arroyo import MultiprocessingPool, run_task_with_multiprocessing
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition

Expand Down Expand Up @@ -48,7 +50,7 @@ def __init__(
self.num_processes = num_processes
self.pool = MultiprocessingPool(num_processes)

topic_definition = get_topic_definition(Topic.SNUBA_SPANS)
topic_definition = get_topic_definition(Topic.SNUBA_ITEMS)
producer_config = get_kafka_producer_cluster_options(topic_definition["cluster"])

# Due to the unfold step that precedes the producer, this pipeline
Expand Down Expand Up @@ -99,24 +101,32 @@ def shutdown(self):
self.pool.close()


def _process_message(message: Message[KafkaPayload]) -> list[bytes]:
def _process_message(message: Message[KafkaPayload]) -> list[KafkaPayload]:
if not options.get("standalone-spans.process-segments-consumer.enable"):
return []

try:
value = message.payload.value
segment = orjson.loads(value)
processed = process_segment(segment["spans"])
return [orjson.dumps(span) for span in processed]
return [_serialize_payload(span) for span in processed]
except Exception:
# TODO: revise error handling
sentry_sdk.capture_exception()
return []


def _unfold_segment(spans: list[bytes]):
return [
Value(KafkaPayload(key=None, value=span, headers=[]), {})
for span in spans
if span is not None
]
def _serialize_payload(span: Span) -> KafkaPayload:
item = convert_span_to_item(span)
return KafkaPayload(
key=None,
value=item.SerializeToString(),
headers=[
("item_type", str(item.item_type).encode("ascii")),
("project_id", str(span["project_id"]).encode("ascii")),
],
)


def _unfold_segment(spans: list[KafkaPayload]):
return [Value(span, {}) for span in spans if span is not None]
3 changes: 2 additions & 1 deletion src/sentry/spans/consumers/process_segments/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import NotRequired
from typing import Any, NotRequired

from sentry_kafka_schemas.schema_types.buffered_segments_v1 import SegmentSpan as UnprocessedSpan

Expand All @@ -17,6 +17,7 @@ class Span(UnprocessedSpan, total=True):
# Missing in schema
start_timestamp_precise: float
end_timestamp_precise: float
data: NotRequired[dict[str, Any]] # currently unused

# Added in enrichment
exclusive_time: float
Expand Down
148 changes: 148 additions & 0 deletions tests/sentry/spans/consumers/process_segments/test_convert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
from typing import cast

from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType
from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue

from sentry.spans.consumers.process_segments.convert import convert_span_to_item
from sentry.spans.consumers.process_segments.types import Span

###############################################
# Test ported from Snuba's `eap_items_span`. #
###############################################

SPAN_KAFKA_MESSAGE = {
"description": "/api/0/relays/projectconfigs/",
"duration_ms": 152,
"exclusive_time_ms": 0.228,
"is_segment": True,
"data": {
"sentry.environment": "development",
"sentry.release": "[email protected]+c45b49caed1e5fcbf70097ab3f434b487c359b6b",
"thread.name": "uWSGIWorker1Core0",
"thread.id": "8522009600",
"sentry.segment.name": "/api/0/relays/projectconfigs/",
"sentry.sdk.name": "sentry.python.django",
"sentry.sdk.version": "2.7.0",
"my.float.field": 101.2,
"my.int.field": 2000,
"my.neg.field": -100,
"my.neg.float.field": -101.2,
"my.true.bool.field": True,
"my.false.bool.field": False,
},
"measurements": {
"num_of_spans": {"value": 50.0},
"client_sample_rate": {"value": 0.1},
"server_sample_rate": {"value": 0.2},
},
"profile_id": "56c7d1401ea14ad7b4ac86de46baebae",
"organization_id": 1,
"origin": "auto.http.django",
"project_id": 1,
"received": 1721319572.877828,
"retention_days": 90,
"segment_id": "8873a98879faf06d",
"sentry_tags": {
"description": "normalized_description",
"category": "http",
"environment": "development",
"op": "http.server",
"platform": "python",
"release": "[email protected]+c45b49caed1e5fcbf70097ab3f434b487c359b6b",
"sdk.name": "sentry.python.django",
"sdk.version": "2.7.0",
"status": "ok",
"status_code": "200",
"thread.id": "8522009600",
"thread.name": "uWSGIWorker1Core0",
"trace.status": "ok",
"transaction": "/api/0/relays/projectconfigs/",
"transaction.method": "POST",
"transaction.op": "http.server",
"user": "ip:127.0.0.1",
},
"span_id": "8873a98879faf06d",
"tags": {
"http.status_code": "200",
"relay_endpoint_version": "3",
"relay_id": "88888888-4444-4444-8444-cccccccccccc",
"relay_no_cache": "False",
"relay_protocol_version": "3",
"relay_use_post_or_schedule": "True",
"relay_use_post_or_schedule_rejected": "version",
"server_name": "D23CXQ4GK2.local",
"spans_over_limit": "False",
},
"trace_id": "d099bf9ad5a143cf8f83a98081d0ed3b",
"start_timestamp_ms": 1721319572616,
"start_timestamp_precise": 1721319572.616648,
"end_timestamp_precise": 1721319572.768806,
}


def test_convert_span_to_item():
# Cast since the above payload does not conform to the strict schema
item = convert_span_to_item(cast(Span, SPAN_KAFKA_MESSAGE))

assert item.organization_id == 1
assert item.project_id == 1
assert item.trace_id == "d099bf9ad5a143cf8f83a98081d0ed3b"
assert item.item_id == b"\x6d\xf0\xfa\x79\x88\xa9\x73\x88\x00\x00\x00\x00\x00\x00\x00\x00"
assert item.item_type == TraceItemType.TRACE_ITEM_TYPE_SPAN
assert item.timestamp == Timestamp(seconds=1721319572, nanos=616648000)
assert item.client_sample_rate == 0.1
assert item.server_sample_rate == 0.2
assert item.retention_days == 90
assert item.received == Timestamp(seconds=1721319572, nanos=877828000)

assert item.attributes == {
"my.false.bool.field": AnyValue(bool_value=False),
"my.true.bool.field": AnyValue(bool_value=True),
"sentry.is_segment": AnyValue(bool_value=True),
"my.float.field": AnyValue(double_value=101.2),
"my.neg.float.field": AnyValue(double_value=-101.2),
"sentry.exclusive_time_ms": AnyValue(double_value=0.228),
"sentry.start_timestamp_precise": AnyValue(double_value=1721319572.616648),
"num_of_spans": AnyValue(double_value=50.0),
"sentry.end_timestamp_precise": AnyValue(double_value=1721319572.768806),
"sentry.duration_ms": AnyValue(int_value=152),
"sentry.received": AnyValue(double_value=1721319572.877828),
"my.int.field": AnyValue(int_value=2000),
"my.neg.field": AnyValue(int_value=-100),
"relay_protocol_version": AnyValue(string_value="3"),
"sentry.raw_description": AnyValue(string_value="/api/0/relays/projectconfigs/"),
"sentry.segment_id": AnyValue(string_value="8873a98879faf06d"),
"sentry.transaction.method": AnyValue(string_value="POST"),
"server_name": AnyValue(string_value="D23CXQ4GK2.local"),
"sentry.status": AnyValue(string_value="ok"),
"relay_endpoint_version": AnyValue(string_value="3"),
"relay_no_cache": AnyValue(string_value="False"),
"relay_use_post_or_schedule": AnyValue(string_value="True"),
"spans_over_limit": AnyValue(string_value="False"),
"sentry.segment.name": AnyValue(string_value="/api/0/relays/projectconfigs/"),
"sentry.status_code": AnyValue(string_value="200"),
"sentry.op": AnyValue(string_value="http.server"),
"sentry.origin": AnyValue(string_value="auto.http.django"),
"sentry.transaction": AnyValue(string_value="/api/0/relays/projectconfigs/"),
"sentry.thread.name": AnyValue(string_value="uWSGIWorker1Core0"),
"sentry.profile_id": AnyValue(string_value="56c7d1401ea14ad7b4ac86de46baebae"),
"thread.id": AnyValue(string_value="8522009600"),
"http.status_code": AnyValue(string_value="200"),
"sentry.release": AnyValue(
string_value="[email protected]+c45b49caed1e5fcbf70097ab3f434b487c359b6b"
),
"sentry.sdk.name": AnyValue(string_value="sentry.python.django"),
"sentry.transaction.op": AnyValue(string_value="http.server"),
"relay_id": AnyValue(string_value="88888888-4444-4444-8444-cccccccccccc"),
"sentry.trace.status": AnyValue(string_value="ok"),
"sentry.category": AnyValue(string_value="http"),
"sentry.environment": AnyValue(string_value="development"),
"sentry.thread.id": AnyValue(string_value="8522009600"),
"sentry.sdk.version": AnyValue(string_value="2.7.0"),
"sentry.platform": AnyValue(string_value="python"),
"sentry.user": AnyValue(string_value="ip:127.0.0.1"),
"relay_use_post_or_schedule_rejected": AnyValue(string_value="version"),
"sentry.normalized_description": AnyValue(string_value="normalized_description"),
"thread.name": AnyValue(string_value="uWSGIWorker1Core0"),
}
13 changes: 10 additions & 3 deletions tests/sentry/spans/consumers/process_segments/test_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
from arroyo.backends.kafka import KafkaPayload
from arroyo.types import BrokerValue, Message, Partition
from arroyo.types import Topic as ArroyoTopic
from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem

from sentry.conf.types.kafka_definition import Topic
from sentry.spans.consumers.process_segments.convert import convert_span_to_item
from sentry.spans.consumers.process_segments.factory import DetectPerformanceIssuesStrategyFactory
from sentry.testutils.helpers.options import override_options
from sentry.utils import json
Expand Down Expand Up @@ -84,7 +86,12 @@ def test_segment_deserialized_correctly(mock_process_segment):
assert mock_process_segment.call_args.args[0] == segment_data["spans"]

assert mock_producer.produce.call_count == 2
assert mock_producer.produce.call_args.args[0] == ArroyoTopic("snuba-spans")
assert mock_producer.produce.call_args.args[0] == ArroyoTopic("snuba-items")

value = mock_producer.produce.call_args.args[1].value
assert json.loads(value) == span_data
payload = mock_producer.produce.call_args.args[1]
span_item = TraceItem.FromString(payload.value)
assert span_item == convert_span_to_item(span_data)

headers = {k: v for k, v in payload.headers}
assert headers["item_type"] == b"1"
assert headers["project_id"] == b"1"
Loading