Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

index cleanup fixes while applying retention #4741

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

* [4736](https://github.com/grafana/loki/pull/4736) **sandeepsukhani**: allow applying retention at different interval than compaction
* [4744](https://github.com/grafana/loki/pull/4744) **cyriltovena**: Promtail: Adds GELF UDP support.
* [4741](https://github.com/grafana/loki/pull/4741) **sandeepsukhani**: index cleanup fixes while applying retention

# 2.4.1 (2021/11/07)

Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/stores/shipper/compactor/retention/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ func newSeriesCleaner(bucket *bbolt.Bucket, config chunk.PeriodConfig, tableName
}

func (s *seriesCleaner) Cleanup(userID []byte, lbls labels.Labels) error {
// We need to add metric name label as well if it is missing since the series ids are calculated including that.
if lbls.Get(labels.MetricName) == "" {
lbls = append(lbls, labels.Label{
Name: labels.MetricName,
Value: logMetricName,
})
}
_, indexEntries, err := s.schema.GetCacheKeysAndLabelWriteEntries(s.tableInterval.Start, s.tableInterval.End, string(userID), logMetricName, lbls, "")
if err != nil {
return err
Expand Down
16 changes: 11 additions & 5 deletions pkg/storage/stores/shipper/compactor/retention/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func Test_SeriesCleaner(t *testing.T) {

tables := store.indexTables()
require.Len(t, tables, 1)
// remove c2 chunk
// remove c1, c2 chunk
err := tables[0].DB.Update(func(tx *bbolt.Tx) error {
it, err := newChunkIndexIterator(tx.Bucket(bucketName), tt.config)
require.NoError(t, err)
Expand All @@ -104,22 +104,28 @@ func Test_SeriesCleaner(t *testing.T) {
if err := cleaner.Cleanup(entryFromChunk(c2).UserID, c2.Metric); err != nil {
return err
}
return cleaner.Cleanup(entryFromChunk(c1).UserID, c1.Metric)

// remove series for c1 without __name__ label, which should work just fine
return cleaner.Cleanup(entryFromChunk(c1).UserID, c1.Metric.WithoutLabels(labels.MetricName))
})
require.NoError(t, err)

err = tables[0].DB.View(func(tx *bbolt.Tx) error {
return tx.Bucket(bucketName).ForEach(func(k, _ []byte) error {
expectedDeleteSeries := entryFromChunk(c2).SeriesID
c1SeriesID := entryFromChunk(c1).SeriesID
c2SeriesID := entryFromChunk(c2).SeriesID
series, ok, err := parseLabelIndexSeriesID(decodeKey(k))
if !ok {
return nil
}
if err != nil {
return err
}
if string(expectedDeleteSeries) == string(series) {
require.Fail(t, "series should be deleted", expectedDeleteSeries)

if string(c1SeriesID) == string(series) {
require.Fail(t, "series for c1 should be deleted", c1SeriesID)
} else if string(c2SeriesID) == string(series) {
require.Fail(t, "series for c2 should be deleted", c2SeriesID)
}

return nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/stores/shipper/compactor/retention/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func ExtractIntervalFromTableName(tableName string) model.Interval {
}

interval.Start = model.TimeFromUnix(tableNumber * 86400)
interval.End = interval.Start.Add(24 * time.Hour)
// subtract a millisecond here so that interval only covers a single table since adding 24 hours ends up covering the start time of next table as well.
interval.End = interval.Start.Add(24*time.Hour) - 1
return interval
}
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func TestExtractIntervalFromTableName(t *testing.T) {

calculateInterval := func(tm model.Time) (m model.Interval) {
m.Start = tm - tm%millisecondsInDay
m.End = m.Start + millisecondsInDay
m.End = m.Start + millisecondsInDay - 1
return
}

Expand Down