-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit dc70f49
Showing
11 changed files
with
736 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
# simpledb | ||
Implement a simple key-value db step by step, and just for fun. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
package simpledb | ||
|
||
import ( | ||
"encoding/binary" | ||
) | ||
|
||
const ( | ||
slotsCountPerBucket = 31 | ||
bucketSize = 512 // approximately equals 31*(4+2+2+4+4)+1+8=505 | ||
) | ||
|
||
// slot corresponds to a single item in the hash table. | ||
type slot struct { | ||
hash uint32 | ||
segmentID uint16 | ||
keySize uint16 | ||
valueSize uint32 | ||
offset uint32 // Segment offset. | ||
} | ||
|
||
type bucket struct { | ||
slots [slotsCountPerBucket]slot | ||
next int64 // Offset of overflow bucket | ||
} | ||
|
||
// bucketHandle is used to link the bucket in disk with bucket in memory | ||
type bucketHandle struct { | ||
*bucket | ||
*MmapFile | ||
offset int64 | ||
} | ||
|
||
func (b *bucket) MarshalBinary() ([]byte, error) { | ||
buf := make([]byte, bucketSize) | ||
data := buf | ||
for i := 0; i < slotsCountPerBucket; i++ { | ||
sl := b.slots[i] | ||
binary.LittleEndian.PutUint32(buf[:4], sl.hash) | ||
binary.LittleEndian.PutUint16(buf[4:6], sl.segmentID) | ||
binary.LittleEndian.PutUint16(buf[6:8], sl.keySize) | ||
binary.LittleEndian.PutUint32(buf[8:12], sl.valueSize) | ||
binary.LittleEndian.PutUint32(buf[12:16], sl.offset) | ||
buf = buf[16:] | ||
} | ||
binary.LittleEndian.PutUint64(buf[:8], uint64(b.next)) | ||
return data, nil | ||
} | ||
|
||
func (b *bucket) UnmarshalBinary(data []byte) error { | ||
for i := 0; i < slotsCountPerBucket; i++ { | ||
_ = data[16] // bounds check hint to compiler; see golang.org/issue/14808 | ||
b.slots[i].hash = binary.LittleEndian.Uint32(data[:4]) | ||
b.slots[i].segmentID = binary.LittleEndian.Uint16(data[4:6]) | ||
b.slots[i].keySize = binary.LittleEndian.Uint16(data[6:8]) | ||
b.slots[i].valueSize = binary.LittleEndian.Uint32(data[8:12]) | ||
b.slots[i].offset = binary.LittleEndian.Uint32(data[12:16]) | ||
data = data[16:] | ||
} | ||
b.next = int64(binary.LittleEndian.Uint64(data[:8])) | ||
return nil | ||
} | ||
|
||
// read a bucket data from disk to memory | ||
func (b *bucketHandle) read() error { | ||
buf, err := b.MmapFile.ReadRandom(b.offset, bucketSize) | ||
if err != nil { | ||
return err | ||
} | ||
if buf == nil { | ||
return nil | ||
} | ||
return b.UnmarshalBinary(buf) | ||
} | ||
|
||
// write a bucket data from memory to disk | ||
func (b *bucketHandle) write() error { | ||
buf, err := b.MarshalBinary() | ||
if err != nil { | ||
return err | ||
} | ||
b.MmapFile.WriteAt(b.offset, buf) | ||
return nil | ||
} | ||
|
||
// Iterate the slots in a bucket for a matched hash. It will stop at | ||
// 1) the position of a matched slot (found); 2) the next position of the last slot (not found) | ||
func (b * bucket) iterateSlots(hash uint32) (*slot, int) { | ||
i := 0 | ||
for ; i< slotsCountPerBucket; i++ { | ||
if b.slots[i].hash == hash { // found | ||
return &b.slots[i], i | ||
} else if b.slots[i].offset == 0 { | ||
break | ||
} | ||
} | ||
return nil, i | ||
} | ||
|
||
func (sl *slot) kvSize() uint32 { | ||
return uint32(sl.keySize) + sl.valueSize | ||
} | ||
|
||
func (b *bucket) insert(sl *slot, pos int) { | ||
b.slots[pos] = *sl | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
package simpledb | ||
|
||
import ( | ||
"math" | ||
) | ||
|
||
const maxSegments = math.MaxInt16 | ||
|
||
type datalog struct { | ||
curSeg *segment | ||
segments [maxSegments]*segment | ||
} | ||
|
||
func (dl *datalog) readKeyValue(sl *slot) ([]byte, []byte, error) { | ||
seg := dl.segments[sl.segmentID] | ||
off := sl.offset | ||
keyStart := off+2 // 2 is the length of keySize | ||
key, err := seg.Read(int64(keyStart), int64(sl.keySize)) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
valueStart := keyStart+uint32(sl.keySize)+4 // 4 is the length of valueSize | ||
value, err := seg.Read(int64(valueStart), int64(sl.valueSize)) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
return key, value, nil | ||
} | ||
|
||
// If the current segment is full, create a new segment and set it as the current segment | ||
func (dl * datalog) swapSegment() error { | ||
curSegId := dl.curSeg.id | ||
newSegId := curSegId + 1 | ||
newMmapFile, err := OpenMmapFile(segmentName(newSegId), 1) | ||
if err != nil { | ||
return err | ||
} | ||
dl.segments[newSegId] = &segment{ | ||
MmapFile: newMmapFile, | ||
id: newSegId, | ||
} | ||
dl.curSeg = dl.segments[newSegId] | ||
return nil | ||
} | ||
|
||
// @return: segmentID, offset | ||
func (dl *datalog) writeRecord(data []byte) (uint16, int64, error) { | ||
curSize := int(dl.curSeg.FileSize()) | ||
if curSize + len(data) > maxSegmentSize { | ||
dl.curSeg.full = true | ||
} | ||
if dl.curSeg.full { | ||
err := dl.swapSegment() | ||
if err!=nil { | ||
return 0, -1, err | ||
} | ||
} | ||
|
||
newFileSize := dl.curSeg.FileSize() + int64(len(data)) | ||
err := dl.curSeg.MmapFile.Grow(newFileSize) | ||
if err != nil { | ||
return 0, -1, err | ||
} | ||
|
||
dl.curSeg.Append(data) | ||
|
||
return dl.curSeg.id, int64(curSize), nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
package simpledb | ||
|
||
import ( | ||
"bytes" | ||
"errors" | ||
"sync" | ||
) | ||
|
||
type DB struct { | ||
mu sync.RWMutex | ||
index *index | ||
datalog *datalog | ||
} | ||
|
||
// Get returns the value for the given key stored in the DB or nil if the key doesn't exist. | ||
func (db *DB) Get(key []byte) ([]byte, error) { | ||
hash := hash(key) | ||
slot, err := db.index.get(hash) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if slot == nil { | ||
return nil, nil | ||
} | ||
keyRead, value, err := db.datalog.readKeyValue(slot) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if bytes.Equal(key, keyRead) { | ||
return value, nil | ||
} else { | ||
return nil, errors.New("key stored in segment is not consistent "+ | ||
"with the index") | ||
} | ||
} | ||
|
||
// Put sets the value for the given key. It updates the value for the existing key. | ||
func (db *DB) Put(key []byte, value []byte) error { | ||
hash := hash(key) | ||
|
||
// write the record to the segment | ||
record := record{ | ||
key: key, | ||
value: value, | ||
} | ||
segmentID, offset, err := db.datalog.writeRecord(record.encode()) | ||
|
||
if err != nil { | ||
return err | ||
} | ||
|
||
// update the index | ||
slot := &slot{ | ||
hash: hash, | ||
segmentID: segmentID, | ||
keySize: uint16(len(key)), | ||
valueSize: uint32(len(value)), | ||
offset: uint32(offset), | ||
} | ||
db.index.put(slot) | ||
|
||
return nil | ||
} | ||
|
||
// Delete deletes the given key from the DB. | ||
func (db *DB) Delete(key []byte) error { | ||
return nil | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
package simpledb | ||
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
"testing" | ||
"time" | ||
) | ||
|
||
// initialize DB | ||
func initializeDB() (*DB, error) { | ||
newMmapFile, err := OpenMmapFile(segmentName(0), 1) | ||
if err != nil { | ||
return nil, err | ||
} | ||
indexMmapFile, err := OpenMmapFile(indexName, 512 * (1 << 10)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
db := &DB{ | ||
index: &index{ | ||
MmapFile: indexMmapFile, | ||
numBucket: BucketLen, | ||
}, | ||
datalog: &datalog{ | ||
curSeg: &segment{ | ||
MmapFile: newMmapFile, | ||
id: 0, | ||
}, | ||
}, | ||
} | ||
|
||
db.datalog.segments[0] = db.datalog.curSeg | ||
|
||
return db, nil | ||
} | ||
|
||
// Test if the query results matches the data inserted | ||
func TestSimple(t *testing.T) { | ||
// initialize DB | ||
db, err := initializeDB() | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
db.Put([]byte("1"), []byte("simple1")) | ||
db.Put([]byte("2"), []byte("这是simple2")) | ||
db.Put([]byte("3"), []byte("玩的愉快")) | ||
|
||
getFirst, err := db.Get([]byte("1")) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
if !bytes.Equal(getFirst, []byte("simple1")) { | ||
t.Fatal("Test for '1'") | ||
} | ||
|
||
getSecond, _ := db.Get([]byte("2")) | ||
if !bytes.Equal(getSecond, []byte("这是simple2")) { | ||
t.Fatal("Test for '2'") | ||
} | ||
|
||
getThird, _ := db.Get([]byte("3")) | ||
if !bytes.Equal(getThird, []byte("玩的愉快")) { | ||
t.Fatal("Test for '3'") | ||
} | ||
|
||
} | ||
|
||
// Test if the query for an inexistent data works as expected | ||
func TestInexistence(t *testing.T) { | ||
// initialize DB | ||
db, err := initializeDB() | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
getData, _ := db.Get([]byte("0")) | ||
if getData != nil { | ||
t.Fatal("get an inexistent value, but it exists") | ||
} | ||
|
||
} | ||
|
||
// Test if the segments and buckets are extended correctly, | ||
// when the data is massive | ||
// Watch if the 'Count of buckets' and 'Count of segments' increase as expected | ||
func TestMassiveData(t *testing.T) { | ||
// initialize DB | ||
db, err := initializeDB() | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
// suffix is used to enlarge the value, | ||
// so as to test the creation of new segment more quickly | ||
suffix := "" | ||
for i:=0; i<10; i++ { | ||
suffix += "fdhsjkhfjkshfjksahjkfshajkfhaskjfhkjsafjksahjkfajkfjnvnnasm" | ||
} | ||
|
||
for j:=0; j<1000; j++ { | ||
for i := 0; i < 5000; i++ { | ||
key := []byte(string(j*5000+i)) | ||
value := []byte(string(j*5000+i)+suffix) | ||
db.Put(key, value) | ||
} | ||
|
||
fmt.Printf("Count of buckets: %d\n", db.index.numBucket) | ||
fmt.Printf("Count of segments: %d\n", db.datalog.curSeg.id+1) | ||
fmt.Printf("Size of curSeg: %d\n\n", db.datalog.curSeg.FileSize()) | ||
|
||
} | ||
|
||
start := time.Now() | ||
getData, _ := db.Get([]byte(string(10000))) | ||
fmt.Printf("Elapsed time: %v\n", time.Now().Sub(start).String()) | ||
if !bytes.Equal(getData, []byte(string(10000)+suffix)) { | ||
t.Fatal("Test for '10000'") | ||
} | ||
|
||
} | ||
|
||
// Test if the data is updated correctly | ||
func TestUpdateData(t *testing.T) { | ||
// initialize DB | ||
db, err := initializeDB() | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
db.Put([]byte("1"), []byte("祝你早安")) | ||
db.Put([]byte("1"), []byte("祝你晚安")) | ||
|
||
getFirst, _ := db.Get([]byte("1")) | ||
if bytes.Equal(getFirst, []byte("祝你早安")) { | ||
t.Fatal("Update data") | ||
} | ||
if !bytes.Equal(getFirst, []byte("祝你晚安")) { | ||
t.Fatal("Update data") | ||
} | ||
|
||
} |
Oops, something went wrong.