Skip to content

Commit

Permalink
Loki: When processing logs during tailing, don't reuse the query pipe…
Browse files Browse the repository at this point in the history
…line (#6152)

* when tailing, don't reuse the query pipeline because it has an unbounded label cache

Signed-off-by: Edward Welch <edward.welch@grafana.com>

* update changelog

Signed-off-by: Edward Welch <edward.welch@grafana.com>
  • Loading branch information
slim-bean authored May 12, 2022
1 parent ee4ab1b commit 4bf1648
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
##### Enhancements

##### Fixes
* [6152](https://github.com/grafana/loki/pull/6152) **slim-bean**: Fixes unbounded ingester memory growth when live tailing under specific circumstances.
* [5685](https://github.com/grafana/loki/pull/5685) **chaudum**: Assert that push values tuples consist of string values
##### Changes
* [6042](https://github.com/grafana/loki/pull/6042) **slim-bean**: Add a new configuration to allow fudging of ingested timestamps to guarantee sort order of duplicate timestamps at query time.
Expand Down
17 changes: 11 additions & 6 deletions pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ type tailer struct {
id uint32
orgID string
matchers []*labels.Matcher
pipeline syntax.Pipeline
expr syntax.Expr
expr syntax.LogSelectorExpr
pipelineMtx sync.Mutex

sendChan chan *logproto.Stream
Expand All @@ -52,7 +51,9 @@ func newTailer(orgID, query string, conn TailServer, maxDroppedStreams int) (*ta
if err != nil {
return nil, err
}
pipeline, err := expr.Pipeline()
// Make sure we can build a pipeline. The stream processing code doesn't have a place to handle
// this error so make sure we handle it here.
_, err = expr.Pipeline()
if err != nil {
return nil, err
}
Expand All @@ -61,7 +62,6 @@ func newTailer(orgID, query string, conn TailServer, maxDroppedStreams int) (*ta
return &tailer{
orgID: orgID,
matchers: matchers,
pipeline: pipeline,
sendChan: make(chan *logproto.Stream, bufferSizeForTailResponse),
conn: conn,
droppedStreams: make([]*logproto.DroppedStream, 0, maxDroppedStreams),
Expand Down Expand Up @@ -135,8 +135,13 @@ func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) {
}

func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*logproto.Stream {
// Build a new pipeline for each call because the pipeline builds a cache of labels
// and if we don't start with a new pipeline that cache will grow unbounded.
// The error is ignored because it would be handled in the constructor of the tailer.
pipeline, _ := t.expr.Pipeline()

// Optimization: skip filtering entirely, if no filter is set
if log.IsNoopPipeline(t.pipeline) {
if log.IsNoopPipeline(pipeline) {
return []*logproto.Stream{&stream}
}
// pipeline are not thread safe and tailer can process multiple stream at once.
Expand All @@ -145,7 +150,7 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*log

streams := map[uint64]*logproto.Stream{}

sp := t.pipeline.ForStream(lbs)
sp := pipeline.ForStream(lbs)
for _, e := range stream.Entries {
newLine, parsedLbs, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line)
if !ok {
Expand Down

0 comments on commit 4bf1648

Please sign in to comment.