Skip to content

Commit

Permalink
Revert "Revert "Buffer pool for decompression (#1308)" (#1408)"
Browse files Browse the repository at this point in the history
This reverts commit 800305e.
  • Loading branch information
manishrjain committed Aug 11, 2020
1 parent 1291ce8 commit f2e9b48
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 20 deletions.
3 changes: 3 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,9 @@ func Open(opt Options) (db *DB, err error) {
MaxCost: int64(float64(opt.MaxCacheSize) * 0.95),
BufferItems: 64,
Metrics: true,
OnEvict: func(_, _ uint64, value interface{}, _ int64) {
table.BlockEvictHandler(value)
},
}
db.blockCache, err = ristretto.NewCache(&config)
if err != nil {
Expand Down
15 changes: 8 additions & 7 deletions table/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,13 @@ func NewTableBuilder(opts Options) *Builder {
return b
}

var slicePool = sync.Pool{
var blockPool = &sync.Pool{
New: func() interface{} {
// Make 4 KB blocks for reuse.
b := make([]byte, 0, 4<<10)
// Create 5 Kb blocks even when the default size of blocks is 4 KB. The
// ZSTD decompresion library increases the buffer by 2X if it's not big
// enough. Using a 5 KB block instead of a 4 KB one avoids the
// unncessary 2X allocation by the decompression library.
b := make([]byte, 5<<10)
return &b
},
}
Expand All @@ -135,9 +138,7 @@ func (b *Builder) handleBlock() {
// Compress the block.
if b.opt.Compression != options.None {
var err error

dst = slicePool.Get().(*[]byte)
*dst = (*dst)[:0]
dst = blockPool.Get().(*[]byte)

blockBuf, err = b.compressData(*dst, blockBuf)
y.Check(err)
Expand Down Expand Up @@ -167,7 +168,7 @@ func (b *Builder) handleBlock() {
item.end = item.start + uint32(len(blockBuf))

if dst != nil {
slicePool.Put(dst)
blockPool.Put(dst)
}
}
}
Expand Down
13 changes: 12 additions & 1 deletion table/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,21 @@ type blockIterator struct {
key []byte
val []byte
entryOffsets []uint32
block *block

// prevOverlap stores the overlap of the previous key with the base key.
// This avoids unnecessary copy of base key when the overlap is same for multiple keys.
prevOverlap uint16
}

func (itr *blockIterator) setBlock(b *block) {
// Decrement the ref for the old block. If the old block was compressed, we
// might be able to reuse it.
itr.block.decrRef()
// Increment the ref for the new block.
b.incrRef()

itr.block = b
itr.err = nil
itr.idx = 0
itr.baseKey = itr.baseKey[:0]
Expand Down Expand Up @@ -102,7 +110,9 @@ func (itr *blockIterator) Error() error {
return itr.err
}

func (itr *blockIterator) Close() {}
func (itr *blockIterator) Close() {
itr.block.decrRef()
}

var (
origin = 0
Expand Down Expand Up @@ -172,6 +182,7 @@ func (t *Table) NewIterator(reversed bool) *Iterator {

// Close closes the iterator (and it must be called).
func (itr *Iterator) Close() error {
itr.bi.Close()
return itr.t.DecrRef()
}

Expand Down
67 changes: 55 additions & 12 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,44 @@ func (t *Table) DecrRef() error {
return nil
}

// BlockEvictHandler is used to reuse the byte slice stored in the block on cache eviction.
func BlockEvictHandler(value interface{}) {
if b, ok := value.(*block); ok {
b.decrRef()
}
}

type block struct {
offset int
data []byte
checksum []byte
entriesIndexStart int // start index of entryOffsets list
entryOffsets []uint32
chkLen int // checksum length
entriesIndexStart int // start index of entryOffsets list
entryOffsets []uint32 // used to binary search an entry in the block.
chkLen int // checksum length.
isReusable bool // used to determine if the blocked should be reused.
ref int32
}

func (b *block) incrRef() {
atomic.AddInt32(&b.ref, 1)
}
func (b *block) decrRef() {
if b == nil {
return
}

p := atomic.AddInt32(&b.ref, -1)
// Insert the []byte into pool only if the block is resuable. When a block
// is reusable a new []byte is used for decompression and this []byte can
// be reused.
// In case of an uncompressed block, the []byte is a reference to the
// table.mmap []byte slice. Any attempt to write data to the mmap []byte
// will lead to SEGFAULT.
if p == 0 && b.isReusable {
blockPool.Put(&b.data)
}
y.AssertTrue(p >= 0)
}
func (b *block) size() int64 {
return int64(3*intSize /* Size of the offset, entriesIndexStart and chkLen */ +
cap(b.data) + cap(b.checksum) + cap(b.entryOffsets)*4)
Expand Down Expand Up @@ -482,8 +511,7 @@ func (t *Table) block(idx int) (*block, error) {
}
}

blk.data, err = t.decompressData(blk.data)
if err != nil {
if err = t.decompress(blk); err != nil {
return nil, errors.Wrapf(err,
"failed to decode compressed data in file: %s at offset: %d, len: %d",
t.fd.Name(), blk.offset, ko.Len)
Expand Down Expand Up @@ -525,6 +553,7 @@ func (t *Table) block(idx int) (*block, error) {
}
if t.opt.Cache != nil && t.opt.KeepBlocksInCache {
key := t.blockCacheKey(idx)
blk.incrRef()
t.opt.Cache.Set(key, blk, blk.size())
}
return blk, nil
Expand Down Expand Up @@ -647,7 +676,8 @@ func (t *Table) VerifyChecksum() error {
return y.Wrapf(err, "checksum validation failed for table: %s, block: %d, offset:%d",
t.Filename(), i, os.Offset)
}

b.incrRef()
defer b.decrRef()
// OnBlockRead or OnTableAndBlockRead, we don't need to call verify checksum
// on block, verification would be done while reading block itself.
if !(t.opt.ChkMode == options.OnBlockRead || t.opt.ChkMode == options.OnTableAndBlockRead) {
Expand Down Expand Up @@ -713,15 +743,28 @@ func NewFilename(id uint64, dir string) string {
return filepath.Join(dir, IDToFilename(id))
}

// decompressData decompresses the given data.
func (t *Table) decompressData(data []byte) ([]byte, error) {
// decompress decompresses the data stored in a block.
func (t *Table) decompress(b *block) error {
var err error
switch t.opt.Compression {
case options.None:
return data, nil
// Nothing to be done here.
case options.Snappy:
return snappy.Decode(nil, data)
dst := blockPool.Get().(*[]byte)
b.data, err = snappy.Decode(*dst, b.data)
if err != nil {
return errors.Wrap(err, "failed to decompress")
}
b.isReusable = true
case options.ZSTD:
return y.ZSTDDecompress(nil, data)
dst := blockPool.Get().(*[]byte)
b.data, err = y.ZSTDDecompress(*dst, b.data)
if err != nil {
return errors.Wrap(err, "failed to decompress")
}
b.isReusable = true
default:
return errors.New("Unsupported compression type")
}
return nil, errors.New("Unsupported compression type")
return nil
}

0 comments on commit f2e9b48

Please sign in to comment.