Skip to content

Commit

Permalink
Reuse FetcherMetrics from Thanos (#3892)
Browse files Browse the repository at this point in the history
* Reuse FetcherMetrics from Thanos

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Updated comment

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored Mar 2, 2021
1 parent 3e236eb commit d7d8b9c
Show file tree
Hide file tree
Showing 12 changed files with 165 additions and 205 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ require (
github.com/sony/gobreaker v0.4.1
github.com/spf13/afero v1.2.2
github.com/stretchr/testify v1.7.0
github.com/thanos-io/thanos v0.13.1-0.20210226093915-2027fb3098e5
github.com/thanos-io/thanos v0.13.1-0.20210226164558-03dace0a1aa1
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/weaveworks/common v0.0.0-20210112142934-23c8d7fa6120
go.etcd.io/bbolt v1.3.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1287,8 +1287,8 @@ github.com/thanos-io/thanos v0.13.1-0.20210108102609-f85e4003ba51/go.mod h1:kPvI
github.com/thanos-io/thanos v0.13.1-0.20210204123931-82545cdd16fe/go.mod h1:ZLDGYRNkgM+FCwYNOD+6tOV+DE2fpjzfV6iqXyOgFIw=
github.com/thanos-io/thanos v0.13.1-0.20210224074000-659446cab117 h1:+rTDtekRPNMsLgCpReU13zJfBEScxGB+w0N4rHGWnRA=
github.com/thanos-io/thanos v0.13.1-0.20210224074000-659446cab117/go.mod h1:kdqFpzdkveIKpNNECVJd75RPvgsAifQgJymwCdfev1w=
github.com/thanos-io/thanos v0.13.1-0.20210226093915-2027fb3098e5 h1:5/zDK+wXZfFNOkYCKUk33AZNEmU0ev15dcG5G73/3FE=
github.com/thanos-io/thanos v0.13.1-0.20210226093915-2027fb3098e5/go.mod h1:gMCy4oCteKTT7VuXVvXLTPGzzjovX1VPE5p+HgL1hyU=
github.com/thanos-io/thanos v0.13.1-0.20210226164558-03dace0a1aa1 h1:ebr5jjRA6al28bNWhouwHC7hQqC1wexo2uac1+utOus=
github.com/thanos-io/thanos v0.13.1-0.20210226164558-03dace0a1aa1/go.mod h1:gMCy4oCteKTT7VuXVvXLTPGzzjovX1VPE5p+HgL1hyU=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY=
github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
Expand Down
145 changes: 24 additions & 121 deletions pkg/storegateway/bucket_index_metadata_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/objstore"

"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
)

const (
corruptedBucketIndex = "corrupted-bucket-index"
noBucketIndex = "no-bucket-index"
)

// BucketIndexMetadataFetcher is a Thanos MetadataFetcher implementation leveraging on the Cortex bucket index.
type BucketIndexMetadataFetcher struct {
userID string
Expand All @@ -28,7 +31,7 @@ type BucketIndexMetadataFetcher struct {
logger log.Logger
filters []block.MetadataFilter
modifiers []block.MetadataModifier
metrics *fetcherMetrics
metrics *block.FetcherMetrics
}

func NewBucketIndexMetadataFetcher(
Expand All @@ -49,37 +52,37 @@ func NewBucketIndexMetadataFetcher(
logger: logger,
filters: filters,
modifiers: modifiers,
metrics: newFetcherMetrics(reg),
metrics: block.NewFetcherMetrics(reg, [][]string{{corruptedBucketIndex}, {noBucketIndex}}, nil),
}
}

// Fetch implements metadata.MetadataFetcher.
// Fetch implements block.MetadataFetcher. Not goroutine-safe.
func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) {
f.metrics.resetTx()
f.metrics.ResetTx()

// Check whether the user belongs to the shard.
if len(f.strategy.FilterUsers(ctx, []string{f.userID})) != 1 {
f.metrics.submit()
f.metrics.Submit()
return nil, nil, nil
}

// Track duration and sync counters only if wasn't filtered out by the sharding strategy.
start := time.Now()
defer func() {
f.metrics.syncDuration.Observe(time.Since(start).Seconds())
f.metrics.SyncDuration.Observe(time.Since(start).Seconds())
if err != nil {
f.metrics.syncFailures.Inc()
f.metrics.SyncFailures.Inc()
}
}()
f.metrics.syncs.Inc()
f.metrics.Syncs.Inc()

// Fetch the bucket index.
idx, err := bucketindex.ReadIndex(ctx, f.bkt, f.userID, f.cfgProvider, f.logger)
if errors.Is(err, bucketindex.ErrIndexNotFound) {
// This is a legit case happening when the first blocks of a tenant have recently been uploaded by ingesters
// and their bucket index has not been created yet.
f.metrics.synced.WithLabelValues(noBucketIndex).Set(1)
f.metrics.submit()
f.metrics.Synced.WithLabelValues(noBucketIndex).Set(1)
f.metrics.Submit()

return nil, nil, nil
}
Expand All @@ -88,14 +91,14 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid.
// because unable to fetch blocks metadata. We'll act as if the tenant has no bucket index, but the query
// will fail anyway in the querier (the querier fails in the querier if bucket index is corrupted).
level.Error(f.logger).Log("msg", "corrupted bucket index found", "user", f.userID, "err", err)
f.metrics.synced.WithLabelValues(corruptedBucketIndex).Set(1)
f.metrics.submit()
f.metrics.Synced.WithLabelValues(corruptedBucketIndex).Set(1)
f.metrics.Submit()

return nil, nil, nil
}
if err != nil {
f.metrics.synced.WithLabelValues(failedMeta).Set(1)
f.metrics.submit()
f.metrics.Synced.WithLabelValues(block.FailedMeta).Set(1)
f.metrics.Submit()

return nil, nil, errors.Wrapf(err, "read bucket index")
}
Expand All @@ -111,9 +114,9 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid.

// NOTE: filter can update synced metric accordingly to the reason of the exclude.
if customFilter, ok := filter.(MetadataFilterWithBucketIndex); ok {
err = customFilter.FilterWithBucketIndex(ctx, metas, idx, f.metrics.synced)
err = customFilter.FilterWithBucketIndex(ctx, metas, idx, f.metrics.Synced)
} else {
err = filter.Filter(ctx, metas, f.metrics.synced)
err = filter.Filter(ctx, metas, f.metrics.Synced)
}

if err != nil {
Expand All @@ -123,13 +126,13 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid.

for _, m := range f.modifiers {
// NOTE: modifier can update modified metric accordingly to the reason of the modification.
if err := m.Modify(ctx, metas, f.metrics.modified); err != nil {
if err := m.Modify(ctx, metas, f.metrics.Modified); err != nil {
return nil, nil, errors.Wrap(err, "modify metas")
}
}

f.metrics.synced.WithLabelValues(loadedMeta).Set(float64(len(metas)))
f.metrics.submit()
f.metrics.Synced.WithLabelValues(block.LoadedMeta).Set(float64(len(metas)))
f.metrics.Submit()

return metas, nil, nil
}
Expand All @@ -138,103 +141,3 @@ func (f *BucketIndexMetadataFetcher) UpdateOnChange(callback func([]metadata.Met
// Unused by the store-gateway.
callback(nil, errors.New("UpdateOnChange is unsupported"))
}

const (
fetcherSubSys = "blocks_meta"

corruptedMeta = "corrupted-meta-json"
noMeta = "no-meta-json"
loadedMeta = "loaded"
failedMeta = "failed"
corruptedBucketIndex = "corrupted-bucket-index"
noBucketIndex = "no-bucket-index"

// Synced label values.
labelExcludedMeta = "label-excluded"
timeExcludedMeta = "time-excluded"
tooFreshMeta = "too-fresh"
duplicateMeta = "duplicate"
// Blocks that are marked for deletion can be loaded as well. This is done to make sure that we load blocks that are meant to be deleted,
// but don't have a replacement block yet.
markedForDeletionMeta = "marked-for-deletion"

// MarkedForNoCompactionMeta is label for blocks which are loaded but also marked for no compaction. This label is also counted in `loaded` label metric.
MarkedForNoCompactionMeta = "marked-for-no-compact"

// Modified label values.
replicaRemovedMeta = "replica-label-removed"
)

// fetcherMetrics is a copy of Thanos internal fetcherMetrics. These metrics have been copied from
// Thanos in order to track the same exact metrics in our own custom metadata fetcher implementation.
type fetcherMetrics struct {
syncs prometheus.Counter
syncFailures prometheus.Counter
syncDuration prometheus.Histogram

synced *extprom.TxGaugeVec
modified *extprom.TxGaugeVec
}

func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics {
var m fetcherMetrics

m.syncs = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Subsystem: fetcherSubSys,
Name: "syncs_total",
Help: "Total blocks metadata synchronization attempts",
})
m.syncFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Subsystem: fetcherSubSys,
Name: "sync_failures_total",
Help: "Total blocks metadata synchronization failures",
})
m.syncDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Subsystem: fetcherSubSys,
Name: "sync_duration_seconds",
Help: "Duration of the blocks metadata synchronization in seconds",
Buckets: []float64{0.01, 1, 10, 100, 1000},
})
m.synced = extprom.NewTxGaugeVec(
reg,
prometheus.GaugeOpts{
Subsystem: fetcherSubSys,
Name: "synced",
Help: "Number of block metadata synced",
},
[]string{"state"},
[]string{corruptedMeta},
[]string{corruptedBucketIndex},
[]string{noMeta},
[]string{noBucketIndex},
[]string{loadedMeta},
[]string{tooFreshMeta},
[]string{failedMeta},
[]string{labelExcludedMeta},
[]string{timeExcludedMeta},
[]string{duplicateMeta},
[]string{markedForDeletionMeta},
[]string{MarkedForNoCompactionMeta},
)
m.modified = extprom.NewTxGaugeVec(
reg,
prometheus.GaugeOpts{
Subsystem: fetcherSubSys,
Name: "modified",
Help: "Number of blocks whose metadata changed",
},
[]string{"modified"},
[]string{replicaRemovedMeta},
)
return &m
}

