From df93a32cce871b3101f0cc5fbf769e2a6f8b91da Mon Sep 17 00:00:00 2001 From: scalalang2 Date: Tue, 14 Feb 2023 14:05:03 +0900 Subject: [PATCH] Log when a trace is too large to compact --- CHANGELOG.md | 1 + modules/compactor/compactor.go | 21 +++++++++++------ modules/compactor/compactor_test.go | 12 +++++----- tempodb/compactor.go | 4 ++-- tempodb/compactor_test.go | 2 +- tempodb/encoding/common/interfaces.go | 2 +- tempodb/encoding/vparquet/compactor.go | 18 +++++++++++---- tempodb/encoding/vparquet/compactor_test.go | 25 ++++++++++++++++++++- tempodb/tempodb.go | 2 +- 9 files changed, 65 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c4d02db112c..f19634308a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * [BUGFIX] Apply `rate()` to bytes/s panel in tenant's dashboard. [#2081](https://github.com/grafana/tempo/pull/2081) (@mapno) * [BUGFIX] Correctly coalesce trace level data when combining Parquet traces. [#2095](https://github.com/grafana/tempo/pull/2095) (@joe-elliott) * [CHANGE] Update Go to 1.20 [#2079](https://github.com/grafana/tempo/pull/2079) (@scalalang2) +* [ENHANCEMENT] Log when a trace is too large to compact [#2105](https://github.com/grafana/tempo/pull/2105) (@scalalang2) ## v2.0.0 / 2023-01-31 diff --git a/modules/compactor/compactor.go b/modules/compactor/compactor.go index cdd363b2dde..dc7059e2aa4 100644 --- a/modules/compactor/compactor.go +++ b/modules/compactor/compactor.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/dskit/kv" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" + tempoUtil "github.com/grafana/tempo/pkg/util" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -236,12 +237,13 @@ func (c *Compactor) Combine(dataEncoding string, tenantID string, objs ...[]byte return objs[0], wasCombined, nil } - spansDiscarded := countSpans(dataEncoding, objs[1:]...) - overrides.RecordDiscardedSpans(spansDiscarded, reasonCompactorDiscardedSpans, tenantID) + totalDiscarded := countSpans(dataEncoding, objs[1:]...) + overrides.RecordDiscardedSpans(totalDiscarded, reasonCompactorDiscardedSpans, tenantID) return objs[0], wasCombined, nil } -func (c *Compactor) RecordDiscardedSpans(count int, tenantID string) { +func (c *Compactor) RecordDiscardedSpans(count int, tenantID string, traceID string) { + level.Warn(log.Logger).Log("msg", "max size of trace exceeded", "tenant", tenantID, "traceId", traceID, "discarded_span_count", count) overrides.RecordDiscardedSpans(count, reasonCompactorDiscardedSpans, tenantID) } @@ -293,12 +295,12 @@ func (c *Compactor) OnRingInstanceStopping(lifecycler *ring.BasicLifecycler) {} func (c *Compactor) OnRingInstanceHeartbeat(lifecycler *ring.BasicLifecycler, ringDesc *ring.Desc, instanceDesc *ring.InstanceDesc) { } -func countSpans(dataEncoding string, objs ...[]byte) int { +func countSpans(dataEncoding string, objs ...[]byte) (total int) { + var traceID string decoder, err := model.NewObjectDecoder(dataEncoding) if err != nil { return 0 } - spans := 0 for _, o := range objs { t, err := decoder.PrepareForRead(o) @@ -308,10 +310,15 @@ func countSpans(dataEncoding string, objs ...[]byte) int { for _, b := range t.Batches { for _, ilm := range b.ScopeSpans { - spans += len(ilm.Spans) + if len(ilm.Spans) > 0 && traceID == "" { + traceID = tempoUtil.TraceIDToHexString(ilm.Spans[0].TraceId) + } + total += len(ilm.Spans) } } } - return spans + level.Debug(log.Logger).Log("msg", "max size of trace exceeded", "traceId", traceID, "discarded_span_count", total) + + return } diff --git a/modules/compactor/compactor_test.go b/modules/compactor/compactor_test.go index 70a9738abec..af76d83e19b 100644 --- a/modules/compactor/compactor_test.go +++ b/modules/compactor/compactor_test.go @@ -113,11 +113,13 @@ func TestCountSpans(t *testing.T) { b1 := encode(t, t1) b2 := encode(t, t2) - assert.Equal(t, t1ExpectedSpans, countSpans(model.CurrentEncoding, b1)) - assert.Equal(t, t2ExpectedSpans, countSpans(model.CurrentEncoding, b2)) - assert.Equal(t, - t1ExpectedSpans+t2ExpectedSpans, - countSpans(model.CurrentEncoding, b1, b2)) + b1Total := countSpans(model.CurrentEncoding, b1) + b2Total := countSpans(model.CurrentEncoding, b2) + total := countSpans(model.CurrentEncoding, b1, b2) + + assert.Equal(t, t1ExpectedSpans, b1Total) + assert.Equal(t, t2ExpectedSpans, b2Total) + assert.Equal(t, t1ExpectedSpans+t2ExpectedSpans, total) } func encode(t *testing.T, tr *tempopb.Trace) []byte { diff --git a/tempodb/compactor.go b/tempodb/compactor.go index f4ff6aebe89..8bf97cc6bee 100644 --- a/tempodb/compactor.go +++ b/tempodb/compactor.go @@ -194,8 +194,8 @@ func (rw *readerWriter) compact(blockMetas []*backend.BlockMeta, tenantID string ObjectsWritten: func(compactionLevel, objs int) { metricCompactionObjectsWritten.WithLabelValues(strconv.Itoa(compactionLevel)).Add(float64(objs)) }, - SpansDiscarded: func(spans int) { - rw.compactorSharder.RecordDiscardedSpans(spans, tenantID) + SpansDiscarded: func(traceId string, spans int) { + rw.compactorSharder.RecordDiscardedSpans(spans, tenantID, traceId) }, } diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index bb6d5cd355d..f2eab6f8b1d 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -42,7 +42,7 @@ func (m *mockSharder) Combine(dataEncoding string, tenantID string, objs ...[]by return model.StaticCombiner.Combine(dataEncoding, objs...) } -func (m *mockSharder) RecordDiscardedSpans(count int, tenantID string) {} +func (m *mockSharder) RecordDiscardedSpans(count int, tenantID string, traceID string) {} type mockJobSharder struct{} diff --git a/tempodb/encoding/common/interfaces.go b/tempodb/encoding/common/interfaces.go index 5621b639b29..4c8dffe5b49 100644 --- a/tempodb/encoding/common/interfaces.go +++ b/tempodb/encoding/common/interfaces.go @@ -74,7 +74,7 @@ type CompactionOptions struct { ObjectsCombined func(compactionLevel, objects int) ObjectsWritten func(compactionLevel, objects int) BytesWritten func(compactionLevel, bytes int) - SpansDiscarded func(spans int) + SpansDiscarded func(traceID string, spans int) } type Iterator interface { diff --git a/tempodb/encoding/vparquet/compactor.go b/tempodb/encoding/vparquet/compactor.go index dd73c8ac9cb..a6919c69f86 100644 --- a/tempodb/encoding/vparquet/compactor.go +++ b/tempodb/encoding/vparquet/compactor.go @@ -11,6 +11,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/google/uuid" + tempoUtil "github.com/grafana/tempo/pkg/util" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/segmentio/parquet-go" @@ -315,18 +316,27 @@ func estimateMarshalledSizeFromParquetRow(row parquet.Row) (size int) { } // countSpans counts the number of spans in the given trace in deconstructed -// parquet row format. It simply counts the number of values for span ID, which -// is always present. -func countSpans(schema *parquet.Schema, row parquet.Row) (spans int) { +// parquet row format and returns traceId. +// It simply counts the number of values for span ID, which is always present. +func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, spans int) { + traceIDColumn, found := schema.Lookup(TraceIDColumnName) + if !found { + return "", 0 + } + spanID, found := schema.Lookup("rs", "ils", "Spans", "ID") if !found { - return 0 + return "", 0 } for _, v := range row { if v.Column() == spanID.ColumnIndex { spans++ } + + if v.Column() == traceIDColumn.ColumnIndex { + traceID = tempoUtil.TraceIDToHexString(v.ByteArray()) + } } return diff --git a/tempodb/encoding/vparquet/compactor_test.go b/tempodb/encoding/vparquet/compactor_test.go index d9a1d200022..ea41b5f0808 100644 --- a/tempodb/encoding/vparquet/compactor_test.go +++ b/tempodb/encoding/vparquet/compactor_test.go @@ -2,13 +2,16 @@ package vparquet import ( "context" + crand "crypto/rand" "encoding/binary" + "math/rand" "time" "testing" "github.com/go-kit/log" "github.com/google/uuid" + tempoUtil "github.com/grafana/tempo/pkg/util" "github.com/segmentio/parquet-go" tempo_io "github.com/grafana/tempo/pkg/io" @@ -97,7 +100,7 @@ func BenchmarkCompactorDupes(b *testing.B) { FlushSizeBytes: 30_000_000, MaxBytesPerTrace: 50_000_000, ObjectsCombined: func(compactionLevel, objects int) {}, - SpansDiscarded: func(spans int) {}, + SpansDiscarded: func(traceID string, spans int) {}, }) _, err = c.Compact(ctx, l, r, func(*backend.BlockMeta, time.Time) backend.Writer { return w }, inputs) @@ -142,3 +145,23 @@ func createTestBlock(t testing.TB, ctx context.Context, cfg *common.BlockConfig, func TestValueAlloc(t *testing.T) { _ = make([]parquet.Value, 1_000_000) } + +func TestCountSpans(t *testing.T) { + batchSize := rand.Intn(1000) + 1 + spansEach := rand.Intn(1000) + 1 + + sch := parquet.SchemaOf(new(Trace)) + traceID := make([]byte, 16) + _, err := crand.Read(traceID) + require.NoError(t, err) + + // make Trace and convert to parquet.Row + tr := test.MakeTraceWithSpanCount(batchSize, spansEach, traceID) + trp := traceToParquet(traceID, tr, nil) + row := sch.Deconstruct(nil, trp) + + // count spans for generated rows. + tID, spans := countSpans(sch, row) + require.Equal(t, tID, tempoUtil.TraceIDToHexString(traceID)) + require.Equal(t, spans, batchSize*spansEach) +} diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index 89ac2c52059..0ad09c01db3 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -93,7 +93,7 @@ type Compactor interface { type CompactorSharder interface { Combine(dataEncoding string, tenantID string, objs ...[]byte) ([]byte, bool, error) Owns(hash string) bool - RecordDiscardedSpans(count int, tenantID string) + RecordDiscardedSpans(count int, tenantID string, traceID string) } type CompactorOverrides interface {