From 8802b65678aaa2fd7a0935bda0aea070c2c7fb96 Mon Sep 17 00:00:00 2001 From: rockb1017 Date: Tue, 11 Jan 2022 11:00:11 -0800 Subject: [PATCH] recombine - add sourceIdentifier, update doc --- docs/operators/recombine.md | 1 + .../transformer/recombine/recombine.go | 91 ++++++++++++++----- .../transformer/recombine/recombine_test.go | 29 ++++++ 3 files changed, 99 insertions(+), 22 deletions(-) diff --git a/docs/operators/recombine.md b/docs/operators/recombine.md index 6560ef90..f61383e1 100644 --- a/docs/operators/recombine.md +++ b/docs/operators/recombine.md @@ -16,6 +16,7 @@ The `recombine` operator combines consecutive logs into single logs based on sim | `max_batch_size` | 1000 | The maximum number of consecutive entries that will be combined into a single entry. | | `overwrite_with` | `oldest` | Whether to use the fields from the `oldest` or the `newest` entry for all the fields that are not combined. | | `force_flush_period` | `5s` | Flush timeout after which entries will be flushed aborting the wait for their sub parts to be merged with. | +| `source_identifier` | | The [field](/docs/types/field.md) to separate one source of logs from others when combining them. (i.e., `$attributes["file.path"]`) | Exactly one of `is_first_entry` and `is_last_entry` must be specified. diff --git a/operator/builtin/transformer/recombine/recombine.go b/operator/builtin/transformer/recombine/recombine.go index ddbd7721..cacfea05 100644 --- a/operator/builtin/transformer/recombine/recombine.go +++ b/operator/builtin/transformer/recombine/recombine.go @@ -52,6 +52,7 @@ type RecombineOperatorConfig struct { MaxBatchSize int `json:"max_batch_size" yaml:"max_batch_size"` CombineField entry.Field `json:"combine_field" yaml:"combine_field"` CombineWith string `json:"combine_with" yaml:"combine_with"` + SourceIdentifier entry.Field `json:"source_identifier" yaml:"source_identifier"` OverwriteWith string `json:"overwrite_with" yaml:"overwrite_with"` ForceFlushTimeout time.Duration `json:"force_flush_period" yaml:"force_flush_period"` } @@ -106,13 +107,15 @@ func (c *RecombineOperatorConfig) Build(bc operator.BuildContext) ([]operator.Op matchFirstLine: matchesFirst, prog: prog, maxBatchSize: c.MaxBatchSize, + batchSize: 0, overwriteWithOldest: overwriteWithOldest, - batch: make([]*entry.Entry, 0, c.MaxBatchSize), + batchMap: make(map[string][]*entry.Entry), combineField: c.CombineField, combineWith: c.CombineWith, forceFlushTimeout: c.ForceFlushTimeout, ticker: time.NewTicker(c.ForceFlushTimeout), chClose: make(chan struct{}), + sourceIdentifier: c.SourceIdentifier, } return []operator.Operator{recombine}, nil @@ -125,15 +128,17 @@ type RecombineOperator struct { matchFirstLine bool prog *vm.Program maxBatchSize int + batchSize int overwriteWithOldest bool combineField entry.Field combineWith string ticker *time.Ticker forceFlushTimeout time.Duration chClose chan struct{} + sourceIdentifier entry.Field sync.Mutex - batch []*entry.Entry + batchMap map[string][]*entry.Entry } func (r *RecombineOperator) Start(_ operator.Persister) error { @@ -147,9 +152,19 @@ func (r *RecombineOperator) flushLoop() { select { case <-r.ticker.C: r.Lock() - if err := r.flushCombined(); err != nil { - r.Errorw("Failed flushing", "error", err) + timeNow := time.Now() + for source, entries := range r.batchMap { + lastEntryTs := entries[len(entries)-1].Timestamp + timeSinceLastEntry := timeNow.Sub(lastEntryTs) + if timeSinceLastEntry > r.forceFlushTimeout { + err := r.flushSource(source) + if err != nil { + r.Errorf("there was error flushing combined logs %s", err) + } + } } + + r.ticker.Reset(r.forceFlushTimeout) r.Unlock() case <-r.chClose: r.ticker.Stop() @@ -190,11 +205,20 @@ func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error { // this is guaranteed to be a boolean because of expr.AsBool matches := m.(bool) - + var s string + if r.sourceIdentifier.FieldInterface != nil { + err := e.Read(r.sourceIdentifier, &s) + if err != nil { + r.Warn("entry does not contain the source_identifier, so it may be pooled with other sources") + s = "DefaultSourceIdentifier" + } + } else { + s = "DefaultSourceIdentifier" + } // This is the first entry in the next batch if matches && r.matchIndicatesFirst() { // Flush the existing batch - err := r.flushCombined() + err := r.flushSource(s) if err != nil { return err } @@ -207,7 +231,7 @@ func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error { // This is the last entry in a complete batch if matches && r.matchIndicatesLast() { r.addToBatch(ctx, e) - err := r.flushCombined() + err := r.flushSource(s) if err != nil { return err } @@ -230,45 +254,66 @@ func (r *RecombineOperator) matchIndicatesLast() bool { // addToBatch adds the current entry to the current batch of entries that will be combined func (r *RecombineOperator) addToBatch(_ context.Context, e *entry.Entry) { - if len(r.batch) >= r.maxBatchSize { + var s string + + if r.sourceIdentifier.FieldInterface != nil { + err := e.Read(r.sourceIdentifier, &s) + if err != nil { + r.Warn("entry does not contain the source_identifier, so it may be pooled with other sources") + s = "DefaultSourceIdentifier" + } + } else { + s = "DefaultSourceIdentifier" + } + if r.batchSize >= r.maxBatchSize { r.Error("Batch size exceeds max batch size. Flushing logs that have not been recombined") r.flushUncombined(context.Background()) + } else { + r.batchSize += 1 + if _, ok := r.batchMap[s]; !ok { + r.batchMap[s] = []*entry.Entry{e} + } else { + r.batchMap[s] = append(r.batchMap[s], e) + } } - - r.batch = append(r.batch, e) } // flushUncombined flushes all the logs in the batch individually to the // next output in the pipeline. This is only used when there is an error // or at shutdown to avoid dropping the logs. func (r *RecombineOperator) flushUncombined(ctx context.Context) { - for _, entry := range r.batch { - r.Write(ctx, entry) + for _, entries := range r.batchMap { + for _, entry := range entries { + r.Write(ctx, entry) + } } - r.batch = r.batch[:0] + r.batchMap = make(map[string][]*entry.Entry) + r.batchSize = 0 r.ticker.Reset(r.forceFlushTimeout) } -// flushCombined combines the entries currently in the batch into a single entry, +// flushSource combines the entries currently in the batch into a single entry, // then forwards them to the next operator in the pipeline -func (r *RecombineOperator) flushCombined() error { +func (r *RecombineOperator) flushSource(source string) error { // Skip flushing a combined log if the batch is empty - if len(r.batch) == 0 { + if len(r.batchMap[source]) == 0 { return nil } // Choose which entry we want to keep the rest of the fields from var base *entry.Entry + entries := r.batchMap[source] + if r.overwriteWithOldest { - base = r.batch[0] + base = entries[0] } else { - base = r.batch[len(r.batch)-1] + base = entries[len(entries)-1] } // Combine the combineField of each entry in the batch, // separated by newlines var recombined strings.Builder - for i, e := range r.batch { + for i, e := range entries { var s string err := e.Read(r.combineField, &s) if err != nil { @@ -277,7 +322,7 @@ func (r *RecombineOperator) flushCombined() error { } recombined.WriteString(s) - if i != len(r.batch)-1 && len(r.combineWith) > 0 { + if i != len(entries)-1 { recombined.WriteString(r.combineWith) } } @@ -289,7 +334,9 @@ func (r *RecombineOperator) flushCombined() error { } r.Write(context.Background(), base) - r.batch = r.batch[:0] - r.ticker.Reset(r.forceFlushTimeout) + + r.batchSize -= len(r.batchMap[source]) + r.batchMap[source] = []*entry.Entry{} + return nil } diff --git a/operator/builtin/transformer/recombine/recombine_test.go b/operator/builtin/transformer/recombine/recombine_test.go index 6250dfef..00ea3e87 100644 --- a/operator/builtin/transformer/recombine/recombine_test.go +++ b/operator/builtin/transformer/recombine/recombine_test.go @@ -37,6 +37,14 @@ func TestRecombineOperator(t *testing.T) { return e } + entryWithBodyAttr := func(ts time.Time, body interface{}, Attr map[string]string) *entry.Entry { + e := entryWithBody(ts, body) + for k, v := range Attr { + e.AddAttribute(k, v) + } + return e + } + cases := []struct { name string config *RecombineOperatorConfig @@ -141,6 +149,27 @@ func TestRecombineOperator(t *testing.T) { }, []*entry.Entry{entryWithBody(t1, "test1test2")}, }, + { + "fourEntriesTwoSourceIdentifier", + func() *RecombineOperatorConfig { + cfg := NewRecombineOperatorConfig("") + cfg.CombineField = entry.NewBodyField() + cfg.IsLastEntry = "$body == 'end'" + cfg.OutputIDs = []string{"fake"} + cfg.SourceIdentifier = entry.NewAttributeField("file.name") + return cfg + }(), + []*entry.Entry{ + entryWithBodyAttr(t1, "file1", map[string]string{"file.name": "file1"}), + entryWithBodyAttr(t1, "file2", map[string]string{"file.name": "file2"}), + entryWithBodyAttr(t2, "end", map[string]string{"file.name": "file1"}), + entryWithBodyAttr(t2, "end", map[string]string{"file.name": "file2"}), + }, + []*entry.Entry{ + entryWithBodyAttr(t1, "file1\nend", map[string]string{"file.name": "file1"}), + entryWithBodyAttr(t1, "file2\nend", map[string]string{"file.name": "file2"}), + }, + }, } for _, tc := range cases {