Skip to content

Commit 86dfa71

Browse files
committed
Log dropped records
1 parent d5f859a commit 86dfa71

File tree

2 files changed

+45
-0
lines changed

2 files changed

+45
-0
lines changed

sdk/log/batch.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"sync"
1111
"sync/atomic"
1212
"time"
13+
14+
"go.opentelemetry.io/otel/internal/global"
1315
)
1416

1517
const (
@@ -148,6 +150,10 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) {
148150
return
149151
}
150152

153+
if d := b.q.Dropped(); d > 0 {
154+
global.Warn("dropped log records", "dropped", d)
155+
}
156+
151157
qLen := b.q.TryDequeue(buf, func(r []Record) bool {
152158
ok := b.exporter.EnqueueExport(r)
153159
if ok {

sdk/log/batch_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,22 @@
44
package log // import "go.opentelemetry.io/otel/sdk/log"
55

66
import (
7+
"bytes"
78
"context"
9+
stdlog "log"
810
"slices"
911
"strconv"
1012
"sync"
1113
"testing"
1214
"time"
1315
"unsafe"
1416

17+
"github.com/go-logr/stdr"
1518
"github.com/stretchr/testify/assert"
1619
"github.com/stretchr/testify/require"
1720

1821
"go.opentelemetry.io/otel"
22+
"go.opentelemetry.io/otel/internal/global"
1923
"go.opentelemetry.io/otel/log"
2024
)
2125

@@ -413,6 +417,41 @@ func TestBatchProcessor(t *testing.T) {
413417
})
414418
})
415419

420+
t.Run("DroppedLogs", func(t *testing.T) {
421+
orig := global.GetLogger()
422+
t.Cleanup(func() { global.SetLogger(orig) })
423+
buf := new(bytes.Buffer)
424+
stdr.SetVerbosity(1)
425+
global.SetLogger(stdr.New(stdlog.New(buf, "", 0)))
426+
427+
e := newTestExporter(nil)
428+
e.ExportTrigger = make(chan struct{})
429+
430+
b := NewBatchProcessor(
431+
e,
432+
WithMaxQueueSize(1),
433+
WithExportMaxBatchSize(1),
434+
WithExportInterval(time.Hour),
435+
WithExportTimeout(time.Hour),
436+
)
437+
var r Record
438+
assert.NoError(t, b.OnEmit(ctx, r), "queued")
439+
assert.NoError(t, b.OnEmit(ctx, r), "dropped")
440+
441+
var n int
442+
require.Eventually(t, func() bool {
443+
n = e.ExportN()
444+
return n > 0
445+
}, 2*time.Second, time.Microsecond, "blocked export not attempted")
446+
447+
got := buf.String()
448+
want := `"level"=1 "msg"="dropped log records" "dropped"=1`
449+
assert.Contains(t, got, want)
450+
451+
close(e.ExportTrigger)
452+
_ = b.Shutdown(ctx)
453+
})
454+
416455
t.Run("ConcurrentSafe", func(t *testing.T) {
417456
const goRoutines = 10
418457

0 commit comments

Comments
 (0)