Skip to content

Commit

Permalink
Improve context cancellation handling in PostingsForMatchersCache (#644)
Browse files Browse the repository at this point in the history
* Improve context cancellation handling in PostingsForMatchersCache

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Added more comments

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Move done by an hat in postingsForMatcherPromise

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Update comment

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Added TestContextsTracker_Concurrency

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Improved TestPostingsForMatchersCache

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Removed addressed TODO comment

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Fix flaky test

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Gracefully handle the race condition

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Remove time.Sleep() from TestPostingsForMatchersCache

Signed-off-by: Marco Pracucci <marco@pracucci.com>

---------

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored Jun 11, 2024
1 parent 6df5495 commit 171cafc
Show file tree
Hide file tree
Showing 2 changed files with 731 additions and 29 deletions.
210 changes: 193 additions & 17 deletions tsdb/postings_for_matchers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tsdb
import (
"container/list"
"context"
"errors"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -82,9 +83,14 @@ type PostingsForMatchersCache struct {

// timeNow is the time.Now that can be replaced for testing purposes
timeNow func() time.Time

// postingsForMatchers can be replaced for testing purposes
postingsForMatchers func(ctx context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error)

// onPromiseExecutionDoneBeforeHook is used for testing purposes. It allows to hook at the
// beginning of onPromiseExecutionDone() execution.
onPromiseExecutionDoneBeforeHook func()

tracer trace.Tracer
// Preallocated for performance
ttlAttrib attribute.KeyValue
Expand Down Expand Up @@ -121,8 +127,13 @@ func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix I
}

type postingsForMatcherPromise struct {
done chan struct{}
// Keep track of all callers contexts in order to cancel the execution context if all
// callers contexts get canceled.
callersCtxTracker *contextsTracker

// The result of the promise is stored either in cloner or err (only of the two is valued).
// Do not access these fields until the done channel is closed.
done chan struct{}
cloner *index.PostingsCloner
err error
}
Expand All @@ -147,39 +158,80 @@ func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings,
func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Context, ix IndexPostingsReader, ms []*labels.Matcher) func(context.Context) (index.Postings, error) {
span := trace.SpanFromContext(ctx)

promiseCallersCtxTracker, promiseExecCtx := newContextsTracker()
promise := &postingsForMatcherPromise{
done: make(chan struct{}),
done: make(chan struct{}),
callersCtxTracker: promiseCallersCtxTracker,
}

// Add the caller context to the ones tracked by the new promise.
//
// It's important to do it here so that if the promise will be stored, then the caller's context is already
// tracked. Otherwise, if the promise will not be stored (because there's another in-flight promise for the
// same label matchers) then it's not a problem, and resources will be released.
//
// Skipping the error checking because it can't happen here.
_ = promise.callersCtxTracker.add(ctx)

key := matchersKey(ms)
oldPromise, loaded := c.calls.LoadOrStore(key, promise)
if loaded {
// promise was not stored, we return a previously stored promise, that's possibly being fulfilled in another goroutine

if oldPromiseValue, loaded := c.calls.LoadOrStore(key, promise); loaded {
// The new promise hasn't been stored because there's already an in-flight promise
// for the same label matchers. We should just wait for it.

// Release the resources created by the new promise, that will not be used.
close(promise.done)
promise.callersCtxTracker.close()

oldPromise := oldPromiseValue.(*postingsForMatcherPromise)

// Add the caller context to the ones tracked by the old promise (currently in-flight).
if err := oldPromise.callersCtxTracker.add(ctx); err != nil && errors.Is(err, errContextsTrackerCanceled{}) {
// We've hit a race condition happening when the "loaded" promise execution was just canceled,
// but it hasn't been removed from map of calls yet, so the old promise was loaded anyway.
//
// We expect this race condition to be infrequent. In this case we simply skip the cache and
// pass through the execution to the underlying postingsForMatchers().
span.AddEvent("looked up in-flight postingsForMatchers promise, but the promise was just canceled due to a race condition: skipping the cache", trace.WithAttributes(
attribute.String("cache_key", key),
))

return func(ctx context.Context) (index.Postings, error) {
return c.postingsForMatchers(ctx, ix, ms...)
}
}

span.AddEvent("using cached postingsForMatchers promise", trace.WithAttributes(
attribute.String("cache_key", key),
))
close(promise.done)
return oldPromise.(*postingsForMatcherPromise).result

return oldPromise.result
}

span.AddEvent("no postingsForMatchers promise in cache, executing query", trace.WithAttributes(attribute.String("cache_key", key)))

// promise was stored, close its channel after fulfilment
defer close(promise.done)

// Don't let context cancellation fail the promise, since it may be used by multiple goroutines, each with
// its own context. Also, keep the call independent of this particular context, since the promise will be reused.
// FIXME: do we need to cancel the call to postingsForMatchers if all the callers waiting for the result have
// cancelled their context?
if postings, err := c.postingsForMatchers(context.Background(), ix, ms...); err != nil {
// The execution context will be canceled only once all callers contexts will be canceled. This way we:
// 1. Do not cancel postingsForMatchers() the input ctx is cancelled, but another goroutine is waiting
// for the promise result.
// 2. Cancel postingsForMatchers() once all callers contexts have been canceled, so that we don't waist
// resources computing postingsForMatchers() is all requests have been canceled (this is particularly
// important if the postingsForMatchers() is very slow due to expensive regexp matchers).
if postings, err := c.postingsForMatchers(promiseExecCtx, ix, ms...); err != nil {
promise.err = err
} else {
promise.cloner = index.NewPostingsCloner(postings)
}

// The execution terminated (or has been canceled). We have to close the tracker to release resources.
// It's important to close it before computing the promise size, so that the actual size is smaller.
promise.callersCtxTracker.close()

sizeBytes := int64(len(key) + size.Of(promise))

c.created(ctx, key, c.timeNow(), sizeBytes)
c.onPromiseExecutionDone(ctx, key, c.timeNow(), sizeBytes, promise.err)
return promise.result
}

Expand Down Expand Up @@ -236,13 +288,28 @@ func (c *PostingsForMatchersCache) evictHead() {
c.cachedBytes -= oldest.sizeBytes
}

// created has to be called when returning from the PostingsForMatchers call that creates the promise.
// the ts provided should be the call time.
func (c *PostingsForMatchersCache) created(ctx context.Context, key string, ts time.Time, sizeBytes int64) {
// onPromiseExecutionDone must be called once the execution of PostingsForMatchers promise has done.
// The input err contains details about any error that could have occurred when executing it.
// The input ts is the function call time.
func (c *PostingsForMatchersCache) onPromiseExecutionDone(ctx context.Context, key string, ts time.Time, sizeBytes int64, err error) {
span := trace.SpanFromContext(ctx)

// Call the registered hook, if any. It's used only for testing purposes.
if c.onPromiseExecutionDoneBeforeHook != nil {
c.onPromiseExecutionDoneBeforeHook()
}

// Do not cache if cache is disabled.
if c.ttl <= 0 {
span.AddEvent("deleting cached promise since c.ttl <= 0")
span.AddEvent("not caching promise result because configured TTL is <= 0")
c.calls.Delete(key)
return
}

// Do not cache if the promise execution was canceled (it gets cancelled once all the callers contexts have
// been canceled).
if errors.Is(err, context.Canceled) {
span.AddEvent("not caching promise result because execution has been canceled")
c.calls.Delete(key)
return
}
Expand Down Expand Up @@ -298,3 +365,112 @@ func (ir indexReaderWithPostingsForMatchers) PostingsForMatchers(ctx context.Con
}

var _ IndexReader = indexReaderWithPostingsForMatchers{}

// errContextsTrackerClosed is the reason to identify contextsTracker has been explicitly closed by calling close().
//
// This error is a struct instead of a globally generic error so that postingsForMatcherPromise computed size is smaller
// (this error is referenced by contextsTracker, which is referenced by postingsForMatcherPromise).
type errContextsTrackerClosed struct{}

func (e errContextsTrackerClosed) Error() string {
return "contexts tracker is closed"
}

// errContextsTrackerCanceled is the reason to identify contextsTracker has been automatically closed because
// all tracked contexts have been canceled.
//
// This error is a struct instead of a globally generic error so that postingsForMatcherPromise computed size is smaller
// (this error is referenced by contextsTracker, which is referenced by postingsForMatcherPromise).
type errContextsTrackerCanceled struct{}

func (e errContextsTrackerCanceled) Error() string {
return "contexts tracker has been canceled"
}

// contextsTracker is responsible to monitor multiple context.Context and provides an execution
// that gets canceled once all monitored context.Context have done.
type contextsTracker struct {
cancelExecCtx context.CancelFunc

mx sync.Mutex
closedWithReason error // Track whether the tracker is closed and why. The tracker is not closed if this is nil.
trackedCount int // Number of tracked contexts.
trackedStopFuncs []func() bool // The stop watching functions for all tracked contexts.
}

func newContextsTracker() (*contextsTracker, context.Context) {
t := &contextsTracker{}

// Create a new execution context that will be canceled only once all tracked contexts have done.
var execCtx context.Context
execCtx, t.cancelExecCtx = context.WithCancel(context.Background())

return t, execCtx
}

// add the input ctx to the group of monitored context.Context.
// Returns false if the input context couldn't be added to the tracker because the tracker is already closed.
func (t *contextsTracker) add(ctx context.Context) error {
t.mx.Lock()
defer t.mx.Unlock()

// Check if we've already done.
if t.closedWithReason != nil {
return t.closedWithReason
}

// Register a function that will be called once the tracked context has done.
t.trackedCount++
t.trackedStopFuncs = append(t.trackedStopFuncs, context.AfterFunc(ctx, t.onTrackedContextDone))

return nil
}

// close the tracker. When the tracker is closed, the execution context is canceled
// and resources releases.
//
// This function must be called once done to not leak resources.
func (t *contextsTracker) close() {
t.mx.Lock()
defer t.mx.Unlock()

t.unsafeClose(errContextsTrackerClosed{})
}

// unsafeClose must be called with the t.mx lock hold.
func (t *contextsTracker) unsafeClose(reason error) {
if t.closedWithReason != nil {
return
}

t.cancelExecCtx()

// Stop watching the tracked contexts. It's safe to call the stop function on a context
// for which was already done.
for _, fn := range t.trackedStopFuncs {
fn()
}

t.trackedCount = 0
t.trackedStopFuncs = nil
t.closedWithReason = reason
}

func (t *contextsTracker) onTrackedContextDone() {
t.mx.Lock()
defer t.mx.Unlock()

t.trackedCount--

// If this was the last context to be tracked, we can close the tracker and cancel the execution context.
if t.trackedCount == 0 {
t.unsafeClose(errContextsTrackerCanceled{})
}
}

func (t *contextsTracker) trackedContextsCount() int {
t.mx.Lock()
defer t.mx.Unlock()

return t.trackedCount
}
Loading

0 comments on commit 171cafc

Please sign in to comment.