Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Commit

Permalink
recombine - add sourceIdentifier, update doc
Browse files Browse the repository at this point in the history
  • Loading branch information
rockb1017 committed Jan 11, 2022
1 parent 888d113 commit 8802b65
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 22 deletions.
1 change: 1 addition & 0 deletions docs/operators/recombine.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The `recombine` operator combines consecutive logs into single logs based on sim
| `max_batch_size` | 1000 | The maximum number of consecutive entries that will be combined into a single entry. |
| `overwrite_with` | `oldest` | Whether to use the fields from the `oldest` or the `newest` entry for all the fields that are not combined. |
| `force_flush_period` | `5s` | Flush timeout after which entries will be flushed aborting the wait for their sub parts to be merged with. |
| `source_identifier` | | The [field](/docs/types/field.md) to separate one source of logs from others when combining them. (i.e., `$attributes["file.path"]`) |

Exactly one of `is_first_entry` and `is_last_entry` must be specified.

Expand Down
91 changes: 69 additions & 22 deletions operator/builtin/transformer/recombine/recombine.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type RecombineOperatorConfig struct {
MaxBatchSize int `json:"max_batch_size" yaml:"max_batch_size"`
CombineField entry.Field `json:"combine_field" yaml:"combine_field"`
CombineWith string `json:"combine_with" yaml:"combine_with"`
SourceIdentifier entry.Field `json:"source_identifier" yaml:"source_identifier"`
OverwriteWith string `json:"overwrite_with" yaml:"overwrite_with"`
ForceFlushTimeout time.Duration `json:"force_flush_period" yaml:"force_flush_period"`
}
Expand Down Expand Up @@ -106,13 +107,15 @@ func (c *RecombineOperatorConfig) Build(bc operator.BuildContext) ([]operator.Op
matchFirstLine: matchesFirst,
prog: prog,
maxBatchSize: c.MaxBatchSize,
batchSize: 0,
overwriteWithOldest: overwriteWithOldest,
batch: make([]*entry.Entry, 0, c.MaxBatchSize),
batchMap: make(map[string][]*entry.Entry),
combineField: c.CombineField,
combineWith: c.CombineWith,
forceFlushTimeout: c.ForceFlushTimeout,
ticker: time.NewTicker(c.ForceFlushTimeout),
chClose: make(chan struct{}),
sourceIdentifier: c.SourceIdentifier,
}

return []operator.Operator{recombine}, nil
Expand All @@ -125,15 +128,17 @@ type RecombineOperator struct {
matchFirstLine bool
prog *vm.Program
maxBatchSize int
batchSize int
overwriteWithOldest bool
combineField entry.Field
combineWith string
ticker *time.Ticker
forceFlushTimeout time.Duration
chClose chan struct{}
sourceIdentifier entry.Field

sync.Mutex
batch []*entry.Entry
batchMap map[string][]*entry.Entry
}

