Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store-gateway: avoid unnecessarily writing lazy-loaded blocks snapshot #10740

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (s *BucketStore) InitialSync(ctx context.Context) error {
return nil
}

func (s *BucketStore) tryRestoreLoadedBlocksSet() map[ulid.ULID]int64 {
func (s *BucketStore) tryRestoreLoadedBlocksSet() map[ulid.ULID]struct{} {
previouslyLoadedBlocks, err := indexheader.RestoreLoadedBlocks(s.dir)
if err != nil {
level.Warn(s.logger).Log(
Expand All @@ -397,7 +397,7 @@ func (s *BucketStore) tryRestoreLoadedBlocksSet() map[ulid.ULID]int64 {
return previouslyLoadedBlocks
}

func (s *BucketStore) loadBlocks(ctx context.Context, blocks map[ulid.ULID]int64) {
func (s *BucketStore) loadBlocks(ctx context.Context, blocks map[ulid.ULID]struct{}) {
// This is not happening during a request so we can ignore the stats.
ignoredStats := newSafeQueryStats()
// We ignore the time the block was used because it can only be in the map if it was still loaded before the shutdown
Expand Down
30 changes: 15 additions & 15 deletions pkg/storegateway/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,49 +687,49 @@ func TestBucketStore_EagerLoading(t *testing.T) {
testCases := map[string]struct {
eagerLoadReaderEnabled bool
expectedEagerLoadedBlocks int
createLoadedBlocksSnapshotFn func([]ulid.ULID) map[ulid.ULID]int64
createLoadedBlocksSnapshotFn func([]ulid.ULID) map[ulid.ULID]struct{}
}{
"block is present in pre-shutdown loaded blocks and eager-loading is disabled": {
eagerLoadReaderEnabled: false,
expectedEagerLoadedBlocks: 0,
createLoadedBlocksSnapshotFn: func(blockIDs []ulid.ULID) map[ulid.ULID]int64 {
snapshot := make(map[ulid.ULID]int64)
createLoadedBlocksSnapshotFn: func(blockIDs []ulid.ULID) map[ulid.ULID]struct{} {
snapshot := make(map[ulid.ULID]struct{})
for _, blockID := range blockIDs {
snapshot[blockID] = time.Now().UnixMilli()
snapshot[blockID] = struct{}{}
}
return snapshot
},
},
"block is present in pre-shutdown loaded blocks and eager-loading is enabled, loading index header during initial sync": {
eagerLoadReaderEnabled: true,
expectedEagerLoadedBlocks: 6,
createLoadedBlocksSnapshotFn: func(blockIDs []ulid.ULID) map[ulid.ULID]int64 {
snapshot := make(map[ulid.ULID]int64)
createLoadedBlocksSnapshotFn: func(blockIDs []ulid.ULID) map[ulid.ULID]struct{} {
snapshot := make(map[ulid.ULID]struct{})
for _, blockID := range blockIDs {
snapshot[blockID] = time.Now().UnixMilli()
snapshot[blockID] = struct{}{}
}
return snapshot
},
},
"block is present in pre-shutdown loaded blocks and eager-loading is enabled, loading index header after initial sync": {
eagerLoadReaderEnabled: true,
expectedEagerLoadedBlocks: 6,
createLoadedBlocksSnapshotFn: func(blockIDs []ulid.ULID) map[ulid.ULID]int64 {
snapshot := make(map[ulid.ULID]int64)
createLoadedBlocksSnapshotFn: func(blockIDs []ulid.ULID) map[ulid.ULID]struct{} {
snapshot := make(map[ulid.ULID]struct{})
for _, blockID := range blockIDs {
snapshot[blockID] = time.Now().UnixMilli()
snapshot[blockID] = struct{}{}
}
return snapshot
},
},
"block is not present in pre-shutdown loaded blocks snapshot and eager-loading is enabled": {
eagerLoadReaderEnabled: true,
expectedEagerLoadedBlocks: 0, // although eager loading is enabled, this test will not do eager loading because the block ID is not in the lazy loaded file.
createLoadedBlocksSnapshotFn: func(_ []ulid.ULID) map[ulid.ULID]int64 {
createLoadedBlocksSnapshotFn: func(_ []ulid.ULID) map[ulid.ULID]struct{} {
// let's create a random fake blockID to be stored in lazy loaded headers file
fakeBlockID := ulid.MustNew(ulid.Now(), nil)
// this snapshot will refer to fake block, hence eager load wouldn't be executed for the real block that we test
return map[ulid.ULID]int64{fakeBlockID: time.Now().UnixMilli()}
return map[ulid.ULID]struct{}{fakeBlockID: {}}
},
},
"pre-shutdown loaded blocks snapshot doesn't exist and eager-loading is enabled": {
Expand Down Expand Up @@ -794,7 +794,7 @@ func TestBucketStore_PersistsLazyLoadedBlocks(t *testing.T) {
cfg.bucketStoreConfig.IndexHeader.EagerLoadingStartupEnabled = true
cfg.bucketStoreConfig.IndexHeader.LazyLoadingIdleTimeout = persistInterval * 3
ctx := context.Background()
readBlocksInSnapshot := func() map[ulid.ULID]int64 {
readBlocksInSnapshot := func() map[ulid.ULID]struct{} {
blocks, err := indexheader.RestoreLoadedBlocks(cfg.tempDir)
assert.NoError(t, err)
return blocks
Expand Down Expand Up @@ -825,9 +825,9 @@ func TestBucketStore_PersistsLazyLoadedBlocks(t *testing.T) {
}, persistInterval*5, persistInterval/2)
}

type staticLoadedBlocks map[ulid.ULID]int64
type staticLoadedBlocks map[ulid.ULID]struct{}

func (b staticLoadedBlocks) LoadedBlocks() map[ulid.ULID]int64 {
func (b staticLoadedBlocks) LoadedBlocks() map[ulid.ULID]struct{} {
return b
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/storegateway/indexheader/reader_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,15 @@ func (p *ReaderPool) onLazyReaderClosed(r *LazyBinaryReader) {
delete(p.lazyReaders, r)
}

// LoadedBlocks returns a new map of lazy-loaded block IDs and the last time they were used in milliseconds.
func (p *ReaderPool) LoadedBlocks() map[ulid.ULID]int64 {
func (p *ReaderPool) LoadedBlocks() map[ulid.ULID]struct{} {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just return a slice here? The only place that calls this is Snapshotter.PersistLoadedBlocks, and it doesn't need a map.

p.lazyReadersMx.Lock()
defer p.lazyReadersMx.Unlock()

blocks := make(map[ulid.ULID]int64, len(p.lazyReaders))
blocks := make(map[ulid.ULID]struct{}, len(p.lazyReaders))
for r := range p.lazyReaders {
usedAt := r.usedAt.Load()
if usedAt != 0 {
blocks[r.blockID] = usedAt / int64(time.Millisecond)
blocks[r.blockID] = struct{}{}
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/indexheader/reader_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestReaderPool_LoadedBlocks(t *testing.T) {
lazyReaderEnabled: true,
lazyReaders: map[*LazyBinaryReader]struct{}{&lb: {}},
}
require.Equal(t, map[ulid.ULID]int64{id: usedAt.UnixMilli()}, rp.LoadedBlocks())
require.Equal(t, map[ulid.ULID]struct{}{id: {}}, rp.LoadedBlocks())
}

func prepareReaderPool(t *testing.T) (context.Context, string, *filesystem.Bucket, ulid.ULID, *ReaderPoolMetrics) {
Expand Down
49 changes: 43 additions & 6 deletions pkg/storegateway/indexheader/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package indexheader
import (
"bytes"
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"os"
Expand Down Expand Up @@ -37,6 +38,11 @@ type Snapshotter struct {
conf SnapshotterConfig

bl BlocksLoader

// lastChecksum stores the checksum of the last persisted JSON data
// to avoid writing the same data repeatedly, reducing IOPS.
// This is useful when running with many tenants on low-performance disks.
lastChecksum [sha256.Size]byte
}

func NewSnapshotter(logger log.Logger, conf SnapshotterConfig, bl BlocksLoader) *Snapshotter {
Expand All @@ -50,7 +56,7 @@ func NewSnapshotter(logger log.Logger, conf SnapshotterConfig, bl BlocksLoader)
}

type BlocksLoader interface {
LoadedBlocks() map[ulid.ULID]int64
LoadedBlocks() map[ulid.ULID]struct{}
}

func (s *Snapshotter) persist(context.Context) error {
Expand All @@ -64,20 +70,42 @@ func (s *Snapshotter) persist(context.Context) error {
}

func (s *Snapshotter) PersistLoadedBlocks() error {
loadedBlocks := s.bl.LoadedBlocks()

// Convert to the format we store on disk for backward compatibility
indexHeaderLastUsedTime := make(map[ulid.ULID]int64, len(loadedBlocks))
for blockID := range loadedBlocks {
// We use a constant timestamp since we no longer care about the actual timestamp
indexHeaderLastUsedTime[blockID] = 1
}

snapshot := &indexHeadersSnapshot{
IndexHeaderLastUsedTime: s.bl.LoadedBlocks(),
IndexHeaderLastUsedTime: indexHeaderLastUsedTime,
UserID: s.conf.UserID,
}

data, err := json.Marshal(snapshot)
if err != nil {
return err
}

// The json marshalling is deterministic, so the checksum will be the same for the same map contents.
checksum := sha256.Sum256(data)
if checksum == s.lastChecksum {
level.Debug(s.logger).Log("msg", "skipping persistence of index headers snapshot as data hasn't changed", "user", s.conf.UserID)
return nil
}

finalPath := filepath.Join(s.conf.Path, lazyLoadedHeadersListFileName)
return atomicfs.CreateFile(finalPath, bytes.NewReader(data))
err = atomicfs.CreateFile(finalPath, bytes.NewReader(data))
if err == nil {
// Only update the checksum if the write was successful
s.lastChecksum = checksum
}
return err
}

func RestoreLoadedBlocks(directory string) (map[ulid.ULID]int64, error) {
func RestoreLoadedBlocks(directory string) (map[ulid.ULID]struct{}, error) {
var (
snapshot indexHeadersSnapshot
multiErr = multierror.MultiError{}
Expand All @@ -99,11 +127,20 @@ func RestoreLoadedBlocks(directory string) (map[ulid.ULID]int64, error) {
multiErr.Add(fmt.Errorf("removing the lazy-loaded index-header snapshot: %w", err))
}
}
return snapshot.IndexHeaderLastUsedTime, multiErr.Err()

// Snapshots used to be stored with their last-used timestamp. But that wasn't used and lead to constant file churn, so we removed it.
result := make(map[ulid.ULID]struct{}, len(snapshot.IndexHeaderLastUsedTime))
for blockID := range snapshot.IndexHeaderLastUsedTime {
result[blockID] = struct{}{}
}

return result, multiErr.Err()
}

type indexHeadersSnapshot struct {
// IndexHeaderLastUsedTime is map of index header ulid.ULID to timestamp in millisecond.
// IndexHeaderLastUsedTime is map of index header ulid.ULID to the number 1.
// The number used to be the last-used timestamp of each block.
// We keep this format for backward compatibility, but we no longer care about the timestamps.
IndexHeaderLastUsedTime map[ulid.ULID]int64 `json:"index_header_last_used_time"`
UserID string `json:"user_id"`
}
Expand Down
104 changes: 93 additions & 11 deletions pkg/storegateway/indexheader/snapshotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ import (
func TestSnapshotter_PersistAndRestoreLoadedBlocks(t *testing.T) {
tmpDir := t.TempDir()

usedAt := time.Now()
testBlockID := ulid.MustNew(ulid.Now(), rand.Reader)

origBlocks := map[ulid.ULID]int64{
testBlockID: usedAt.UnixMilli(),
}
testBlocksLoader := testBlocksLoaderFunc(func() map[ulid.ULID]int64 { return origBlocks })
origBlocks := map[ulid.ULID]struct{}{testBlockID: {}}

testBlocksLoader := testBlocksLoaderFunc(func() map[ulid.ULID]struct{} { return origBlocks })

config := SnapshotterConfig{
Path: tmpDir,
Expand All @@ -42,22 +40,106 @@ func TestSnapshotter_PersistAndRestoreLoadedBlocks(t *testing.T) {
data, err := os.ReadFile(persistedFile)
require.NoError(t, err)

expected := fmt.Sprintf(`{"index_header_last_used_time":{"%s":%d},"user_id":"anonymous"}`, testBlockID, usedAt.UnixMilli())
expected := fmt.Sprintf(`{"index_header_last_used_time":{"%s":1},"user_id":"anonymous"}`, testBlockID)
require.JSONEq(t, expected, string(data))

restoredBlocks, err := RestoreLoadedBlocks(config.Path)
require.Equal(t, origBlocks, restoredBlocks)
require.NoError(t, err)
}

func TestSnapshotter_ChecksumOptimization(t *testing.T) {
tmpDir := t.TempDir()

firstBlockID := ulid.MustNew(ulid.Now(), rand.Reader)

origBlocks := map[ulid.ULID]struct{}{
firstBlockID: {},
}
testBlocksLoader := testBlocksLoaderFunc(func() map[ulid.ULID]struct{} { return origBlocks })

config := SnapshotterConfig{
Path: tmpDir,
UserID: "anonymous",
}

// Create snapshotter and persist data
s := NewSnapshotter(log.NewNopLogger(), config, testBlocksLoader)

err := s.PersistLoadedBlocks()
require.NoError(t, err)

// Verify the content of the file using RestoreLoadedBlocks
restoredBlocks, err := RestoreLoadedBlocks(config.Path)
require.NoError(t, err)
require.Equal(t, origBlocks, restoredBlocks, "Restored blocks should match original blocks")

// Get file info after first write
persistedFile := filepath.Join(tmpDir, lazyLoadedHeadersListFileName)
firstStat, err := os.Stat(persistedFile)
require.NoError(t, err)
firstModTime := firstStat.ModTime()

// Wait a moment to ensure modification time would be different if file is written
time.Sleep(10 * time.Millisecond)

// Call persist again with the same data
err = s.PersistLoadedBlocks()
require.NoError(t, err)

// Get file info after second write attempt
secondStat, err := os.Stat(persistedFile)
require.NoError(t, err)
secondModTime := secondStat.ModTime()

// File should not have been modified since the data hasn't changed
require.Equal(t, firstModTime, secondModTime, "File was modified even though data hasn't changed")

// Verify the content has not changed using RestoreLoadedBlocks
restoredBlocksAfterSecondPersist, err := RestoreLoadedBlocks(config.Path)
require.NoError(t, err)
require.Equal(t, origBlocks, restoredBlocksAfterSecondPersist, "Restored blocks should match original blocks")

// Now change the data and persist again
secondBlockID := ulid.MustNew(ulid.Now(), rand.Reader)
newBlocks := map[ulid.ULID]struct{}{
firstBlockID: {},
secondBlockID: {},
}

// Create a new loader with updated data
updatedBlocksLoader := testBlocksLoaderFunc(func() map[ulid.ULID]struct{} { return newBlocks })
s.bl = updatedBlocksLoader

// Wait a moment to ensure modification time would be different if file is written
time.Sleep(10 * time.Millisecond)

// Persist the new data
err = s.PersistLoadedBlocks()
require.NoError(t, err)

// Get file info after third write
thirdStat, err := os.Stat(persistedFile)
require.NoError(t, err)
thirdModTime := thirdStat.ModTime()

// File should have been modified since the data has changed
require.NotEqual(t, secondModTime, thirdModTime, "File was not modified even though data has changed")

// Verify the content has changed using RestoreLoadedBlocks
restoredBlocksAfterThirdPersist, err := RestoreLoadedBlocks(config.Path)
require.NoError(t, err)
require.Equal(t, newBlocks, restoredBlocksAfterThirdPersist, "Restored blocks should match new blocks")
}

func TestSnapshotter_StartStop(t *testing.T) {
t.Run("stop after start", func(t *testing.T) {
tmpDir := t.TempDir()

testBlocksLoader := testBlocksLoaderFunc(func() map[ulid.ULID]int64 {
testBlocksLoader := testBlocksLoaderFunc(func() map[ulid.ULID]struct{} {
// We don't care about the content of the index header in this test.
return map[ulid.ULID]int64{
ulid.MustNew(ulid.Now(), rand.Reader): time.Now().UnixMilli(),
return map[ulid.ULID]struct{}{
ulid.MustNew(ulid.Now(), rand.Reader): {},
}
})

Expand All @@ -78,8 +160,8 @@ func TestSnapshotter_StartStop(t *testing.T) {
})
}

type testBlocksLoaderFunc func() map[ulid.ULID]int64
type testBlocksLoaderFunc func() map[ulid.ULID]struct{}

func (f testBlocksLoaderFunc) LoadedBlocks() map[ulid.ULID]int64 {
func (f testBlocksLoaderFunc) LoadedBlocks() map[ulid.ULID]struct{} {
return f()
}
Loading