Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions batch_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package sentry

import (
"context"
"sync"
"time"
)

type BatchMeter struct {
client *Client
metricsCh chan Metric
flushCh chan chan struct{}
cancel context.CancelFunc
wg sync.WaitGroup
startOnce sync.Once
shutdownOnce sync.Once
}

func NewBatchMeter(client *Client) *BatchMeter {
return &BatchMeter{
client: client,
metricsCh: make(chan Metric, batchSize),
flushCh: make(chan chan struct{}),
}
}

func (l *BatchMeter) Start() {
l.startOnce.Do(func() {
ctx, cancel := context.WithCancel(context.Background())
l.cancel = cancel
l.wg.Add(1)
go l.run(ctx)
})
}

func (l *BatchMeter) Flush(timeout <-chan struct{}) {
done := make(chan struct{})
select {
case l.flushCh <- done:
select {
case <-done:
case <-timeout:
}
case <-timeout:
}
}

func (l *BatchMeter) Shutdown() {
l.shutdownOnce.Do(func() {
if l.cancel != nil {
l.cancel()
l.wg.Wait()
}
})
}

func (l *BatchMeter) run(ctx context.Context) {
defer l.wg.Done()
var metrics []Metric
timer := time.NewTimer(batchTimeout)
defer timer.Stop()

for {
select {
case metric := <-l.metricsCh:
metrics = append(metrics, metric)
if len(metrics) >= batchSize {
l.processEvent(metrics)
metrics = nil
if !timer.Stop() {
<-timer.C
}
timer.Reset(batchTimeout)
}
case <-timer.C:
if len(metrics) > 0 {
l.processEvent(metrics)
metrics = nil
}
timer.Reset(batchTimeout)
case done := <-l.flushCh:
flushDrain:
for {
select {
case metric := <-l.metricsCh:
metrics = append(metrics, metric)
default:
break flushDrain
}
}

if len(metrics) > 0 {
l.processEvent(metrics)
metrics = nil
}
if !timer.Stop() {
<-timer.C
}
timer.Reset(batchTimeout)
close(done)
case <-ctx.Done():
drain:
for {
select {
case metric := <-l.metricsCh:
metrics = append(metrics, metric)
default:
break drain
}
}

if len(metrics) > 0 {
l.processEvent(metrics)
}
return
}
}
}

