From a81f8e36a1b2898fa142dcca0c1c8d214b43d794 Mon Sep 17 00:00:00 2001 From: Zejun Li Date: Thu, 29 Aug 2019 11:42:06 +0800 Subject: [PATCH] kv: replace memdb with a more memory efficient version (#11807) --- kv/memdb/arena.go | 131 ++++++++++++++ kv/memdb/iterator.go | 102 +++++++++++ kv/memdb/memdb.go | 364 ++++++++++++++++++++++++++++++++++++++ kv/memdb/memdb.s | 12 ++ kv/memdb/memdb_test.go | 389 +++++++++++++++++++++++++++++++++++++++++ kv/memdb_buffer.go | 58 +++--- 6 files changed, 1032 insertions(+), 24 deletions(-) create mode 100644 kv/memdb/arena.go create mode 100644 kv/memdb/iterator.go create mode 100644 kv/memdb/memdb.go create mode 100644 kv/memdb/memdb.s create mode 100644 kv/memdb/memdb_test.go diff --git a/kv/memdb/arena.go b/kv/memdb/arena.go new file mode 100644 index 0000000000000..4494a8ca7e1c6 --- /dev/null +++ b/kv/memdb/arena.go @@ -0,0 +1,131 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package memdb + +import "math" + +type arenaAddr struct { + blockIdx uint32 + blockOffset uint32 +} + +func (addr arenaAddr) isNull() bool { + return addr.blockIdx == 0 && addr.blockOffset == 0 +} + +func newArenaAddr(idx int, offset uint32) arenaAddr { + return arenaAddr{ + blockIdx: uint32(idx) + 1, + blockOffset: offset, + } +} + +const ( + nullBlockOffset = math.MaxUint32 + maxBlockSize = 128 << 20 +) + +type arena struct { + blockSize int + availIdx int + blocks []arenaBlock +} + +func newArenaLocator(initBlockSize int) *arena { + return &arena{ + blockSize: initBlockSize, + blocks: []arenaBlock{newArenaBlock(initBlockSize)}, + } +} + +func (a *arena) getFrom(addr arenaAddr) []byte { + return a.blocks[addr.blockIdx-1].getFrom(addr.blockOffset) +} + +func (a *arena) alloc(size int) (arenaAddr, []byte) { + if size >= maxBlockSize { + // Use a separate block to store entry which size larger than specified block size. + blk := newArenaBlock(size) + blk.length = size + a.blocks = append(a.blocks, blk) + + addr := newArenaAddr(len(a.blocks)-1, 0) + return addr, blk.buf + } + + addr, data := a.allocInBlock(a.availIdx, size) + if !addr.isNull() { + return addr, data + } + + a.enlarge(size) + return a.allocInBlock(a.availIdx, size) +} + +func (a *arena) enlarge(size int) { + a.blockSize <<= 1 + for a.blockSize <= size { + a.blockSize <<= 1 + } + // Size always less than maxBlockSize. + if a.blockSize > maxBlockSize { + a.blockSize = maxBlockSize + } + a.blocks = append(a.blocks, newArenaBlock(a.blockSize)) + a.availIdx = int(uint32(len(a.blocks) - 1)) +} + +func (a *arena) allocInBlock(idx, size int) (arenaAddr, []byte) { + offset, data := a.blocks[idx].alloc(size) + if offset == nullBlockOffset { + return arenaAddr{}, nil + } + return newArenaAddr(idx, offset), data +} + +func (a *arena) reset() { + a.availIdx = 0 + a.blockSize = len(a.blocks[0].buf) + a.blocks = []arenaBlock{a.blocks[0]} + a.blocks[0].reset() +} + +type arenaBlock struct { + buf []byte + length int +} + +func newArenaBlock(blockSize int) arenaBlock { + return arenaBlock{ + buf: make([]byte, blockSize), + } +} + +func (a *arenaBlock) getFrom(offset uint32) []byte { + return a.buf[offset:] +} + +func (a *arenaBlock) alloc(size int) (uint32, []byte) { + offset := a.length + newLen := offset + size + if newLen > len(a.buf) { + return nullBlockOffset, nil + } + a.length = newLen + return uint32(offset), a.buf[offset : offset+size] +} + +func (a *arenaBlock) reset() { + a.length = 0 +} diff --git a/kv/memdb/iterator.go b/kv/memdb/iterator.go new file mode 100644 index 0000000000000..ec50995b02064 --- /dev/null +++ b/kv/memdb/iterator.go @@ -0,0 +1,102 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package memdb + +import "unsafe" + +// Iterator iterates the entries in the DB. +type Iterator struct { + db *DB + curr *node + key []byte + val []byte +} + +// NewIterator returns a new Iterator for the lock store. +func (db *DB) NewIterator() Iterator { + return Iterator{ + db: db, + } +} + +// Valid returns true if the iterator is positioned at a valid node. +func (it *Iterator) Valid() bool { return it.curr != nil } + +// Key returns the key at the current position. +func (it *Iterator) Key() []byte { + return it.key +} + +// Value returns value. +func (it *Iterator) Value() []byte { + return it.val +} + +// Next moves the iterator to the next entry. +func (it *Iterator) Next() { + it.changeToAddr(it.curr.nexts[0]) +} + +// Prev moves the iterator to the previous entry. +func (it *Iterator) Prev() { + it.changeToAddr(it.curr.prev) +} + +// Seek locates the iterator to the first entry with a key >= seekKey. +func (it *Iterator) Seek(seekKey []byte) { + node, nodeData, _ := it.db.findGreaterEqual(seekKey) // find >=. + it.updateState(node, nodeData) +} + +// SeekForPrev locates the iterator to the last entry with key <= target. +func (it *Iterator) SeekForPrev(target []byte) { + node, nodeData, _ := it.db.findLess(target, true) // find <=. + it.updateState(node, nodeData) +} + +// SeekForExclusivePrev locates the iterator to the last entry with key < target. +func (it *Iterator) SeekForExclusivePrev(target []byte) { + node, nodeData, _ := it.db.findLess(target, false) + it.updateState(node, nodeData) +} + +// SeekToFirst locates the iterator to the first entry. +func (it *Iterator) SeekToFirst() { + node, nodeData := it.db.getNext(it.db.head.node, 0) + it.updateState(node, nodeData) +} + +// SeekToLast locates the iterator to the last entry. +func (it *Iterator) SeekToLast() { + node, nodeData := it.db.findLast() + it.updateState(node, nodeData) +} + +func (it *Iterator) updateState(node *node, nodeData []byte) { + it.curr = node + if node != nil { + it.key = node.getKey(nodeData) + it.val = node.getValue(nodeData) + } +} + +func (it *Iterator) changeToAddr(addr arenaAddr) { + var data []byte + var n *node + if !addr.isNull() { + data = it.db.arena.getFrom(addr) + n = (*node)(unsafe.Pointer(&data[0])) + } + it.updateState(n, data) +} diff --git a/kv/memdb/memdb.go b/kv/memdb/memdb.go new file mode 100644 index 0000000000000..57c05c9c2fdc3 --- /dev/null +++ b/kv/memdb/memdb.go @@ -0,0 +1,364 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package memdb + +import ( + "bytes" + "math" + "unsafe" +) + +const ( + maxHeight = 16 + nodeHeaderSize = int(unsafe.Sizeof(nodeHeader{})) +) + +// DB is an in-memory key/value database. +type DB struct { + height int + head nodeWithAddr + arena *arena + + length int + size int +} + +// New creates a new initialized in-memory key/value DB. +// The initBlockSize is the size of first block. +// This DB is append-only, deleting an entry would remove entry node but not +// reclaim KV buffer. +func New(initBlockSize int) *DB { + return &DB{ + height: 1, + head: nodeWithAddr{node: new(node)}, + arena: newArenaLocator(initBlockSize), + } +} + +// Reset resets the DB to initial empty state. +// Release all blocks except the initial one. +func (db *DB) Reset() { + db.height = 1 + db.head.node = new(node) + db.length = 0 + db.size = 0 + db.arena.reset() +} + +// Get gets the value for the given key. It returns nil if the +// DB does not contain the key. +func (db *DB) Get(key []byte) []byte { + node, data, match := db.findGreaterEqual(key) + if !match { + return nil + } + return node.getValue(data) +} + +// Put sets the value for the given key. +// It overwrites any previous value for that key. +func (db *DB) Put(key []byte, v []byte) bool { + arena := db.arena + lsHeight := db.height + var prev [maxHeight + 1]nodeWithAddr + var next [maxHeight + 1]nodeWithAddr + prev[lsHeight] = db.head + + var exists bool + for i := lsHeight - 1; i >= 0; i-- { + // Use higher level to speed up for current level. + prev[i], next[i], exists = db.findSpliceForLevel(db.arena, key, prev[i+1], i) + } + + var height int + if !exists { + height = db.randomHeight() + } else { + height = db.prepareOverwrite(next[:]) + } + + x, addr := db.newNode(arena, key, v, height) + if height > lsHeight { + db.height = height + } + + // We always insert from the base level and up. After you add a node in base level, we cannot + // create a node in the level above because it would have discovered the node in the base level. + for i := 0; i < height; i++ { + x.nexts[i] = next[i].addr + if prev[i].node == nil { + prev[i] = db.head + } + prev[i].nexts[i] = addr + } + + x.prev = prev[0].addr + if next[0].node != nil { + next[0].prev = addr + } + + db.length++ + db.size += len(key) + len(v) + return true +} + +// The pointers in findSpliceForLevel may point to the node which going to be overwrite, +// prepareOverwrite update them to point to the next node, so we can link new node with the list correctly. +func (db *DB) prepareOverwrite(next []nodeWithAddr) int { + old := next[0] + + // Update necessary states. + db.size -= int(old.valLen) + int(old.keyLen) + db.length-- + + height := int(old.height) + for i := 0; i < height; i++ { + if next[i].addr == old.addr { + next[i].addr = old.nexts[i] + if !next[i].addr.isNull() { + data := db.arena.getFrom(next[i].addr) + next[i].node = (*node)(unsafe.Pointer(&data[0])) + } + } + } + return height +} + +// Delete deletes the value for the given key. +// It returns false if the DB does not contain the key. +func (db *DB) Delete(key []byte) bool { + listHeight := db.height + var prev [maxHeight + 1]nodeWithAddr + prev[listHeight] = db.head + + var keyNode nodeWithAddr + var match bool + for i := listHeight - 1; i >= 0; i-- { + prev[i], keyNode, match = db.findSpliceForLevel(db.arena, key, prev[i+1], i) + } + if !match { + return false + } + + for i := int(keyNode.height) - 1; i >= 0; i-- { + prev[i].nexts[i] = keyNode.nexts[i] + } + nextAddr := keyNode.nexts[0] + if !nextAddr.isNull() { + nextData := db.arena.getFrom(nextAddr) + next := (*node)(unsafe.Pointer(&nextData[0])) + next.prev = prev[0].addr + } + + db.length-- + db.size -= int(keyNode.keyLen) + int(keyNode.valLen) + return true +} + +// Len returns the number of entries in the DB. +func (db *DB) Len() int { + return db.length +} + +// Size returns sum of keys and values length. Note that deleted +// key/value will not be accounted for, but it will still consume +// the buffer, since the buffer is append only. +func (db *DB) Size() int { + return db.size +} + +// findSpliceForLevel returns (outBefore, outAfter) with outBefore.key < key <= outAfter.key. +// The input "before" tells us where to start looking. +// If we found a node with the same key, then we return true. +func (db *DB) findSpliceForLevel(arena *arena, key []byte, before nodeWithAddr, level int) (nodeWithAddr, nodeWithAddr, bool) { + for { + // Assume before.key < key. + nextAddr := before.nexts[level] + if nextAddr.isNull() { + return before, nodeWithAddr{}, false + } + data := arena.getFrom(nextAddr) + next := nodeWithAddr{(*node)(unsafe.Pointer(&data[0])), nextAddr} + nextKey := next.getKey(data) + cmp := bytes.Compare(nextKey, key) + if cmp >= 0 { + // before.key < key < next.key. We are done for this level. + return before, next, cmp == 0 + } + before = next // Keep moving right on this level. + } +} + +func (db *DB) findGreaterEqual(key []byte) (*node, []byte, bool) { + prev := db.head.node + level := db.height - 1 + + for { + var nextData []byte + var next *node + addr := prev.nexts[level] + if !addr.isNull() { + arena := db.arena + nextData = arena.getFrom(addr) + next = (*node)(unsafe.Pointer(&nextData[0])) + + nextKey := next.getKey(nextData) + cmp := bytes.Compare(nextKey, key) + if cmp < 0 { + // next key is still smaller, keep moving. + prev = next + continue + } + if cmp == 0 { + // prev.key < key == next.key. + return next, nextData, true + } + } + // next is greater than key or next is nil. go to the lower level. + if level > 0 { + level-- + continue + } + return next, nextData, false + } +} + +func (db *DB) findLess(key []byte, allowEqual bool) (*node, []byte, bool) { + var prevData []byte + prev := db.head.node + level := db.height - 1 + + for { + next, nextData := db.getNext(prev, level) + if next != nil { + cmp := bytes.Compare(key, next.getKey(nextData)) + if cmp > 0 { + // prev.key < next.key < key. We can continue to move right. + prev = next + prevData = nextData + continue + } + if cmp == 0 && allowEqual { + // prev.key < key == next.key. + return next, nextData, true + } + } + // get closer to the key in the lower level. + if level > 0 { + level-- + continue + } + break + } + + // We are not going to return head. + if prev == db.head.node { + return nil, nil, false + } + return prev, prevData, false +} + +// findLast returns the last element. If head (empty db), we return nil. All the find functions +// will NEVER return the head nodes. +func (db *DB) findLast() (*node, []byte) { + var nodeData []byte + node := db.head.node + level := db.height - 1 + + for { + next, nextData := db.getNext(node, level) + if next != nil { + node = next + nodeData = nextData + continue + } + if level == 0 { + if node == db.head.node { + return nil, nil + } + return node, nodeData + } + level-- + } +} + +func (db *DB) newNode(arena *arena, key []byte, v []byte, height int) (*node, arenaAddr) { + // The base level is already allocated in the node struct. + nodeSize := nodeHeaderSize + height*8 + 8 + len(key) + len(v) + addr, data := arena.alloc(nodeSize) + node := (*node)(unsafe.Pointer(&data[0])) + node.keyLen = uint16(len(key)) + node.height = uint16(height) + node.valLen = uint32(len(v)) + copy(data[node.nodeLen():], key) + copy(data[node.nodeLen()+int(node.keyLen):], v) + return node, addr +} + +// fastRand is a fast thread local random function. +//go:linkname fastRand runtime.fastrand +func fastRand() uint32 + +func (db *DB) randomHeight() int { + h := 1 + for h < maxHeight && fastRand() < uint32(math.MaxUint32)/4 { + h++ + } + return h +} + +type nodeHeader struct { + height uint16 + keyLen uint16 + valLen uint32 +} + +type node struct { + nodeHeader + + // Addr of previous node at base level. + prev arenaAddr + // Height of the nexts. + nexts [maxHeight]arenaAddr +} + +type nodeWithAddr struct { + *node + addr arenaAddr +} + +func (n *node) nodeLen() int { + return int(n.height)*8 + 8 + nodeHeaderSize +} + +func (n *node) getKey(buf []byte) []byte { + nodeLen := n.nodeLen() + return buf[nodeLen : nodeLen+int(n.keyLen)] +} + +func (n *node) getValue(buf []byte) []byte { + nodeLenKeyLen := n.nodeLen() + int(n.keyLen) + return buf[nodeLenKeyLen : nodeLenKeyLen+int(n.valLen)] +} + +func (db *DB) getNext(n *node, level int) (*node, []byte) { + addr := n.nexts[level] + if addr.isNull() { + return nil, nil + } + arena := db.arena + data := arena.getFrom(addr) + node := (*node)(unsafe.Pointer(&data[0])) + return node, data +} diff --git a/kv/memdb/memdb.s b/kv/memdb/memdb.s new file mode 100644 index 0000000000000..d57f14436cecb --- /dev/null +++ b/kv/memdb/memdb.s @@ -0,0 +1,12 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. diff --git a/kv/memdb/memdb_test.go b/kv/memdb/memdb_test.go new file mode 100644 index 0000000000000..597674a2e096c --- /dev/null +++ b/kv/memdb/memdb_test.go @@ -0,0 +1,389 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package memdb + +import ( + "encoding/binary" + "math/rand" + "testing" + + . "github.com/pingcap/check" + "github.com/pingcap/goleveldb/leveldb/comparer" + "github.com/pingcap/goleveldb/leveldb/memdb" +) + +const ( + keySize = 16 + valueSize = 128 +) + +func TestT(t *testing.T) { + TestingT(t) +} + +type testMemDBSuite struct{} + +var _ = Suite(testMemDBSuite{}) + +func (s testMemDBSuite) TestGetSet(c *C) { + const cnt = 10000 + p := s.fillDB(cnt) + + var buf [4]byte + for i := 0; i < cnt; i++ { + binary.BigEndian.PutUint32(buf[:], uint32(i)) + v := p.Get(buf[:]) + c.Check(v, BytesEquals, buf[:]) + } +} + +func (s testMemDBSuite) TestIterator(c *C) { + const cnt = 10000 + p := s.fillDB(cnt) + + var buf [4]byte + var i int + it := p.NewIterator() + + for it.SeekToFirst(); it.Valid(); it.Next() { + binary.BigEndian.PutUint32(buf[:], uint32(i)) + c.Check(it.Key(), BytesEquals, buf[:]) + c.Check(it.Value(), BytesEquals, buf[:]) + i++ + } + c.Check(i, Equals, cnt) + + i-- + for it.SeekToLast(); it.Valid(); it.Prev() { + binary.BigEndian.PutUint32(buf[:], uint32(i)) + c.Check(it.Key(), BytesEquals, buf[:]) + c.Check(it.Value(), BytesEquals, buf[:]) + i-- + } + c.Check(i, Equals, -1) +} + +func (s testMemDBSuite) TestOverwrite(c *C) { + const cnt = 10000 + p := s.fillDB(cnt) + var buf [4]byte + + sz := p.Size() + for i := 0; i < cnt; i += 3 { + var newBuf [4]byte + binary.BigEndian.PutUint32(buf[:], uint32(i)) + binary.BigEndian.PutUint32(newBuf[:], uint32(i*10)) + p.Put(buf[:], newBuf[:]) + } + c.Check(p.Len(), Equals, cnt) + c.Check(p.Size(), Equals, sz) + + for i := 0; i < cnt; i++ { + binary.BigEndian.PutUint32(buf[:], uint32(i)) + v := binary.BigEndian.Uint32(p.Get(buf[:])) + if i%3 == 0 { + c.Check(v, Equals, uint32(i*10)) + } else { + c.Check(v, Equals, uint32(i)) + } + } + + it := p.NewIterator() + var i int + + for it.SeekToFirst(); it.Valid(); it.Next() { + binary.BigEndian.PutUint32(buf[:], uint32(i)) + c.Check(it.Key(), BytesEquals, buf[:]) + v := binary.BigEndian.Uint32(it.Value()) + if i%3 == 0 { + c.Check(v, Equals, uint32(i*10)) + } else { + c.Check(v, Equals, uint32(i)) + } + i++ + } + c.Check(i, Equals, cnt) + + i-- + for it.SeekToLast(); it.Valid(); it.Prev() { + binary.BigEndian.PutUint32(buf[:], uint32(i)) + c.Check(it.Key(), BytesEquals, buf[:]) + v := binary.BigEndian.Uint32(it.Value()) + if i%3 == 0 { + c.Check(v, Equals, uint32(i*10)) + } else { + c.Check(v, Equals, uint32(i)) + } + i-- + } + c.Check(i, Equals, -1) +} + +func (s testMemDBSuite) TestDelete(c *C) { + const cnt = 10000 + p := s.fillDB(cnt) + var buf [4]byte + + for i := 0; i < cnt; i += 3 { + binary.BigEndian.PutUint32(buf[:], uint32(i)) + p.Delete(buf[:]) + } + + for i := 0; i < cnt; i++ { + binary.BigEndian.PutUint32(buf[:], uint32(i)) + v := p.Get(buf[:]) + if i%3 == 0 { + c.Check(v, IsNil) + } else { + c.Check(v, BytesEquals, buf[:]) + } + } + + it := p.NewIterator() + var i int + + for it.SeekToFirst(); it.Valid(); it.Next() { + if i%3 == 0 { + i++ + } + binary.BigEndian.PutUint32(buf[:], uint32(i)) + c.Check(it.Key(), BytesEquals, buf[:]) + c.Check(it.Value(), BytesEquals, buf[:]) + i++ + } + + i-- + for it.SeekToLast(); it.Valid(); it.Prev() { + if i%3 == 0 { + i-- + } + binary.BigEndian.PutUint32(buf[:], uint32(i)) + c.Check(it.Key(), BytesEquals, buf[:]) + c.Check(it.Value(), BytesEquals, buf[:]) + i-- + } +} + +func (s testMemDBSuite) TestKVLargeThanBlock(c *C) { + p := New(4 * 1024) + p.Put([]byte{1}, make([]byte, 1)) + p.Put([]byte{2}, make([]byte, 4096)) + c.Check(len(p.arena.blocks), Equals, 2) + p.Put([]byte{3}, make([]byte, 3000)) + c.Check(len(p.arena.blocks), Equals, 2) + c.Check(len(p.Get([]byte{3})), Equals, 3000) +} + +func (s testMemDBSuite) TestEmptyDB(c *C) { + p := New(4 * 1024) + c.Check(p.Get([]byte{0}), IsNil) + c.Check(p.Delete([]byte{0}), IsFalse) + it := p.NewIterator() + it.SeekToFirst() + c.Check(it.Valid(), IsFalse) + it.SeekToLast() + c.Check(it.Valid(), IsFalse) + it.SeekForPrev([]byte{0}) + c.Check(it.Valid(), IsFalse) + it.SeekForExclusivePrev([]byte{0}) + c.Check(it.Valid(), IsFalse) + it.Seek([]byte{0xff}) + c.Check(it.Valid(), IsFalse) +} + +func (s testMemDBSuite) TestReset(c *C) { + p := s.fillDB(10000) + p.Reset() + c.Check(p.Get([]byte{0}), IsNil) + c.Check(p.Delete([]byte{0}), IsFalse) + c.Check(p.Size(), Equals, 0) + c.Check(p.Len(), Equals, 0) + + key := []byte{0} + p.Put(key, key) + c.Check(p.Get(key), BytesEquals, key) + c.Check(p.arena.availIdx, Equals, 0) + + it := p.NewIterator() + it.SeekToFirst() + c.Check(it.Key(), BytesEquals, key) + c.Check(it.Value(), BytesEquals, key) + it.Next() + c.Check(it.Valid(), IsFalse) + + it.SeekToLast() + c.Check(it.Key(), BytesEquals, key) + c.Check(it.Value(), BytesEquals, key) + it.Prev() + c.Check(it.Valid(), IsFalse) +} + +func (s testMemDBSuite) TestRandom(c *C) { + const cnt = 500000 + keys := make([][]byte, cnt) + for i := range keys { + keys[i] = make([]byte, rand.Intn(19)+1) + rand.Read(keys[i]) + } + + p1 := New(4 * 1024) + p2 := memdb.New(comparer.DefaultComparer, 4*1024) + for _, k := range keys { + p1.Put(k, k) + _ = p2.Put(k, k) + } + + c.Check(p1.Len(), Equals, p2.Len()) + c.Check(p1.Size(), Equals, p2.Size()) + + rand.Shuffle(cnt, func(i, j int) { keys[i], keys[j] = keys[j], keys[i] }) + + for _, k := range keys { + switch rand.Intn(4) { + case 0, 1: + newValue := make([]byte, rand.Intn(19)+1) + rand.Read(newValue) + p1.Put(k, newValue) + _ = p2.Put(k, newValue) + case 2: + p1.Delete(k) + _ = p2.Delete(k) + } + } + + c.Check(p1.Len(), Equals, p2.Len()) + c.Check(p1.Size(), Equals, p2.Size()) + + it1 := p1.NewIterator() + it1.SeekToFirst() + + it2 := p2.NewIterator(nil) + + var prevKey, prevVal []byte + for it2.First(); it2.Valid(); it2.Next() { + c.Check(it1.Key(), BytesEquals, it2.Key()) + c.Check(it1.Value(), BytesEquals, it2.Value()) + + it := p1.NewIterator() + it.Seek(it2.Key()) + c.Check(it.Key(), BytesEquals, it2.Key()) + c.Check(it.Value(), BytesEquals, it2.Value()) + + it.SeekForPrev(it2.Key()) + c.Check(it.Key(), BytesEquals, it2.Key()) + c.Check(it.Value(), BytesEquals, it2.Value()) + + if prevKey != nil { + it.SeekForExclusivePrev(it2.Key()) + c.Check(it.Key(), BytesEquals, prevKey) + c.Check(it.Value(), BytesEquals, prevVal) + } + + it1.Next() + prevKey = it2.Key() + prevVal = it2.Value() + } + + it1.SeekToLast() + for it2.Last(); it2.Valid(); it2.Prev() { + c.Check(it1.Key(), BytesEquals, it2.Key()) + c.Check(it1.Value(), BytesEquals, it2.Value()) + it1.Prev() + } +} + +func (s testMemDBSuite) fillDB(cnt int) *DB { + p := New(4 * 1024) + var buf [4]byte + for i := 0; i < cnt; i++ { + binary.BigEndian.PutUint32(buf[:], uint32(i)) + p.Put(buf[:], buf[:]) + } + return p +} + +func BenchmarkLargeIndex(b *testing.B) { + buf := make([][valueSize]byte, 10000000) + for i := range buf { + binary.LittleEndian.PutUint32(buf[i][:], uint32(i)) + } + p := New(4 * 1024 * 1024) + b.ResetTimer() + + for i := range buf { + p.Put(buf[i][:keySize], buf[i][:]) + } +} + +func BenchmarkPut(b *testing.B) { + buf := make([][valueSize]byte, b.N) + for i := range buf { + binary.LittleEndian.PutUint32(buf[i][:], uint32(i)) + } + + p := New(4 * 1024 * 1024) + b.ResetTimer() + + for i := range buf { + p.Put(buf[i][:keySize], buf[i][:]) + } +} + +func BenchmarkPutRandom(b *testing.B) { + buf := make([][valueSize]byte, b.N) + for i := range buf { + binary.LittleEndian.PutUint32(buf[i][:], uint32(rand.Int())) + } + + p := New(4 * 1024 * 1024) + b.ResetTimer() + + for i := range buf { + p.Put(buf[i][:keySize], buf[i][:]) + } +} + +func BenchmarkGet(b *testing.B) { + buf := make([][valueSize]byte, b.N) + for i := range buf { + binary.LittleEndian.PutUint32(buf[i][:], uint32(i)) + } + + p := New(4 * 1024 * 1024) + for i := range buf { + p.Put(buf[i][:keySize], buf[i][:]) + } + + b.ResetTimer() + for i := range buf { + p.Get(buf[i][:]) + } +} + +func BenchmarkGetRandom(b *testing.B) { + buf := make([][valueSize]byte, b.N) + for i := range buf { + binary.LittleEndian.PutUint32(buf[i][:], uint32(i)) + } + + p := New(4 * 1024 * 1024) + for i := range buf { + p.Put(buf[i][:keySize], buf[i][:]) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + p.Get(buf[rand.Int()%b.N][:]) + } +} diff --git a/kv/memdb_buffer.go b/kv/memdb_buffer.go index 953b7933c9847..b3cabb0598de4 100644 --- a/kv/memdb_buffer.go +++ b/kv/memdb_buffer.go @@ -16,16 +16,12 @@ package kv import ( + "bytes" "context" "sync/atomic" "github.com/pingcap/errors" - "github.com/pingcap/goleveldb/leveldb" - "github.com/pingcap/goleveldb/leveldb/comparer" - "github.com/pingcap/goleveldb/leveldb/iterator" - "github.com/pingcap/goleveldb/leveldb/memdb" - "github.com/pingcap/goleveldb/leveldb/util" - "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/kv/memdb" ) // memDbBuffer implements the MemBuffer interface. @@ -37,14 +33,16 @@ type memDbBuffer struct { } type memDbIter struct { - iter iterator.Iterator + iter memdb.Iterator + start []byte + end []byte reverse bool } // NewMemDbBuffer creates a new memDbBuffer. -func NewMemDbBuffer(cap int) MemBuffer { +func NewMemDbBuffer(initBlockSize int) MemBuffer { return &memDbBuffer{ - db: memdb.New(comparer.DefaultComparer, cap), + db: memdb.New(initBlockSize), entrySizeLimit: TxnEntrySizeLimit, bufferSizeLimit: atomic.LoadUint64(&TxnTotalSizeLimit), } @@ -52,11 +50,17 @@ func NewMemDbBuffer(cap int) MemBuffer { // Iter creates an Iterator. func (m *memDbBuffer) Iter(k Key, upperBound Key) (Iterator, error) { - i := &memDbIter{iter: m.db.NewIterator(&util.Range{Start: []byte(k), Limit: []byte(upperBound)}), reverse: false} + i := &memDbIter{ + iter: m.db.NewIterator(), + start: k, + end: upperBound, + reverse: false, + } - err := i.Next() - if err != nil { - return nil, err + if k == nil { + i.iter.SeekToFirst() + } else { + i.iter.Seek(k) } return i, nil } @@ -66,20 +70,23 @@ func (m *memDbBuffer) SetCap(cap int) { } func (m *memDbBuffer) IterReverse(k Key) (Iterator, error) { - var i *memDbIter + i := &memDbIter{ + iter: m.db.NewIterator(), + end: k, + reverse: true, + } if k == nil { - i = &memDbIter{iter: m.db.NewIterator(&util.Range{}), reverse: true} + i.iter.SeekToLast() } else { - i = &memDbIter{iter: m.db.NewIterator(&util.Range{Limit: []byte(k)}), reverse: true} + i.iter.SeekForExclusivePrev(k) } - i.iter.Last() return i, nil } // Get returns the value associated with key. func (m *memDbBuffer) Get(ctx context.Context, k Key) ([]byte, error) { - v, err := m.db.Get(k) - if terror.ErrorEqual(err, leveldb.ErrNotFound) { + v := m.db.Get(k) + if v == nil { return nil, ErrNotExist } return v, nil @@ -94,17 +101,17 @@ func (m *memDbBuffer) Set(k Key, v []byte) error { return ErrEntryTooLarge.GenWithStackByArgs(m.entrySizeLimit, len(k)+len(v)) } - err := m.db.Put(k, v) + m.db.Put(k, v) if m.Size() > int(m.bufferSizeLimit) { return ErrTxnTooLarge.GenWithStack("transaction too large, size:%d", m.Size()) } - return errors.Trace(err) + return nil } // Delete removes the entry from buffer with provided key. func (m *memDbBuffer) Delete(k Key) error { - err := m.db.Put(k, nil) - return errors.Trace(err) + m.db.Put(k, nil) + return nil } // Size returns sum of keys and values length. @@ -134,6 +141,9 @@ func (i *memDbIter) Next() error { // Valid implements the Iterator Valid. func (i *memDbIter) Valid() bool { + if !i.reverse { + return i.iter.Valid() && (i.end == nil || bytes.Compare(i.Key(), i.end) < 0) + } return i.iter.Valid() } @@ -149,7 +159,7 @@ func (i *memDbIter) Value() []byte { // Close Implements the Iterator Close. func (i *memDbIter) Close() { - i.iter.Release() + } // WalkMemBuffer iterates all buffered kv pairs in memBuf