Skip to content

Commit

Permalink
Use TSDB's index of series, remove RefCache
Browse files Browse the repository at this point in the history
prometheus/prometheus#8600 adds a method to
`TSDB.Appender` which allows us to save building
a parallel cache, reducing ingester heap by about 20%.

We depend on values from GetRef() remaining valid while v2Push() uses
them. Currently the only way a ref can be invalidated is by a head
compaction, which cannot happen while v2Push() holds the append lock.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
  • Loading branch information
bboreham committed Mar 26, 2021
1 parent 0f8367f commit 21432f9
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 527 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* [ENHANCEMENT] Querier / ruler: some optimizations to PromQL query engine. #3934 #3989
* [ENHANCEMENT] Ingester: reduce CPU and memory when an high number of errors are returned by the ingester on the write path with the blocks storage. #3969 #3971 #3973
* [ENHANCEMENT] Distributor: reduce CPU and memory when an high number of errors are returned by the distributor on the write path. #3990
* [ENHANCEMENT] Blocks storage: reduce ingester memory by eliminating series reference cache. #3951
* [BUGFIX] Ruler-API: fix bug where `/api/v1/rules/<namespace>/<group_name>` endpoint return `400` instead of `404`. #4013
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
* [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ require (
github.com/prometheus/client_golang v1.9.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.18.0
github.com/prometheus/prometheus v1.8.2-0.20210321183757-31a518faab18
github.com/prometheus/prometheus v1.8.2-0.20210319192855-d614ae9ecf1c
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e
github.com/sony/gobreaker v0.4.1
github.com/spf13/afero v1.2.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1100,8 +1100,8 @@ github.com/prometheus/prometheus v1.8.2-0.20201029103703-63be30dceed9/go.mod h1:
github.com/prometheus/prometheus v1.8.2-0.20201119142752-3ad25a6dc3d9/go.mod h1:1MDE/bXgu4gqd5w/otko6WQpXZX9vu8QX4KbitCmaPg=
github.com/prometheus/prometheus v1.8.2-0.20201119181812-c8f810083d3f/go.mod h1:1MDE/bXgu4gqd5w/otko6WQpXZX9vu8QX4KbitCmaPg=
github.com/prometheus/prometheus v1.8.2-0.20210215121130-6f488061dfb4/go.mod h1:NAYujktP0dmSSpeV155mtnwX2pndLpVVK/Ps68R01TA=
github.com/prometheus/prometheus v1.8.2-0.20210321183757-31a518faab18 h1:8chKJNOWv10FApdXgQ8Td8oYFrfFTbiBp/QpBaxEMRA=
github.com/prometheus/prometheus v1.8.2-0.20210321183757-31a518faab18/go.mod h1:MS/bpdil77lPbfQeKk6OqVQ9OLnpN3Rszd0hka0EOWE=
github.com/prometheus/prometheus v1.8.2-0.20210319192855-d614ae9ecf1c h1:qGkhJcR4jPjldlEtiRe2NHOtByRD39Y507miU1HoHD0=
github.com/prometheus/prometheus v1.8.2-0.20210319192855-d614ae9ecf1c/go.mod h1:MS/bpdil77lPbfQeKk6OqVQ9OLnpN3Rszd0hka0EOWE=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1/go.mod h1:JaY6n2sDr+z2WTsXkOmNRUfDy6FN0L6Nk7x06ndm4tY=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
Expand Down
67 changes: 23 additions & 44 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ const (
type userTSDB struct {
db *tsdb.DB
userID string
refCache *cortex_tsdb.RefCache
activeSeries *ActiveSeries
seriesInMetric *metricCounter
limiter *Limiter
Expand Down Expand Up @@ -185,7 +184,8 @@ func (u *userTSDB) compactHead(blockDuration int64) error {

defer u.casState(forceCompacting, active)

// Ingestion of samples in parallel with forced compaction can lead to overlapping blocks.
// Ingestion of samples in parallel with forced compaction can lead to overlapping blocks,
// and possible invalidation of the references returned from Appender.GetRef().
// So we wait for existing in-flight requests to finish. Future push requests would fail until compaction is over.
u.pushesInFlight.Wait()

Expand Down Expand Up @@ -383,7 +383,6 @@ type TSDBState struct {
walReplayTime prometheus.Histogram
appenderAddDuration prometheus.Histogram
appenderCommitDuration prometheus.Histogram
refCachePurgeDuration prometheus.Histogram
idleTsdbChecks *prometheus.CounterVec
}

Expand Down Expand Up @@ -435,11 +434,6 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer
Help: "The total time it takes for a push request to commit samples appended to TSDB.",
Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
}),
refCachePurgeDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingester_tsdb_refcache_purge_duration_seconds",
Help: "The total time it takes to purge the TSDB series reference cache for a single tenant.",
Buckets: prometheus.DefBuckets,
}),

idleTsdbChecks: idleTsdbChecks,
}
Expand Down Expand Up @@ -619,11 +613,6 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
rateUpdateTicker := time.NewTicker(i.cfg.RateUpdatePeriod)
defer rateUpdateTicker.Stop()

