Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Source identifier #341

Merged
merged 8 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/operators/recombine.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ The `recombine` operator combines consecutive logs into single logs based on sim
| `max_batch_size` | 1000 | The maximum number of consecutive entries that will be combined into a single entry. |
| `overwrite_with` | `oldest` | Whether to use the fields from the `oldest` or the `newest` entry for all the fields that are not combined. |
| `force_flush_period` | `5s` | Flush timeout after which entries will be flushed aborting the wait for their sub parts to be merged with. |
| `source_identifier` | `$attibutes["file.path"]` | The [field](/docs/types/field.md) to separate one source of logs from others when combining them. |
| `max_sources` | 1000 | The maximum number of unique sources allowed concurrently to be tracked for combining separately. |

Exactly one of `is_first_entry` and `is_last_entry` must be specified.

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
)

require (
github.com/benbjohnson/clock v1.1.0 // indirect
github.com/benbjohnson/clock v1.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/evanphx/json-patch v4.9.0+incompatible // indirect
github.com/go-logr/logr v0.4.0 // indirect
Expand All @@ -34,6 +34,7 @@ require (
github.com/google/go-cmp v0.5.6 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/googleapis/gnostic v0.4.1 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand Down
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.3.2/go.mod h1:72H
github.com/aws/aws-sdk-go-v2/service/sso v1.4.2/go.mod h1:NBvT9R1MEF+Ud6ApJKM0G+IkPchKS7p7c2YPKwHmBOk=
github.com/aws/aws-sdk-go-v2/service/sts v1.7.2/go.mod h1:8EzeIqfWt2wWT4rJVu3f21TfrhJ8AEMzVybRNSb/b4g=
github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.2.0 h1:9Re3G2TWxkE06LdMWMpcY6KV81GLXMGiYpPYUPkFAws=
github.com/benbjohnson/clock v1.2.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
Expand Down Expand Up @@ -317,7 +316,6 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
Expand Down Expand Up @@ -432,6 +430,7 @@ github.com/rivo/tview v0.0.0-20200219210816-cd38d7432498/go.mod h1:6lkG1x+13OShE
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rs/cors v1.8.0/go.mod h1:EBwu+T5AvHOcXwvZIkQFjUN6s8Czyqw12GL/Y0tUyRM=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
Expand Down
95 changes: 69 additions & 26 deletions operator/builtin/transformer/recombine/recombine.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ func NewRecombineOperatorConfig(operatorID string) *RecombineOperatorConfig {
return &RecombineOperatorConfig{
TransformerConfig: helper.NewTransformerConfig(operatorID, "recombine"),
MaxBatchSize: 1000,
MaxSources: 1000,
CombineWith: "\n",
OverwriteWith: "oldest",
ForceFlushTimeout: 5 * time.Second,
SourceIdentifier: entry.NewAttributeField("file.path"),
}
}

Expand All @@ -52,8 +54,10 @@ type RecombineOperatorConfig struct {
MaxBatchSize int `json:"max_batch_size" yaml:"max_batch_size"`
CombineField entry.Field `json:"combine_field" yaml:"combine_field"`
CombineWith string `json:"combine_with" yaml:"combine_with"`
SourceIdentifier entry.Field `json:"source_identifier" yaml:"source_identifier"`
OverwriteWith string `json:"overwrite_with" yaml:"overwrite_with"`
ForceFlushTimeout time.Duration `json:"force_flush_period" yaml:"force_flush_period"`
MaxSources int `json:"max_sources" yaml:"max_sources"`
rockb1017 marked this conversation as resolved.
Show resolved Hide resolved
}

// Build creates a new RecombineOperator from a config
Expand Down Expand Up @@ -106,13 +110,15 @@ func (c *RecombineOperatorConfig) Build(bc operator.BuildContext) ([]operator.Op
matchFirstLine: matchesFirst,
prog: prog,
maxBatchSize: c.MaxBatchSize,
maxSources: c.MaxSources,
overwriteWithOldest: overwriteWithOldest,
batch: make([]*entry.Entry, 0, c.MaxBatchSize),
batchMap: make(map[string][]*entry.Entry),
combineField: c.CombineField,
combineWith: c.CombineWith,
forceFlushTimeout: c.ForceFlushTimeout,
ticker: time.NewTicker(c.ForceFlushTimeout),
chClose: make(chan struct{}),
sourceIdentifier: c.SourceIdentifier,
}

return []operator.Operator{recombine}, nil
Expand All @@ -125,15 +131,17 @@ type RecombineOperator struct {
matchFirstLine bool
prog *vm.Program
maxBatchSize int
maxSources int
overwriteWithOldest bool
combineField entry.Field
combineWith string
ticker *time.Ticker
forceFlushTimeout time.Duration
chClose chan struct{}
sourceIdentifier entry.Field

sync.Mutex
batch []*entry.Entry
batchMap map[string][]*entry.Entry
}

func (r *RecombineOperator) Start(_ operator.Persister) error {
Expand All @@ -147,9 +155,19 @@ func (r *RecombineOperator) flushLoop() {
select {
case <-r.ticker.C:
r.Lock()
if err := r.flushCombined(); err != nil {
r.Errorw("Failed flushing", "error", err)
timeNow := time.Now()
for source, entries := range r.batchMap {
lastEntryTs := entries[len(entries)-1].Timestamp
timeSinceLastEntry := timeNow.Sub(lastEntryTs)
if timeSinceLastEntry < r.forceFlushTimeout {
continue
}
if err := r.flushSource(source); err != nil {
r.Errorf("there was error flushing combined logs %s", err)
}
}

r.ticker.Reset(r.forceFlushTimeout)
r.Unlock()
case <-r.chClose:
r.ticker.Stop()
Expand All @@ -171,6 +189,8 @@ func (r *RecombineOperator) Stop() error {
return nil
}

const DefaultSourceIdentifier = "DefaultSourceIdentifier"

func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error {
// Lock the recombine operator because process can't run concurrently
r.Lock()
Expand All @@ -190,24 +210,34 @@ func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error {

// this is guaranteed to be a boolean because of expr.AsBool
matches := m.(bool)
var s string
err = e.Read(r.sourceIdentifier, &s)
if err != nil {
r.Warn("entry does not contain the source_identifier, so it may be pooled with other sources")
s = DefaultSourceIdentifier
}

if s == "" {
s = DefaultSourceIdentifier
}

// This is the first entry in the next batch
if matches && r.matchIndicatesFirst() {
// Flush the existing batch
err := r.flushCombined()
err := r.flushSource(s)
if err != nil {
return err
}

// Add the current log to the new batch
r.addToBatch(ctx, e)
r.addToBatch(ctx, e, s)
return nil
}

// This is the last entry in a complete batch
if matches && r.matchIndicatesLast() {
r.addToBatch(ctx, e)
err := r.flushCombined()
r.addToBatch(ctx, e, s)
err := r.flushSource(s)
if err != nil {
return err
}
Expand All @@ -216,7 +246,7 @@ func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error {

// This is neither the first entry of a new log,
// nor the last entry of a log, so just add it to the batch
r.addToBatch(ctx, e)
r.addToBatch(ctx, e, s)
return nil
}

Expand All @@ -229,46 +259,59 @@ func (r *RecombineOperator) matchIndicatesLast() bool {
}

// addToBatch adds the current entry to the current batch of entries that will be combined
func (r *RecombineOperator) addToBatch(_ context.Context, e *entry.Entry) {
if len(r.batch) >= r.maxBatchSize {
r.Error("Batch size exceeds max batch size. Flushing logs that have not been recombined")
r.flushUncombined(context.Background())
func (r *RecombineOperator) addToBatch(_ context.Context, e *entry.Entry, source string) {
if _, ok := r.batchMap[source]; !ok {
r.batchMap[source] = []*entry.Entry{e}
if len(r.batchMap) >= r.maxSources {
r.Error("Batched source exceeds max source size. Flushing all batched logs. Consider increasing max_sources parameter")
r.flushUncombined(context.Background())
}
return
}

r.batch = append(r.batch, e)
r.batchMap[source] = append(r.batchMap[source], e)
if len(r.batchMap[source]) >= r.maxBatchSize {
if err := r.flushSource(source); err != nil {
r.Errorf("there was error flushing combined logs %s", err)
}
}
}

// flushUncombined flushes all the logs in the batch individually to the
// next output in the pipeline. This is only used when there is an error
// or at shutdown to avoid dropping the logs.
func (r *RecombineOperator) flushUncombined(ctx context.Context) {
for _, entry := range r.batch {
r.Write(ctx, entry)
for source := range r.batchMap {
for _, entry := range r.batchMap[source] {
r.Write(ctx, entry)
}
Comment on lines +284 to +287
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we combined entries for each source, but given that this is a fairly atypical bailout condition, I think it's ok to proceed with individual entries. I'll make a ticket to capture this as a future improvement.

}
r.batch = r.batch[:0]
r.batchMap = make(map[string][]*entry.Entry)
r.ticker.Reset(r.forceFlushTimeout)
}

// flushCombined combines the entries currently in the batch into a single entry,
// flushSource combines the entries currently in the batch into a single entry,
// then forwards them to the next operator in the pipeline
func (r *RecombineOperator) flushCombined() error {
func (r *RecombineOperator) flushSource(source string) error {
// Skip flushing a combined log if the batch is empty
if len(r.batch) == 0 {
if len(r.batchMap[source]) == 0 {
return nil
}

// Choose which entry we want to keep the rest of the fields from
var base *entry.Entry
entries := r.batchMap[source]

if r.overwriteWithOldest {
base = r.batch[0]
base = entries[0]
} else {
base = r.batch[len(r.batch)-1]
base = entries[len(entries)-1]
}

// Combine the combineField of each entry in the batch,
// separated by newlines
var recombined strings.Builder
for i, e := range r.batch {
for i, e := range entries {
var s string
err := e.Read(r.combineField, &s)
if err != nil {
Expand All @@ -277,7 +320,7 @@ func (r *RecombineOperator) flushCombined() error {
}

recombined.WriteString(s)
if i != len(r.batch)-1 && len(r.combineWith) > 0 {
if i != len(entries)-1 {
recombined.WriteString(r.combineWith)
}
}
Expand All @@ -289,7 +332,7 @@ func (r *RecombineOperator) flushCombined() error {
}

r.Write(context.Background(), base)
r.batch = r.batch[:0]
r.ticker.Reset(r.forceFlushTimeout)

delete(r.batchMap, source)
return nil
}
91 changes: 91 additions & 0 deletions operator/builtin/transformer/recombine/recombine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ func TestRecombineOperator(t *testing.T) {
return e
}

entryWithBodyAttr := func(ts time.Time, body interface{}, Attr map[string]string) *entry.Entry {
e := entryWithBody(ts, body)
for k, v := range Attr {
e.AddAttribute(k, v)
}
return e
}

cases := []struct {
name string
config *RecombineOperatorConfig
Expand Down Expand Up @@ -141,6 +149,89 @@ func TestRecombineOperator(t *testing.T) {
},
[]*entry.Entry{entryWithBody(t1, "test1test2")},
},
{
"TestDefaultSourceIdentifier",
func() *RecombineOperatorConfig {
cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsLastEntry = "$body == 'end'"
cfg.OutputIDs = []string{"fake"}
return cfg
}(),
[]*entry.Entry{
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file2", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file2"}),
},
[]*entry.Entry{
entryWithBodyAttr(t1, "file1\nend", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file2\nend", map[string]string{"file.path": "file2"}),
},
},
rockb1017 marked this conversation as resolved.
Show resolved Hide resolved
{
"TestCustomSourceIdentifier",
func() *RecombineOperatorConfig {
cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsLastEntry = "$body == 'end'"
cfg.OutputIDs = []string{"fake"}
cfg.SourceIdentifier = entry.NewAttributeField("custom_source")
return cfg
}(),
[]*entry.Entry{
entryWithBodyAttr(t1, "file1", map[string]string{"custom_source": "file1"}),
entryWithBodyAttr(t1, "file2", map[string]string{"custom_source": "file2"}),
entryWithBodyAttr(t2, "end", map[string]string{"custom_source": "file1"}),
entryWithBodyAttr(t2, "end", map[string]string{"custom_source": "file2"}),
},
[]*entry.Entry{
entryWithBodyAttr(t1, "file1\nend", map[string]string{"custom_source": "file1"}),
entryWithBodyAttr(t1, "file2\nend", map[string]string{"custom_source": "file2"}),
},
},
{
"TestMaxSources",
func() *RecombineOperatorConfig {
cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsLastEntry = "$body == 'end'"
cfg.OutputIDs = []string{"fake"}
cfg.MaxSources = 1
return cfg
}(),
[]*entry.Entry{
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file1"}),
},
[]*entry.Entry{
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file1"}),
},
},
{
"TestMaxBatchSize",
func() *RecombineOperatorConfig {
cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsLastEntry = "$body == 'end'"
cfg.OutputIDs = []string{"fake"}
cfg.MaxBatchSize = 2
return cfg
}(),
[]*entry.Entry{
entryWithBodyAttr(t1, "file1_event1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file2_event1", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "file2_event2", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file2"}),
},
[]*entry.Entry{
entryWithBodyAttr(t1, "file1_event1\nend", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file2_event1\nfile2_event2", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file2"}),
},
},
}

for _, tc := range cases {
Expand Down