Skip to content

Commit

Permalink
use [32]byte keys in the filesystem cache (#13885)
Browse files Browse the repository at this point in the history
Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
  • Loading branch information
kasey and kasey authored Apr 25, 2024
1 parent a9862f3 commit 8d9024f
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 34 deletions.
2 changes: 2 additions & 0 deletions beacon-chain/db/filesystem/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ go_library(
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/primitives:go_default_library",
"//encoding/bytesutil:go_default_library",
"//io/file:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime/logging:go_default_library",
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
Expand Down
11 changes: 11 additions & 0 deletions beacon-chain/db/filesystem/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (
"strings"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/io/file"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/logging"
Expand All @@ -27,6 +29,7 @@ var (
errEmptyBlobWritten = errors.New("zero bytes written to disk when saving blob sidecar")
errSidecarEmptySSZData = errors.New("sidecar marshalled to an empty ssz byte slice")
errNoBasePath = errors.New("BlobStorage base path not specified in init")
errInvalidRootString = errors.New("Could not parse hex string as a [32]byte")
)

const (
Expand Down Expand Up @@ -333,3 +336,11 @@ func (p blobNamer) path() string {
func rootString(root [32]byte) string {
return fmt.Sprintf("%#x", root)
}

func stringToRoot(str string) ([32]byte, error) {
slice, err := hexutil.Decode(str)
if err != nil {
return [32]byte{}, errors.Wrapf(errInvalidRootString, "input=%s", str)
}
return bytesutil.ToBytes32(slice), nil
}
13 changes: 6 additions & 7 deletions beacon-chain/db/filesystem/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,26 @@ type BlobStorageSummarizer interface {
type blobStorageCache struct {
mu sync.RWMutex
nBlobs float64
cache map[string]BlobStorageSummary
cache map[[32]byte]BlobStorageSummary
}

var _ BlobStorageSummarizer = &blobStorageCache{}

func newBlobStorageCache() *blobStorageCache {
return &blobStorageCache{
cache: make(map[string]BlobStorageSummary, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest*fieldparams.SlotsPerEpoch),
cache: make(map[[32]byte]BlobStorageSummary, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest*fieldparams.SlotsPerEpoch),
}
}

// Summary returns the BlobStorageSummary for `root`. The BlobStorageSummary can be used to check for the presence of
// BlobSidecars based on Index.
func (s *blobStorageCache) Summary(root [32]byte) BlobStorageSummary {
k := rootString(root)
s.mu.RLock()
defer s.mu.RUnlock()
return s.cache[k]
return s.cache[root]
}

func (s *blobStorageCache) ensure(key string, slot primitives.Slot, idx uint64) error {
func (s *blobStorageCache) ensure(key [32]byte, slot primitives.Slot, idx uint64) error {
if idx >= fieldparams.MaxBlobsPerBlock {
return errIndexOutOfBounds
}
Expand All @@ -84,7 +83,7 @@ func (s *blobStorageCache) ensure(key string, slot primitives.Slot, idx uint64)
return nil
}

func (s *blobStorageCache) slot(key string) (primitives.Slot, bool) {
func (s *blobStorageCache) slot(key [32]byte) (primitives.Slot, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
v, ok := s.cache[key]
Expand All @@ -94,7 +93,7 @@ func (s *blobStorageCache) slot(key string) (primitives.Slot, bool) {
return v.slot, ok
}

func (s *blobStorageCache) evict(key string) {
func (s *blobStorageCache) evict(key [32]byte) {
var deleted float64
s.mu.Lock()
v, ok := s.cache[key]
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/db/filesystem/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestSlotByRoot_Summary(t *testing.T) {
sc := newBlobStorageCache()
for _, c := range cases {
if c.expected != nil {
key := rootString(bytesutil.ToBytes32([]byte(c.name)))
key := bytesutil.ToBytes32([]byte(c.name))
sc.cache[key] = BlobStorageSummary{slot: 0, mask: *c.expected}
}
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/db/filesystem/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewMockBlobStorageSummarizer(t *testing.T, set map[[32]byte][]int) BlobStor
c := newBlobStorageCache()
for k, v := range set {
for i := range v {
if err := c.ensure(rootString(k), 0, uint64(v[i])); err != nil {
if err := c.ensure(k, 0, uint64(v[i])); err != nil {
t.Fatal(err)
}
}
Expand Down
18 changes: 13 additions & 5 deletions beacon-chain/db/filesystem/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func newBlobPruner(fs afero.Fs, retain primitives.Epoch, opts ...prunerOpt) (*bl
// notify updates the pruner's view of root->blob mappings. This allows the pruner to build a cache
// of root->slot mappings and decide when to evict old blobs based on the age of present blobs.
func (p *blobPruner) notify(root [32]byte, latest primitives.Slot, idx uint64) error {
if err := p.cache.ensure(rootString(root), latest, idx); err != nil {
if err := p.cache.ensure(root, latest, idx); err != nil {
return err
}
pruned := uint64(windowMin(latest, p.windowSize))
Expand Down Expand Up @@ -160,7 +160,10 @@ func shouldRetain(slot, pruneBefore primitives.Slot) bool {
}

func (p *blobPruner) tryPruneDir(dir string, pruneBefore primitives.Slot) (int, error) {
root := rootFromDir(dir)
root, err := rootFromDir(dir)
if err != nil {
return 0, errors.Wrapf(err, "invalid directory, could not parse subdir as root %s", dir)
}
slot, slotCached := p.cache.slot(root)
// Return early if the slot is cached and doesn't need pruning.
if slotCached && shouldRetain(slot, pruneBefore) {
Expand Down Expand Up @@ -218,7 +221,7 @@ func (p *blobPruner) tryPruneDir(dir string, pruneBefore primitives.Slot) (int,
return removed, errors.Wrapf(err, "unable to remove blob directory %s", dir)
}

p.cache.evict(rootFromDir(dir))
p.cache.evict(root)
return len(scFiles), nil
}

Expand All @@ -235,8 +238,13 @@ func idxFromPath(fname string) (uint64, error) {
return strconv.ParseUint(parts[0], 10, 64)
}

func rootFromDir(dir string) string {
return filepath.Base(dir) // end of the path should be the blob directory, named by hex encoding of root
func rootFromDir(dir string) ([32]byte, error) {
subdir := filepath.Base(dir) // end of the path should be the blob directory, named by hex encoding of root
root, err := stringToRoot(subdir)
if err != nil {
return root, errors.Wrapf(err, "invalid directory, could not parse subdir as root %s", dir)
}
return root, nil
}

// Read slot from marshaled BlobSidecar data in the given file. See slotFromBlob for details.
Expand Down
85 changes: 65 additions & 20 deletions beacon-chain/db/filesystem/pruner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ func TestTryPruneDir_CachedNotExpired(t *testing.T) {
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, slot, fieldparams.MaxBlobsPerBlock)
sc, err := verification.BlobSidecarNoop(sidecars[0])
require.NoError(t, err)
root := fmt.Sprintf("%#x", sc.BlockRoot())
rootStr := rootString(sc.BlockRoot())
// This slot is right on the edge of what would need to be pruned, so by adding it to the cache and
// skipping any other test setup, we can be certain the hot cache path never touches the filesystem.
require.NoError(t, pr.cache.ensure(root, sc.Slot(), 0))
pruned, err := pr.tryPruneDir(root, pr.windowSize)
require.NoError(t, pr.cache.ensure(sc.BlockRoot(), sc.Slot(), 0))
pruned, err := pr.tryPruneDir(rootStr, pr.windowSize)
require.NoError(t, err)
require.Equal(t, 0, pruned)
}
Expand All @@ -43,10 +43,10 @@ func TestTryPruneDir_CachedExpired(t *testing.T) {
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, slot, 1)
sc, err := verification.BlobSidecarNoop(sidecars[0])
require.NoError(t, err)
root := fmt.Sprintf("%#x", sc.BlockRoot())
require.NoError(t, fs.Mkdir(root, directoryPermissions)) // make empty directory
require.NoError(t, pr.cache.ensure(root, sc.Slot(), 0))
pruned, err := pr.tryPruneDir(root, slot+1)
rootStr := rootString(sc.BlockRoot())
require.NoError(t, fs.Mkdir(rootStr, directoryPermissions)) // make empty directory
require.NoError(t, pr.cache.ensure(sc.BlockRoot(), sc.Slot(), 0))
pruned, err := pr.tryPruneDir(rootStr, slot+1)
require.NoError(t, err)
require.Equal(t, 0, pruned)
})
Expand All @@ -61,20 +61,21 @@ func TestTryPruneDir_CachedExpired(t *testing.T) {
require.NoError(t, bs.Save(scs[1]))

// check that the root->slot is cached
root := fmt.Sprintf("%#x", scs[0].BlockRoot())
cs, cok := bs.pruner.cache.slot(root)
root := scs[0].BlockRoot()
rootStr := rootString(root)
cs, cok := bs.pruner.cache.slot(scs[0].BlockRoot())
require.Equal(t, true, cok)
require.Equal(t, slot, cs)

// ensure that we see the saved files in the filesystem
files, err := listDir(fs, root)
files, err := listDir(fs, rootStr)
require.NoError(t, err)
require.Equal(t, 2, len(files))

pruned, err := bs.pruner.tryPruneDir(root, slot+1)
pruned, err := bs.pruner.tryPruneDir(rootStr, slot+1)
require.NoError(t, err)
require.Equal(t, 2, pruned)
files, err = listDir(fs, root)
files, err = listDir(fs, rootStr)
require.ErrorIs(t, err, os.ErrNotExist)
require.Equal(t, 0, len(files))
})
Expand All @@ -92,7 +93,8 @@ func TestTryPruneDir_SlotFromFile(t *testing.T) {
require.NoError(t, bs.Save(scs[1]))

// check that the root->slot is cached
root := fmt.Sprintf("%#x", scs[0].BlockRoot())
root := scs[0].BlockRoot()
rootStr := rootString(root)
cs, ok := bs.pruner.cache.slot(root)
require.Equal(t, true, ok)
require.Equal(t, slot, cs)
Expand All @@ -102,14 +104,14 @@ func TestTryPruneDir_SlotFromFile(t *testing.T) {
require.Equal(t, false, ok)

// ensure that we see the saved files in the filesystem
files, err := listDir(fs, root)
files, err := listDir(fs, rootStr)
require.NoError(t, err)
require.Equal(t, 2, len(files))

pruned, err := bs.pruner.tryPruneDir(root, slot+1)
pruned, err := bs.pruner.tryPruneDir(rootStr, slot+1)
require.NoError(t, err)
require.Equal(t, 2, pruned)
files, err = listDir(fs, root)
files, err = listDir(fs, rootStr)
require.ErrorIs(t, err, os.ErrNotExist)
require.Equal(t, 0, len(files))
})
Expand All @@ -125,24 +127,25 @@ func TestTryPruneDir_SlotFromFile(t *testing.T) {
require.NoError(t, bs.Save(scs[1]))

// Evict slot mapping from the cache so that we trigger the file read path.
root := fmt.Sprintf("%#x", scs[0].BlockRoot())
root := scs[0].BlockRoot()
rootStr := rootString(root)
bs.pruner.cache.evict(root)
_, ok := bs.pruner.cache.slot(root)
require.Equal(t, false, ok)

// Ensure that we see the saved files in the filesystem.
files, err := listDir(fs, root)
files, err := listDir(fs, rootStr)
require.NoError(t, err)
require.Equal(t, 2, len(files))

// This should use the slotFromFile code (simulating restart).
// Setting pruneBefore == slot, so that the slot will be outside the window (at the boundary).
pruned, err := bs.pruner.tryPruneDir(root, slot)
pruned, err := bs.pruner.tryPruneDir(rootStr, slot)
require.NoError(t, err)
require.Equal(t, 0, pruned)

// Ensure files are still present.
files, err = listDir(fs, root)
files, err = listDir(fs, rootStr)
require.NoError(t, err)
require.Equal(t, 2, len(files))
})
Expand Down Expand Up @@ -316,3 +319,45 @@ func TestListDir(t *testing.T) {
})
}
}

func TestRootFromDir(t *testing.T) {
cases := []struct {
name string
dir string
err error
root [32]byte
}{
{
name: "happy path",
dir: "0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb",
root: [32]byte{255, 255, 135, 94, 29, 152, 92, 92, 203, 33, 72, 148, 152, 63, 36, 40,
237, 178, 113, 240, 248, 123, 104, 186, 112, 16, 228, 169, 157, 243, 181, 203},
},
{
name: "too short",
dir: "0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5c",
err: errInvalidRootString,
},
{
name: "too log",
dir: "0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cbb",
err: errInvalidRootString,
},
{
name: "missing prefix",
dir: "ffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb",
err: errInvalidRootString,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
root, err := stringToRoot(c.dir)
if c.err != nil {
require.ErrorIs(t, err, c.err)
return
}
require.NoError(t, err)
require.Equal(t, c.root, root)
})
}
}

0 comments on commit 8d9024f

Please sign in to comment.