diff --git a/batch_metrics.go b/batch_metrics.go new file mode 100644 index 000000000..8c2902f3d --- /dev/null +++ b/batch_metrics.go @@ -0,0 +1,127 @@ +package sentry + +import ( + "context" + "sync" + "time" +) + +type BatchMeter struct { + client *Client + metricsCh chan Metric + flushCh chan chan struct{} + cancel context.CancelFunc + wg sync.WaitGroup + startOnce sync.Once + shutdownOnce sync.Once +} + +func NewBatchMeter(client *Client) *BatchMeter { + return &BatchMeter{ + client: client, + metricsCh: make(chan Metric, batchSize), + flushCh: make(chan chan struct{}), + } +} + +func (m *BatchMeter) Start() { + m.startOnce.Do(func() { + ctx, cancel := context.WithCancel(context.Background()) + m.cancel = cancel + m.wg.Add(1) + go m.run(ctx) + }) +} + +func (m *BatchMeter) Flush(timeout <-chan struct{}) { + done := make(chan struct{}) + select { + case m.flushCh <- done: + select { + case <-done: + case <-timeout: + } + case <-timeout: + } +} + +func (m *BatchMeter) Shutdown() { + m.shutdownOnce.Do(func() { + if m.cancel != nil { + m.cancel() + m.wg.Wait() + } + }) +} + +func (m *BatchMeter) run(ctx context.Context) { + defer m.wg.Done() + var metrics []Metric + timer := time.NewTimer(batchTimeout) + defer timer.Stop() + + for { + select { + case metric := <-m.metricsCh: + metrics = append(metrics, metric) + if len(metrics) >= batchSize { + m.processEvent(metrics) + metrics = nil + if !timer.Stop() { + <-timer.C + } + timer.Reset(batchTimeout) + } + case <-timer.C: + if len(metrics) > 0 { + m.processEvent(metrics) + metrics = nil + } + timer.Reset(batchTimeout) + case done := <-m.flushCh: + flushDrain: + for { + select { + case metric := <-m.metricsCh: + metrics = append(metrics, metric) + default: + break flushDrain + } + } + + if len(metrics) > 0 { + m.processEvent(metrics) + metrics = nil + } + if !timer.Stop() { + <-timer.C + } + timer.Reset(batchTimeout) + close(done) + case <-ctx.Done(): + drain: + for { + select { + case metric := <-m.metricsCh: + metrics = append(metrics, metric) + default: + break drain + } + } + + if len(metrics) > 0 { + m.processEvent(metrics) + } + return + } + } +} + +func (m *BatchMeter) processEvent(metrics []Metric) { + event := NewEvent() + event.Timestamp = time.Now() + event.EventID = EventID(uuid()) + event.Type = traceMetricEvent.Type + event.Metrics = metrics + m.client.Transport.SendEvent(event) +} diff --git a/client.go b/client.go index 346230223..c26b44a00 100644 --- a/client.go +++ b/client.go @@ -234,6 +234,9 @@ type ClientOptions struct { Tags map[string]string // EnableLogs controls when logs should be emitted. EnableLogs bool + // ExperimentalEnableTraceMetric controls when trace metrics should be emitted. + // This is an experimental feature that is subject to change. + ExperimentalEnableTraceMetric bool // TraceIgnoreStatusCodes is a list of HTTP status codes that should not be traced. // Each element can be either: // - A single-element slice [code] for a specific status code @@ -265,6 +268,7 @@ type Client struct { // not supported, create a new client instead. Transport Transport batchLogger *BatchLogger + batchMeter *BatchMeter } // NewClient creates and returns an instance of Client configured using @@ -369,6 +373,11 @@ func NewClient(options ClientOptions) (*Client, error) { client.batchLogger.Start() } + if options.ExperimentalEnableTraceMetric { + client.batchMeter = NewBatchMeter(&client) + client.batchMeter.Start() + } + client.setupTransport() client.setupIntegrations() @@ -531,7 +540,7 @@ func (client *Client) RecoverWithContext( // the network synchronously, configure it to use the HTTPSyncTransport in the // call to Init. func (client *Client) Flush(timeout time.Duration) bool { - if client.batchLogger != nil { + if client.batchLogger != nil || client.batchMeter != nil { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() return client.FlushWithContext(ctx) @@ -555,6 +564,9 @@ func (client *Client) FlushWithContext(ctx context.Context) bool { if client.batchLogger != nil { client.batchLogger.Flush(ctx.Done()) } + if client.batchMeter != nil { + client.batchMeter.Flush(ctx.Done()) + } return client.Transport.FlushWithContext(ctx) } diff --git a/interfaces.go b/interfaces.go index ea2f8d883..906333be4 100644 --- a/interfaces.go +++ b/interfaces.go @@ -26,6 +26,14 @@ var logEvent = struct { "application/vnd.sentry.items.log+json", } +var traceMetricEvent = struct { + Type string + ContentType string +}{ + "trace_metric", + "application/vnd.sentry.items.trace-metric+json", +} + // Level marks the severity of the event. type Level string @@ -141,6 +149,26 @@ type LogEntry interface { Emitf(format string, args ...interface{}) } +type MeterOptions struct { + // Attributes are key/value pairs that will be added to the metric. + // The attributes set here will take precedence over the attributes + // set from the Meter. + Attributes []attribute.Builder + // The unit of measurements, for "gauge" and "distribution" metrics. + Unit string +} + +type Meter interface { + // GetCtx returns the [context.Context] set on the meter. + GetCtx() context.Context + // SetAttributes allows attaching parameters to the meter using the attribute API. + SetAttributes(...attribute.Builder) + Count(name string, count int64, options MeterOptions) + Gauge(name string, value int64, options MeterOptions) + FGauge(name string, value float64, options MeterOptions) + Distribution(name string, sample float64, options MeterOptions) +} + // Attachment allows associating files with your events to aid in investigation. // An event may contain one or more attachments. type Attachment struct { @@ -395,14 +423,42 @@ type Event struct { CheckIn *CheckIn `json:"check_in,omitempty"` MonitorConfig *MonitorConfig `json:"monitor_config,omitempty"` + Items *sentryItems `json:"items,omitempty"` // The fields below are only relevant for logs - Logs []Log `json:"items,omitempty"` + Logs []Log `json:"-"` + + // The fields below are only relevant for metrics + Metrics []Metric `json:"-"` // The fields below are not part of the final JSON payload. sdkMetaData SDKMetaData } +// sentryItems is a wrapper for the Items field of the Event struct. +// It is used to prevent the empty interface from being marshaled. +type sentryItems struct { + valid bool + t string + metrics []Metric + logs []Log +} + +func (items *sentryItems) MarshalJSON() ([]byte, error) { + if !items.valid { + return nil, nil + } + + switch items.t { + case traceMetricEvent.Type: + return json.Marshal(items.metrics) + case logEvent.Type: + return json.Marshal(items.logs) + default: + return nil, nil + } +} + // SetException appends the unwrapped errors to the event's exception list. // // maxErrorDepth is the maximum depth of the error chain we will look @@ -480,6 +536,8 @@ func (e *Event) ToEnvelopeWithTime(dsn *protocol.Dsn, sentAt time.Time) (*protoc mainItem = protocol.NewEnvelopeItem(protocol.EnvelopeItemTypeCheckIn, eventBody) case logEvent.Type: mainItem = protocol.NewLogItem(len(e.Logs), eventBody) + case traceMetricEvent.Type: + mainItem = protocol.NewTraceMetricsItem(len(e.Metrics), eventBody) default: mainItem = protocol.NewEnvelopeItem(protocol.EnvelopeItemTypeEvent, eventBody) } @@ -515,6 +573,25 @@ func (e *Event) MarshalJSON() ([]byte, error) { if e.Type == checkInType { return e.checkInMarshalJSON() } + + // HACK: Logs & metrics uses the same JSON key. This is not possible in Go. + // Since metrics is experimental, we'll try to prioritize logs. + if e.Logs != nil { + e.Items = &sentryItems{ + valid: true, + t: logEvent.Type, + metrics: nil, + logs: e.Logs, + } + } else if e.Metrics != nil { + e.Items = &sentryItems{ + valid: true, + t: traceMetricEvent.Type, + metrics: e.Metrics, + logs: nil, + } + } + return e.defaultMarshalJSON() } @@ -623,6 +700,8 @@ func (e *Event) toCategory() ratelimit.Category { return ratelimit.CategoryLog case checkInType: return ratelimit.CategoryMonitor + case traceMetricEvent.Type: + return ratelimit.CategoryTraceMetric default: return ratelimit.CategoryUnknown } @@ -681,3 +760,22 @@ type Attribute struct { Value any `json:"value"` Type AttrType `json:"type"` } + +type MetricType string + +const ( + MetricTypeInvalid MetricType = "" + MetricTypeCounter MetricType = "counter" + MetricTypeGauge MetricType = "gauge" + MetricTypeDistribution MetricType = "distribution" +) + +type Metric struct { + Timestamp time.Time `json:"timestamp"` + TraceID TraceID `json:"trace_id,omitempty"` + Type MetricType `json:"type"` + Name string `json:"name,omitempty"` + Value float64 `json:"value"` + Unit string `json:"unit,omitempty"` + Attributes map[string]Attribute `json:"attributes,omitempty"` +} diff --git a/internal/protocol/envelope.go b/internal/protocol/envelope.go index 65e305caf..94f817513 100644 --- a/internal/protocol/envelope.go +++ b/internal/protocol/envelope.go @@ -41,11 +41,12 @@ type EnvelopeItemType string // Constants for envelope item types as defined in the Sentry documentation. const ( - EnvelopeItemTypeEvent EnvelopeItemType = "event" - EnvelopeItemTypeTransaction EnvelopeItemType = "transaction" - EnvelopeItemTypeCheckIn EnvelopeItemType = "check_in" - EnvelopeItemTypeAttachment EnvelopeItemType = "attachment" - EnvelopeItemTypeLog EnvelopeItemType = "log" + EnvelopeItemTypeEvent EnvelopeItemType = "event" + EnvelopeItemTypeTransaction EnvelopeItemType = "transaction" + EnvelopeItemTypeCheckIn EnvelopeItemType = "check_in" + EnvelopeItemTypeAttachment EnvelopeItemType = "attachment" + EnvelopeItemTypeLog EnvelopeItemType = "log" + EnvelopeItemTypeTraceMetrics EnvelopeItemType = "trace_metric" ) // EnvelopeItemHeader represents the header of an envelope item. @@ -211,3 +212,16 @@ func NewLogItem(itemCount int, payload []byte) *EnvelopeItem { Payload: payload, } } + +func NewTraceMetricsItem(itemCount int, payload []byte) *EnvelopeItem { + length := len(payload) + return &EnvelopeItem{ + Header: &EnvelopeItemHeader{ + Type: EnvelopeItemTypeTraceMetrics, + Length: &length, + ItemCount: &itemCount, + ContentType: "application/vnd.sentry.items.trace-metric+json", + }, + Payload: payload, + } +} diff --git a/internal/ratelimit/category.go b/internal/ratelimit/category.go index 971cba738..d0143c857 100644 --- a/internal/ratelimit/category.go +++ b/internal/ratelimit/category.go @@ -8,7 +8,7 @@ import ( ) // Reference: -// https://github.com/getsentry/relay/blob/0424a2e017d193a93918053c90cdae9472d164bf/relay-common/src/constants.rs#L116-L127 +// https://github.com/getsentry/relay/blob/46dfaa850b8717a6e22c3e9a275ba17fe673b9da/relay-base-schema/src/data_category.rs#L231-L271 // Category classifies supported payload types that can be ingested by Sentry // and, therefore, rate limited. @@ -22,6 +22,7 @@ const ( CategoryTransaction Category = "transaction" CategoryLog Category = "log_item" CategoryMonitor Category = "monitor" + CategoryTraceMetric Category = "trace_metric" ) // knownCategories is the set of currently known categories. Other categories @@ -32,6 +33,7 @@ var knownCategories = map[Category]struct{}{ CategoryTransaction: {}, CategoryLog: {}, CategoryMonitor: {}, + CategoryTraceMetric: {}, } // String returns the category formatted for debugging. @@ -47,6 +49,8 @@ func (c Category) String() string { return "CategoryLog" case CategoryMonitor: return "CategoryMonitor" + case CategoryTraceMetric: + return "CategoryTraceMetric" default: // For unknown categories, use the original formatting logic caser := cases.Title(language.English) @@ -95,6 +99,8 @@ func (c Category) GetPriority() Priority { return PriorityHigh case CategoryLog: return PriorityMedium + case CategoryTraceMetric: + return PriorityMedium case CategoryTransaction: return PriorityLow default: diff --git a/internal/ratelimit/category_test.go b/internal/ratelimit/category_test.go index 8d43765f6..1d964b03f 100644 --- a/internal/ratelimit/category_test.go +++ b/internal/ratelimit/category_test.go @@ -14,6 +14,7 @@ func TestCategory_String(t *testing.T) { {CategoryTransaction, "CategoryTransaction"}, {CategoryMonitor, "CategoryMonitor"}, {CategoryLog, "CategoryLog"}, + {CategoryTraceMetric, "CategoryTraceMetric"}, {Category("custom type"), "CategoryCustomType"}, {Category("multi word type"), "CategoryMultiWordType"}, } @@ -35,6 +36,7 @@ func TestKnownCategories(t *testing.T) { CategoryTransaction, CategoryMonitor, CategoryLog, + CategoryTraceMetric, } for _, category := range expectedCategories { diff --git a/metrics.go b/metrics.go new file mode 100644 index 000000000..31392b595 --- /dev/null +++ b/metrics.go @@ -0,0 +1,268 @@ +package sentry + +import ( + "context" + "os" + "sync" + "time" + + "github.com/getsentry/sentry-go/attribute" + "github.com/getsentry/sentry-go/internal/debuglog" +) + +func NewMeter(ctx context.Context) Meter { + var hub *Hub + hub = GetHubFromContext(ctx) + if hub == nil { + hub = CurrentHub() + } + + client := hub.Client() + if client != nil && client.batchMeter != nil { + return &sentryMeter{ + ctx: ctx, + client: client, + attributes: make(map[string]Attribute), + mu: sync.RWMutex{}, + } + } + + return &noopMeter{} +} + +type sentryMeter struct { + ctx context.Context + client *Client + attributes map[string]Attribute + mu sync.RWMutex +} + +var _ Meter = (*sentryMeter)(nil) + +func (s *sentryMeter) emit(ctx context.Context, metricType MetricType, name string, value float64, unit string, attributes map[string]Attribute) { + if name == "" { + return + } + + hub := GetHubFromContext(ctx) + if hub == nil { + hub = CurrentHub() + } + + var traceID TraceID + var spanID SpanID + var span *Span + var user User + + scope := hub.Scope() + if scope != nil { + scope.mu.Lock() + span = scope.span + if span != nil { + traceID = span.TraceID + spanID = span.SpanID + } else { + traceID = scope.propagationContext.TraceID + } + user = scope.user + scope.mu.Unlock() + } + + attrs := map[string]Attribute{} + s.mu.RLock() + for k, v := range s.attributes { + attrs[k] = v + } + s.mu.RUnlock() + + for k, v := range attributes { + attrs[k] = v + } + + // Set default attributes + if release := s.client.options.Release; release != "" { + attrs["sentry.release"] = Attribute{Value: release, Type: AttributeString} + } + if environment := s.client.options.Environment; environment != "" { + attrs["sentry.environment"] = Attribute{Value: environment, Type: AttributeString} + } + if serverName := s.client.options.ServerName; serverName != "" { + attrs["sentry.server.address"] = Attribute{Value: serverName, Type: AttributeString} + } else if serverAddr, err := os.Hostname(); err == nil { + attrs["sentry.server.address"] = Attribute{Value: serverAddr, Type: AttributeString} + } + + if !user.IsEmpty() { + if user.ID != "" { + attrs["user.id"] = Attribute{Value: user.ID, Type: AttributeString} + } + if user.Name != "" { + attrs["user.name"] = Attribute{Value: user.Name, Type: AttributeString} + } + if user.Email != "" { + attrs["user.email"] = Attribute{Value: user.Email, Type: AttributeString} + } + } + if span != nil { + attrs["sentry.trace.parent_span_id"] = Attribute{Value: spanID.String(), Type: AttributeString} + } + if sdkIdentifier := s.client.sdkIdentifier; sdkIdentifier != "" { + attrs["sentry.sdk.name"] = Attribute{Value: sdkIdentifier, Type: AttributeString} + } + if sdkVersion := s.client.sdkVersion; sdkVersion != "" { + attrs["sentry.sdk.version"] = Attribute{Value: sdkVersion, Type: AttributeString} + } + + metric := &Metric{ + Timestamp: time.Now(), + TraceID: traceID, + Type: metricType, + Name: name, + Value: value, + Unit: unit, + Attributes: attrs, + } + s.client.batchMeter.metricsCh <- *metric + + if s.client.options.Debug { + debuglog.Printf("Metric %s [%s]: %f %s", metricType, name, value, unit) + } +} + +// Count implements Meter. +func (s *sentryMeter) Count(name string, count int64, options MeterOptions) { + // count can be negative, but if it's 0, then don't send anything + if count == 0 { + return + } + + attrs := make(map[string]Attribute) + if options.Attributes != nil { + for _, attr := range options.Attributes { + t, ok := mapTypesToStr[attr.Value.Type()] + if !ok || t == "" { + debuglog.Printf("invalid attribute type set: %v", t) + continue + } + attrs[attr.Key] = Attribute{Value: attr.Value.AsInterface(), Type: t} + } + } + + s.emit(s.ctx, MetricTypeCounter, name, float64(count), "", attrs) +} + +// Distribution implements Meter. +func (s *sentryMeter) Distribution(name string, sample float64, options MeterOptions) { + if sample == 0 { + return + } + + attrs := make(map[string]Attribute) + if options.Attributes != nil { + for _, attr := range options.Attributes { + t, ok := mapTypesToStr[attr.Value.Type()] + if !ok || t == "" { + debuglog.Printf("invalid attribute type set: %v", t) + continue + } + attrs[attr.Key] = Attribute{Value: attr.Value.AsInterface(), Type: t} + } + } + + s.emit(s.ctx, MetricTypeDistribution, name, sample, options.Unit, attrs) +} + +// FGauge implements Meter. +func (s *sentryMeter) FGauge(name string, value float64, options MeterOptions) { + if value == 0 { + return + } + + attrs := make(map[string]Attribute) + if options.Attributes != nil { + for _, attr := range options.Attributes { + t, ok := mapTypesToStr[attr.Value.Type()] + if !ok || t == "" { + debuglog.Printf("invalid attribute type set: %v", t) + continue + } + attrs[attr.Key] = Attribute{Value: attr.Value.AsInterface(), Type: t} + } + } + + s.emit(s.ctx, MetricTypeGauge, name, value, options.Unit, attrs) +} + +// Gauge implements Meter. +func (s *sentryMeter) Gauge(name string, value int64, options MeterOptions) { + if value == 0 { + return + } + + attrs := make(map[string]Attribute) + if options.Attributes != nil { + for _, attr := range options.Attributes { + t, ok := mapTypesToStr[attr.Value.Type()] + if !ok || t == "" { + debuglog.Printf("invalid attribute type set: %v", t) + continue + } + attrs[attr.Key] = Attribute{Value: attr.Value.AsInterface(), Type: t} + } + } + + s.emit(s.ctx, MetricTypeGauge, name, float64(value), options.Unit, attrs) +} + +// GetCtx implements Meter. +func (s *sentryMeter) GetCtx() context.Context { + return s.ctx +} + +// SetAttributes implements Meter. +func (s *sentryMeter) SetAttributes(attrs ...attribute.Builder) { + s.mu.Lock() + defer s.mu.Unlock() + + for _, v := range attrs { + t, ok := mapTypesToStr[v.Value.Type()] + if !ok || t == "" { + debuglog.Printf("invalid attribute type set: %v", t) + continue + } + + s.attributes[v.Key] = Attribute{ + Value: v.Value.AsInterface(), + Type: t, + } + } +} + +type noopMeter struct{} + +var _ Meter = (*noopMeter)(nil) + +// Count implements Meter. +func (n *noopMeter) Count(_ string, _ int64, _ MeterOptions) { +} + +// Distribution implements Meter. +func (n *noopMeter) Distribution(_ string, _ float64, _ MeterOptions) { +} + +// FGauge implements Meter. +func (n *noopMeter) FGauge(_ string, _ float64, _ MeterOptions) { +} + +// Gauge implements Meter. +func (n *noopMeter) Gauge(_ string, _ int64, _ MeterOptions) { +} + +// GetCtx implements Meter. +func (n *noopMeter) GetCtx() context.Context { + return context.Background() +} + +// SetAttributes implements Meter. +func (n *noopMeter) SetAttributes(...attribute.Builder) { +} diff --git a/transport.go b/transport.go index d57e15517..d4eb398b8 100644 --- a/transport.go +++ b/transport.go @@ -167,6 +167,23 @@ func encodeEnvelopeLogs(enc *json.Encoder, itemsLength int, body json.RawMessage return err } +func encodeEnvelopeTraceMetrics(enc *json.Encoder, itemsLength int, body json.RawMessage) error { + err := enc.Encode( + struct { + Type string `json:"type"` + ItemCount int `json:"item_count"` + ContentType string `json:"content_type"` + }{ + Type: traceMetricEvent.Type, + ItemCount: itemsLength, + ContentType: traceMetricEvent.ContentType, + }) + if err == nil { + err = enc.Encode(body) + } + return err +} + func envelopeFromBody(event *Event, dsn *Dsn, sentAt time.Time, body json.RawMessage) (*bytes.Buffer, error) { var b bytes.Buffer enc := json.NewEncoder(&b) @@ -205,6 +222,8 @@ func envelopeFromBody(event *Event, dsn *Dsn, sentAt time.Time, body json.RawMes err = encodeEnvelopeItem(enc, event.Type, body) case logEvent.Type: err = encodeEnvelopeLogs(enc, len(event.Logs), body) + case traceMetricEvent.Type: + err = encodeEnvelopeTraceMetrics(enc, len(event.Metrics), body) default: err = encodeEnvelopeItem(enc, eventType, body) }