Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
ExemplarProvider takes attributes as an argument
  • Loading branch information
dashpole committed Oct 15, 2024
commit 849611f5b676284eb1ef3dfc3812c25872a57047
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Added

- Add `go.opentelemetry.io/otel/sdk/metric/exemplar` package which includes `Exemplar`, `Filter`, `TraceBasedFilter`, `AlwaysOnFilter`, `HistogramReservoir`, `FixedSizeReservoir`, `Reservoir`, `Value` and `ValueType` types. These will be used for configuring the exemplar reservoir for the metrics sdk. (#5747, #5862)
- Add `WithExportBufferSize` option to log batch processor.(#5877)

### Changed

Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/exemplar/fixed_size_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

// FixedSizeReservoirProvider returns a provider of [FixedSizeReservoir].
func FixedSizeReservoirProvider(k int) ReservoirProvider {
return func() Reservoir {
return func(_ attribute.Set) Reservoir {
return NewFixedSizeReservoir(k)
}
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/exemplar/histogram_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
func HistogramReservoirProvider(bounds []float64) ReservoirProvider {
cp := slices.Clone(bounds)
slices.Sort(cp)
return func() Reservoir {
return func(_ attribute.Set) Reservoir {
return NewHistogramReservoir(cp)
}
}
Expand Down
7 changes: 6 additions & 1 deletion sdk/metric/exemplar/reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,9 @@ type Reservoir interface {
}

// ReservoirProvider creates new [Reservoir]s.
type ReservoirProvider func() Reservoir
//
// The attributes provided are attributes which are kept by the aggregation, and
// are exclusive with attributes passed to Offer. The combination of these
// attributes and the attributes passed to Offer is the complete set of
// attributes a measurement was made with.
type ReservoirProvider func(attr attribute.Set) Reservoir
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Builder[N int64 | float64] struct {
//
// If this is not provided a default factory function that returns an
// dropReservoir reservoir will be used.
ReservoirFunc func() FilteredExemplarReservoir[N]
ReservoirFunc func(attribute.Set) FilteredExemplarReservoir[N]
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
Expand All @@ -49,7 +49,7 @@ type Builder[N int64 | float64] struct {
AggregationLimit int
}

func (b Builder[N]) resFunc() func() FilteredExemplarReservoir[N] {
func (b Builder[N]) resFunc() func(attribute.Set) FilteredExemplarReservoir[N] {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func (c *clock) Register() (unregister func()) {
return func() { now = orig }
}

func dropExemplars[N int64 | float64]() FilteredExemplarReservoir[N] {
return dropReservoir[N]()
func dropExemplars[N int64 | float64](attr attribute.Set) FilteredExemplarReservoir[N] {
return dropReservoir[N](attr)
}

func TestBuilderFilter(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion sdk/metric/internal/aggregate/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
)

// dropReservoir returns a [FilteredReservoir] that drops all measurements it is offered.
func dropReservoir[N int64 | float64]() FilteredExemplarReservoir[N] { return &dropRes[N]{} }
func dropReservoir[N int64 | float64](attribute.Set) FilteredExemplarReservoir[N] {
return &dropRes[N]{}
}

type dropRes[N int64 | float64] struct{}

Expand Down
3 changes: 2 additions & 1 deletion sdk/metric/internal/aggregate/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)

Expand All @@ -17,7 +18,7 @@ func TestDrop(t *testing.T) {
}

func testDropFiltered[N int64 | float64](t *testing.T) {
r := dropReservoir[N]()
r := dropReservoir[N](*attribute.EmptySet())

var dest []exemplar.Exemplar
r.Collect(&dest)
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (b *expoBuckets) downscale(delta int32) {
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *expoHistogram[N] {
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
Expand All @@ -306,7 +306,7 @@ type expoHistogram[N int64 | float64] struct {
maxSize int
maxScale int32

newRes func() FilteredExemplarReservoir[N]
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[*expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex
Expand All @@ -327,7 +327,7 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib
v, ok := e.values[attr.Equivalent()]
if !ok {
v = newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale, e.noMinMax, e.noSum)
v.res = e.newRes()
v.res = e.newRes(attr)

e.values[attr.Equivalent()] = v
}
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/internal/aggregate/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ type histValues[N int64 | float64] struct {
noSum bool
bounds []float64

newRes func() FilteredExemplarReservoir[N]
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[*buckets[N]]
values map[attribute.Distinct]*buckets[N]
valuesMu sync.Mutex
}

func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *histValues[N] {
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
Expand Down Expand Up @@ -93,7 +93,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute
//
// buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
b = newBuckets[N](attr, len(s.bounds)+1)
b.res = s.newRes()
b.res = s.newRes(attr)

// Ensure min and max are recorded values (not zero), for new buckets.
b.min, b.max = value, value
Expand All @@ -108,7 +108,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute

// newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram.
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *histogram[N] {
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histogram[N] {
return &histogram[N]{
histValues: newHistValues[N](boundaries, noSum, limit, r),
noMinMax: noMinMax,
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/internal/aggregate/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type datapoint[N int64 | float64] struct {
res FilteredExemplarReservoir[N]
}

func newLastValue[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *lastValue[N] {
func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *lastValue[N] {
return &lastValue[N]{
newRes: r,
limit: newLimiter[datapoint[N]](limit),
Expand All @@ -32,7 +32,7 @@ func newLastValue[N int64 | float64](limit int, r func() FilteredExemplarReservo
type lastValue[N int64 | float64] struct {
sync.Mutex

newRes func() FilteredExemplarReservoir[N]
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[datapoint[N]]
values map[attribute.Distinct]datapoint[N]
start time.Time
Expand All @@ -45,7 +45,7 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.
attr := s.limit.Attributes(fltrAttr, s.values)
d, ok := s.values[attr.Equivalent()]
if !ok {
d.res = s.newRes()
d.res = s.newRes(attr)
}

d.attrs = attr
Expand Down Expand Up @@ -114,7 +114,7 @@ func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) in

// newPrecomputedLastValue returns an aggregator that summarizes a set of
// observations as the last one made.
func newPrecomputedLastValue[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *precomputedLastValue[N] {
func newPrecomputedLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *precomputedLastValue[N] {
return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)}
}

Expand Down
10 changes: 5 additions & 5 deletions sdk/metric/internal/aggregate/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ type sumValue[N int64 | float64] struct {
// valueMap is the storage for sums.
type valueMap[N int64 | float64] struct {
sync.Mutex
newRes func() FilteredExemplarReservoir[N]
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[sumValue[N]]
values map[attribute.Distinct]sumValue[N]
}

func newValueMap[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *valueMap[N] {
func newValueMap[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *valueMap[N] {
return &valueMap[N]{
newRes: r,
limit: newLimiter[sumValue[N]](limit),
Expand All @@ -41,7 +41,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S
attr := s.limit.Attributes(fltrAttr, s.values)
v, ok := s.values[attr.Equivalent()]
if !ok {
v.res = s.newRes()
v.res = s.newRes(attr)
}

v.attrs = attr
Expand All @@ -54,7 +54,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S
// newSum returns an aggregator that summarizes a set of measurements as their
// arithmetic sum. Each sum is scoped by attributes and the aggregation cycle
// the measurements were made in.
func newSum[N int64 | float64](monotonic bool, limit int, r func() FilteredExemplarReservoir[N]) *sum[N] {
func newSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *sum[N] {
return &sum[N]{
valueMap: newValueMap[N](limit, r),
monotonic: monotonic,
Expand Down Expand Up @@ -143,7 +143,7 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int {
// newPrecomputedSum returns an aggregator that summarizes a set of
// observations as their arithmetic sum. Each sum is scoped by attributes and
// the aggregation cycle the measurements were made in.
func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() FilteredExemplarReservoir[N]) *precomputedSum[N] {
func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *precomputedSum[N] {
return &precomputedSum[N]{
valueMap: newValueMap[N](limit, r),
monotonic: monotonic,
Expand Down