Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Commit

Permalink
revert flushUncombined
Browse files Browse the repository at this point in the history
  • Loading branch information
rockb1017 committed Jan 20, 2022
1 parent 019ccb6 commit 7a7bcb7
Showing 1 changed file with 12 additions and 11 deletions.
23 changes: 12 additions & 11 deletions operator/builtin/transformer/recombine/recombine.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (r *RecombineOperator) flushLoop() {
if timeSinceLastEntry < r.forceFlushTimeout {
continue
}
if err := r.flushSource(context.Background(), source); err != nil {
if err := r.flushSource(source); err != nil {
r.Errorf("there was error flushing combined logs %s", err)
}
}
Expand Down Expand Up @@ -224,7 +224,7 @@ func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error {
// This is the first entry in the next batch
if matches && r.matchIndicatesFirst() {
// Flush the existing batch
err := r.flushSource(ctx, s)
err := r.flushSource(s)
if err != nil {
return err
}
Expand All @@ -237,7 +237,7 @@ func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error {
// This is the last entry in a complete batch
if matches && r.matchIndicatesLast() {
r.addToBatch(ctx, e, s)
err := r.flushSource(ctx, s)
err := r.flushSource(s)
if err != nil {
return err
}
Expand All @@ -259,39 +259,40 @@ func (r *RecombineOperator) matchIndicatesLast() bool {
}

// addToBatch adds the current entry to the current batch of entries that will be combined
func (r *RecombineOperator) addToBatch(ctx context.Context, e *entry.Entry, source string) {
func (r *RecombineOperator) addToBatch(_ context.Context, e *entry.Entry, source string) {
if _, ok := r.batchMap[source]; !ok {
r.batchMap[source] = []*entry.Entry{e}
if len(r.batchMap) >= r.maxSources {
r.Error("Batched source exceeds max source size. Flushing all batched logs. Consider increasing max_sources parameter")
r.flushUncombined(ctx)
r.flushUncombined(context.Background())
}
return
}

r.batchMap[source] = append(r.batchMap[source], e)
if len(r.batchMap[source]) >= r.maxBatchSize {
if err := r.flushSource(ctx, source); err != nil {
if err := r.flushSource(source); err != nil {
r.Errorf("there was error flushing combined logs %s", err)
}
}
}

// flushUncombined flushes all the logs in the batchMap by source to the
// flushUncombined flushes all the logs in the batch individually to the
// next output in the pipeline. This is only used when there is an error
// or at shutdown to avoid dropping the logs.
func (r *RecombineOperator) flushUncombined(ctx context.Context) {
for source := range r.batchMap {
if err := r.flushSource(ctx, source); err != nil {
r.Errorf("there was error flushing combined logs %s", err)
for _, entry := range r.batchMap[source] {
r.Write(ctx, entry)
}
}
r.batchMap = make(map[string][]*entry.Entry)
r.ticker.Reset(r.forceFlushTimeout)
}

// flushSource combines the entries currently in the batch into a single entry,
// then forwards them to the next operator in the pipeline
func (r *RecombineOperator) flushSource(ctx context.Context, source string) error {
func (r *RecombineOperator) flushSource(source string) error {
// Skip flushing a combined log if the batch is empty
if len(r.batchMap[source]) == 0 {
return nil
Expand Down Expand Up @@ -330,7 +331,7 @@ func (r *RecombineOperator) flushSource(ctx context.Context, source string) erro
return err
}

r.Write(ctx, base)
r.Write(context.Background(), base)

delete(r.batchMap, source)
return nil
Expand Down

0 comments on commit 7a7bcb7

Please sign in to comment.