func (r *RecombineOperator) Start(_ operator.Persister) error {
Expand All @@ -147,9 +152,19 @@ func (r *RecombineOperator) flushLoop() {
select {
case <-r.ticker.C:
r.Lock()
if err := r.flushCombined(); err != nil {
r.Errorw("Failed flushing", "error", err)
timeNow := time.Now()
for source, entries := range r.batchMap {
lastEntryTs := entries[len(entries)-1].Timestamp
timeSinceLastEntry := timeNow.Sub(lastEntryTs)
if timeSinceLastEntry > r.forceFlushTimeout {
err := r.flushSource(source)
if err != nil {
r.Errorf("there was error flushing combined logs %s", err)
}
}
}

r.ticker.Reset(r.forceFlushTimeout)
r.Unlock()
case <-r.chClose:
r.ticker.Stop()
Expand Down Expand Up @@ -190,11 +205,20 @@ func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error {

// this is guaranteed to be a boolean because of expr.AsBool
matches := m.(bool)

var s string
if r.sourceIdentifier.FieldInterface != nil {
err := e.Read(r.sourceIdentifier, &s)
if err != nil {
r.Warn("entry does not contain the source_identifier, so it may be pooled with other sources")
s = "DefaultSourceIdentifier"
}
} else {
s = "DefaultSourceIdentifier"
}
// This is the first entry in the next batch
if matches && r.matchIndicatesFirst() {
// Flush the existing batch
err := r.flushCombined()
err := r.flushSource(s)
if err != nil {
return err
}
Expand All @@ -207,7 +231,7 @@ func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error {
// This is the last entry in a complete batch
if matches && r.matchIndicatesLast() {
r.addToBatch(ctx, e)
err := r.flushCombined()
err := r.flushSource(s)
if err != nil {
return err
}
Expand All @@ -230,45 +254,66 @@ func (r *RecombineOperator) matchIndicatesLast() bool {

// addToBatch adds the current entry to the current batch of entries that will be combined
func (r *RecombineOperator) addToBatch(_ context.Context, e *entry.Entry) {
if len(r.batch) >= r.maxBatchSize {
var s string

if r.sourceIdentifier.FieldInterface != nil {
err := e.Read(r.sourceIdentifier, &s)
if err != nil {
r.Warn("entry does not contain the source_identifier, so it may be pooled with other sources")
s = "DefaultSourceIdentifier"
}
} else {
s = "DefaultSourceIdentifier"
}
if r.batchSize >= r.maxBatchSize {
r.Error("Batch size exceeds max batch size. Flushing logs that have not been recombined")
r.flushUncombined(context.Background())
} else {
r.batchSize += 1
if _, ok := r.batchMap[s]; !ok {
r.batchMap[s] = []*entry.Entry{e}
} else {
r.batchMap[s] = append(r.batchMap[s], e)
}
}

r.batch = append(r.batch, e)
}

// flushUncombined flushes all the logs in the batch individually to the
// next output in the pipeline. This is only used when there is an error
// or at shutdown to avoid dropping the logs.
func (r *RecombineOperator) flushUncombined(ctx context.Context) {
for _, entry := range r.batch {
r.Write(ctx, entry)
for _, entries := range r.batchMap {
for _, entry := range entries {
r.Write(ctx, entry)
}
}
r.batch = r.batch[:0]
r.batchMap = make(map[string][]*entry.Entry)
r.batchSize = 0
r.ticker.Reset(r.forceFlushTimeout)
}

// flushCombined combines the entries currently in the batch into a single entry,
// flushSource combines the entries currently in the batch into a single entry,
// then forwards them to the next operator in the pipeline
func (r *RecombineOperator) flushCombined() error {
func (r *RecombineOperator) flushSource(source string) error {
// Skip flushing a combined log if the batch is empty
if len(r.batch) == 0 {
if len(r.batchMap[source]) == 0 {
return nil
}

// Choose which entry we want to keep the rest of the fields from
var base *entry.Entry
entries := r.batchMap[source]

if r.overwriteWithOldest {
base = r.batch[0]
base = entries[0]
} else {
base = r.batch[len(r.batch)-1]
base = entries[len(entries)-1]
}

// Combine the combineField of each entry in the batch,
// separated by newlines
var recombined strings.Builder
for i, e := range r.batch {
for i, e := range entries {
var s string
err := e.Read(r.combineField, &s)
if err != nil {
Expand All @@ -277,7 +322,7 @@ func (r *RecombineOperator) flushCombined() error {
}

recombined.WriteString(s)
if i != len(r.batch)-1 && len(r.combineWith) > 0 {
if i != len(entries)-1 {
recombined.WriteString(r.combineWith)
}
}
Expand All @@ -289,7 +334,9 @@ func (r *RecombineOperator) flushCombined() error {
}

r.Write(context.Background(), base)
r.batch = r.batch[:0]
r.ticker.Reset(r.forceFlushTimeout)

r.batchSize -= len(r.batchMap[source])
r.batchMap[source] = []*entry.Entry{}

return nil
}
29 changes: 29 additions & 0 deletions operator/builtin/transformer/recombine/recombine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ func TestRecombineOperator(t *testing.T) {
return e
}

entryWithBodyAttr := func(ts time.Time, body interface{}, Attr map[string]string) *entry.Entry {
e := entryWithBody(ts, body)
for k, v := range Attr {
e.AddAttribute(k, v)
}
return e
}

cases := []struct {
name string
config *RecombineOperatorConfig
Expand Down Expand Up @@ -141,6 +149,27 @@ func TestRecombineOperator(t *testing.T) {
},
[]*entry.Entry{entryWithBody(t1, "test1test2")},
},
{
"fourEntriesTwoSourceIdentifier",
func() *RecombineOperatorConfig {
cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsLastEntry = "$body == 'end'"
cfg.OutputIDs = []string{"fake"}
cfg.SourceIdentifier = entry.NewAttributeField("file.name")
return cfg
}(),
[]*entry.Entry{
entryWithBodyAttr(t1, "file1", map[string]string{"file.name": "file1"}),
entryWithBodyAttr(t1, "file2", map[string]string{"file.name": "file2"}),
entryWithBodyAttr(t2, "end", map[string]string{"file.name": "file1"}),
entryWithBodyAttr(t2, "end", map[string]string{"file.name": "file2"}),
},
[]*entry.Entry{
entryWithBodyAttr(t1, "file1\nend", map[string]string{"file.name": "file1"}),
entryWithBodyAttr(t1, "file2\nend", map[string]string{"file.name": "file2"}),
},
},
}

for _, tc := range cases {
Expand Down

0 comments on commit 8802b65

Please sign in to comment.