diff --git a/internal/stanza/config.go b/internal/stanza/config.go index 8dd810526562..4c08f334564e 100644 --- a/internal/stanza/config.go +++ b/internal/stanza/config.go @@ -43,6 +43,10 @@ type ConverterConfig struct { // FlushInterval defines how often to flush the converted and accumulated // log entries. FlushInterval time.Duration `mapstructure:"flush_interval"` + // WorkerCount defines how many worker goroutines used for entry.Entry to + // log records translation should be spawned. + // By default: math.Max(1, runtime.NumCPU()/4) workers are spawned. + WorkerCount int `mapstructure:"worker_count"` } // InputConfig is an alias that allows unmarshaling outside of mapstructure diff --git a/internal/stanza/converter.go b/internal/stanza/converter.go index 7c688c4ea415..c789579805a7 100644 --- a/internal/stanza/converter.go +++ b/internal/stanza/converter.go @@ -15,9 +15,13 @@ package stanza import ( + "bytes" "context" "encoding/json" + "errors" "fmt" + "math" + "runtime" "sync" "time" @@ -36,43 +40,42 @@ const ( // Logs are being sent out based on the flush interval and/or the maximum // batch size. // -// The diagram below illustrates the internal communication inside the Converter. +// The diagram below illustrates the internal communication inside the Converter: // -// ┌─────────────────────────────────┐ -// │ Batch() │ -// │ Ingests and converts log │ -// │ entries and then spawns │ -// ┌───┼─ go queueForFlush() │ -// │ │ if maxFlushCount was reached │ -// │ └─────────────────────────────────┘ -// │ -// │ ┌──────────────────────────────────────┐ -// ├──► queueForFlush goroutine(s) │ -// │ │ Spawned whenever a batch of logs │ -// │ │ ┌─is queued to be flushed. │ -// │ │ │ Sends received logs onto flushChan │ -// │ └─┼────────────────────────────────────┘ -// │ │ -// │ │ -// │ ┌─┼───────────────────────────────────┐ -// │ │ │ Start() │ -// │ │ │ Starts a goroutine listening on: │ -// │ │ │ │ -// │ │ └► * flushChan ───────────────────┼──┐ -// │ │ │ │ -// │ │ * ticker.C │ │ -// └──┼───── calls go queueForFlush() if │ │ -// │ there's anything in the buffer │ │ -// └─────────────────────────────────────┘ │ -// │ -// │ -// ┌──────────────────────────────────────┐ │ -// │ flush() ◄──────────────────────────┼─┘ -// │ Flushes converted and aggregated │ -// │ logs onto pLogsChan which is │ -// │ consumed by downstream consumers │ -// │ viaoOutChannel() │ -// └──────────────────────────────────────┘ +// ┌─────────────────────────────────┐ +// │ Batch() │ +// ┌─────────┤ Ingests log entries and sends │ +// │ │ them onto a workerChan │ +// │ └─────────────────────────────────┘ +// │ +// │ ┌───────────────────────────────────────────────────┐ +// ├─► workerLoop() │ +// │ │ ┌─────────────────────────────────────────────────┴─┐ +// ├─┼─► workerLoop() │ +// │ │ │ ┌─────────────────────────────────────────────────┴─┐ +// └─┼─┼─► workerLoop() │ +// └─┤ │ consumes sent log entries from workerChan, │ +// │ │ translates received entries to pdata.LogRecords,│ +// └─┤ marshalls them to JSON and send them onto │ +// │ batchChan │ +// └─────────────────────────┬─────────────────────────┘ +// │ +// ▼ +// ┌─────────────────────────────────────────────────────┐ +// │ batchLoop() │ +// │ consumes from batchChan, aggregates log records │ +// │ by marshaled Resource and based on flush interval │ +// │ and maxFlushCount decides whether to send the │ +// │ aggregated buffer to flushChan │ +// └───────────────────────────┬─────────────────────────┘ +// │ +// ▼ +// ┌─────────────────────────────────────────────────────┐ +// │ flushLoop() │ +// │ receives log records from flushChan and sends │ +// │ them onto pLogsChan which is consumed by │ +// │ downstream consumers via OutChannel() │ +// └─────────────────────────────────────────────────────┘ // type Converter struct { // pLogsChan is a channel on which batched logs will be sent to. @@ -81,20 +84,30 @@ type Converter struct { stopOnce sync.Once stopChan chan struct{} + // workerChan is an internal communication channel that gets the log + // entries from Batch() calls and it receives the data in workerLoop(). + workerChan chan *entry.Entry + // workerCount configures the amount of workers started. + workerCount int + // batchChan obtains log entries converted by the pool of workers, + // in a form of logRecords grouped by Resource and then after aggregating + // them decides based on maxFlushCount if the flush should be triggered. + // If also serves the ticker flushes configured by flushInterval. + batchChan chan *workerItem + // flushInterval defines how often we flush the aggregated log entries. flushInterval time.Duration // maxFlushCount defines what's the amount of entries in the buffer that // will trigger a flush of log entries. maxFlushCount uint // flushChan is an internal channel used for transporting batched pdata.Logs. - flushChan chan []pdata.Logs + flushChan chan pdata.Logs - // data is the internal cache which is flushed regularly, either when - // flushInterval ticker ticks or when max number of entries for a - // particular Resource is reached. - data map[string][]*entry.Entry - dataMutex sync.RWMutex - dataCount uint + // data holds currently converted and aggregated log entries, grouped by Resource. + data map[string]pdata.Logs + // logRecordCount holds the number of translated and accumulated log Records + // and is compared against maxFlushCount to make a decision whether to flush. + logRecordCount uint // wg is a WaitGroup that makes sure that we wait for spun up goroutines exit // when Stop() is called. @@ -131,16 +144,24 @@ func WithLogger(logger *zap.Logger) ConverterOption { }) } +func WithWorkerCount(workerCount int) ConverterOption { + return optionFunc(func(c *Converter) { + c.workerCount = workerCount + }) +} + func NewConverter(opts ...ConverterOption) *Converter { c := &Converter{ + workerChan: make(chan *entry.Entry), + workerCount: int(math.Max(1, float64(runtime.NumCPU()/4))), + batchChan: make(chan *workerItem), + data: make(map[string]pdata.Logs), pLogsChan: make(chan pdata.Logs), stopChan: make(chan struct{}), logger: zap.NewNop(), - flushChan: make(chan []pdata.Logs), + flushChan: make(chan pdata.Logs), flushInterval: DefaultFlushInterval, maxFlushCount: DefaultMaxFlushCount, - data: make(map[string][]*entry.Entry), - wg: sync.WaitGroup{}, } for _, opt := range opts { @@ -151,41 +172,18 @@ func NewConverter(opts ...ConverterOption) *Converter { } func (c *Converter) Start() { - c.logger.Debug("Starting log converter") + c.logger.Debug("Starting log converter", zap.Int("worker_count", c.workerCount)) - c.wg.Add(1) - go func(c *Converter) { - defer c.wg.Done() - - ticker := time.NewTicker(c.flushInterval) - defer ticker.Stop() + for i := 0; i < c.workerCount; i++ { + c.wg.Add(1) + go c.workerLoop() + } - for { - select { - case <-c.stopChan: - return + c.wg.Add(1) + go c.batchLoop() - case pLogs := <-c.flushChan: - if err := c.flush(context.Background(), pLogs); err != nil { - c.logger.Debug("Problem sending log entries", - zap.Error(err), - ) - } - // NOTE: - // Since we've received a flush signal independently of flush - // ticker do we want to reset the flush ticker? - - case <-ticker.C: - c.dataMutex.Lock() - count := c.dataCount - if count > 0 { - pLogs := c.convertBuffer() - go c.queueForFlush(pLogs) - } - c.dataMutex.Unlock() - } - } - }(c) + c.wg.Add(1) + go c.flushLoop() } func (c *Converter) Stop() { @@ -201,115 +199,165 @@ func (c *Converter) OutChannel() <-chan pdata.Logs { return c.pLogsChan } -// flush flushes provided pdata.Logs entries onto a channel. -func (c *Converter) flush(ctx context.Context, pLogs []pdata.Logs) error { - doneChan := ctx.Done() +type workerItem struct { + Resource map[string]string + LogRecord pdata.LogRecord + ResourceString string +} - for _, pLog := range pLogs { - select { - case <-doneChan: - return fmt.Errorf("flushing log entries interrupted, err: %v", ctx.Err()) +// workerLoop is responsible for obtaining log entries from Batch() calls, +// converting them to pdata.LogRecords and sending them together with the +// associated Resource through the batchChan for aggregation. +func (c *Converter) workerLoop() { + defer c.wg.Done() + + var ( + buff = bytes.Buffer{} + encoder = json.NewEncoder(&buff) + ) - case c.pLogsChan <- pLog: + for { - // The converter has been stopped so bail the flush. + select { case <-c.stopChan: - return nil + return + + case e, ok := <-c.workerChan: + if !ok { + return + } + + buff.Reset() + lr := convert(e) + + if err := encoder.Encode(e.Resource); err != nil { + c.logger.Debug("Failed marshaling entry.Resource to JSON", + zap.Any("resource", e.Resource), + ) + continue + } + + select { + case c.batchChan <- &workerItem{ + Resource: e.Resource, + ResourceString: buff.String(), + LogRecord: lr, + }: + case <-c.stopChan: + } } } - - return nil } -// Batch takes in an entry.Entry and aggregates it with other entries -// that came from the same Resource. -// If the maxFlushCount has been reached then trigger a flush via the flushChan. -func (c *Converter) Batch(e *entry.Entry) error { - b, err := json.Marshal(e.Resource) - if err != nil { - return err - } +// batchLoop is responsible for receiving the converted log entries and aggregating +// them by Resource. +// Whenever maxFlushCount is reached or the ticker ticks a flush is triggered. +func (c *Converter) batchLoop() { + defer c.wg.Done() - resource := string(b) + ticker := time.NewTicker(c.flushInterval) + defer ticker.Stop() - // This is locked also for the possible conversion so that no entries are - // added in the meantime so that the expected maximum batch size is not - // exceeded. - c.dataMutex.Lock() + for { + select { + case wi, ok := <-c.batchChan: + if !ok { + return + } - resourceEntries, ok := c.data[resource] - if !ok { - // If we don't have any log entries for this Resource then create - // the provider entry in the cache for it. - resourceEntries = make([]*entry.Entry, 0, 1) - } + pLogs, ok := c.data[wi.ResourceString] + if ok { + pLogs.ResourceLogs(). + At(0).InstrumentationLibraryLogs(). + At(0).Logs().Append(wi.LogRecord) + } else { + pLogs = pdata.NewLogs() + logs := pLogs.ResourceLogs() + logs.Resize(1) + rls := logs.At(0) + + resource := rls.Resource() + resourceAtts := resource.Attributes() + resourceAtts.EnsureCapacity(len(wi.Resource)) + for k, v := range wi.Resource { + resourceAtts.InsertString(k, v) + } - c.data[resource] = append(resourceEntries, e) - c.dataCount++ + ills := rls.InstrumentationLibraryLogs() + ills.Resize(1) + ills.At(0).Logs().Append(wi.LogRecord) + } - needToFlush := c.dataCount >= c.maxFlushCount + c.data[wi.ResourceString] = pLogs + c.logRecordCount++ - if needToFlush { - // Flush max size has been reached: schedule a log flush. - pLogs := c.convertBuffer() - go c.queueForFlush(pLogs) - } - c.dataMutex.Unlock() + if c.logRecordCount >= c.maxFlushCount { + for r, pLogs := range c.data { + c.flushChan <- pLogs + delete(c.data, r) + } + c.logRecordCount = 0 + } - return nil -} + case <-ticker.C: + for r, pLogs := range c.data { + c.flushChan <- pLogs + delete(c.data, r) + } + c.logRecordCount = 0 -// convertBuffer converts the accumulated entries in the buffer and empties it. -// -// NOTE: The caller needs to ensure that c.dataMutex is locked when this is called. -func (c *Converter) convertBuffer() []pdata.Logs { - pLogs := make([]pdata.Logs, 0, len(c.data)) - for h, entries := range c.data { - pLogs = append(pLogs, convertEntries(entries)) - delete(c.data, h) + case <-c.stopChan: + return + } } - c.dataCount = 0 - - return pLogs } -// queueForFlush queues the provided slice of pdata.Logs for flushing. -func (c *Converter) queueForFlush(pLogs []pdata.Logs) { - select { - case c.flushChan <- pLogs: - case <-c.stopChan: +func (c *Converter) flushLoop() { + defer c.wg.Done() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + for { + select { + case <-c.stopChan: + return + + case pLogs := <-c.flushChan: + if err := c.flush(ctx, pLogs); err != nil { + c.logger.Debug("Problem sending log entries", + zap.Error(err), + ) + } + } } } -// convertEntries converts takes in a slice of entries coming from the same -// Resource and converts them into a pdata.Logs. -func convertEntries(entries []*entry.Entry) pdata.Logs { - out := pdata.NewLogs() - if len(entries) == 0 { - return out - } +// flush flushes provided pdata.Logs entries onto a channel. +func (c *Converter) flush(ctx context.Context, pLogs pdata.Logs) error { + doneChan := ctx.Done() - logs := out.ResourceLogs() - rls := logs.AppendEmpty() + select { + case <-doneChan: + return fmt.Errorf("flushing log entries interrupted, err: %w", ctx.Err()) - // NOTE: This assumes that passed in entries all come from the same Resource. - if len(entries[0].Resource) > 0 { - resource := rls.Resource() - resourceAtts := resource.Attributes() - resourceAtts.EnsureCapacity(len(entries[0].Resource)) - for k, v := range entries[0].Resource { - resourceAtts.InsertString(k, v) - } - } + case c.pLogsChan <- pLogs: - ills := rls.InstrumentationLibraryLogs().AppendEmpty() - ills.Logs().Resize(len(entries)) - for i := 0; i < len(entries); i++ { - ent := entries[i] - convertInto(ent, ills.Logs().At(i)) + // The converter has been stopped so bail the flush. + case <-c.stopChan: + return errors.New("Logs converter has been stopped") } - return out + return nil +} + +// Batch takes in an entry.Entry and sends it to an available worker for processing. +func (c *Converter) Batch(e *entry.Entry) error { + select { + case c.workerChan <- e: + return nil + case <-c.stopChan: + return errors.New("Logs converter has been stopped") + } } // convert converts one entry.Entry into pdata.LogRecord allocating it. @@ -319,6 +367,28 @@ func convert(ent *entry.Entry) pdata.LogRecord { return dest } +// Convert converts one entry.Entry into pdata.Logs. +// To be used in a stateless setting like tests where ease of use is more +// important than performance or throughput. +func Convert(ent *entry.Entry) pdata.Logs { + pLogs := pdata.NewLogs() + logs := pLogs.ResourceLogs() + + rls := logs.AppendEmpty() + + resource := rls.Resource() + resourceAtts := resource.Attributes() + resourceAtts.EnsureCapacity(len(ent.Resource)) + for k, v := range ent.Resource { + resourceAtts.InsertString(k, v) + } + + ills := rls.InstrumentationLibraryLogs().AppendEmpty() + lr := ills.Logs().AppendEmpty() + convertInto(ent, lr) + return pLogs +} + // convertInto converts entry.Entry into provided pdata.LogRecord. func convertInto(ent *entry.Entry, dest pdata.LogRecord) { dest.SetTimestamp(pdata.TimestampFromTime(ent.Timestamp)) @@ -327,8 +397,9 @@ func convertInto(ent *entry.Entry, dest pdata.LogRecord) { dest.SetSeverityText(sevText) dest.SetSeverityNumber(sevNum) - if len(ent.Attributes) > 0 { + if l := len(ent.Attributes); l > 0 { attributes := dest.Attributes() + attributes.EnsureCapacity(l) for k, v := range ent.Attributes { attributes.InsertString(k, v) } @@ -449,10 +520,9 @@ func toAttributeMap(obsMap map[string]interface{}) pdata.AttributeValue { func toAttributeArray(obsArr []interface{}) pdata.AttributeValue { arrVal := pdata.NewAttributeValueArray() arr := arrVal.ArrayVal() - for _, v := range obsArr { - attVal := pdata.NewAttributeValueNull() - insertToAttributeVal(v, attVal) - arr.Append(attVal) + arr.Resize(len(obsArr)) + for i, v := range obsArr { + insertToAttributeVal(v, arr.At(i)) } return arrVal } diff --git a/internal/stanza/converter_test.go b/internal/stanza/converter_test.go index 0e78518b05c9..357895239acb 100644 --- a/internal/stanza/converter_test.go +++ b/internal/stanza/converter_test.go @@ -49,13 +49,17 @@ func BenchmarkConvertComplex(b *testing.B) { } func complexEntries(count int) []*entry.Entry { + return complexEntriesForNDifferentHosts(count, 1) +} + +func complexEntriesForNDifferentHosts(count int, n int) []*entry.Entry { ret := make([]*entry.Entry, count) - for i := int64(0); i < int64(count); i++ { + for i := 0; i < count; i++ { e := entry.New() e.Severity = entry.Error e.AddResourceKey("type", "global") e.Resource = map[string]string{ - "host": "host", + "host": fmt.Sprintf("host-%d", i%n), } e.Body = map[string]interface{}{ "bool": true, @@ -94,25 +98,90 @@ func complexEntry() *entry.Entry { "int": 123, "double": 12.34, "string": "hello", - "bytes": []byte("asdf"), + // "bytes": []byte("asdf"), "object": map[string]interface{}{ "bool": true, "int": 123, "double": 12.34, "string": "hello", - "bytes": []byte("asdf"), + // "bytes": []byte("asdf"), "object": map[string]interface{}{ - "bool": true, - "int": 123, - "double": 12.34, + "bool": true, + "int": 123, + // "double": 12.34, "string": "hello", - "bytes": []byte("asdf"), + // "bytes": []byte("asdf"), }, }, } return e } +func TestConvert(t *testing.T) { + ent := func() *entry.Entry { + e := entry.New() + e.Severity = entry.Error + e.AddResourceKey("type", "global") + e.AddAttribute("one", "two") + e.AddAttribute("two", "three") + e.Body = map[string]interface{}{ + "bool": true, + "int": 123, + "double": 12.34, + "string": "hello", + "bytes": []byte("asdf"), + } + return e + }() + + pLogs := Convert(ent) + require.Equal(t, 1, pLogs.ResourceLogs().Len()) + rls := pLogs.ResourceLogs().At(0) + require.Equal(t, 1, rls.Resource().Attributes().Len()) + { + att, ok := rls.Resource().Attributes().Get("type") + if assert.True(t, ok) { + if assert.Equal(t, att.Type(), pdata.AttributeValueSTRING) { + assert.Equal(t, att.StringVal(), "global") + } + } + } + + ills := rls.InstrumentationLibraryLogs() + require.Equal(t, 1, ills.Len()) + + logs := ills.At(0).Logs() + require.Equal(t, 1, logs.Len()) + + lr := logs.At(0) + + assert.Equal(t, pdata.SeverityNumberERROR, lr.SeverityNumber()) + assert.Equal(t, "Error", lr.SeverityText()) + + if atts := lr.Attributes(); assert.Equal(t, 2, atts.Len()) { + m := pdata.NewAttributeMap() + m.InitFromMap(map[string]pdata.AttributeValue{ + "one": pdata.NewAttributeValueString("two"), + "two": pdata.NewAttributeValueString("three"), + }) + assert.EqualValues(t, m.Sort(), atts.Sort()) + } + + if assert.Equal(t, pdata.AttributeValueMAP, lr.Body().Type()) { + m := pdata.NewAttributeMap() + m.InitFromMap(map[string]pdata.AttributeValue{ + "bool": pdata.NewAttributeValueBool(true), + "int": pdata.NewAttributeValueInt(123), + "double": pdata.NewAttributeValueDouble(12.34), + "string": pdata.NewAttributeValueString("hello"), + "bytes": pdata.NewAttributeValueString("asdf"), + // Don't include a nested object because AttributeValueMap sorting + // doesn't sort recursively. + }) + assert.EqualValues(t, m.Sort(), lr.Body().MapVal().Sort()) + } +} + func TestAllConvertedEntriesAreSentAndReceived(t *testing.T) { t.Parallel() @@ -141,6 +210,7 @@ func TestAllConvertedEntriesAreSentAndReceived(t *testing.T) { t.Parallel() converter := NewConverter( + WithWorkerCount(1), WithMaxFlushCount(tc.maxFlushCount), WithFlushInterval(10*time.Millisecond), // To minimize time spent in test ) @@ -154,10 +224,9 @@ func TestAllConvertedEntriesAreSentAndReceived(t *testing.T) { }() var ( - actualCount int - actualFlushCount int - timeoutTimer = time.NewTimer(10 * time.Second) - ch = converter.OutChannel() + actualCount int + timeoutTimer = time.NewTimer(10 * time.Second) + ch = converter.OutChannel() ) defer timeoutTimer.Stop() @@ -173,8 +242,6 @@ func TestAllConvertedEntriesAreSentAndReceived(t *testing.T) { break forLoop } - actualFlushCount++ - rLogs := pLogs.ResourceLogs() require.Equal(t, 1, rLogs.Len()) @@ -207,29 +274,40 @@ func TestAllConvertedEntriesAreSentAndReceivedWithinAnExpectedTimeDuration(t *te testcases := []struct { entries int + hostsCount int maxFlushCount uint flushInterval time.Duration }{ { entries: 10, + hostsCount: 1, maxFlushCount: 20, flushInterval: 100 * time.Millisecond, }, { entries: 50, + hostsCount: 1, maxFlushCount: 51, flushInterval: 100 * time.Millisecond, }, { entries: 500, + hostsCount: 1, maxFlushCount: 501, flushInterval: 100 * time.Millisecond, }, { entries: 500, + hostsCount: 1, maxFlushCount: 100, flushInterval: 100 * time.Millisecond, }, + { + entries: 500, + hostsCount: 4, + maxFlushCount: 501, + flushInterval: 100 * time.Millisecond, + }, } for i, tc := range testcases { @@ -239,6 +317,7 @@ func TestAllConvertedEntriesAreSentAndReceivedWithinAnExpectedTimeDuration(t *te t.Parallel() converter := NewConverter( + WithWorkerCount(1), WithMaxFlushCount(tc.maxFlushCount), WithFlushInterval(tc.flushInterval), ) @@ -246,7 +325,7 @@ func TestAllConvertedEntriesAreSentAndReceivedWithinAnExpectedTimeDuration(t *te defer converter.Stop() go func() { - for _, ent := range complexEntries(tc.entries) { + for _, ent := range complexEntriesForNDifferentHosts(tc.entries, tc.hostsCount) { assert.NoError(t, converter.Batch(ent)) } }() @@ -321,8 +400,17 @@ func TestConverterCancelledContextCancellsTheFlush(t *testing.T) { go func() { defer wg.Done() - logs := convertEntries(complexEntries(1)) - assert.Error(t, converter.flush(ctx, []pdata.Logs{logs})) + pLogs := pdata.NewLogs() + logs := pLogs.ResourceLogs() + logs.Resize(1) + rls := logs.At(0) + rls.InstrumentationLibraryLogs().Resize(1) + ills := rls.InstrumentationLibraryLogs().At(0) + + lr := convert(complexEntry()) + ills.Logs().Append(lr) + + assert.Error(t, converter.flush(ctx, pLogs)) }() wg.Wait() } @@ -574,3 +662,76 @@ func TestConvertTrace(t *testing.T) { }), record.SpanID()) require.Equal(t, uint32(0x01), record.Flags()) } + +func BenchmarkConverter(b *testing.B) { + const ( + entryCount = 1_000_000 + hostsCount = 4 + ) + + var ( + workerCounts = []int{1, 2, 4, 6, 8} + entries = complexEntriesForNDifferentHosts(entryCount, hostsCount) + ) + + for _, wc := range workerCounts { + b.Run(fmt.Sprintf("worker_count=%d", wc), func(b *testing.B) { + for i := 0; i < b.N; i++ { + + converter := NewConverter( + WithWorkerCount(wc), + WithMaxFlushCount(1_000), + WithFlushInterval(250*time.Millisecond), + ) + converter.Start() + defer converter.Stop() + b.ResetTimer() + + go func() { + for _, ent := range entries { + assert.NoError(b, converter.Batch(ent)) + } + }() + + var ( + timeoutTimer = time.NewTimer(10 * time.Second) + ch = converter.OutChannel() + ) + defer timeoutTimer.Stop() + + var n int + forLoop: + for { + if n == entryCount { + break + } + + select { + case pLogs, ok := <-ch: + if !ok { + break forLoop + } + + rLogs := pLogs.ResourceLogs() + require.Equal(b, 1, rLogs.Len()) + + rLog := rLogs.At(0) + ills := rLog.InstrumentationLibraryLogs() + require.Equal(b, 1, ills.Len()) + + ill := ills.At(0) + + n += ill.Logs().Len() + + case <-timeoutTimer.C: + break forLoop + } + } + + assert.Equal(b, entryCount, n, + "didn't receive expected number of entries after conversion", + ) + } + }) + } +} diff --git a/internal/stanza/factory.go b/internal/stanza/factory.go index 4bce00849957..ff68a83d27e7 100644 --- a/internal/stanza/factory.go +++ b/internal/stanza/factory.go @@ -80,6 +80,9 @@ func createLogsReceiver(logReceiverType LogReceiverType) receiverhelper.CreateLo if baseCfg.Converter.FlushInterval > 0 { opts = append(opts, WithFlushInterval(baseCfg.Converter.FlushInterval)) } + if baseCfg.Converter.WorkerCount > 0 { + opts = append(opts, WithWorkerCount(baseCfg.Converter.WorkerCount)) + } converter := NewConverter(opts...) return &receiver{ diff --git a/receiver/syslogreceiver/syslog_test.go b/receiver/syslogreceiver/syslog_test.go index 4d0bb43e4526..6713503d212a 100644 --- a/receiver/syslogreceiver/syslog_test.go +++ b/receiver/syslogreceiver/syslog_test.go @@ -110,6 +110,7 @@ func testdataConfigYamlAsMap() *SysLogConfig { Operators: stanza.OperatorConfigs{}, Converter: stanza.ConverterConfig{ FlushInterval: 100 * time.Millisecond, + WorkerCount: 1, }, }, Input: stanza.InputConfig{ @@ -128,6 +129,7 @@ func testdataUDPConfig() *SysLogConfig { Operators: stanza.OperatorConfigs{}, Converter: stanza.ConverterConfig{ FlushInterval: 100 * time.Millisecond, + WorkerCount: 1, }, }, Input: stanza.InputConfig{ diff --git a/receiver/syslogreceiver/testdata/config.yaml b/receiver/syslogreceiver/testdata/config.yaml index 3a4838206f40..238280a96313 100644 --- a/receiver/syslogreceiver/testdata/config.yaml +++ b/receiver/syslogreceiver/testdata/config.yaml @@ -5,6 +5,7 @@ receivers: protocol: rfc5424 converter: flush_interval: 100ms + worker_count: 1 processors: nop: diff --git a/receiver/tcplogreceiver/tcp_test.go b/receiver/tcplogreceiver/tcp_test.go index 807d6114c143..52ee2d8899d5 100644 --- a/receiver/tcplogreceiver/tcp_test.go +++ b/receiver/tcplogreceiver/tcp_test.go @@ -96,6 +96,9 @@ func testdataConfigYamlAsMap() *TCPLogConfig { BaseConfig: stanza.BaseConfig{ ReceiverSettings: config.NewReceiverSettings(config.NewID(typeStr)), Operators: stanza.OperatorConfigs{}, + Converter: stanza.ConverterConfig{ + WorkerCount: 1, + }, }, Input: stanza.InputConfig{ "listen_address": "0.0.0.0:29018", diff --git a/receiver/tcplogreceiver/testdata/config.yaml b/receiver/tcplogreceiver/testdata/config.yaml index 5eec0642a6b7..37c3eaa0efe5 100644 --- a/receiver/tcplogreceiver/testdata/config.yaml +++ b/receiver/tcplogreceiver/testdata/config.yaml @@ -1,6 +1,8 @@ receivers: tcplog: listen_address: "0.0.0.0:29018" + converter: + worker_count: 1 processors: nop: