Skip to content

Commit

Permalink
membuffer: implement cache for ART (#1470)
Browse files Browse the repository at this point in the history
ref pingcap/tidb#55287

Signed-off-by: you06 <you1474600@gmail.com>
  • Loading branch information
you06 authored Oct 8, 2024
1 parent 58f3322 commit c3e10ae
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 14 deletions.
2 changes: 1 addition & 1 deletion examples/gcworker/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 // indirect
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion examples/rawkv/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 // indirect
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/1pc_txn/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 // indirect
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/async_commit/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 // indirect
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/delete_range/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 // indirect
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 // indirect
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/pessimistic_txn/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 // indirect
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion examples/txnkv/unsafedestoryrange/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 // indirect
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 // indirect
Expand Down
75 changes: 69 additions & 6 deletions internal/unionstore/art/art.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
package art

import (
"bytes"
"fmt"
"math"
"sync/atomic"

tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/internal/unionstore/arena"
Expand All @@ -44,6 +46,12 @@ type ART struct {
bufferSizeLimit uint64
len int
size int

// The lastTraversedNode stores addr in uint64 of the last traversed node, include search and recursiveInsert.
// Compare to atomic.Pointer, atomic.Uint64 can avoid heap allocation, so it's more efficient.
lastTraversedNode atomic.Uint64
hitCount atomic.Uint64
missCount atomic.Uint64
}

func New() *ART {
Expand All @@ -55,6 +63,7 @@ func New() *ART {
t.allocator.nodeAllocator.freeNode4 = make([]arena.MemdbArenaAddr, 0, 1<<4)
t.allocator.nodeAllocator.freeNode16 = make([]arena.MemdbArenaAddr, 0, 1<<3)
t.allocator.nodeAllocator.freeNode48 = make([]arena.MemdbArenaAddr, 0, 1<<2)
t.lastTraversedNode.Store(arena.NullU64Addr)
return &t
}

Expand Down Expand Up @@ -102,10 +111,26 @@ func (t *ART) Set(key artKey, value []byte, ops ...kv.FlagsOp) error {
return nil
}

// search looks up the leaf with the given key.
// search wraps searchImpl with cache.
func (t *ART) search(key artKey) (arena.MemdbArenaAddr, *artLeaf) {
// check cache
addr, leaf, found := t.checkKeyInCache(key)
if found {
t.hitCount.Add(1)
return addr, leaf
}
t.missCount.Add(1)
addr, leaf = t.searchImpl(key)
if !addr.IsNull() {
t.updateLastTraversed(addr)
}
return addr, leaf
}

// searchImpl looks up the leaf with the given key.
// It returns the memory arena address and leaf itself it there is a match leaf,
// returns arena.NullAddr and nil if the key is not found.
func (t *ART) search(key artKey) (arena.MemdbArenaAddr, *artLeaf) {
func (t *ART) searchImpl(key artKey) (arena.MemdbArenaAddr, *artLeaf) {
current := t.root
if current == nullArtNode {
return arena.NullAddr, nil
Expand Down Expand Up @@ -154,9 +179,25 @@ func (t *ART) search(key artKey) (arena.MemdbArenaAddr, *artLeaf) {
}
}

// recursiveInsert returns the node address of the key.
// It will insert the key if not exists, returns the newly inserted or existing leaf.
// recursiveInsert wraps recursiveInsertImpl with cache.
func (t *ART) recursiveInsert(key artKey) (arena.MemdbArenaAddr, *artLeaf) {
addr, leaf, found := t.checkKeyInCache(key)
if found {
t.hitCount.Add(1)
return addr, leaf
}
t.missCount.Add(1)
addr, leaf = t.recursiveInsertImpl(key)
if !addr.IsNull() {
t.updateLastTraversed(addr)
}
return addr, leaf
}

// recursiveInsertImpl returns the node address of the key.
// It will insert the key if not exists, returns the newly inserted or existing leaf.
func (t *ART) recursiveInsertImpl(key artKey) (arena.MemdbArenaAddr, *artLeaf) {

// lazy init root node and allocator.
// this saves memory for read only txns.
if t.root.addr.IsNull() {
Expand Down Expand Up @@ -501,6 +542,7 @@ func (t *ART) Reset() {
t.len = 0
t.allocator.nodeAllocator.Reset()
t.allocator.vlogAllocator.Reset()
t.lastTraversedNode.Store(arena.NullU64Addr)
}

// DiscardValues releases the memory used by all values.
Expand Down Expand Up @@ -583,10 +625,31 @@ func (t *ART) RemoveFromBuffer(key []byte) {
panic("unimplemented")
}

// updateLastTraversed updates the last traversed node atomically
// the addr must be a valid leaf address
func (t *ART) updateLastTraversed(addr arena.MemdbArenaAddr) {
t.lastTraversedNode.Store(addr.AsU64())
}

// checkKeyInCache retrieves the last traversed node if the key matches
func (t *ART) checkKeyInCache(key []byte) (arena.MemdbArenaAddr, *artLeaf, bool) {
addrU64 := t.lastTraversedNode.Load()
if addrU64 == arena.NullU64Addr {
return arena.NullAddr, nil, false
}

addr := arena.U64ToAddr(addrU64)
leaf := t.allocator.getLeaf(addr)
if !bytes.Equal(leaf.GetKey(), key) {
return arena.NullAddr, nil, false
}
return addr, leaf, true
}

func (t *ART) GetCacheHitCount() uint64 {
return 0
return t.hitCount.Load()
}

func (t *ART) GetCacheMissCount() uint64 {
return 0
return t.missCount.Load()
}
21 changes: 21 additions & 0 deletions internal/unionstore/memdb_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,24 @@ func benchIterator(b *testing.B, buffer MemBuffer) {
iter.Close()
}
}

func BenchmarkMemBufferCache(b *testing.B) {
fn := func(b *testing.B, buffer MemBuffer) {
buf := make([][keySize]byte, b.N)
for i := range buf {
binary.LittleEndian.PutUint32(buf[i][:], uint32(i))
buffer.Set(buf[i][:], buf[i][:])
}
ctx := context.Background()
b.ResetTimer()
for i := range buf {
buffer.Get(ctx, buf[i][:])
for j := 0; j < 10; j++ {
// the cache hit get will be fast
buffer.Get(ctx, buf[i][:])
}
}
}
b.Run("RBT", func(b *testing.B) { fn(b, newRbtDBWithContext()) })
b.Run("ART", func(b *testing.B) { fn(b, newArtDBWithContext()) })
}
66 changes: 66 additions & 0 deletions internal/unionstore/memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,3 +1106,69 @@ func testIterNoResult(t *testing.T, buffer MemBuffer) {
// Test lower bound > upper bound
checkFn([]byte{1, 0, 1}, []byte{1, 0, 0})
}

func TestMemBufferCache(t *testing.T) {
testMemBufferCache(t, newRbtDBWithContext())
testMemBufferCache(t, newArtDBWithContext())
}

func testMemBufferCache(t *testing.T, buffer MemBuffer) {
assert := assert.New(t)

type CacheStats interface {
GetCacheHitCount() uint64
GetCacheMissCount() uint64
}
cacheCheck := func(hit bool, fn func()) {
beforeHit, beforeMiss := buffer.(CacheStats).GetCacheHitCount(), buffer.(CacheStats).GetCacheMissCount()
fn()
afterHit, afterMiss := buffer.(CacheStats).GetCacheHitCount(), buffer.(CacheStats).GetCacheMissCount()
hitCnt := afterHit - beforeHit
missCnt := afterMiss - beforeMiss
if hit {
assert.Equal(hitCnt, uint64(1))
assert.Equal(missCnt, uint64(0))
} else {
assert.Equal(hitCnt, uint64(0))
assert.Equal(missCnt, uint64(1))
}
}

cacheCheck(false, func() {
assert.Nil(buffer.Set([]byte{1}, []byte{0}))
})
cacheCheck(true, func() {
assert.Nil(buffer.Set([]byte{1}, []byte{1}))
})
cacheCheck(false, func() {
assert.Nil(buffer.Set([]byte{2}, []byte{2}))
})
cacheCheck(true, func() {
v, err := buffer.Get(context.Background(), []byte{2})
assert.Nil(err)
assert.Equal(v, []byte{2})
})
cacheCheck(false, func() {
v, err := buffer.Get(context.Background(), []byte{1})
assert.Nil(err)
assert.Equal(v, []byte{1})
})
cacheCheck(true, func() {
v, err := buffer.Get(context.Background(), []byte{1})
assert.Nil(err)
assert.Equal(v, []byte{1})
})
cacheCheck(false, func() {
v, err := buffer.Get(context.Background(), []byte{2})
assert.Nil(err)
assert.Equal(v, []byte{2})
})
cacheCheck(true, func() {
assert.Nil(buffer.Set([]byte{2}, []byte{2, 2}))
})
cacheCheck(true, func() {
v, err := buffer.Get(context.Background(), []byte{2})
assert.Nil(err)
assert.Equal(v, []byte{2, 2})
})
}

0 comments on commit c3e10ae

Please sign in to comment.