diff --git a/.chloggen/batchprocessor-metadata.yaml b/.chloggen/batchprocessor-metadata.yaml new file mode 100644 index 00000000000..8280fbdebdd --- /dev/null +++ b/.chloggen/batchprocessor-metadata.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: batchprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for batching by metadata keys. + +# One or more tracking issues or pull requests related to the change +issues: [4544] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/processor/batchprocessor/README.md b/processor/batchprocessor/README.md index 378b5c3edd1..14af6b9bd34 100644 --- a/processor/batchprocessor/README.md +++ b/processor/batchprocessor/README.md @@ -30,6 +30,15 @@ ignored as data will be sent immediately, subject to only `send_batch_max_size`. `0` means no upper limit of the batch size. This property ensures that larger batches are split into smaller units. It must be greater than or equal to `send_batch_size`. +- `metadata_keys` (default = empty): When set, this processor will + create one batcher instance per distinct combination of values in + the `client.Metadata`. +- `metadata_cardinality_limit` (default = 1000): When `metadata_keys` is + not empty, this setting limits the number of unique combinations of + metadata key values that will be processed over the lifetime of the + process. + +See notes about metadata batching below. Examples: @@ -60,6 +69,44 @@ processors: Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using the processor. +## Batching and client metadata + +Batching by metadata enables support for multi-tenant OpenTelemetry +Collector pipelines with batching over groups of data having the same +authorization metadata. For example: + +```yaml +processors: + batch: + # batch data by tenant-id + metadata_keys: + - tenant_id + + # limit to 10 batcher processes before raising errors + metadata_cardinality_limit: 10 +``` + +Receivers should be configured with `include_metadata: true` so that +metadata keys are available to the processor. + +Note that each distinct combination of metadata triggers the +allocation of a new background task in the Collector that runs for the +lifetime of the process, and each background task holds one pending +batch of up to `send_batch_size` records. Batching by metadata can +therefore substantially increase the amount of memory dedicated to +batching. + +The maximum number of distinct combinations is limited to the +configured `metadata_cardinality_limit`, which defaults to 1000 to +limit memory impact. + +Users of the batching processor configured with metadata keys should +consider use of an Auth extension to validate the relevant +metadata-key values. + +The number of batch processors currently in use is exported as the +`otelcol_processor_batch_metadata_cardinality` metric. + [beta]: https://github.com/open-telemetry/opentelemetry-collector#beta [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib [core]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index 68a6a1b59ad..e31700f7689 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -16,21 +16,30 @@ package batchprocessor // import "go.opentelemetry.io/collector/processor/batchp import ( "context" + "errors" "fmt" "runtime" + "sort" + "strings" "sync" "time" + "go.opentelemetry.io/otel/attribute" "go.uber.org/zap" + "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/processor" ) +// errTooManyBatchers is returned when the MetadataCardinalityLimit has been reached. +var errTooManyBatchers = consumererror.NewPermanent(errors.New("too many batcher metadata-value combinations")) + // batch_processor is a component that accepts spans and metrics, places them // into batches and sends downstream. // @@ -41,21 +50,75 @@ import ( // - cfg.Timeout is elapsed since the timestamp when the previous batch was sent out. type batchProcessor struct { logger *zap.Logger - exportCtx context.Context - timer *time.Timer timeout time.Duration sendBatchSize int sendBatchMaxSize int - newItem chan any - batch batch + // batchFunc is a factory for new batch objects corresponding + // with the appropriate signal. + batchFunc func() batch + + // metadataKeys is the configured list of metadata keys. When + // empty, the `singleton` batcher is used. When non-empty, + // each distinct combination of metadata keys and values + // triggers a new batcher, counted in `goroutines`. + metadataKeys []string + + // metadataLimit is the limiting size of the batchers map. + metadataLimit int shutdownC chan struct{} goroutines sync.WaitGroup telemetry *batchProcessorTelemetry + + // batcherFinder will be either *singletonBatcher or *multiBatcher + batcherFinder +} + +type batcherFinder interface { + findBatcher(ctx context.Context) (*batcher, error) + currentMetadataCardinality() int } +// singleBatcher is used when metadataKeys is empty, to avoid the +// additional lock and map operations used in multiBatcher. +type singleBatcher struct { + *batcher +} + +// multiBatcher is used when metadataKeys is not empty. +type multiBatcher struct { + *batchProcessor + + lock sync.Mutex + batchers map[attribute.Set]*batcher +} + +// batcher is a single instance of the batcher logic. When metadata +// keys are in use, one of these is created per distinct combination +// of values. +type batcher struct { + // processor refers to this processor, for access to common + // configuration. + processor *batchProcessor + + // exportCtx is a context with the metadata key-values + // corresponding with this batcher set. + exportCtx context.Context + + // timer informs the batcher send a batch. + timer *time.Timer + + // newItem is used to receive data items from producers. + newItem chan any + + // batch is an in-flight data item containing one of the + // underlying data types. + batch batch +} + +// batch is an interface generalizing the individual signal types. type batch interface { // export the current batch export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (sentBatchSize int, sentBatchBytes int, err error) @@ -71,24 +134,57 @@ var _ consumer.Traces = (*batchProcessor)(nil) var _ consumer.Metrics = (*batchProcessor)(nil) var _ consumer.Logs = (*batchProcessor)(nil) -func newBatchProcessor(set processor.CreateSettings, cfg *Config, batch batch, useOtel bool) (*batchProcessor, error) { - bpt, err := newBatchProcessorTelemetry(set, useOtel) - if err != nil { - return nil, fmt.Errorf("error to create batch processor telemetry %w", err) +// newBatchProcessor returns a new batch processor component. +func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func() batch, useOtel bool) (*batchProcessor, error) { + // use lower-case, to be consistent with http/2 headers. + mks := make([]string, len(cfg.MetadataKeys)) + for i, k := range cfg.MetadataKeys { + mks[i] = strings.ToLower(k) } - - return &batchProcessor{ - logger: set.Logger, - exportCtx: bpt.exportCtx, - telemetry: bpt, + sort.Strings(mks) + bp := &batchProcessor{ + logger: set.Logger, sendBatchSize: int(cfg.SendBatchSize), sendBatchMaxSize: int(cfg.SendBatchMaxSize), timeout: cfg.Timeout, - newItem: make(chan any, runtime.NumCPU()), - batch: batch, + batchFunc: batchFunc, shutdownC: make(chan struct{}, 1), - }, nil + metadataKeys: mks, + metadataLimit: int(cfg.MetadataCardinalityLimit), + } + if len(bp.metadataKeys) == 0 { + bp.batcherFinder = &singleBatcher{bp.newBatcher(nil)} + } else { + bp.batcherFinder = &multiBatcher{ + batchProcessor: bp, + batchers: map[attribute.Set]*batcher{}, + } + } + + bpt, err := newBatchProcessorTelemetry(set, bp.currentMetadataCardinality, useOtel) + if err != nil { + return nil, fmt.Errorf("error creating batch processor telemetry: %w", err) + } + bp.telemetry = bpt + + return bp, nil +} + +// newBatcher gets or creates a batcher corresponding with attrs. +func (bp *batchProcessor) newBatcher(md map[string][]string) *batcher { + exportCtx := client.NewContext(context.Background(), client.Info{ + Metadata: client.NewMetadata(md), + }) + b := &batcher{ + processor: bp, + newItem: make(chan any, runtime.NumCPU()), + exportCtx: exportCtx, + batch: bp.batchFunc(), + } + b.processor.goroutines.Add(1) + go b.start() + return b } func (bp *batchProcessor) Capabilities() consumer.Capabilities { @@ -98,12 +194,14 @@ func (bp *batchProcessor) Capabilities() consumer.Capabilities { // Start is invoked during service startup. func (bp *batchProcessor) Start(context.Context, component.Host) error { bp.goroutines.Add(1) - go bp.startProcessingCycle() return nil } // Shutdown is invoked during service shutdown. func (bp *batchProcessor) Shutdown(context.Context) error { + // Done corresponds with the initial Add(1) in Start. + bp.goroutines.Done() + close(bp.shutdownC) // Wait until all goroutines are done. @@ -111,120 +209,184 @@ func (bp *batchProcessor) Shutdown(context.Context) error { return nil } -func (bp *batchProcessor) startProcessingCycle() { - defer bp.goroutines.Done() +func (b *batcher) start() { + defer b.processor.goroutines.Done() // timerCh ensures we only block when there is a // timer, since <- from a nil channel is blocking. var timerCh <-chan time.Time - if bp.timeout != 0 && bp.sendBatchSize != 0 { - bp.timer = time.NewTimer(bp.timeout) - timerCh = bp.timer.C + if b.processor.timeout != 0 && b.processor.sendBatchSize != 0 { + b.timer = time.NewTimer(b.processor.timeout) + timerCh = b.timer.C } for { select { - case <-bp.shutdownC: + case <-b.processor.shutdownC: DONE: for { select { - case item := <-bp.newItem: - bp.processItem(item) + case item := <-b.newItem: + b.processItem(item) default: break DONE } } // This is the close of the channel - if bp.batch.itemCount() > 0 { + if b.batch.itemCount() > 0 { // TODO: Set a timeout on sendTraces or // make it cancellable using the context that Shutdown gets as a parameter - bp.sendItems(triggerTimeout) + b.sendItems(triggerTimeout) } return - case item := <-bp.newItem: + case item := <-b.newItem: if item == nil { continue } - bp.processItem(item) + b.processItem(item) case <-timerCh: - if bp.batch.itemCount() > 0 { - bp.sendItems(triggerTimeout) + if b.batch.itemCount() > 0 { + b.sendItems(triggerTimeout) } - bp.resetTimer() + b.resetTimer() } } } -func (bp *batchProcessor) processItem(item any) { - bp.batch.add(item) +func (b *batcher) processItem(item any) { + b.batch.add(item) sent := false - for bp.batch.itemCount() > 0 && (!bp.hasTimer() || bp.batch.itemCount() >= bp.sendBatchSize) { + for b.batch.itemCount() > 0 && (!b.hasTimer() || b.batch.itemCount() >= b.processor.sendBatchSize) { sent = true - bp.sendItems(triggerBatchSize) + b.sendItems(triggerBatchSize) } if sent { - bp.stopTimer() - bp.resetTimer() + b.stopTimer() + b.resetTimer() } } -func (bp *batchProcessor) hasTimer() bool { - return bp.timer != nil +func (b *batcher) hasTimer() bool { + return b.timer != nil } -func (bp *batchProcessor) stopTimer() { - if bp.hasTimer() && !bp.timer.Stop() { - <-bp.timer.C +func (b *batcher) stopTimer() { + if b.hasTimer() && !b.timer.Stop() { + <-b.timer.C } } -func (bp *batchProcessor) resetTimer() { - if bp.hasTimer() { - bp.timer.Reset(bp.timeout) +func (b *batcher) resetTimer() { + if b.hasTimer() { + b.timer.Reset(b.processor.timeout) } } -func (bp *batchProcessor) sendItems(trigger trigger) { - sent, bytes, err := bp.batch.export(bp.exportCtx, bp.sendBatchMaxSize, bp.telemetry.detailed) +func (b *batcher) sendItems(trigger trigger) { + sent, bytes, err := b.batch.export(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed) if err != nil { - bp.logger.Warn("Sender failed", zap.Error(err)) + b.processor.logger.Warn("Sender failed", zap.Error(err)) } else { - bp.telemetry.record(trigger, int64(sent), int64(bytes)) + b.processor.telemetry.record(trigger, int64(sent), int64(bytes)) + } +} + +func (sb *singleBatcher) findBatcher(_ context.Context) (*batcher, error) { + return sb.batcher, nil +} + +func (mb *multiBatcher) findBatcher(ctx context.Context) (*batcher, error) { + // Get each metadata key value, form the corresponding + // attribute set for use as a map lookup key. + info := client.FromContext(ctx) + md := map[string][]string{} + var attrs []attribute.KeyValue + for _, k := range mb.metadataKeys { + // Lookup the value in the incoming metadata, copy it + // into the outgoing metadata, and create a unique + // value for the attributeSet. + vs := info.Metadata.Get(k) + md[k] = vs + if len(vs) == 1 { + attrs = append(attrs, attribute.String(k, vs[0])) + } else { + attrs = append(attrs, attribute.StringSlice(k, vs)) + } + } + aset := attribute.NewSet(attrs...) + + mb.lock.Lock() + defer mb.lock.Unlock() + + b, ok := mb.batchers[aset] + if ok { + return b, nil } + + if limit := mb.metadataLimit; limit != 0 && len(mb.batchers) >= limit { + return nil, errTooManyBatchers + } + + // aset.ToSlice() returns the sorted, deduplicated, + // and name-downcased list of attributes. + b = mb.newBatcher(md) + mb.batchers[aset] = b + return b, nil +} + +func (sb *singleBatcher) currentMetadataCardinality() int { + return 1 +} + +func (mb *multiBatcher) currentMetadataCardinality() int { + mb.lock.Lock() + defer mb.lock.Unlock() + return len(mb.batchers) } // ConsumeTraces implements TracesProcessor -func (bp *batchProcessor) ConsumeTraces(_ context.Context, td ptrace.Traces) error { - bp.newItem <- td +func (bp *batchProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + b, err := bp.findBatcher(ctx) + if err != nil { + return err + } + b.newItem <- td return nil } // ConsumeMetrics implements MetricsProcessor -func (bp *batchProcessor) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error { - // First thing is convert into a different internal format - bp.newItem <- md +func (bp *batchProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + b, err := bp.findBatcher(ctx) + if err != nil { + return nil + } + b.newItem <- md return nil } // ConsumeLogs implements LogsProcessor -func (bp *batchProcessor) ConsumeLogs(_ context.Context, ld plog.Logs) error { - bp.newItem <- ld +func (bp *batchProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + b, err := bp.findBatcher(ctx) + if err != nil { + return nil + } + b.newItem <- ld return nil } // newBatchTracesProcessor creates a new batch processor that batches traces by size or with timeout func newBatchTracesProcessor(set processor.CreateSettings, next consumer.Traces, cfg *Config, useOtel bool) (*batchProcessor, error) { - return newBatchProcessor(set, cfg, newBatchTraces(next), useOtel) + return newBatchProcessor(set, cfg, func() batch { return newBatchTraces(next) }, useOtel) } // newBatchMetricsProcessor creates a new batch processor that batches metrics by size or with timeout func newBatchMetricsProcessor(set processor.CreateSettings, next consumer.Metrics, cfg *Config, useOtel bool) (*batchProcessor, error) { - return newBatchProcessor(set, cfg, newBatchMetrics(next), useOtel) + return newBatchProcessor(set, cfg, func() batch { return newBatchMetrics(next) }, useOtel) } // newBatchLogsProcessor creates a new batch processor that batches logs by size or with timeout func newBatchLogsProcessor(set processor.CreateSettings, next consumer.Logs, cfg *Config, useOtel bool) (*batchProcessor, error) { - return newBatchProcessor(set, cfg, newBatchLogs(next), useOtel) + return newBatchProcessor(set, cfg, func() batch { return newBatchLogs(next) }, useOtel) } type batchTraces struct { diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index c7b8e046c77..a21e5523fa7 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -25,9 +25,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/pdata/plog" @@ -820,6 +822,166 @@ func TestShutdown(t *testing.T) { processortest.VerifyShutdown(t, factory, factory.CreateDefaultConfig()) } +type metadataTracesSink struct { + *consumertest.TracesSink + + lock sync.Mutex + spanCountByToken12 map[string]int +} + +func formatTwo(first, second []string) string { + return fmt.Sprintf("%s;%s", first, second) +} + +func (mts *metadataTracesSink) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + info := client.FromContext(ctx) + token1 := info.Metadata.Get("token1") + token2 := info.Metadata.Get("token2") + mts.lock.Lock() + defer mts.lock.Unlock() + + mts.spanCountByToken12[formatTwo( + token1, + token2, + )] += td.SpanCount() + return mts.TracesSink.ConsumeTraces(ctx, td) +} + +func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) { + sink := &metadataTracesSink{ + TracesSink: &consumertest.TracesSink{}, + spanCountByToken12: map[string]int{}, + } + cfg := createDefaultConfig().(*Config) + cfg.SendBatchSize = 1000 + cfg.Timeout = 10 * time.Minute + cfg.MetadataKeys = []string{"token1", "token2"} + creationSet := processortest.NewNopCreateSettings() + creationSet.MetricsLevel = configtelemetry.LevelDetailed + batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, false) + require.NoError(t, err) + require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + + bg := context.Background() + callCtxs := []context.Context{ + client.NewContext(bg, client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "token1": {"single"}, + "token3": {"n/a"}, + }), + }), + client.NewContext(bg, client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "token1": {"single"}, + "token2": {"one", "two"}, + "token4": {"n/a"}, + }), + }), + client.NewContext(bg, client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "token1": nil, + "token2": {"single"}, + }), + }), + client.NewContext(bg, client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "token1": {"one", "two", "three"}, + "token2": {"single"}, + "token3": {"n/a"}, + "token4": {"n/a", "d/c"}, + }), + }), + } + expectByContext := make([]int, len(callCtxs)) + + requestCount := 1000 + spansPerRequest := 33 + sentResourceSpans := ptrace.NewTraces().ResourceSpans() + for requestNum := 0; requestNum < requestCount; requestNum++ { + td := testdata.GenerateTraces(spansPerRequest) + spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() + for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { + spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex)) + } + td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty()) + // use round-robin to assign context. + num := requestNum % len(callCtxs) + expectByContext[num] += spansPerRequest + assert.NoError(t, batcher.ConsumeTraces(callCtxs[num], td)) + } + + require.NoError(t, batcher.Shutdown(context.Background())) + + // The following tests are the same as TestBatchProcessorSpansDelivered(). + require.Equal(t, requestCount*spansPerRequest, sink.SpanCount()) + receivedTraces := sink.AllTraces() + spansReceivedByName := spansReceivedByName(receivedTraces) + for requestNum := 0; requestNum < requestCount; requestNum++ { + spans := sentResourceSpans.At(requestNum).ScopeSpans().At(0).Spans() + for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { + require.EqualValues(t, + spans.At(spanIndex), + spansReceivedByName[getTestSpanName(requestNum, spanIndex)]) + } + } + + // This test ensures each context had the expected number of spans. + require.Equal(t, len(callCtxs), len(sink.spanCountByToken12)) + for idx, ctx := range callCtxs { + md := client.FromContext(ctx).Metadata + exp := formatTwo(md.Get("token1"), md.Get("token2")) + require.Equal(t, expectByContext[idx], sink.spanCountByToken12[exp]) + } +} + +func TestBatchProcessorDuplicateMetadataKeys(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.MetadataKeys = []string{"myTOKEN", "mytoken"} + err := cfg.Validate() + require.Error(t, err) + require.Contains(t, err.Error(), "duplicate") + require.Contains(t, err.Error(), "mytoken") +} + +func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) { + const cardLimit = 10 + + sink := new(consumertest.TracesSink) + cfg := createDefaultConfig().(*Config) + cfg.MetadataKeys = []string{"token"} + cfg.MetadataCardinalityLimit = cardLimit + creationSet := processortest.NewNopCreateSettings() + batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, false) + require.NoError(t, err) + require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + + bg := context.Background() + for requestNum := 0; requestNum < cardLimit; requestNum++ { + td := testdata.GenerateTraces(1) + ctx := client.NewContext(bg, client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "token": {fmt.Sprint(requestNum)}, + }), + }) + + assert.NoError(t, batcher.ConsumeTraces(ctx, td)) + } + + td := testdata.GenerateTraces(1) + ctx := client.NewContext(bg, client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "token": {"limit_exceeded"}, + }), + }) + err = batcher.ConsumeTraces(ctx, td) + + assert.Error(t, err) + assert.True(t, consumererror.IsPermanent(err)) + assert.Contains(t, err.Error(), "too many") + + require.NoError(t, batcher.Shutdown(context.Background())) +} + func TestBatchZeroConfig(t *testing.T) { // This is a no-op configuration. No need for a timer, no // minimum, no mxaimum, just a pass through. diff --git a/processor/batchprocessor/config.go b/processor/batchprocessor/config.go index d57129728af..7b91e505ee0 100644 --- a/processor/batchprocessor/config.go +++ b/processor/batchprocessor/config.go @@ -16,6 +16,8 @@ package batchprocessor // import "go.opentelemetry.io/collector/processor/batchp import ( "errors" + "fmt" + "strings" "time" "go.opentelemetry.io/collector/component" @@ -36,6 +38,23 @@ type Config struct { // Larger batches are split into smaller units. // Default value is 0, that means no maximum size. SendBatchMaxSize uint32 `mapstructure:"send_batch_max_size"` + + // MetadataKeys is a list of client.Metadata keys that will be + // used to form distinct batchers. If this setting is empty, + // a single batcher instance will be used. When this setting + // is not empty, one batcher will be used per distinct + // combination of values for the listed metadata keys. + // + // Empty value and unset metadata are treated as distinct cases. + // + // Entries are case-insensitive. Duplicated entries will + // trigger a validation error. + MetadataKeys []string `mapstructure:"metadata_keys"` + + // MetadataCardinalityLimit indicates the maximum number of + // batcher instances that will be created through a distinct + // combination of MetadataKeys. + MetadataCardinalityLimit uint32 `mapstructure:"metadata_cardinality_limit"` } var _ component.Config = (*Config)(nil) @@ -45,6 +64,14 @@ func (cfg *Config) Validate() error { if cfg.SendBatchMaxSize > 0 && cfg.SendBatchMaxSize < cfg.SendBatchSize { return errors.New("send_batch_max_size must be greater or equal to send_batch_size") } + uniq := map[string]bool{} + for _, k := range cfg.MetadataKeys { + l := strings.ToLower(k) + if _, has := uniq[l]; has { + return fmt.Errorf("duplicate entry in metadata_keys: %q (case-insensitive)", l) + } + uniq[l] = true + } if cfg.Timeout < 0 { return errors.New("timeout must be greater or equal to 0") } diff --git a/processor/batchprocessor/config_test.go b/processor/batchprocessor/config_test.go index 6c3b0dfc565..f02a92bb1da 100644 --- a/processor/batchprocessor/config_test.go +++ b/processor/batchprocessor/config_test.go @@ -42,9 +42,10 @@ func TestUnmarshalConfig(t *testing.T) { assert.NoError(t, component.UnmarshalConfig(cm, cfg)) assert.Equal(t, &Config{ - SendBatchSize: uint32(10000), - SendBatchMaxSize: uint32(11000), - Timeout: time.Second * 10, + SendBatchSize: uint32(10000), + SendBatchMaxSize: uint32(11000), + Timeout: time.Second * 10, + MetadataCardinalityLimit: 1000, }, cfg) } diff --git a/processor/batchprocessor/factory.go b/processor/batchprocessor/factory.go index bde46823a2a..87047d80af7 100644 --- a/processor/batchprocessor/factory.go +++ b/processor/batchprocessor/factory.go @@ -30,6 +30,11 @@ const ( defaultSendBatchSize = uint32(8192) defaultTimeout = 200 * time.Millisecond + + // defaultMetadataCardinalityLimit should be set to the number + // of metadata configurations the user expects to submit to + // the collector. + defaultMetadataCardinalityLimit = 1000 ) // NewFactory returns a new factory for the Batch processor. @@ -44,8 +49,9 @@ func NewFactory() processor.Factory { func createDefaultConfig() component.Config { return &Config{ - SendBatchSize: defaultSendBatchSize, - Timeout: defaultTimeout, + SendBatchSize: defaultSendBatchSize, + Timeout: defaultTimeout, + MetadataCardinalityLimit: defaultMetadataCardinalityLimit, } } diff --git a/processor/batchprocessor/metrics.go b/processor/batchprocessor/metrics.go index d128135a278..31df28ad9a3 100644 --- a/processor/batchprocessor/metrics.go +++ b/processor/batchprocessor/metrics.go @@ -106,14 +106,15 @@ type batchProcessorTelemetry struct { exportCtx context.Context - processorAttr []attribute.KeyValue - batchSizeTriggerSend metric.Int64Counter - timeoutTriggerSend metric.Int64Counter - batchSendSize metric.Int64Histogram - batchSendSizeBytes metric.Int64Histogram + processorAttr []attribute.KeyValue + batchSizeTriggerSend metric.Int64Counter + timeoutTriggerSend metric.Int64Counter + batchSendSize metric.Int64Histogram + batchSendSizeBytes metric.Int64Histogram + batchMetadataCardinality metric.Int64ObservableUpDownCounter } -func newBatchProcessorTelemetry(set processor.CreateSettings, useOtel bool) (*batchProcessorTelemetry, error) { +func newBatchProcessorTelemetry(set processor.CreateSettings, currentMetadataCardinality func() int, useOtel bool) (*batchProcessorTelemetry, error) { exportCtx, err := tag.New(context.Background(), tag.Insert(processorTagKey, set.ID.String())) if err != nil { return nil, err @@ -127,7 +128,7 @@ func newBatchProcessorTelemetry(set processor.CreateSettings, useOtel bool) (*ba detailed: set.MetricsLevel == configtelemetry.LevelDetailed, } - err = bpt.createOtelMetrics(set.MeterProvider) + err = bpt.createOtelMetrics(set.MeterProvider, currentMetadataCardinality) if err != nil { return nil, err } @@ -135,7 +136,7 @@ func newBatchProcessorTelemetry(set processor.CreateSettings, useOtel bool) (*ba return bpt, nil } -func (bpt *batchProcessorTelemetry) createOtelMetrics(mp metric.MeterProvider) error { +func (bpt *batchProcessorTelemetry) createOtelMetrics(mp metric.MeterProvider, currentMetadataCardinality func() int) error { if !bpt.useOtel { return nil } @@ -179,6 +180,19 @@ func (bpt *batchProcessorTelemetry) createOtelMetrics(mp metric.MeterProvider) e return err } + bpt.batchMetadataCardinality, err = meter.Int64ObservableUpDownCounter( + obsreport.BuildProcessorCustomMetricName(typeStr, "metadata_cardinality"), + metric.WithDescription("Number of distinct metadata value combinations being processed"), + metric.WithUnit("1"), + metric.WithInt64Callback(func(_ context.Context, obs metric.Int64Observer) error { + obs.Observe(int64(currentMetadataCardinality())) + return nil + }), + ) + if err != nil { + return err + } + return nil } @@ -205,7 +219,7 @@ func (bpt *batchProcessorTelemetry) recordWithOC(trigger trigger, sent, bytes in } } -func (bpt *batchProcessorTelemetry) recordWithOtel(trigger trigger, sent int64, bytes int64) { +func (bpt *batchProcessorTelemetry) recordWithOtel(trigger trigger, sent, bytes int64) { switch trigger { case triggerBatchSize: bpt.batchSizeTriggerSend.Add(bpt.exportCtx, 1, metric.WithAttributes(bpt.processorAttr...))