Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/metric/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ require (
go.opentelemetry.io/otel v1.23.0-rc.1
go.opentelemetry.io/otel/metric v1.23.0-rc.1
go.opentelemetry.io/otel/sdk v1.23.0-rc.1
go.opentelemetry.io/otel/trace v1.23.0-rc.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/otel/trace v1.23.0-rc.1 // indirect
golang.org/x/sys v0.16.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
17 changes: 17 additions & 0 deletions sdk/metric/internal/exemplar/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package exemplar provides an implementation of the OpenTelemetry exemplar
// reservoir to be used in metric collection pipelines.
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
51 changes: 51 additions & 0 deletions sdk/metric/internal/exemplar/reservoir.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"

import (
"context"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// Reservoir holds the sampled exemplar of measurements made.
type Reservoir[N int64 | float64] interface {
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the Reservoir decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
//
// The time t is the time when the measurement was made. The val and attr
// parameters are the value and dropped (filtered) attributes of the
// measurement respectively.
Offer(ctx context.Context, t time.Time, val N, attr []attribute.KeyValue)

// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call. See Flush to
// copy-and-clear instead.
Collect(dest *[]metricdata.Exemplar[N])

// Flush returns all the held exemplars.
//
// The Reservoir state is reset after this call. See Collect to preserve
// the state instead.
Flush(dest *[]metricdata.Exemplar[N])
}
180 changes: 180 additions & 0 deletions sdk/metric/internal/exemplar/reservoir_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exemplar

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/trace"
)

// Sat Jan 01 2000 00:00:00 GMT+0000.
var staticTime = time.Unix(946684800, 0)

type factory[N int64 | float64] func(requstedCap int) (r Reservoir[N], actualCap int)

func ReservoirTest[N int64 | float64](f factory[N]) func(*testing.T) {
return func(t *testing.T) {
t.Helper()

ctx := context.Background()

t.Run("CaptureSpanContext", func(t *testing.T) {
t.Helper()

r, n := f(1)
if n < 1 {
t.Skip("skipping, reservoir capacity less than 1:", n)
}

tID, sID := trace.TraceID{0x01}, trace.SpanID{0x01}
sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: tID,
SpanID: sID,
TraceFlags: trace.FlagsSampled,
})
ctx := trace.ContextWithSpanContext(ctx, sc)

r.Offer(ctx, staticTime, 10, nil)

var dest []metricdata.Exemplar[N]
r.Collect(&dest)

want := metricdata.Exemplar[N]{
Time: staticTime,
Value: 10,
SpanID: []byte(sID[:]),
TraceID: []byte(tID[:]),
}
require.Len(t, dest, 1, "number of collected exemplars")
assert.Equal(t, want, dest[0])
})

t.Run("FilterAttributes", func(t *testing.T) {
t.Helper()

r, n := f(1)
if n < 1 {
t.Skip("skipping, reservoir capacity less than 1:", n)
}

adminTrue := attribute.Bool("admin", true)
r.Offer(ctx, staticTime, 10, []attribute.KeyValue{adminTrue})

var dest []metricdata.Exemplar[N]
r.Collect(&dest)

want := metricdata.Exemplar[N]{
FilteredAttributes: []attribute.KeyValue{adminTrue},
Time: staticTime,
Value: 10,
}
require.Len(t, dest, 1, "number of collected exemplars")
assert.Equal(t, want, dest[0])
})

t.Run("CollectDoesNotFlush", func(t *testing.T) {
t.Helper()

r, n := f(1)
if n < 1 {
t.Skip("skipping, reservoir capacity less than 1:", n)
}

r.Offer(ctx, staticTime, 10, nil)

var dest []metricdata.Exemplar[N]
r.Collect(&dest)
require.Len(t, dest, 1, "number of collected exemplars")

dest = dest[:0]
r.Collect(&dest)
assert.Len(t, dest, 1, "Collect flushed reservoir")
})

t.Run("FlushFlushes", func(t *testing.T) {
t.Helper()

r, n := f(1)
if n < 1 {
t.Skip("skipping, reservoir capacity less than 1:", n)
}

r.Offer(ctx, staticTime, 10, nil)

var dest []metricdata.Exemplar[N]
r.Flush(&dest)
require.Len(t, dest, 1, "number of flushed exemplars")

r.Flush(&dest)
assert.Len(t, dest, 0, "Flush did not flush reservoir")
})

t.Run("MultipleOffers", func(t *testing.T) {
t.Helper()

r, n := f(3)
if n < 1 {
t.Skip("skipping, reservoir capacity less than 1:", n)
}

for i := 0; i < n+1; i++ {
v := N(i)
r.Offer(ctx, staticTime, v, nil)
}

var dest []metricdata.Exemplar[N]
r.Flush(&dest)
assert.Len(t, dest, n, "multiple offers did not fill reservoir")

// Ensure the flush reset also resets any couting state.
for i := 0; i < n+1; i++ {
v := N(2 * i)
r.Offer(ctx, staticTime, v, nil)
}

dest = dest[:0]
r.Flush(&dest)
assert.Len(t, dest, n, "internal count state not reset")
})

t.Run("DropAll", func(t *testing.T) {
t.Helper()

r, n := f(0)
if n > 0 {
t.Skip("skipping, reservoir capacity greater than 0:", n)
}

r.Offer(context.Background(), staticTime, 10, nil)

dest := []metricdata.Exemplar[N]{{}} // Should be reset to empty.
r.Collect(&dest)
assert.Len(t, dest, 0, "no exemplars should be collected")

r.Offer(context.Background(), staticTime, 10, nil)
dest = []metricdata.Exemplar[N]{{}} // Should be reset to empty.
r.Flush(&dest)
assert.Len(t, dest, 0, "no exemplars should be flushed")
})
}
}