Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add `RecordFactory` in `go.opentelemetry.io/otel/log/logtest` to facilitate testing the bridge implementations. (#5263)
- Add `RecordFactory` in `go.opentelemetry.io/otel/sdk/log/logtest` to facilitate testing the exporter and processor implementations. (#5258)
- Add example for `go.opentelemetry.io/otel/exporters/stdout/stdoutlog`. (#5242)
- The count of dropped records from the `BatchProcessor` in `go.opentelemetry.io/otel/sdk/log` is logged. (#5276)

### Changed

Expand Down
14 changes: 14 additions & 0 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync"
"sync/atomic"
"time"

"go.opentelemetry.io/otel/internal/global"
)

const (
Expand Down Expand Up @@ -148,6 +150,10 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) {
return
}

if d := b.q.Dropped(); d > 0 {
global.Warn("dropped log records", "dropped", d)
}

qLen := b.q.TryDequeue(buf, func(r []Record) bool {
ok := b.exporter.EnqueueExport(r)
if ok {
Expand Down Expand Up @@ -253,6 +259,7 @@ func (b *BatchProcessor) ForceFlush(ctx context.Context) error {
type queue struct {
sync.Mutex

dropped atomic.Uint64
cap, len int
read, write *ring
}
Expand All @@ -266,6 +273,12 @@ func newQueue(size int) *queue {
}
}

// Dropped returns the number of Records dropped during enqueueing since the
// last time Dropped was called.
func (q *queue) Dropped() uint64 {
return q.dropped.Swap(0)
}

// Enqueue adds r to the queue. The queue size, including the addition of r, is
// returned.
//
Expand All @@ -283,6 +296,7 @@ func (q *queue) Enqueue(r Record) int {
// Overflow. Advance read to be the new "oldest".
q.len = q.cap
q.read = q.read.Next()
q.dropped.Add(1)
}
return q.len
}
Expand Down
51 changes: 51 additions & 0 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@
package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"bytes"
"context"
stdlog "log"
"slices"
"strconv"
"sync"
"testing"
"time"
"unsafe"

"github.com/go-logr/stdr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/log"
)

Expand Down Expand Up @@ -413,6 +417,41 @@ func TestBatchProcessor(t *testing.T) {
})
})

t.Run("DroppedLogs", func(t *testing.T) {
orig := global.GetLogger()
t.Cleanup(func() { global.SetLogger(orig) })
buf := new(bytes.Buffer)
stdr.SetVerbosity(1)
global.SetLogger(stdr.New(stdlog.New(buf, "", 0)))

e := newTestExporter(nil)
e.ExportTrigger = make(chan struct{})

b := NewBatchProcessor(
e,
WithMaxQueueSize(1),
WithExportMaxBatchSize(1),
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
var r Record
assert.NoError(t, b.OnEmit(ctx, r), "queued")
assert.NoError(t, b.OnEmit(ctx, r), "dropped")

var n int
require.Eventually(t, func() bool {
n = e.ExportN()
return n > 0
}, 2*time.Second, time.Microsecond, "blocked export not attempted")

got := buf.String()
want := `"level"=1 "msg"="dropped log records" "dropped"=1`
assert.Contains(t, got, want)

close(e.ExportTrigger)
_ = b.Shutdown(ctx)
})

t.Run("ConcurrentSafe", func(t *testing.T) {
const goRoutines = 10

Expand Down Expand Up @@ -488,6 +527,18 @@ func TestQueue(t *testing.T) {
assert.Equal(t, []Record{r, r}, q.Flush(), "flushed Records")
})

t.Run("Dropped", func(t *testing.T) {
q := newQueue(1)

_ = q.Enqueue(r)
_ = q.Enqueue(r)
assert.Equal(t, uint64(1), q.Dropped(), "fist")

_ = q.Enqueue(r)
_ = q.Enqueue(r)
assert.Equal(t, uint64(2), q.Dropped(), "second")
})

t.Run("Flush", func(t *testing.T) {
const size = 2
q := newQueue(size)
Expand Down
2 changes: 1 addition & 1 deletion sdk/log/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.21

require (
github.com/go-logr/logr v1.4.1
github.com/go-logr/stdr v1.2.2
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/otel v1.26.0
go.opentelemetry.io/otel/log v0.2.0-alpha
Expand All @@ -13,7 +14,6 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/otel/metric v1.26.0 // indirect
golang.org/x/sys v0.20.0 // indirect
Expand Down