func (s *fetcherMetrics) submit() {
s.synced.Submit()
s.modified.Submit()
}

func (s *fetcherMetrics) resetTx() {
s.synced.ResetTx()
s.modified.ResetTx()
}
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type BucketStores struct {
indexCache storecache.IndexCache

// Chunks bytes pool shared across all tenants.
chunksPool pool.BytesPool
chunksPool pool.Bytes

// Partitioner shared across all tenants.
partitioner store.Partitioner
Expand Down
9 changes: 3 additions & 6 deletions pkg/storegateway/chunk_bytes_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,19 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/pool"
)

const (
maxChunkSize = 16000
"github.com/thanos-io/thanos/pkg/store"
)

type chunkBytesPool struct {
pool *pool.BucketedBytesPool
pool *pool.BucketedBytes

// Metrics.
requestedBytes prometheus.Counter
returnedBytes prometheus.Counter
}

func newChunkBytesPool(maxChunkPoolBytes uint64, reg prometheus.Registerer) (*chunkBytesPool, error) {
upstream, err := pool.NewBucketedBytesPool(maxChunkSize, 50e6, 2, maxChunkPoolBytes)
upstream, err := pool.NewBucketedBytes(store.EstimatedMaxChunkSize, 50e6, 2, maxChunkPoolBytes)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/storegateway/chunk_bytes_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@ import (
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/store"
)

func TestChunkBytesPool_Get(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
p, err := newChunkBytesPool(0, reg)
require.NoError(t, err)

_, err = p.Get(maxChunkSize - 1)
_, err = p.Get(store.EstimatedMaxChunkSize - 1)
require.NoError(t, err)

_, err = p.Get(maxChunkSize + 1)
_, err = p.Get(store.EstimatedMaxChunkSize + 1)
require.NoError(t, err)

assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(fmt.Sprintf(`
Expand All @@ -30,5 +31,5 @@ func TestChunkBytesPool_Get(t *testing.T) {
# HELP cortex_bucket_store_chunk_pool_returned_bytes_total Total bytes returned by the chunk bytes pool.
# TYPE cortex_bucket_store_chunk_pool_returned_bytes_total counter
cortex_bucket_store_chunk_pool_returned_bytes_total %d
`, maxChunkSize*2, maxChunkSize*3))))
`, store.EstimatedMaxChunkSize*2, store.EstimatedMaxChunkSize*3))))
}
2 changes: 1 addition & 1 deletion pkg/storegateway/metadata_fetcher_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (f *IgnoreDeletionMarkFilter) FilterWithBucketIndex(_ context.Context, meta
}

if time.Since(time.Unix(mark.DeletionTime, 0)).Seconds() > f.delay.Seconds() {
synced.WithLabelValues(markedForDeletionMeta).Inc()
synced.WithLabelValues(block.MarkedForDeletionMeta).Inc()
delete(metas, mark.ID)
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/storegateway/metadata_fetcher_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/objstore"
Expand Down Expand Up @@ -100,7 +101,7 @@ func testIgnoreDeletionMarkFilter(t *testing.T, bucketIndexEnabled bool) {
require.NoError(t, f.Filter(ctx, inputMetas, synced))
}

assert.Equal(t, 1.0, promtest.ToFloat64(synced.WithLabelValues(markedForDeletionMeta)))
assert.Equal(t, 1.0, promtest.ToFloat64(synced.WithLabelValues(block.MarkedForDeletionMeta)))
assert.Equal(t, expectedMetas, inputMetas)
assert.Equal(t, expectedDeletionMarks, f.DeletionMarkBlocks())
}
Loading

0 comments on commit d7d8b9c

Please sign in to comment.