Skip to content

Commit

Permalink
Log dropped records
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Apr 29, 2024
1 parent 4f3586c commit afb63a1
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 1 deletion.
6 changes: 6 additions & 0 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync"
"sync/atomic"
"time"

"go.opentelemetry.io/otel/internal/global"
)

const (
Expand Down Expand Up @@ -148,6 +150,10 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) {
return
}

if d := b.q.Dropped(); d > 0 {
global.Warn("dropped log records", "dropped", d)
}

qLen := b.q.TryDequeue(buf, func(r []Record) bool {
ok := b.exporter.EnqueueExport(r)
if ok {
Expand Down
39 changes: 39 additions & 0 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@
package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"bytes"
"context"
stdlog "log"
"slices"
"strconv"
"sync"
"testing"
"time"
"unsafe"

"github.com/go-logr/stdr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/log"
)

Expand Down Expand Up @@ -413,6 +417,41 @@ func TestBatchProcessor(t *testing.T) {
})
})

t.Run("DroppedLogs", func(t *testing.T) {
orig := global.GetLogger()
t.Cleanup(func() { global.SetLogger(orig) })
buf := new(bytes.Buffer)
stdr.SetVerbosity(1)
global.SetLogger(stdr.New(stdlog.New(buf, "", 0)))

e := newTestExporter(nil)
e.ExportTrigger = make(chan struct{})

b := NewBatchProcessor(
e,
WithMaxQueueSize(1),
WithExportMaxBatchSize(1),
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
var r Record
assert.NoError(t, b.OnEmit(ctx, r), "queued")
assert.NoError(t, b.OnEmit(ctx, r), "dropped")

var n int
require.Eventually(t, func() bool {
n = e.ExportN()
return n > 0
}, 2*time.Second, time.Microsecond, "blocked export not attempted")

got := buf.String()
want := `"level"=1 "msg"="dropped log records" "dropped"=1`
assert.Contains(t, got, want)

close(e.ExportTrigger)
_ = b.Shutdown(ctx)
})

t.Run("ConcurrentSafe", func(t *testing.T) {
const goRoutines = 10

Expand Down
2 changes: 1 addition & 1 deletion sdk/log/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.21

require (
github.com/go-logr/logr v1.4.1
github.com/go-logr/stdr v1.2.2
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/otel v1.26.0
go.opentelemetry.io/otel/log v0.2.0-alpha
Expand All @@ -13,7 +14,6 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/otel/metric v1.26.0 // indirect
golang.org/x/sys v0.19.0 // indirect
Expand Down

0 comments on commit afb63a1

Please sign in to comment.