Skip to content

Commit

Permalink
Metrics for compaction and downsampling process (#4801)
Browse files Browse the repository at this point in the history
  • Loading branch information
metonymic-smokey authored Nov 6, 2021
1 parent 885b3bb commit eca25c4
Show file tree
Hide file tree
Showing 6 changed files with 526 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4710](https://github.com/thanos-io/thanos/pull/4710) Store: add metric to capture timestamp of the last loaded block.
- [#4736](https://github.com/thanos-io/thanos/pull/4736) S3: Add capability to use custom AWS STS Endpoint.
- [#4764](https://github.com/thanos-io/thanos/pull/4764) Compactor: add `block-viewer.global.sync-block-timeout` flag to set the timeout of synchronization block metas.
- [#4801](https://github.com/thanos-io/thanos/pull/4801) Compactor: added Prometheus metrics for tracking the progress of compaction and downsampling.

### Fixed

Expand Down
47 changes: 46 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,9 @@ func runCompact(
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason),
metadata.HashFunc(conf.hashFunc),
)
tsdbPlanner := compact.NewPlanner(logger, levels, noCompactMarkerFilter)
planner := compact.WithLargeTotalIndexSizeFilter(
compact.NewPlanner(logger, levels, noCompactMarkerFilter),
tsdbPlanner,
bkt,
int64(conf.maxBlockIndexSize),
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason),
Expand Down Expand Up @@ -458,6 +459,47 @@ func runCompact(
return cleanPartialMarked()
}

if conf.compactionProgressMetrics {
g.Add(func() error {
ps := compact.NewCompactionProgressCalculator(reg, tsdbPlanner)
var ds *compact.DownsampleProgressCalculator
if !conf.disableDownsampling {
ds = compact.NewDownsampleProgressCalculator(reg)
}

return runutil.Repeat(5*time.Minute, ctx.Done(), func() error {

if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrapf(err, "could not sync metas")
}

metas := sy.Metas()
groups, err := grouper.Groups(metas)
if err != nil {
return errors.Wrapf(err, "could not group metadata")
}

if err = ps.ProgressCalculate(ctx, groups); err != nil {
return errors.Wrapf(err, "could not calculate compaction progress")
}

if !conf.disableDownsampling {
groups, err = grouper.Groups(metas)
if err != nil {
return errors.Wrapf(err, "could not group metadata into downsample groups")
}
if err := ds.ProgressCalculate(ctx, groups); err != nil {
return errors.Wrapf(err, "could not calculate downsampling progress")
}
}

return nil
})
}, func(err error) {
cancel()
})
}

g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

Expand Down Expand Up @@ -590,9 +632,12 @@ type compactConfig struct {
enableVerticalCompaction bool
dedupFunc string
skipBlockWithOutOfOrderChunks bool
compactionProgressMetrics bool
}

func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("progress-metrics", "Enables the progress metrics, indicating the progress of compaction and downsampling").Default("true").BoolVar(&cc.compactionProgressMetrics)

cmd.Flag("debug.halt-on-error", "Halt the process if a critical compaction error is detected.").
Hidden().Default("true").BoolVar(&cc.haltOnError)
cmd.Flag("debug.accept-malformed-index",
Expand Down
2 changes: 2 additions & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ Flags:
Path to YAML file that contains object store
configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--progress-metrics Enables the progress metrics, indicating the
progress of compaction and downsampling
--retention.resolution-1h=0d
How long to retain samples of resolution 2 (1
hour) in bucket. Setting this to 0d will retain
Expand Down
197 changes: 197 additions & 0 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,19 @@ func (cg *Group) Key() string {
return cg.key
}

func (cg *Group) deleteFromGroup(target map[ulid.ULID]struct{}) {
cg.mtx.Lock()
defer cg.mtx.Unlock()
var newGroupMeta []*metadata.Meta
for _, meta := range cg.metasByMinTime {
if _, found := target[meta.BlockMeta.ULID]; !found {
newGroupMeta = append(newGroupMeta, meta)
}
}

cg.metasByMinTime = newGroupMeta
}

// AppendMeta the block with the given meta to the group.
func (cg *Group) AppendMeta(meta *metadata.Meta) error {
cg.mtx.Lock()
Expand Down Expand Up @@ -470,6 +483,190 @@ func (cg *Group) Resolution() int64 {
return cg.resolution
}

// CompactProgressMetrics contains Prometheus metrics related to compaction progress.
type CompactProgressMetrics struct {
NumberOfCompactionRuns *prometheus.GaugeVec
NumberOfCompactionBlocks *prometheus.GaugeVec
}

// ProgressCalculator calculates the progress of the compaction process for a given slice of Groups.
type ProgressCalculator interface {
ProgressCalculate(ctx context.Context, groups []*Group) error
}

// CompactionProgressCalculator contains a planner and ProgressMetrics, which are updated during the compaction simulation process.
type CompactionProgressCalculator struct {
planner Planner
*CompactProgressMetrics
}

// NewCompactProgressCalculator creates a new CompactionProgressCalculator.
func NewCompactionProgressCalculator(reg prometheus.Registerer, planner *tsdbBasedPlanner) *CompactionProgressCalculator {
return &CompactionProgressCalculator{
planner: planner,
CompactProgressMetrics: &CompactProgressMetrics{
NumberOfCompactionRuns: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "thanos_compact_todo_compactions",
Help: "number of compactions to be done",
}, []string{"group"}),
NumberOfCompactionBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "thanos_compact_todo_compaction_blocks",
Help: "number of blocks planned to be compacted",
}, []string{"group"}),
},
}
}

