From 2564a810c5872205c7385cd6d0a787b68477188f Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Thu, 4 Nov 2021 00:00:22 +0530 Subject: [PATCH] Logs deletion fixes (#4625) * check all tables when we have delete requests to process since chunks could span multiple tables * upload modified index when the it is modified without dropping the chunk * fix broken test * lint * Fix modified check Co-authored-by: Cyril Tovena * suggested changes from PR review * add changelog Co-authored-by: Cyril Tovena Co-authored-by: Owen Diehl --- CHANGELOG.md | 1 + .../stores/shipper/compactor/compactor.go | 10 ++-- .../deletion/delete_requests_manager.go | 19 ++----- .../shipper/compactor/retention/expiration.go | 4 +- .../compactor/retention/expiration_test.go | 4 +- .../shipper/compactor/retention/retention.go | 53 ++++++++++--------- .../compactor/retention/retention_test.go | 29 ++++++++-- pkg/storage/stores/shipper/compactor/table.go | 4 +- .../stores/shipper/compactor/table_test.go | 20 +++---- 9 files changed, 80 insertions(+), 64 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 570655d7aa776..8be31abffdae5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ * [4603](https://github.com/grafana/loki/pull/4603) **garrettlish**: Add date time sprig template functions in logql label/line formatter * [4608](https://github.com/grafana/loki/pull/4608) **trevorwhitney**: Change default value of ingester lifecycler's `final_sleep` from `30s` to `0s` * [4629](https://github.com/grafana/loki/pull/4629) **owen-d**: Default the WAL to enabled in the Loki jsonnet library +* [4625](https://github.com/grafana/loki/pull/4625) **sandeepsukhani**: Logs Deletion: Fix issues in processing of delete requests * [4556](https://github.com/grafana/loki/pull/4556) **james-callahan**: Remove `promtail_instance` label that was was added by promtail when scraping `gcplog` target. # 2.3.0 (2021/08/06) diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index a6f5a24ce1f19..a0acdf09cf989 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -392,12 +392,12 @@ func (c *Compactor) CompactTable(ctx context.Context, tableName string) error { } interval := retention.ExtractIntervalFromTableName(tableName) - intervalHasExpiredChunks := false + intervalMayHaveExpiredChunks := false if c.cfg.RetentionEnabled { - intervalHasExpiredChunks = c.expirationChecker.IntervalHasExpiredChunks(interval) + intervalMayHaveExpiredChunks = c.expirationChecker.IntervalMayHaveExpiredChunks(interval) } - err = table.compact(intervalHasExpiredChunks) + err = table.compact(intervalMayHaveExpiredChunks) if err != nil { level.Error(util_log.Logger).Log("msg", "failed to compact files", "table", tableName, "err", err) return err @@ -531,8 +531,8 @@ func (e *expirationChecker) MarkPhaseFinished() { e.deletionExpiryChecker.MarkPhaseFinished() } -func (e *expirationChecker) IntervalHasExpiredChunks(interval model.Interval) bool { - return e.retentionExpiryChecker.IntervalHasExpiredChunks(interval) || e.deletionExpiryChecker.IntervalHasExpiredChunks(interval) +func (e *expirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval) bool { + return e.retentionExpiryChecker.IntervalMayHaveExpiredChunks(interval) || e.deletionExpiryChecker.IntervalMayHaveExpiredChunks(interval) } func (e *expirationChecker) DropFromIndex(ref retention.ChunkEntry, tableEndTime model.Time, now model.Time) bool { diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go index 45109ffe58f52..f6740f199edfe 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go @@ -195,24 +195,13 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() { } } -func (d *DeleteRequestsManager) IntervalHasExpiredChunks(interval model.Interval) bool { +func (d *DeleteRequestsManager) IntervalMayHaveExpiredChunks(_ model.Interval) bool { d.deleteRequestsToProcessMtx.Lock() defer d.deleteRequestsToProcessMtx.Unlock() - if len(d.deleteRequestsToProcess) == 0 { - return false - } - - for _, deleteRequest := range d.deleteRequestsToProcess { - if intervalsOverlap(interval, model.Interval{ - Start: deleteRequest.StartTime, - End: deleteRequest.EndTime, - }) { - return true - } - } - - return false + // If your request includes just today and there are chunks spanning today and yesterday then + // with previous check it won’t process yesterday’s index. + return len(d.deleteRequestsToProcess) != 0 } func (d *DeleteRequestsManager) DropFromIndex(_ retention.ChunkEntry, _ model.Time, _ model.Time) bool { diff --git a/pkg/storage/stores/shipper/compactor/retention/expiration.go b/pkg/storage/stores/shipper/compactor/retention/expiration.go index 9c6e6c85bc366..3eb6c6a89db7d 100644 --- a/pkg/storage/stores/shipper/compactor/retention/expiration.go +++ b/pkg/storage/stores/shipper/compactor/retention/expiration.go @@ -15,7 +15,7 @@ import ( type ExpirationChecker interface { Expired(ref ChunkEntry, now model.Time) (bool, []model.Interval) - IntervalHasExpiredChunks(interval model.Interval) bool + IntervalMayHaveExpiredChunks(interval model.Interval) bool MarkPhaseStarted() MarkPhaseFailed() MarkPhaseFinished() @@ -65,7 +65,7 @@ func (e *expirationChecker) MarkPhaseStarted() { func (e *expirationChecker) MarkPhaseFailed() {} func (e *expirationChecker) MarkPhaseFinished() {} -func (e *expirationChecker) IntervalHasExpiredChunks(interval model.Interval) bool { +func (e *expirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval) bool { return interval.Start.Before(e.latestRetentionStartTime) } diff --git a/pkg/storage/stores/shipper/compactor/retention/expiration_test.go b/pkg/storage/stores/shipper/compactor/retention/expiration_test.go index b1ffaf508a77d..13816e59708cc 100644 --- a/pkg/storage/stores/shipper/compactor/retention/expiration_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/expiration_test.go @@ -211,7 +211,7 @@ func TestFindLatestRetentionStartTime(t *testing.T) { } } -func TestExpirationChecker_IntervalHasExpiredChunks(t *testing.T) { +func TestExpirationChecker_IntervalMayHaveExpiredChunks(t *testing.T) { for _, tc := range []struct { name string expirationChecker expirationChecker @@ -252,7 +252,7 @@ func TestExpirationChecker_IntervalHasExpiredChunks(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - require.Equal(t, tc.hasExpiredChunks, tc.expirationChecker.IntervalHasExpiredChunks(tc.interval)) + require.Equal(t, tc.hasExpiredChunks, tc.expirationChecker.IntervalMayHaveExpiredChunks(tc.interval)) }) } } diff --git a/pkg/storage/stores/shipper/compactor/retention/retention.go b/pkg/storage/stores/shipper/compactor/retention/retention.go index 7e6dcbb8168f2..1f4d53bba2cc3 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention.go @@ -30,8 +30,8 @@ const ( ) type TableMarker interface { - // MarkForDelete marks chunks to delete for a given table and returns if it's empty and how many marks were created. - MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) + // MarkForDelete marks chunks to delete for a given table and returns if it's empty or modified. + MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) } type Marker struct { @@ -57,7 +57,7 @@ func NewMarker(workingDirectory string, config storage.SchemaConfig, expiration } // MarkForDelete marks all chunks expired for a given table. -func (t *Marker) MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) { +func (t *Marker) MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { start := time.Now() status := statusSuccess defer func() { @@ -66,26 +66,26 @@ func (t *Marker) MarkForDelete(ctx context.Context, tableName string, db *bbolt. }() level.Debug(util_log.Logger).Log("msg", "starting to process table", "table", tableName) - empty, markCount, err := t.markTable(ctx, tableName, db) + empty, modified, err := t.markTable(ctx, tableName, db) if err != nil { status = statusFailure - return false, 0, err + return false, false, err } - return empty, markCount, nil + return empty, modified, nil } -func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) { +func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { schemaCfg, ok := schemaPeriodForTable(t.config, tableName) if !ok { - return false, 0, fmt.Errorf("could not find schema for table: %s", tableName) + return false, false, fmt.Errorf("could not find schema for table: %s", tableName) } markerWriter, err := NewMarkerStorageWriter(t.workingDirectory) if err != nil { - return false, 0, fmt.Errorf("failed to create marker writer: %w", err) + return false, false, fmt.Errorf("failed to create marker writer: %w", err) } - var empty bool + var empty, modified bool err = db.Update(func(tx *bbolt.Tx) error { bucket := tx.Bucket(bucketName) if bucket == nil { @@ -104,7 +104,7 @@ func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB) return err } - empty, err = markforDelete(ctx, tableName, markerWriter, chunkIt, newSeriesCleaner(bucket, schemaCfg, tableName), t.expiration, chunkRewriter) + empty, modified, err = markforDelete(ctx, tableName, markerWriter, chunkIt, newSeriesCleaner(bucket, schemaCfg, tableName), t.expiration, chunkRewriter) if err != nil { return err } @@ -115,30 +115,31 @@ func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB) return nil }) if err != nil { - return false, 0, err + return false, false, err } if empty { t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionDeleted).Inc() - return empty, markerWriter.Count(), nil + return empty, true, nil } - if markerWriter.Count() == 0 { + if !modified { t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionNone).Inc() - return empty, markerWriter.Count(), nil + return empty, modified, nil } t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionModified).Inc() - return empty, markerWriter.Count(), nil + return empty, modified, nil } -func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWriter, chunkIt ChunkEntryIterator, seriesCleaner SeriesCleaner, expiration ExpirationChecker, chunkRewriter *chunkRewriter) (bool, error) { +func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWriter, chunkIt ChunkEntryIterator, seriesCleaner SeriesCleaner, expiration ExpirationChecker, chunkRewriter *chunkRewriter) (bool, bool, error) { seriesMap := newUserSeriesMap() // tableInterval holds the interval for which the table is expected to have the chunks indexed tableInterval := ExtractIntervalFromTableName(tableName) empty := true + modified := false now := model.Now() for chunkIt.Next() { if chunkIt.Err() != nil { - return false, chunkIt.Err() + return false, false, chunkIt.Err() } c := chunkIt.Entry() seriesMap.Add(c.SeriesID, c.UserID, c.Labels) @@ -148,7 +149,7 @@ func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWr if len(nonDeletedIntervals) > 0 { wroteChunks, err := chunkRewriter.rewriteChunk(ctx, c, nonDeletedIntervals) if err != nil { - return false, err + return false, false, err } if wroteChunks { @@ -159,15 +160,16 @@ func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWr } if err := chunkIt.Delete(); err != nil { - return false, err + return false, false, err } + modified = true // Mark the chunk for deletion only if it is completely deleted, or this is the last table that the chunk is index in. // For a partially deleted chunk, if we delete the source chunk before all the tables which index it are processed then // the retention would fail because it would fail to find it in the storage. if len(nonDeletedIntervals) == 0 || c.Through <= tableInterval.End { if err := marker.Put(c.ChunkID); err != nil { - return false, err + return false, false, err } } continue @@ -180,8 +182,9 @@ func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWr if c.Through.After(tableInterval.End) { if expiration.DropFromIndex(c, tableInterval.End, now) { if err := chunkIt.Delete(); err != nil { - return false, err + return false, false, err } + modified = true continue } } @@ -190,13 +193,13 @@ func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWr seriesMap.MarkSeriesNotDeleted(c.SeriesID, c.UserID) } if empty { - return true, nil + return true, true, nil } if ctx.Err() != nil { - return false, ctx.Err() + return false, false, ctx.Err() } - return false, seriesMap.ForEach(func(info userSeriesInfo) error { + return false, modified, seriesMap.ForEach(func(info userSeriesInfo) error { if !info.isDeleted { return nil } diff --git a/pkg/storage/stores/shipper/compactor/retention/retention_test.go b/pkg/storage/stores/shipper/compactor/retention/retention_test.go index a0741ad6a051e..0e64da614da64 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention_test.go @@ -207,7 +207,7 @@ func Test_EmptyTable(t *testing.T) { err := tables[0].DB.Update(func(tx *bbolt.Tx) error { it, err := newChunkIndexIterator(tx.Bucket(bucketName), schema.config) require.NoError(t, err) - empty, err := markforDelete(context.Background(), tables[0].name, noopWriter{}, it, noopCleaner{}, + empty, _, err := markforDelete(context.Background(), tables[0].name, noopWriter{}, it, noopCleaner{}, NewExpirationChecker(&fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: 0}, "2": {retentionPeriod: 0}}}), nil) require.NoError(t, err) require.True(t, empty) @@ -457,6 +457,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { expiry []chunkExpiry expectedDeletedSeries []map[uint64]struct{} expectedEmpty []bool + expectedModified []bool }{ { name: "no chunk and series deleted", @@ -474,6 +475,9 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { expectedEmpty: []bool{ false, }, + expectedModified: []bool{ + false, + }, }, { name: "only one chunk in store which gets deleted", @@ -491,6 +495,9 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { expectedEmpty: []bool{ true, }, + expectedModified: []bool{ + true, + }, }, { name: "only one chunk in store which gets partially deleted", @@ -512,6 +519,9 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { expectedEmpty: []bool{ false, }, + expectedModified: []bool{ + true, + }, }, { name: "one of two chunks deleted", @@ -533,6 +543,9 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { expectedEmpty: []bool{ false, }, + expectedModified: []bool{ + true, + }, }, { name: "one of two chunks partially deleted", @@ -558,6 +571,9 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { expectedEmpty: []bool{ false, }, + expectedModified: []bool{ + true, + }, }, { name: "one big chunk partially deleted for yesterdays table without rewrite", @@ -579,6 +595,9 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { expectedEmpty: []bool{ true, false, }, + expectedModified: []bool{ + true, true, + }, }, { name: "one big chunk partially deleted for yesterdays table with rewrite", @@ -600,6 +619,9 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { expectedEmpty: []bool{ false, false, }, + expectedModified: []bool{ + true, true, + }, }, } { t.Run(tc.name, func(t *testing.T) { @@ -628,10 +650,11 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { cr, err := newChunkRewriter(chunkClient, schema.config, table.name, tx.Bucket(bucketName)) require.NoError(t, err) - empty, err := markforDelete(context.Background(), table.name, noopWriter{}, it, seriesCleanRecorder, + empty, isModified, err := markforDelete(context.Background(), table.name, noopWriter{}, it, seriesCleanRecorder, expirationChecker, cr) require.NoError(t, err) require.Equal(t, tc.expectedEmpty[i], empty) + require.Equal(t, tc.expectedModified[i], isModified) return nil }) require.NoError(t, err) @@ -671,7 +694,7 @@ func TestMarkForDelete_DropChunkFromIndex(t *testing.T) { err := table.DB.Update(func(tx *bbolt.Tx) error { it, err := newChunkIndexIterator(tx.Bucket(bucketName), schema.config) require.NoError(t, err) - empty, err := markforDelete(context.Background(), table.name, noopWriter{}, it, noopCleaner{}, + empty, _, err := markforDelete(context.Background(), table.name, noopWriter{}, it, noopCleaner{}, NewExpirationChecker(fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: retentionPeriod}}}), nil) require.NoError(t, err) if i == 7 { diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/shipper/compactor/table.go index 484edbe554ff7..54739f1d1ba06 100644 --- a/pkg/storage/stores/shipper/compactor/table.go +++ b/pkg/storage/stores/shipper/compactor/table.go @@ -134,7 +134,7 @@ func (t *table) compact(tableHasExpiredStreams bool) error { return nil } - empty, markCount, err := t.tableMarker.MarkForDelete(t.ctx, t.name, t.compactedDB) + empty, modified, err := t.tableMarker.MarkForDelete(t.ctx, t.name, t.compactedDB) if err != nil { return err } @@ -143,7 +143,7 @@ func (t *table) compact(tableHasExpiredStreams bool) error { return t.removeFilesFromStorage(indexFiles) } - if markCount == 0 && !compacted { + if !modified && !compacted { // we didn't make a modification so let's just return return nil } diff --git a/pkg/storage/stores/shipper/compactor/table_test.go b/pkg/storage/stores/shipper/compactor/table_test.go index e7324833399ba..3afc454ee1e92 100644 --- a/pkg/storage/stores/shipper/compactor/table_test.go +++ b/pkg/storage/stores/shipper/compactor/table_test.go @@ -96,9 +96,9 @@ func TestTable_Compaction(t *testing.T) { } } -type TableMarkerFunc func(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) +type TableMarkerFunc func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) -func (t TableMarkerFunc) MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) { +func (t TableMarkerFunc) MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { return t(ctx, tableName, db) } @@ -114,8 +114,8 @@ func TestTable_CompactionRetention(t *testing.T) { _, err := ioutil.ReadDir(filepath.Join(storagePath, tableName)) require.True(t, os.IsNotExist(err)) }, - tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) { - return true, 100, nil + tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { + return true, true, nil }), }, "marked table": { @@ -127,8 +127,8 @@ func TestTable_CompactionRetention(t *testing.T) { require.True(t, strings.HasSuffix(files[0].Name(), ".gz")) compareCompactedDB(t, filepath.Join(storagePath, tableName, files[0].Name()), filepath.Join(storagePath, "test-copy")) }, - tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) { - return false, 100, nil + tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { + return false, true, nil }), }, "already compacted table": { @@ -140,8 +140,8 @@ func TestTable_CompactionRetention(t *testing.T) { require.True(t, strings.HasSuffix(files[0].Name(), ".gz")) compareCompactedDB(t, filepath.Join(storagePath, tableName, files[0].Name()), filepath.Join(storagePath, "test-copy")) }, - tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) { - return false, 100, nil + tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { + return false, true, nil }), }, "not modified": { @@ -153,8 +153,8 @@ func TestTable_CompactionRetention(t *testing.T) { require.True(t, strings.HasSuffix(files[0].Name(), ".gz")) compareCompactedDB(t, filepath.Join(storagePath, tableName, files[0].Name()), filepath.Join(storagePath, "test-copy")) }, - tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) { - return false, 0, nil + tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { + return false, false, nil }), }, } {