Skip to content

Commit

Permalink
membuffer: support staging & checkpoint for ART (#1465)
Browse files Browse the repository at this point in the history
ref pingcap/tidb#55287

Signed-off-by: you06 <you1474600@gmail.com>
  • Loading branch information
you06 authored Sep 23, 2024
1 parent 6beede6 commit 271945f
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 38 deletions.
4 changes: 2 additions & 2 deletions internal/unionstore/arena/arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ type KeyFlagsGetter interface {

// VlogMemDB is the interface of the memory buffer which supports vlog to revert node and inspect node.
type VlogMemDB[G KeyFlagsGetter] interface {
RevertNode(hdr *MemdbVlogHdr)
RevertVAddr(hdr *MemdbVlogHdr)
InspectNode(addr MemdbArenaAddr) (G, MemdbArenaAddr)
}

Expand Down Expand Up @@ -351,7 +351,7 @@ func (l *MemdbVlog[G, M]) RevertToCheckpoint(m M, cp *MemDBCheckpoint) {
block := l.blocks[cursor.blocks-1].buf
var hdr MemdbVlogHdr
hdr.load(block[hdrOff:])
m.RevertNode(&hdr)
m.RevertVAddr(&hdr)
l.moveBackCursor(&cursor, &hdr)
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/unionstore/arena/arena_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (

type dummyMemDB struct{}

func (m *dummyMemDB) RevertNode(hdr *MemdbVlogHdr) {}
func (m *dummyMemDB) RevertVAddr(hdr *MemdbVlogHdr) {}
func (m *dummyMemDB) InspectNode(addr MemdbArenaAddr) (KeyFlagsGetter, MemdbArenaAddr) {
return nil, NullAddr
}
Expand Down
100 changes: 75 additions & 25 deletions internal/unionstore/art/art.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package art

import (
"fmt"
"math"

tikverr "github.com/tikv/client-go/v2/error"
Expand Down Expand Up @@ -76,7 +77,7 @@ func (t *ART) GetFlags(key []byte) (kv.KeyFlags, error) {
if leaf.vAddr.IsNull() && leaf.isDeleted() {
return 0, tikverr.ErrNotExist
}
return leaf.getKeyFlags(), nil
return leaf.GetKeyFlags(), nil
}

func (t *ART) Set(key artKey, value []byte, ops ...kv.FlagsOp) error {
Expand Down Expand Up @@ -324,8 +325,8 @@ func (t *ART) newLeaf(key artKey) (artNode, *artLeaf) {
}

func (t *ART) setValue(addr arena.MemdbArenaAddr, l *artLeaf, value []byte, ops []kv.FlagsOp) {
flags := l.getKeyFlags()
if flags == 0 && l.vAddr.IsNull() {
flags := l.GetKeyFlags()
if flags == 0 && l.vAddr.IsNull() || l.isDeleted() {
t.len++
t.size += int(l.klen)
}
Expand Down Expand Up @@ -373,12 +374,12 @@ func (t *ART) trySwapValue(addr arena.MemdbArenaAddr, value []byte) (int, bool)
}

func (t *ART) Dirty() bool {
panic("unimplemented")
return t.dirty
}

// Mem returns the memory usage of MemBuffer.
func (t *ART) Mem() uint64 {
panic("unimplemented")
return t.allocator.nodeAllocator.Capacity() + t.allocator.vlogAllocator.Capacity()
}

// Len returns the count of entries in the MemBuffer.
Expand All @@ -392,51 +393,97 @@ func (t *ART) Size() int {
}

func (t *ART) checkpoint() arena.MemDBCheckpoint {
panic("unimplemented")
return t.allocator.vlogAllocator.Checkpoint()
}

func (t *ART) RevertNode(hdr *arena.MemdbVlogHdr) {
panic("unimplemented")
func (t *ART) RevertVAddr(hdr *arena.MemdbVlogHdr) {
lf := t.allocator.getLeaf(hdr.NodeAddr)
if lf == nil {
panic("revert an invalid node")
}
lf.vAddr = hdr.OldValue
t.size -= int(hdr.ValueLen)
if hdr.OldValue.IsNull() {
keptFlags := lf.GetKeyFlags()
keptFlags = keptFlags.AndPersistent()
if keptFlags == 0 {
lf.markDelete()
t.len--
t.size -= int(lf.klen)
} else {
lf.setKeyFlags(keptFlags)
}
} else {
t.size += len(t.allocator.vlogAllocator.GetValue(hdr.OldValue))
}
}

func (t *ART) InspectNode(addr arena.MemdbArenaAddr) (*artLeaf, arena.MemdbArenaAddr) {
panic("unimplemented")
lf := t.allocator.getLeaf(addr)
return lf, lf.vAddr
}

// Checkpoint returns a checkpoint of ART.
func (t *ART) Checkpoint() *arena.MemDBCheckpoint {
panic("unimplemented")
cp := t.allocator.vlogAllocator.Checkpoint()
return &cp
}

// RevertToCheckpoint reverts the ART to the checkpoint.
func (t *ART) RevertToCheckpoint(cp *arena.MemDBCheckpoint) {
panic("unimplemented")
t.allocator.vlogAllocator.RevertToCheckpoint(t, cp)
t.allocator.vlogAllocator.Truncate(cp)
t.allocator.vlogAllocator.OnMemChange()
}

func (t *ART) Stages() []arena.MemDBCheckpoint {
panic("unimplemented")
return t.stages
}

func (t *ART) Staging() int {
return 0
t.stages = append(t.stages, t.checkpoint())
return len(t.stages)
}

func (t *ART) Release(h int) {
if h == 0 {
// 0 is the invalid and no-effect handle.
return
}
if h != len(t.stages) {
panic("cannot release staging buffer")
}
if h == 1 {
tail := t.checkpoint()
if !t.stages[0].IsSamePosition(&tail) {
t.dirty = true
}
}
t.stages = t.stages[:h-1]
}

func (t *ART) Cleanup(h int) {
}

func (t *ART) revertToCheckpoint(cp *arena.MemDBCheckpoint) {
panic("unimplemented")
}

func (t *ART) moveBackCursor(cursor *arena.MemDBCheckpoint, hdr *arena.MemdbVlogHdr) {
panic("unimplemented")
}
if h == 0 {
// 0 is the invalid and no-effect handle.
return
}
if h > len(t.stages) {
return
}
if h < len(t.stages) {
panic(fmt.Sprintf("cannot cleanup staging buffer, h=%v, len(db.stages)=%v", h, len(t.stages)))
}

func (t *ART) truncate(snap *arena.MemDBCheckpoint) {
panic("unimplemented")
cp := &t.stages[h-1]
if !t.vlogInvalid {
curr := t.checkpoint()
if !curr.IsSamePosition(cp) {
t.allocator.vlogAllocator.RevertToCheckpoint(t, cp)
t.allocator.vlogAllocator.Truncate(cp)
}
}
t.stages = t.stages[:h-1]
t.allocator.vlogAllocator.OnMemChange()
}

// Reset resets the MemBuffer to initial states.
Expand All @@ -459,7 +506,10 @@ func (t *ART) DiscardValues() {

// InspectStage used to inspect the value updates in the given stage.
func (t *ART) InspectStage(handle int, f func([]byte, kv.KeyFlags, []byte)) {
panic("unimplemented")
idx := handle - 1
tail := t.allocator.vlogAllocator.Checkpoint()
head := t.stages[idx]
t.allocator.vlogAllocator.InspectKVInLog(t, &head, &tail, f)
}

// SelectValueHistory select the latest value which makes `predicate` returns true from the modification history.
Expand Down
10 changes: 2 additions & 8 deletions internal/unionstore/art/art_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,6 @@ func (l *artLeaf) getKeyDepth(depth uint32) []byte {
return unsafe.Slice((*byte)(base), int(l.klen)-int(depth))
}

// GetKeyFlags gets the flags of the leaf
func (l *artLeaf) GetKeyFlags() kv.KeyFlags {
panic("unimplemented")
}

func (l *artLeaf) match(depth uint32, key artKey) bool {
return bytes.Equal(l.getKeyDepth(depth), key[depth:])
}
Expand All @@ -278,7 +273,8 @@ func (l *artLeaf) setKeyFlags(flags kv.KeyFlags) {
l.flags = uint16(flags) & flagMask
}

func (l *artLeaf) getKeyFlags() kv.KeyFlags {
// GetKeyFlags gets the flags of the leaf
func (l *artLeaf) GetKeyFlags() kv.KeyFlags {
return kv.KeyFlags(l.flags & flagMask)
}

Expand All @@ -288,8 +284,6 @@ const (
)

// markDelete marks the artLeaf as deleted
//
//nolint:unused
func (l *artLeaf) markDelete() {
l.flags = deleteFlag
}
Expand Down
39 changes: 39 additions & 0 deletions internal/unionstore/memdb_norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
package unionstore

import (
"context"
rand2 "crypto/rand"
"encoding/binary"
"math/rand"
Expand Down Expand Up @@ -166,3 +167,41 @@ func testRandomDeriveRecur(t *testing.T, db *MemDB, golden *leveldb.DB, depth in

return opLog
}

func TestRandomAB(t *testing.T) {
testRandomAB(t, newRbtDBWithContext(), newArtDBWithContext())
}

func testRandomAB(t *testing.T, bufferA, bufferB MemBuffer) {
require := require.New(t)

const cnt = 50000
keys := make([][]byte, cnt)
for i := 0; i < cnt; i++ {
h := bufferA.Staging()
require.Equal(h, bufferB.Staging())

keys[i] = make([]byte, rand.Intn(19)+1)
rand2.Read(keys[i])

bufferA.Set(keys[i], keys[i])
bufferB.Set(keys[i], keys[i])

if i%2 == 0 {
bufferA.Cleanup(h)
bufferB.Cleanup(h)
} else {
bufferA.Release(h)
bufferB.Release(h)
}

require.Equal(bufferA.Dirty(), bufferB.Dirty())
require.Equal(bufferA.Len(), bufferB.Len())
require.Equal(bufferA.Size(), bufferB.Size(), i)
key := keys[rand.Intn(i+1)]
v1, err1 := bufferA.Get(context.Background(), key)
v2, err2 := bufferB.Get(context.Background(), key)
require.Equal(err1, err2)
require.Equal(v1, v2)
}
}
Loading

0 comments on commit 271945f

Please sign in to comment.