// We use an hardcoded value for this ticker because there should be no
// real value in customizing it.
refCachePurgeTicker := time.NewTicker(5 * time.Minute)
defer refCachePurgeTicker.Stop()

var activeSeriesTickerChan <-chan time.Time
if i.cfg.ActiveSeriesMetricsEnabled {
t := time.NewTicker(i.cfg.ActiveSeriesMetricsUpdatePeriod)
Expand All @@ -646,17 +635,6 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
db.ingestedRuleSamples.tick()
}
i.userStatesMtx.RUnlock()
case <-refCachePurgeTicker.C:
for _, userID := range i.getTSDBUsers() {
userDB := i.getTSDB(userID)
if userDB == nil {
continue
}

startTime := time.Now()
userDB.refCache.Purge(startTime.Add(-cortex_tsdb.DefaultRefCacheTTL))
i.TSDBState.refCachePurgeDuration.Observe(time.Since(startTime).Seconds())
}

case <-activeSeriesTickerChan:
i.v2UpdateActiveSeries()
Expand All @@ -683,6 +661,12 @@ func (i *Ingester) v2UpdateActiveSeries() {
}
}

// GetRef() is an extra method added to TSDB to let Cortex check before calling Add()
type extendedAppender interface {
storage.Appender
storage.GetRef
}

// v2Push adds metrics to a block
func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
var firstPartialErr error
Expand Down Expand Up @@ -738,13 +722,17 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
)

