diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index 57f6f067cc4..1fd9dd72758 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -383,7 +383,7 @@ func (s *BucketStore) InitialSync(ctx context.Context) error { return nil } -func (s *BucketStore) tryRestoreLoadedBlocksSet() map[ulid.ULID]int64 { +func (s *BucketStore) tryRestoreLoadedBlocksSet() map[ulid.ULID]struct{} { previouslyLoadedBlocks, err := indexheader.RestoreLoadedBlocks(s.dir) if err != nil { level.Warn(s.logger).Log( @@ -397,7 +397,7 @@ func (s *BucketStore) tryRestoreLoadedBlocksSet() map[ulid.ULID]int64 { return previouslyLoadedBlocks } -func (s *BucketStore) loadBlocks(ctx context.Context, blocks map[ulid.ULID]int64) { +func (s *BucketStore) loadBlocks(ctx context.Context, blocks map[ulid.ULID]struct{}) { // This is not happening during a request so we can ignore the stats. ignoredStats := newSafeQueryStats() // We ignore the time the block was used because it can only be in the map if it was still loaded before the shutdown diff --git a/pkg/storegateway/bucket_e2e_test.go b/pkg/storegateway/bucket_e2e_test.go index 88a1a106aa9..10baf529412 100644 --- a/pkg/storegateway/bucket_e2e_test.go +++ b/pkg/storegateway/bucket_e2e_test.go @@ -687,15 +687,15 @@ func TestBucketStore_EagerLoading(t *testing.T) { testCases := map[string]struct { eagerLoadReaderEnabled bool expectedEagerLoadedBlocks int - createLoadedBlocksSnapshotFn func([]ulid.ULID) map[ulid.ULID]int64 + createLoadedBlocksSnapshotFn func([]ulid.ULID) map[ulid.ULID]struct{} }{ "block is present in pre-shutdown loaded blocks and eager-loading is disabled": { eagerLoadReaderEnabled: false, expectedEagerLoadedBlocks: 0, - createLoadedBlocksSnapshotFn: func(blockIDs []ulid.ULID) map[ulid.ULID]int64 { - snapshot := make(map[ulid.ULID]int64) + createLoadedBlocksSnapshotFn: func(blockIDs []ulid.ULID) map[ulid.ULID]struct{} { + snapshot := make(map[ulid.ULID]struct{}) for _, blockID := range blockIDs { - snapshot[blockID] = time.Now().UnixMilli() + snapshot[blockID] = struct{}{} } return snapshot }, @@ -703,10 +703,10 @@ func TestBucketStore_EagerLoading(t *testing.T) { "block is present in pre-shutdown loaded blocks and eager-loading is enabled, loading index header during initial sync": { eagerLoadReaderEnabled: true, expectedEagerLoadedBlocks: 6, - createLoadedBlocksSnapshotFn: func(blockIDs []ulid.ULID) map[ulid.ULID]int64 { - snapshot := make(map[ulid.ULID]int64) + createLoadedBlocksSnapshotFn: func(blockIDs []ulid.ULID) map[ulid.ULID]struct{} { + snapshot := make(map[ulid.ULID]struct{}) for _, blockID := range blockIDs { - snapshot[blockID] = time.Now().UnixMilli() + snapshot[blockID] = struct{}{} } return snapshot }, @@ -714,10 +714,10 @@ func TestBucketStore_EagerLoading(t *testing.T) { "block is present in pre-shutdown loaded blocks and eager-loading is enabled, loading index header after initial sync": { eagerLoadReaderEnabled: true, expectedEagerLoadedBlocks: 6, - createLoadedBlocksSnapshotFn: func(blockIDs []ulid.ULID) map[ulid.ULID]int64 { - snapshot := make(map[ulid.ULID]int64) + createLoadedBlocksSnapshotFn: func(blockIDs []ulid.ULID) map[ulid.ULID]struct{} { + snapshot := make(map[ulid.ULID]struct{}) for _, blockID := range blockIDs { - snapshot[blockID] = time.Now().UnixMilli() + snapshot[blockID] = struct{}{} } return snapshot }, @@ -725,11 +725,11 @@ func TestBucketStore_EagerLoading(t *testing.T) { "block is not present in pre-shutdown loaded blocks snapshot and eager-loading is enabled": { eagerLoadReaderEnabled: true, expectedEagerLoadedBlocks: 0, // although eager loading is enabled, this test will not do eager loading because the block ID is not in the lazy loaded file. - createLoadedBlocksSnapshotFn: func(_ []ulid.ULID) map[ulid.ULID]int64 { + createLoadedBlocksSnapshotFn: func(_ []ulid.ULID) map[ulid.ULID]struct{} { // let's create a random fake blockID to be stored in lazy loaded headers file fakeBlockID := ulid.MustNew(ulid.Now(), nil) // this snapshot will refer to fake block, hence eager load wouldn't be executed for the real block that we test - return map[ulid.ULID]int64{fakeBlockID: time.Now().UnixMilli()} + return map[ulid.ULID]struct{}{fakeBlockID: {}} }, }, "pre-shutdown loaded blocks snapshot doesn't exist and eager-loading is enabled": { @@ -794,7 +794,7 @@ func TestBucketStore_PersistsLazyLoadedBlocks(t *testing.T) { cfg.bucketStoreConfig.IndexHeader.EagerLoadingStartupEnabled = true cfg.bucketStoreConfig.IndexHeader.LazyLoadingIdleTimeout = persistInterval * 3 ctx := context.Background() - readBlocksInSnapshot := func() map[ulid.ULID]int64 { + readBlocksInSnapshot := func() map[ulid.ULID]struct{} { blocks, err := indexheader.RestoreLoadedBlocks(cfg.tempDir) assert.NoError(t, err) return blocks @@ -825,9 +825,9 @@ func TestBucketStore_PersistsLazyLoadedBlocks(t *testing.T) { }, persistInterval*5, persistInterval/2) } -type staticLoadedBlocks map[ulid.ULID]int64 +type staticLoadedBlocks map[ulid.ULID]struct{} -func (b staticLoadedBlocks) LoadedBlocks() map[ulid.ULID]int64 { +func (b staticLoadedBlocks) LoadedBlocks() map[ulid.ULID]struct{} { return b } diff --git a/pkg/storegateway/indexheader/reader_pool.go b/pkg/storegateway/indexheader/reader_pool.go index ea2cdd05eca..4e00ec7cb93 100644 --- a/pkg/storegateway/indexheader/reader_pool.go +++ b/pkg/storegateway/indexheader/reader_pool.go @@ -152,16 +152,15 @@ func (p *ReaderPool) onLazyReaderClosed(r *LazyBinaryReader) { delete(p.lazyReaders, r) } -// LoadedBlocks returns a new map of lazy-loaded block IDs and the last time they were used in milliseconds. -func (p *ReaderPool) LoadedBlocks() map[ulid.ULID]int64 { +func (p *ReaderPool) LoadedBlocks() map[ulid.ULID]struct{} { p.lazyReadersMx.Lock() defer p.lazyReadersMx.Unlock() - blocks := make(map[ulid.ULID]int64, len(p.lazyReaders)) + blocks := make(map[ulid.ULID]struct{}, len(p.lazyReaders)) for r := range p.lazyReaders { usedAt := r.usedAt.Load() if usedAt != 0 { - blocks[r.blockID] = usedAt / int64(time.Millisecond) + blocks[r.blockID] = struct{}{} } } diff --git a/pkg/storegateway/indexheader/reader_pool_test.go b/pkg/storegateway/indexheader/reader_pool_test.go index 1411588e65b..52d2f8c89ec 100644 --- a/pkg/storegateway/indexheader/reader_pool_test.go +++ b/pkg/storegateway/indexheader/reader_pool_test.go @@ -138,7 +138,7 @@ func TestReaderPool_LoadedBlocks(t *testing.T) { lazyReaderEnabled: true, lazyReaders: map[*LazyBinaryReader]struct{}{&lb: {}}, } - require.Equal(t, map[ulid.ULID]int64{id: usedAt.UnixMilli()}, rp.LoadedBlocks()) + require.Equal(t, map[ulid.ULID]struct{}{id: {}}, rp.LoadedBlocks()) } func prepareReaderPool(t *testing.T) (context.Context, string, *filesystem.Bucket, ulid.ULID, *ReaderPoolMetrics) { diff --git a/pkg/storegateway/indexheader/snapshotter.go b/pkg/storegateway/indexheader/snapshotter.go index d3efc3ffd89..24b7c4eb6c5 100644 --- a/pkg/storegateway/indexheader/snapshotter.go +++ b/pkg/storegateway/indexheader/snapshotter.go @@ -5,6 +5,7 @@ package indexheader import ( "bytes" "context" + "crypto/sha256" "encoding/json" "fmt" "os" @@ -37,6 +38,11 @@ type Snapshotter struct { conf SnapshotterConfig bl BlocksLoader + + // lastChecksum stores the checksum of the last persisted JSON data + // to avoid writing the same data repeatedly, reducing IOPS. + // This is useful when running with many tenants on low-performance disks. + lastChecksum [sha256.Size]byte } func NewSnapshotter(logger log.Logger, conf SnapshotterConfig, bl BlocksLoader) *Snapshotter { @@ -50,7 +56,7 @@ func NewSnapshotter(logger log.Logger, conf SnapshotterConfig, bl BlocksLoader) } type BlocksLoader interface { - LoadedBlocks() map[ulid.ULID]int64 + LoadedBlocks() map[ulid.ULID]struct{} } func (s *Snapshotter) persist(context.Context) error { @@ -64,20 +70,42 @@ func (s *Snapshotter) persist(context.Context) error { } func (s *Snapshotter) PersistLoadedBlocks() error { + loadedBlocks := s.bl.LoadedBlocks() + + // Convert to the format we store on disk for backward compatibility + indexHeaderLastUsedTime := make(map[ulid.ULID]int64, len(loadedBlocks)) + for blockID := range loadedBlocks { + // We use a constant timestamp since we no longer care about the actual timestamp + indexHeaderLastUsedTime[blockID] = 1 + } + snapshot := &indexHeadersSnapshot{ - IndexHeaderLastUsedTime: s.bl.LoadedBlocks(), + IndexHeaderLastUsedTime: indexHeaderLastUsedTime, UserID: s.conf.UserID, } + data, err := json.Marshal(snapshot) if err != nil { return err } + // The json marshalling is deterministic, so the checksum will be the same for the same map contents. + checksum := sha256.Sum256(data) + if checksum == s.lastChecksum { + level.Debug(s.logger).Log("msg", "skipping persistence of index headers snapshot as data hasn't changed", "user", s.conf.UserID) + return nil + } + finalPath := filepath.Join(s.conf.Path, lazyLoadedHeadersListFileName) - return atomicfs.CreateFile(finalPath, bytes.NewReader(data)) + err = atomicfs.CreateFile(finalPath, bytes.NewReader(data)) + if err == nil { + // Only update the checksum if the write was successful + s.lastChecksum = checksum + } + return err } -func RestoreLoadedBlocks(directory string) (map[ulid.ULID]int64, error) { +func RestoreLoadedBlocks(directory string) (map[ulid.ULID]struct{}, error) { var ( snapshot indexHeadersSnapshot multiErr = multierror.MultiError{} @@ -99,11 +127,20 @@ func RestoreLoadedBlocks(directory string) (map[ulid.ULID]int64, error) { multiErr.Add(fmt.Errorf("removing the lazy-loaded index-header snapshot: %w", err)) } } - return snapshot.IndexHeaderLastUsedTime, multiErr.Err() + + // Snapshots used to be stored with their last-used timestamp. But that wasn't used and lead to constant file churn, so we removed it. + result := make(map[ulid.ULID]struct{}, len(snapshot.IndexHeaderLastUsedTime)) + for blockID := range snapshot.IndexHeaderLastUsedTime { + result[blockID] = struct{}{} + } + + return result, multiErr.Err() } type indexHeadersSnapshot struct { - // IndexHeaderLastUsedTime is map of index header ulid.ULID to timestamp in millisecond. + // IndexHeaderLastUsedTime is map of index header ulid.ULID to the number 1. + // The number used to be the last-used timestamp of each block. + // We keep this format for backward compatibility, but we no longer care about the timestamps. IndexHeaderLastUsedTime map[ulid.ULID]int64 `json:"index_header_last_used_time"` UserID string `json:"user_id"` } diff --git a/pkg/storegateway/indexheader/snapshotter_test.go b/pkg/storegateway/indexheader/snapshotter_test.go index 826c48dbbe5..37070c8e6f7 100644 --- a/pkg/storegateway/indexheader/snapshotter_test.go +++ b/pkg/storegateway/indexheader/snapshotter_test.go @@ -20,13 +20,11 @@ import ( func TestSnapshotter_PersistAndRestoreLoadedBlocks(t *testing.T) { tmpDir := t.TempDir() - usedAt := time.Now() testBlockID := ulid.MustNew(ulid.Now(), rand.Reader) - origBlocks := map[ulid.ULID]int64{ - testBlockID: usedAt.UnixMilli(), - } - testBlocksLoader := testBlocksLoaderFunc(func() map[ulid.ULID]int64 { return origBlocks }) + origBlocks := map[ulid.ULID]struct{}{testBlockID: {}} + + testBlocksLoader := testBlocksLoaderFunc(func() map[ulid.ULID]struct{} { return origBlocks }) config := SnapshotterConfig{ Path: tmpDir, @@ -42,7 +40,7 @@ func TestSnapshotter_PersistAndRestoreLoadedBlocks(t *testing.T) { data, err := os.ReadFile(persistedFile) require.NoError(t, err) - expected := fmt.Sprintf(`{"index_header_last_used_time":{"%s":%d},"user_id":"anonymous"}`, testBlockID, usedAt.UnixMilli()) + expected := fmt.Sprintf(`{"index_header_last_used_time":{"%s":1},"user_id":"anonymous"}`, testBlockID) require.JSONEq(t, expected, string(data)) restoredBlocks, err := RestoreLoadedBlocks(config.Path) @@ -50,14 +48,98 @@ func TestSnapshotter_PersistAndRestoreLoadedBlocks(t *testing.T) { require.NoError(t, err) } +func TestSnapshotter_ChecksumOptimization(t *testing.T) { + tmpDir := t.TempDir() + + firstBlockID := ulid.MustNew(ulid.Now(), rand.Reader) + + origBlocks := map[ulid.ULID]struct{}{ + firstBlockID: {}, + } + testBlocksLoader := testBlocksLoaderFunc(func() map[ulid.ULID]struct{} { return origBlocks }) + + config := SnapshotterConfig{ + Path: tmpDir, + UserID: "anonymous", + } + + // Create snapshotter and persist data + s := NewSnapshotter(log.NewNopLogger(), config, testBlocksLoader) + + err := s.PersistLoadedBlocks() + require.NoError(t, err) + + // Verify the content of the file using RestoreLoadedBlocks + restoredBlocks, err := RestoreLoadedBlocks(config.Path) + require.NoError(t, err) + require.Equal(t, origBlocks, restoredBlocks, "Restored blocks should match original blocks") + + // Get file info after first write + persistedFile := filepath.Join(tmpDir, lazyLoadedHeadersListFileName) + firstStat, err := os.Stat(persistedFile) + require.NoError(t, err) + firstModTime := firstStat.ModTime() + + // Wait a moment to ensure modification time would be different if file is written + time.Sleep(10 * time.Millisecond) + + // Call persist again with the same data + err = s.PersistLoadedBlocks() + require.NoError(t, err) + + // Get file info after second write attempt + secondStat, err := os.Stat(persistedFile) + require.NoError(t, err) + secondModTime := secondStat.ModTime() + + // File should not have been modified since the data hasn't changed + require.Equal(t, firstModTime, secondModTime, "File was modified even though data hasn't changed") + + // Verify the content has not changed using RestoreLoadedBlocks + restoredBlocksAfterSecondPersist, err := RestoreLoadedBlocks(config.Path) + require.NoError(t, err) + require.Equal(t, origBlocks, restoredBlocksAfterSecondPersist, "Restored blocks should match original blocks") + + // Now change the data and persist again + secondBlockID := ulid.MustNew(ulid.Now(), rand.Reader) + newBlocks := map[ulid.ULID]struct{}{ + firstBlockID: {}, + secondBlockID: {}, + } + + // Create a new loader with updated data + updatedBlocksLoader := testBlocksLoaderFunc(func() map[ulid.ULID]struct{} { return newBlocks }) + s.bl = updatedBlocksLoader + + // Wait a moment to ensure modification time would be different if file is written + time.Sleep(10 * time.Millisecond) + + // Persist the new data + err = s.PersistLoadedBlocks() + require.NoError(t, err) + + // Get file info after third write + thirdStat, err := os.Stat(persistedFile) + require.NoError(t, err) + thirdModTime := thirdStat.ModTime() + + // File should have been modified since the data has changed + require.NotEqual(t, secondModTime, thirdModTime, "File was not modified even though data has changed") + + // Verify the content has changed using RestoreLoadedBlocks + restoredBlocksAfterThirdPersist, err := RestoreLoadedBlocks(config.Path) + require.NoError(t, err) + require.Equal(t, newBlocks, restoredBlocksAfterThirdPersist, "Restored blocks should match new blocks") +} + func TestSnapshotter_StartStop(t *testing.T) { t.Run("stop after start", func(t *testing.T) { tmpDir := t.TempDir() - testBlocksLoader := testBlocksLoaderFunc(func() map[ulid.ULID]int64 { + testBlocksLoader := testBlocksLoaderFunc(func() map[ulid.ULID]struct{} { // We don't care about the content of the index header in this test. - return map[ulid.ULID]int64{ - ulid.MustNew(ulid.Now(), rand.Reader): time.Now().UnixMilli(), + return map[ulid.ULID]struct{}{ + ulid.MustNew(ulid.Now(), rand.Reader): {}, } }) @@ -78,8 +160,8 @@ func TestSnapshotter_StartStop(t *testing.T) { }) } -type testBlocksLoaderFunc func() map[ulid.ULID]int64 +type testBlocksLoaderFunc func() map[ulid.ULID]struct{} -func (f testBlocksLoaderFunc) LoadedBlocks() map[ulid.ULID]int64 { +func (f testBlocksLoaderFunc) LoadedBlocks() map[ulid.ULID]struct{} { return f() }