// ProgressCalculate calculates the number of blocks and compaction runs in the planning process of the given groups.
func (ps *CompactionProgressCalculator) ProgressCalculate(ctx context.Context, groups []*Group) error {
groupCompactions := make(map[string]int, len(groups))
groupBlocks := make(map[string]int, len(groups))

for len(groups) > 0 {
tmpGroups := make([]*Group, 0, len(groups))
for _, g := range groups {
if len(g.IDs()) == 1 {
continue
}
plan, err := ps.planner.Plan(ctx, g.metasByMinTime)
if err != nil {
return errors.Wrapf(err, "could not plan")
}
if len(plan) == 0 {
continue
}
groupCompactions[g.key]++

toRemove := make(map[ulid.ULID]struct{}, len(plan))
metas := make([]*tsdb.BlockMeta, 0, len(plan))
for _, p := range plan {
metas = append(metas, &p.BlockMeta)
toRemove[p.BlockMeta.ULID] = struct{}{}
}
g.deleteFromGroup(toRemove)

groupBlocks[g.key] += len(plan)

if len(g.metasByMinTime) == 0 {
continue
}

newMeta := tsdb.CompactBlockMetas(ulid.MustNew(uint64(time.Now().Unix()), nil), metas...)
if err := g.AppendMeta(&metadata.Meta{BlockMeta: *newMeta, Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: g.Resolution()}, Labels: g.Labels().Map()}}); err != nil {
return errors.Wrapf(err, "append meta")
}
tmpGroups = append(tmpGroups, g)
}

groups = tmpGroups
}

ps.CompactProgressMetrics.NumberOfCompactionRuns.Reset()
ps.CompactProgressMetrics.NumberOfCompactionBlocks.Reset()

for key, iters := range groupCompactions {
ps.CompactProgressMetrics.NumberOfCompactionRuns.WithLabelValues(key).Add(float64(iters))
ps.CompactProgressMetrics.NumberOfCompactionBlocks.WithLabelValues(key).Add(float64(groupBlocks[key]))
}

return nil
}

// DownsampleProgressMetrics contains Prometheus metrics related to downsampling progress.
type DownsampleProgressMetrics struct {
NumberOfBlocksDownsampled *prometheus.GaugeVec
}

// DownsampleProgressCalculator contains DownsampleMetrics, which are updated during the downsampling simulation process.
type DownsampleProgressCalculator struct {
*DownsampleProgressMetrics
}

// NewDownsampleProgressCalculator creates a new DownsampleProgressCalculator.
func NewDownsampleProgressCalculator(reg prometheus.Registerer) *DownsampleProgressCalculator {
return &DownsampleProgressCalculator{
DownsampleProgressMetrics: &DownsampleProgressMetrics{
NumberOfBlocksDownsampled: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "thanos_compact_todo_downsample_blocks",
Help: "number of blocks to be downsampled",
}, []string{"group"}),
},
}
}

// ProgressCalculate calculates the number of blocks to be downsampled for the given groups.
func (ds *DownsampleProgressCalculator) ProgressCalculate(ctx context.Context, groups []*Group) error {
sources5m := map[ulid.ULID]struct{}{}
sources1h := map[ulid.ULID]struct{}{}
groupBlocks := make(map[string]int, len(groups))

for _, group := range groups {
for _, m := range group.metasByMinTime {
switch m.Thanos.Downsample.Resolution {
case downsample.ResLevel0:
continue
case downsample.ResLevel1:
for _, id := range m.Compaction.Sources {
sources5m[id] = struct{}{}
}
case downsample.ResLevel2:
for _, id := range m.Compaction.Sources {
sources1h[id] = struct{}{}
}
default:
return errors.Errorf("unexpected downsampling resolution %d", m.Thanos.Downsample.Resolution)
}

}
}

for _, group := range groups {
for _, m := range group.metasByMinTime {
switch m.Thanos.Downsample.Resolution {
case downsample.ResLevel0:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources5m[id]; !ok {
missing = true
break
}
}
if !missing {
continue
}

if m.MaxTime-m.MinTime < downsample.DownsampleRange0 {
continue
}
groupBlocks[group.key]++
case downsample.ResLevel1:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources1h[id]; !ok {
missing = true
break
}
}
if !missing {
continue
}

if m.MaxTime-m.MinTime < downsample.DownsampleRange1 {
continue
}
groupBlocks[group.key]++
}
}
}

ds.DownsampleProgressMetrics.NumberOfBlocksDownsampled.Reset()
for key, blocks := range groupBlocks {
ds.DownsampleProgressMetrics.NumberOfBlocksDownsampled.WithLabelValues(key).Add(float64(blocks))
}

return nil
}

// Planner returns blocks to compact.
type Planner interface {
// Plan returns a list of blocks that should be compacted into single one.
Expand Down
Loading

0 comments on commit eca25c4

Please sign in to comment.