Skip to content

Commit

Permalink
fix(blooms): embeddedcache size panic (grafana#12130)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored and rhnasc committed Apr 12, 2024
1 parent 4a4ab25 commit dc318b4
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 9 deletions.
3 changes: 2 additions & 1 deletion pkg/storage/chunk/cache/embeddedcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,12 @@ func (c *EmbeddedCache[K, V]) GetCacheType() stats.CacheType {

func (c *EmbeddedCache[K, V]) remove(key K, element *list.Element, reason string) {
entry := c.lru.Remove(element).(*Entry[K, V])
sz := c.cacheEntrySizeCalculator(entry)
delete(c.entries, key)
if c.onEntryRemoved != nil {
c.onEntryRemoved(entry.Key, entry.Value)
}
c.currSizeBytes -= c.cacheEntrySizeCalculator(entry)
c.currSizeBytes -= sz
c.entriesCurrent.Dec()
c.entriesEvicted.WithLabelValues(reason).Inc()
}
Expand Down
32 changes: 25 additions & 7 deletions pkg/storage/stores/shipper/bloomshipper/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
"fmt"
"io/fs"
"os"
"path"
"path/filepath"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"

Expand Down Expand Up @@ -93,15 +93,20 @@ func calculateBlockDirectorySize(entry *cache.Entry[string, BlockDirectory]) uin
return uint64(entry.Value.Size())
}

// NewBlockDirectory creates a new BlockDirectory. Must exist on disk.
func NewBlockDirectory(ref BlockRef, path string, logger log.Logger) BlockDirectory {
return BlockDirectory{
bd := BlockDirectory{
BlockRef: ref,
Path: path,
refCount: atomic.NewInt32(0),
removeDirectoryTimeout: time.Minute,
logger: logger,
activeQueriersCheckInterval: defaultActiveQueriersCheckInterval,
}
if err := bd.resolveSize(); err != nil {
panic(err)
}
return bd
}

// A BlockDirectory is a local file path that contains a bloom block.
Expand All @@ -113,12 +118,17 @@ type BlockDirectory struct {
refCount *atomic.Int32
logger log.Logger
activeQueriersCheckInterval time.Duration
size int64
}

func (b BlockDirectory) Block() *v1.Block {
return v1.NewBlock(v1.NewDirectoryBlockReader(b.Path))
}

func (b BlockDirectory) Size() int64 {
return b.size
}

func (b BlockDirectory) Acquire() {
_ = b.refCount.Inc()
}
Expand All @@ -128,11 +138,19 @@ func (b BlockDirectory) Release() error {
return nil
}

func (b BlockDirectory) Size() int64 {
// TODO(chaudum): Reduce syscalls by storing the size on the block directory struct
bloomFileStats, _ := os.Lstat(path.Join(b.Path, v1.BloomFileName))
seriesFileStats, _ := os.Lstat(path.Join(b.Path, v1.SeriesFileName))
return bloomFileStats.Size() + seriesFileStats.Size()
func (b *BlockDirectory) resolveSize() error {
bloomPath := filepath.Join(b.Path, v1.BloomFileName)
bloomFileStats, err := os.Lstat(bloomPath)
if err != nil {
return errors.Wrapf(err, "failed to stat bloom file (%s)", bloomPath)
}
seriesPath := filepath.Join(b.Path, v1.SeriesFileName)
seriesFileStats, err := os.Lstat(seriesPath)
if err != nil {
return errors.Wrapf(err, "failed to stat series file (%s)", seriesPath)
}
b.size = (bloomFileStats.Size() + seriesFileStats.Size())
return nil
}

// BlockQuerier returns a new block querier from the directory.
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/stores/shipper/bloomshipper/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.uber.org/atomic"

"github.com/grafana/loki/pkg/logqlmodel/stats"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)

type mockCache[K comparable, V any] struct {
Expand Down Expand Up @@ -106,7 +107,11 @@ func TestBlockDirectory_Cleanup(t *testing.T) {
}

func Test_ClosableBlockQuerier(t *testing.T) {
blockDir := NewBlockDirectory(BlockRef{}, t.TempDir(), log.NewNopLogger())
tmpDir := t.TempDir()
// create the expected files so size initialization doesn't panic
require.NoError(t, os.WriteFile(filepath.Join(tmpDir, v1.BloomFileName), []byte("bloom"), 0o644))
require.NoError(t, os.WriteFile(filepath.Join(tmpDir, v1.SeriesFileName), []byte("series"), 0o644))
blockDir := NewBlockDirectory(BlockRef{}, tmpDir, log.NewNopLogger())

querier := blockDir.BlockQuerier()
require.Equal(t, int32(1), blockDir.refCount.Load())
Expand Down

0 comments on commit dc318b4

Please sign in to comment.