Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use cache for storing block offsets #1336

Merged
merged 8 commits into from
May 28, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestPickSortTables(t *testing.T) {
}

func TestIteratePrefix(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
testIteratorPrefix := func(t *testing.T, db *DB) {
bkey := func(i int) []byte {
return []byte(fmt.Sprintf("%04d", i))
}
Expand Down Expand Up @@ -198,7 +198,22 @@ func TestIteratePrefix(t *testing.T) {
for i := 0; i < n; i++ {
require.Equal(t, 1, countOneKey(bkey(i)))
}
}

t.Run("With Default options", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
testIteratorPrefix(t, db)
})
})

t.Run("With Block Offsets in Cache", func(t *testing.T) {
opts := getTestOptions("")
opts = opts.WithKeepBlockOffsetsInCache(true)
runBadgerTest(t, &opts, func(t *testing.T, db *DB) {
testIteratorPrefix(t, db)
})
})

}

// go test -v -run=XXX -bench=BenchmarkIterate -benchtime=3s
Expand Down
34 changes: 26 additions & 8 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ type Options struct {
// ------------------------------
maxBatchCount int64 // max entries in batch
maxBatchSize int64 // max batch size in bytes

KeepBlockOffsetsInCache bool // if KeepBlockOffsetsInCache is true and cache is enabled then,
// block offsets are kept in cache.
}

// DefaultOptions sets a list of recommended options for good performance.
Expand Down Expand Up @@ -157,19 +160,21 @@ func DefaultOptions(path string) Options {
LogRotatesToFlush: 2,
EncryptionKey: []byte{},
EncryptionKeyRotationDuration: 10 * 24 * time.Hour, // Default 10 days.
KeepBlockOffsetsInCache: false,
}
}

func buildTableOptions(opt Options) table.Options {
return table.Options{
TableSize: uint64(opt.MaxTableSize),
BlockSize: opt.BlockSize,
BloomFalsePositive: opt.BloomFalsePositive,
LoadBloomsOnOpen: opt.LoadBloomsOnOpen,
LoadingMode: opt.TableLoadingMode,
ChkMode: opt.ChecksumVerificationMode,
Compression: opt.Compression,
ZSTDCompressionLevel: opt.ZSTDCompressionLevel,
TableSize: uint64(opt.MaxTableSize),
BlockSize: opt.BlockSize,
BloomFalsePositive: opt.BloomFalsePositive,
LoadBloomsOnOpen: opt.LoadBloomsOnOpen,
LoadingMode: opt.TableLoadingMode,
ChkMode: opt.ChecksumVerificationMode,
Compression: opt.Compression,
ZSTDCompressionLevel: opt.ZSTDCompressionLevel,
KeepBlockOffsetsInCache: opt.KeepBlockOffsetsInCache,
}
}

Expand Down Expand Up @@ -631,3 +636,16 @@ func (opt Options) WithLoadBloomsOnOpen(b bool) Options {
opt.LoadBloomsOnOpen = b
return opt
}

// WithKeepBlockOffsetsInCache returns a new Option value with KeepBlockOffsetsInCache set to the
// given value.
//
// Badger uses block offsets to to find the blocks in the sst table. Blocks offset also contains
// starting key of block. Badger uses those keys to find the block to do an effective key iteration.
// This option is used to reduce inmemory usage of BlockOffsets.
//
// The default value of KeepBlockOffsetInCache is false.
func (opt Options) WithKeepBlockOffsetsInCache(val bool) Options {
opt.KeepBlockOffsetsInCache = val
return opt
}
4 changes: 2 additions & 2 deletions table/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func TestTableIndex(t *testing.T) {
}