func (l *BatchMeter) processEvent(metrics []Metric) {
event := NewEvent()
event.Timestamp = time.Now()
event.EventID = EventID(uuid())
event.Type = logEvent.Type
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Trace Metrics Misclassified as Logs

The processEvent method sets event.Type to logEvent.Type. Since this method processes trace metrics, the event type should be traceMetricEvent.Type. This miscategorization causes trace metrics to be processed as log events, which can lead to incorrect backend processing or issues with the metrics pipeline.

Fix in Cursor Fix in Web

event.Metrics = metrics
l.client.Transport.SendEvent(event)
}
14 changes: 13 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ type ClientOptions struct {
Tags map[string]string
// EnableLogs controls when logs should be emitted.
EnableLogs bool
// ExperimentalEnableTraceMetric controls when trace metrics should be emitted.
// This is an experimental feature that is subject to change.
ExperimentalEnableTraceMetric bool
// TraceIgnoreStatusCodes is a list of HTTP status codes that should not be traced.
// Each element can be either:
// - A single-element slice [code] for a specific status code
Expand Down Expand Up @@ -265,6 +268,7 @@ type Client struct {
// not supported, create a new client instead.
Transport Transport
batchLogger *BatchLogger
batchMeter *BatchMeter
}

// NewClient creates and returns an instance of Client configured using
Expand Down Expand Up @@ -369,6 +373,11 @@ func NewClient(options ClientOptions) (*Client, error) {
client.batchLogger.Start()
}

if options.ExperimentalEnableTraceMetric {
client.batchMeter = NewBatchMeter(&client)
client.batchMeter.Start()
}

client.setupTransport()
client.setupIntegrations()

Expand Down Expand Up @@ -531,7 +540,7 @@ func (client *Client) RecoverWithContext(
// the network synchronously, configure it to use the HTTPSyncTransport in the
// call to Init.
func (client *Client) Flush(timeout time.Duration) bool {
if client.batchLogger != nil {
if client.batchLogger != nil || client.batchMeter != nil {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return client.FlushWithContext(ctx)
Expand All @@ -555,6 +564,9 @@ func (client *Client) FlushWithContext(ctx context.Context) bool {
if client.batchLogger != nil {
client.batchLogger.Flush(ctx.Done())
}
if client.batchMeter != nil {
client.batchMeter.Flush(ctx.Done())
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Goroutine Leak in Client Close

The Client.Close() method doesn't call Shutdown() on batchMeter (or batchLogger), causing goroutine leaks. The BatchMeter.run() goroutine will continue running indefinitely even after the client is closed, as it only stops when the context is cancelled. The Shutdown() method should be called to properly cancel the context and wait for the goroutine to finish, similar to how Flush() calls both batchLogger.Flush() and batchMeter.Flush().

Fix in Cursor Fix in Web

return client.Transport.FlushWithContext(ctx)
}

Expand Down
52 changes: 52 additions & 0 deletions interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@
"application/vnd.sentry.items.log+json",
}

var traceMetricEvent = struct {
Type string
ContentType string
}{
"trace_metric",
"application/vnd.sentry.items.trace_metric+json",
}

// Level marks the severity of the event.
type Level string

Expand Down Expand Up @@ -141,6 +149,26 @@
Emitf(format string, args ...interface{})
}

type MeterOptions struct {
// Attributes are key/value pairs that will be added to the metric.
// The attributes set here will take precedence over the attributes
// set from the Meter.
Attributes []attribute.Builder
// The unit of measurements, for "gauge" and "distribution" metrics.
Unit string
}

type Meter interface {
// GetCtx returns the [context.Context] set on the meter.
GetCtx() context.Context
// SetAttributes allows attaching parameters to the meter using the attribute API.
SetAttributes(...attribute.Builder)
Count(name string, count int64, options MeterOptions)
Gauge(name string, value int64, options MeterOptions)
FGauge(name string, value float64, options MeterOptions)
Distribution(name string, sample float64, options MeterOptions)
}

// Attachment allows associating files with your events to aid in investigation.
// An event may contain one or more attachments.
type Attachment struct {
Expand Down Expand Up @@ -398,6 +426,9 @@
// The fields below are only relevant for logs
Logs []Log `json:"items,omitempty"`

// The fields below are only relevant for metrics
Metrics []Metric `json:"items,omitempty"`

Check failure on line 430 in interfaces.go

View workflow job for this annotation

GitHub Actions / Module Mode (1.23, windows)

struct field Metrics repeats json tag "items" also at interfaces.go:427

Check failure on line 430 in interfaces.go

View workflow job for this annotation

GitHub Actions / Module Mode (1.23, ubuntu)

struct field Metrics repeats json tag "items" also at interfaces.go:427

Check failure on line 430 in interfaces.go

View workflow job for this annotation

GitHub Actions / Module Mode (1.24, windows)

struct field Metrics repeats json tag "items" also at interfaces.go:427

Check failure on line 430 in interfaces.go

View workflow job for this annotation

GitHub Actions / Module Mode (1.25, ubuntu)

struct field Metrics repeats json tag "items" also at interfaces.go:427

Check failure on line 430 in interfaces.go

View workflow job for this annotation

GitHub Actions / Module Mode (1.25, windows)

struct field Metrics repeats json tag "items" also at interfaces.go:427

Check failure on line 430 in interfaces.go

View workflow job for this annotation

GitHub Actions / Module Mode (1.24, ubuntu)

struct field Metrics repeats json tag "items" also at interfaces.go:427

Check failure on line 430 in interfaces.go

View workflow job for this annotation

GitHub Actions / Module Mode (1.25, macos)

struct field Metrics repeats json tag "items" also at interfaces.go:427

Check failure on line 430 in interfaces.go

View workflow job for this annotation

GitHub Actions / Module Mode (1.23, macos)

struct field Metrics repeats json tag "items" also at interfaces.go:427

Check failure on line 430 in interfaces.go

View workflow job for this annotation

GitHub Actions / Lint

structtag: struct field Metrics repeats json tag "items" also at interfaces.go:427 (govet)

Check failure on line 430 in interfaces.go

View workflow job for this annotation

GitHub Actions / Module Mode (1.24, macos)

struct field Metrics repeats json tag "items" also at interfaces.go:427

// The fields below are not part of the final JSON payload.

sdkMetaData SDKMetaData
Expand Down Expand Up @@ -623,6 +654,8 @@
return ratelimit.CategoryLog
case checkInType:
return ratelimit.CategoryMonitor
case traceMetricEvent.Type:
return ratelimit.CategoryTraceMetric
default:
return ratelimit.CategoryUnknown
}
Expand Down Expand Up @@ -681,3 +714,22 @@
Value any `json:"value"`
Type AttrType `json:"type"`
}

type MetricType string

const (
MetricTypeInvalid MetricType = ""
MetricTypeCounter MetricType = "counter"
MetricTypeGauge MetricType = "gauge"
MetricTypeDistribution MetricType = "distribution"
)

type Metric struct {
Timestamp time.Time `json:"timestamp"`
TraceID TraceID `json:"trace_id,omitempty"`
Type MetricType `json:"type"`
Name string `json:"name,omitempty"`
Value float64 `json:"value"`
Unit string `json:"unit,omitempty"`
Attributes map[string]Attribute `json:"attributes,omitempty"`
}
24 changes: 19 additions & 5 deletions internal/protocol/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ type EnvelopeItemType string

// Constants for envelope item types as defined in the Sentry documentation.
const (
EnvelopeItemTypeEvent EnvelopeItemType = "event"
EnvelopeItemTypeTransaction EnvelopeItemType = "transaction"
EnvelopeItemTypeCheckIn EnvelopeItemType = "check_in"
EnvelopeItemTypeAttachment EnvelopeItemType = "attachment"
EnvelopeItemTypeLog EnvelopeItemType = "log"
EnvelopeItemTypeEvent EnvelopeItemType = "event"
EnvelopeItemTypeTransaction EnvelopeItemType = "transaction"
EnvelopeItemTypeCheckIn EnvelopeItemType = "check_in"
EnvelopeItemTypeAttachment EnvelopeItemType = "attachment"
EnvelopeItemTypeLog EnvelopeItemType = "log"
EnvelopeItemTypeTraceMetrics EnvelopeItemType = "trace_metric"
)

// EnvelopeItemHeader represents the header of an envelope item.
Expand Down Expand Up @@ -211,3 +212,16 @@ func NewLogItem(itemCount int, payload []byte) *EnvelopeItem {
Payload: payload,
}
}

func NewTraceMetricsItem(itemCount int, payload []byte) *EnvelopeItem {
length := len(payload)
return &EnvelopeItem{
Header: &EnvelopeItemHeader{
Type: EnvelopeItemTypeTraceMetrics,
Length: &length,
ItemCount: &itemCount,
ContentType: "application/vnd.sentry.items.trace_metric+json",
},
Payload: payload,
}
}
5 changes: 4 additions & 1 deletion internal/ratelimit/category.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

// Reference:
// https://github.com/getsentry/relay/blob/0424a2e017d193a93918053c90cdae9472d164bf/relay-common/src/constants.rs#L116-L127
// https://github.com/getsentry/relay/blob/46dfaa850b8717a6e22c3e9a275ba17fe673b9da/relay-base-schema/src/data_category.rs#L231-L271

// Category classifies supported payload types that can be ingested by Sentry
// and, therefore, rate limited.
Expand All @@ -22,6 +22,7 @@ const (
CategoryTransaction Category = "transaction"
CategoryLog Category = "log_item"
CategoryMonitor Category = "monitor"
CategoryTraceMetric Category = "trace_metric"
)

// knownCategories is the set of currently known categories. Other categories
Expand All @@ -47,6 +48,8 @@ func (c Category) String() string {
return "CategoryLog"
case CategoryMonitor:
return "CategoryMonitor"
case CategoryTraceMetric:
return "CategoryTraceMetric"
default:
// For unknown categories, use the original formatting logic
caser := cases.Title(language.English)
Expand Down
Loading
Loading