// Walk the samples, appending them to the users database
app := db.Appender(ctx)
app := db.Appender(ctx).(extendedAppender)
for _, ts := range req.Timeseries {
// Check if we already have a cached reference for this series. Be aware
// that even if we have a reference it's not guaranteed to be still valid.
// Keeps a reference to labels copy, if it was needed. This is to avoid making a copy twice,
// once for TSDB, and second time for activeSeries map.
var copiedLabels []labels.Label

// The labels must be sorted (in our case, it's guaranteed a write request
// has sorted labels once hit the ingester).
cachedRef, copiedLabels, cachedRefExists := db.refCache.Ref(startAppend, cortexpb.FromLabelAdaptersToLabels(ts.Labels))

// Look up a reference for this series. Holding the appendLock ensures that no compaction will happen while we use it.
ref := app.GetRef(cortexpb.FromLabelAdaptersToLabels(ts.Labels))

// To find out if any sample was added to this series, we keep old value.
oldSucceededSamplesCount := succeededSamplesCount
Expand All @@ -753,30 +741,19 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
var err error

// If the cached reference exists, we try to use it.
if cachedRefExists {
var ref uint64
if ref, err = app.Append(cachedRef, copiedLabels, s.TimestampMs, s.Value); err == nil {
if ref != 0 {
labels := cortexpb.FromLabelAdaptersToLabels(ts.Labels)
if _, err = app.Append(ref, labels, s.TimestampMs, s.Value); err == nil {
succeededSamplesCount++
// This means the reference changes which means we need to update our cache.
if ref != cachedRef {
db.refCache.SetRef(startAppend, copiedLabels, ref)
}
continue
}

} else {
var ref uint64

// Copy the label set because both TSDB and the cache may retain it.
copiedLabels = cortexpb.FromLabelAdaptersToLabelsWithCopy(ts.Labels)

// Retain the reference in case there are multiple samples for the series.
if ref, err = app.Append(0, copiedLabels, s.TimestampMs, s.Value); err == nil {
db.refCache.SetRef(startAppend, copiedLabels, ref)

// Set these in case there are multiple samples for the series.
cachedRef = ref
cachedRefExists = true

succeededSamplesCount++
continue
}
Expand Down Expand Up @@ -812,6 +789,9 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
case errMaxSeriesPerMetricLimitExceeded:
perMetricSeriesLimitCount++
updateFirstPartial(func() error {
if copiedLabels == nil {
copiedLabels = cortexpb.FromLabelAdaptersToLabelsWithCopy(ts.Labels)
}
return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
})
continue
Expand Down Expand Up @@ -1435,7 +1415,6 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {

userDB := &userTSDB{
userID: userID,
refCache: cortex_tsdb.NewRefCache(),
activeSeries: NewActiveSeries(),
seriesInMetric: newMetricCounter(i.limiter),
ingestedAPISamples: newEWMARate(0.2, i.cfg.RateUpdatePeriod),
Expand Down
46 changes: 0 additions & 46 deletions pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2997,52 +2997,6 @@ func TestHeadCompactionOnStartup(t *testing.T) {
require.Equal(t, 11, len(db.Blocks()))
}

func TestIngesterCacheUpdatesOnRefChange(t *testing.T) {
cfg := defaultIngesterTestConfig()
cfg.LifecyclerConfig.JoinAfter = 0

// Create ingester
i, err := prepareIngesterWithBlocksStorage(t, cfg, nil)
require.NoError(t, err)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
t.Cleanup(func() {
_ = services.StopAndAwaitTerminated(context.Background(), i)
})

// Wait until it's ACTIVE
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

// Push a sample, verify that the labels are in ref-cache.
// Compact the head to remove the labels from HEAD but they will still exist in ref-cache.
// Push again to make the ref change and verify the refCache is updated in this case.

pushSingleSampleAtTime(t, i, 10)

db := i.getTSDB(userID)
require.NotNil(t, db)

startAppend := time.Now()
l := labels.Labels{{Name: labels.MetricName, Value: "test"}}
cachedRef, _, cachedRefExists := db.refCache.Ref(startAppend, l)
require.True(t, cachedRefExists)
require.Equal(t, uint64(1), cachedRef)

// Compact to remove the series from HEAD.
i.compactBlocks(context.Background(), true)
cachedRef, _, cachedRefExists = db.refCache.Ref(startAppend, l)
require.True(t, cachedRefExists)
require.Equal(t, uint64(1), cachedRef)

// New sample to create a new ref.
pushSingleSampleAtTime(t, i, 11)
cachedRef, _, cachedRefExists = db.refCache.Ref(startAppend, l)
require.True(t, cachedRefExists)
require.Equal(t, uint64(2), cachedRef)
}

func TestIngester_CloseTSDBsOnShutdown(t *testing.T) {
cfg := defaultIngesterTestConfig()
cfg.LifecyclerConfig.JoinAfter = 0
Expand Down
161 changes: 0 additions & 161 deletions pkg/storage/tsdb/ref_cache.go

This file was deleted.

Loading

0 comments on commit 21432f9

Please sign in to comment.