// Ensure index is built correctly
require.Equal(t, blockCount, len(tbl.blockIndex))
for i, ko := range tbl.blockIndex {
require.Equal(t, blockCount, tbl.noOfBlocks)
for i, ko := range tbl.readTableIndex().Offsets {
require.Equal(t, ko.Key, blockFirstKeys[i])
}
f.Close()
Expand Down
12 changes: 6 additions & 6 deletions table/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (itr *Iterator) Valid() bool {
}

func (itr *Iterator) seekToFirst() {
numBlocks := len(itr.t.blockIndex)
numBlocks := itr.t.noOfBlocks
if numBlocks == 0 {
itr.err = io.EOF
return
Expand All @@ -212,7 +212,7 @@ func (itr *Iterator) seekToFirst() {
}

func (itr *Iterator) seekToLast() {
numBlocks := len(itr.t.blockIndex)
numBlocks := itr.t.noOfBlocks
if numBlocks == 0 {
itr.err = io.EOF
return
Expand Down Expand Up @@ -249,8 +249,8 @@ func (itr *Iterator) seekFrom(key []byte, whence int) {
case current:
}

idx := sort.Search(len(itr.t.blockIndex), func(idx int) bool {
ko := itr.t.blockIndex[idx]
idx := sort.Search(itr.t.noOfBlocks, func(idx int) bool {
ko := itr.t.blockOffsets()[idx]
return y.CompareKeys(ko.Key, key) > 0
})
if idx == 0 {
Expand All @@ -269,7 +269,7 @@ func (itr *Iterator) seekFrom(key []byte, whence int) {
itr.seekHelper(idx-1, key)
if itr.err == io.EOF {
// Case 1. Need to visit block[idx].
if idx == len(itr.t.blockIndex) {
if idx == itr.t.noOfBlocks {
// If idx == len(itr.t.blockIndex), then input key is greater than ANY element of table.
// There's nothing we can do. Valid() should return false as we seek to end of table.
return
Expand Down Expand Up @@ -297,7 +297,7 @@ func (itr *Iterator) seekForPrev(key []byte) {
func (itr *Iterator) next() {
itr.err = nil

if itr.bpos >= len(itr.t.blockIndex) {
if itr.bpos >= itr.t.noOfBlocks {
itr.err = io.EOF
return
}
Expand Down
108 changes: 91 additions & 17 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ import (
const fileSuffix = ".sst"
const intSize = int(unsafe.Sizeof(int(0)))

// 1 word = 8 bytes
// sizeOfOffsetStruct is the size of pb.BlockOffset
const sizeOfOffsetStruct int64 = 3*8 + // key array take 3 words

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How safe/reliable is this computation?

1*8 + // offset and len takes 1 word
3*8 + // XXX_unrecognized array takes 3 word.
1*8 // so far 7 words, in order to round the slab we're adding one more word.

// Options contains configurable options for Table/Builder.
type Options struct {
// Options for Opening/Building Table.
Expand Down Expand Up @@ -81,6 +88,10 @@ type Options struct {
// When LoadBloomsOnOpen is set, bloom filters will be read only when they are accessed.
// Otherwise they will be loaded on table open.
LoadBloomsOnOpen bool

// KeepBlockOffsetsInCache is set to true and cache is enabled. Then block offsets are
// kept in cache.
KeepBlockOffsetsInCache bool
}

// TableInterface is useful for testing.
Expand Down Expand Up @@ -116,6 +127,8 @@ type Table struct {

IsInmemory bool // Set to true if the table is on level 0 and opened in memory.
opt *Options

noOfBlocks int // Total number of blocks.
}

// CompressionType returns the compression algorithm used for block compression.
Expand Down Expand Up @@ -159,7 +172,7 @@ func (t *Table) DecrRef() error {
return err
}
// Delete all blocks from the cache.
for i := range t.blockIndex {
for i := 0; i < t.noOfBlocks; i++ {
t.opt.Cache.Del(t.blockCacheKey(i))
}
// Delete bloom filter from the cache.
Expand Down Expand Up @@ -332,11 +345,13 @@ func OpenInMemoryTable(data []byte, id uint64, opt *Options) (*Table, error) {
}

func (t *Table) initBiggestAndSmallest() error {
if err := t.readIndex(); err != nil {
var err error
var ko *pb.BlockOffset
if ko, err = t.readIndex(); err != nil {
return errors.Wrapf(err, "failed to read index.")
}

t.smallest = t.blockIndex[0].Key
t.smallest = ko.Key

it2 := t.NewIterator(true)
defer it2.Close()
Expand Down Expand Up @@ -383,23 +398,25 @@ func (t *Table) readNoFail(off, sz int) []byte {
return res
}

func (t *Table) readIndex() error {
// readIndex reads the index and populate the necessary table fields and returns
// first block offset
func (t *Table) readIndex() (*pb.BlockOffset, error) {
readPos := t.tableSize

// Read checksum len from the last 4 bytes.
readPos -= 4
buf := t.readNoFail(readPos, 4)
checksumLen := int(y.BytesToU32(buf))
if checksumLen < 0 {
return errors.New("checksum length less than zero. Data corrupted")
return nil, errors.New("checksum length less than zero. Data corrupted")
}

// Read checksum.
expectedChk := &pb.Checksum{}
readPos -= checksumLen
buf = t.readNoFail(readPos, checksumLen)
if err := proto.Unmarshal(buf, expectedChk); err != nil {
return err
return nil, err
}

// Read index size from the footer.
Expand All @@ -413,39 +430,80 @@ func (t *Table) readIndex() error {
data := t.readNoFail(readPos, t.indexLen)

if err := y.VerifyChecksum(data, expectedChk); err != nil {
return y.Wrapf(err, "failed to verify checksum for table: %s", t.Filename())
return nil, y.Wrapf(err, "failed to verify checksum for table: %s", t.Filename())
}

index := pb.TableIndex{}
// Decrypt the table index if it is encrypted.
if t.shouldDecrypt() {
var err error
if data, err = t.decrypt(data); err != nil {
return y.Wrapf(err,
return nil, y.Wrapf(err,
"Error while decrypting table index for the table %d in Table.readIndex", t.id)
}
}
err := proto.Unmarshal(data, &index)
y.Check(err)

t.estimatedSize = index.EstimatedSize
t.blockIndex = index.Offsets
t.noOfBlocks = len(index.Offsets)

if t.opt.LoadBloomsOnOpen {
t.bfLock.Lock()
t.bf, _ = t.readBloomFilter()
t.bfLock.Unlock()
}

return nil
if t.opt.KeepBlockOffsetsInCache && t.opt.Cache != nil {
t.opt.Cache.Set(
t.blockOffsetsCacheKey(),
index.Offsets,
calculateOffsetsSize(index.Offsets))

return index.Offsets[0], nil
}

t.blockIndex = index.Offsets
return index.Offsets[0], nil
}

// blockOffsets returns block offsets of this table.
func (t *Table) blockOffsets() []*pb.BlockOffset {
if !t.opt.KeepBlockOffsetsInCache || t.opt.Cache == nil {
return t.blockIndex
}

val, ok := t.opt.Cache.Get(t.blockOffsetsCacheKey())
if ok {
return val.([]*pb.BlockOffset)
}

ti := t.readTableIndex()

t.opt.Cache.Set(t.blockOffsetsCacheKey(), ti.Offsets, calculateOffsetsSize(ti.Offsets))
return ti.Offsets
}

// calculateOffsetsSize returns the size of *pb.BlockOffset array
func calculateOffsetsSize(offsets []*pb.BlockOffset) int64 {
totalSize := sizeOfOffsetStruct * int64(len(offsets))

for _, ko := range offsets {
// add key size.
totalSize += int64(cap(ko.Key))
// add XXX_unrecognized size.
totalSize += int64(cap(ko.XXX_unrecognized))
}
// Add three words for array size.
return totalSize + 3*8
}

// block function return a new block. Each block holds a ref and the byte
// slice stored in the block will be reused when the ref becomes zero. The
// caller should release the block by calling block.decrRef() on it.
func (t *Table) block(idx int) (*block, error) {
y.AssertTruef(idx >= 0, "idx=%d", idx)
if idx >= len(t.blockIndex) {
if idx >= t.noOfBlocks {
return nil, errors.New("block out of index")
}
if t.opt.Cache != nil {
Expand All @@ -460,7 +518,9 @@ func (t *Table) block(idx int) (*block, error) {
}
}
}
ko := t.blockIndex[idx]

// Read the block index if it's nil
ko := t.blockOffsets()[idx]
blk := &block{
offset: int(ko.Offset),
ref: 1,
Expand Down Expand Up @@ -553,6 +613,15 @@ func (t *Table) blockCacheKey(idx int) []byte {
return buf
}

// blockOffsetsCacheKey returns the cache key for block offsets.
func (t *Table) blockOffsetsCacheKey() []byte {
y.AssertTrue(t.id < math.MaxUint32)
buf := make([]byte, 4)
binary.BigEndian.PutUint32(buf, uint32(t.id))

return append([]byte("bo"), buf...)
}

// EstimatedSize returns the total size of key-values stored in this table (including the
// disk space occupied on the value log).
func (t *Table) EstimatedSize() uint64 { return t.estimatedSize }
Expand Down Expand Up @@ -604,6 +673,14 @@ func (t *Table) DoesNotHave(hash uint64) bool {
// along with the bloom filter.
func (t *Table) readBloomFilter() (*z.Bloom, int) {
// Read bloom filter from the SST.
index := t.readTableIndex()
bf, err := z.JSONUnmarshal(index.BloomFilter)
y.Check(err)
return bf, len(index.BloomFilter)
}

// readTableIndex reads table index from the sst and returns its pb format.
func (t *Table) readTableIndex() *pb.TableIndex {
data := t.readNoFail(t.indexStart, t.indexLen)
index := pb.TableIndex{}
var err error
Expand All @@ -613,16 +690,13 @@ func (t *Table) readBloomFilter() (*z.Bloom, int) {
y.Check(err)
}
y.Check(proto.Unmarshal(data, &index))

bf, err := z.JSONUnmarshal(index.BloomFilter)
y.Check(err)
return bf, len(index.BloomFilter)
return &index
}

// VerifyChecksum verifies checksum for all blocks of table. This function is called by
// OpenTable() function. This function is also called inside levelsController.VerifyChecksum().
func (t *Table) VerifyChecksum() error {
for i, os := range t.blockIndex {
for i, os := range t.blockOffsets() {
b, err := t.block(i)
if err != nil {
return y.Wrapf(err, "checksum validation failed for table: %s, block: %d, offset:%d",
Expand Down
Loading