Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
add unit tests and reslove comments
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv committed Oct 30, 2020
1 parent 09d958f commit fcd453d
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 42 deletions.
103 changes: 61 additions & 42 deletions lightning/backend/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,9 @@ import (
"sync/atomic"
"time"

"github.com/google/btree"

"github.com/pingcap/tidb/util/hack"

"github.com/cockroachdb/pebble"
"github.com/coreos/go-semver/semver"
"github.com/google/btree"
split "github.com/pingcap/br/pkg/restore"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -45,6 +42,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/hack"
uuid "github.com/satori/go.uuid"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand Down Expand Up @@ -125,7 +123,7 @@ func (e *LocalFile) Cleanup(dataDir string) error {
func (e *LocalFile) getSizeProperties() (*sizeProperties, error) {
sstables, err := e.db.SSTables(pebble.WithProperties())
if err != nil {
log.L().Warn("get table properties failed", zap.Stringer("engine", e.Uuid), zap.Error(err))
log.L().Warn("get table properties failed", zap.Stringer("engine", e.Uuid), log.ShortError(err))
return nil, errors.Trace(err)
}

Expand All @@ -137,21 +135,11 @@ func (e *LocalFile) getSizeProperties() (*sizeProperties, error) {
rangeProps, err := decodeRangeProperties(data)
if err != nil {
log.L().Warn("decodeRangeProperties failed", zap.Stringer("engine", e.Uuid),
zap.Stringer("fileNum", info.FileNum), zap.Error(err))
zap.Stringer("fileNum", info.FileNum), log.ShortError(err))
return nil, errors.Trace(err)
}

prevRange := rangeOffsets{}
for _, r := range rangeProps {
sizeProps.add(&rangeProperty{
Key: r.Key,
rangeOffsets: rangeOffsets{Keys: r.Keys - prevRange.Keys, Size: r.Size - prevRange.Size},
})
prevRange = r.rangeOffsets
}
if len(rangeProps) > 0 {
sizeProps.totalSize = rangeProps[len(rangeProps)-1].Size
}
sizeProps.addAll(rangeProps)
}
}
}
Expand Down Expand Up @@ -695,6 +683,31 @@ func (local *local) Ingest(ctx context.Context, meta *sst.SSTMeta, region *split
return resp, nil
}

func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit int64, keysLimit int64) []Range {
ranges := make([]Range, 0, sizeProps.totalSize/uint64(sizeLimit))
curSize := uint64(0)
curKeys := uint64(0)
curKey := fullRange.start
sizeProps.iter(func(p *rangeProperty) bool {
curSize += p.Size
curKeys += p.Keys
if int64(curSize) >= sizeLimit || int64(curKeys) >= keysLimit {
ranges = append(ranges, Range{start: curKey, end: p.Key})
curKey = p.Key
curSize = 0
curKeys = 0
}
return true
})

if curKeys > 0 {
ranges = append(ranges, Range{start: curKey, end: fullRange.end})
} else {
ranges[len(ranges)-1].end = fullRange.end
}
return ranges
}

func (local *local) readAndSplitIntoRange(engineFile *LocalFile) ([]Range, error) {
iter := engineFile.db.NewIter(nil)
defer iter.Close()
Expand Down Expand Up @@ -723,27 +736,8 @@ func (local *local) readAndSplitIntoRange(engineFile *LocalFile) ([]Range, error
return nil, errors.Trace(err)
}

ranges := make([]Range, 0, engineFile.TotalSize/local.regionSplitSize)
curSize := uint64(0)
curKeys := uint64(0)
curKey := firstKey
sizeProps.iter(func(p *rangeProperty) bool {
curSize += p.Size
curKeys += p.Keys
if int64(curSize) >= local.regionSplitSize || curKeys >= regionMaxKeyCount*2/3 {
ranges = append(ranges, Range{start: curKey, end: p.Key})
curKey = p.Key
curSize = 0
curKeys = 0
}
return true
})

if curKeys > 0 {
ranges = append(ranges, Range{start: curKey, end: endKey})
} else {
ranges[len(ranges)-1].end = endKey
}
ranges := splitRangeBySizeProps(Range{start: firstKey, end: endKey}, sizeProps,
local.regionSplitSize, regionMaxKeyCount*2/3)

log.L().Info("split engine key ranges", zap.Stringer("engine", engineFile.Uuid),
zap.Int64("totalSize", engineFile.TotalSize), zap.Int64("totalCount", engineFile.Length),
Expand Down Expand Up @@ -1108,7 +1102,7 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID) erro

lf := engineFile.(*LocalFile)
if lf.TotalSize == 0 {
log.L().Info("engine contains not kv, skip import", zap.Stringer("engine", engineUUID))
log.L().Info("engine contains no kv, skip import", zap.Stringer("engine", engineUUID))
return nil
}

Expand Down Expand Up @@ -1397,6 +1391,13 @@ func (r rangeProperties) Encode() []byte {
return b
}

func (r rangeProperties) get(key []byte) rangeOffsets {
idx := sort.Search(len(r), func(i int) bool {
return bytes.Compare(r[i].Key, key) >= 0
})
return r[idx].rangeOffsets
}

type RangePropertiesCollector struct {
props rangeProperties
lastOffsets rangeOffsets
Expand Down Expand Up @@ -1445,7 +1446,7 @@ func (c *RangePropertiesCollector) Finish(userProps map[string]string) error {
c.insertNewPoint(c.lastKey)
}

userProps[PROP_RANGE_INDEX] = string(hack.String(c.props.Encode()))
userProps[PROP_RANGE_INDEX] = string(c.props.Encode())
return nil
}

Expand All @@ -1464,10 +1465,28 @@ func newSizeProperties() *sizeProperties {
}

func (s *sizeProperties) add(item *rangeProperty) {
s.indexHandles.ReplaceOrInsert(item)
if old := s.indexHandles.ReplaceOrInsert(item); old != nil {
o := old.(*rangeProperty)
item.Keys += o.Keys
item.Size += o.Size
}
}

func (s *sizeProperties) addAll(props rangeProperties) {
prevRange := rangeOffsets{}
for _, r := range props {
s.add(&rangeProperty{
Key: r.Key,
rangeOffsets: rangeOffsets{Keys: r.Keys - prevRange.Keys, Size: r.Size - prevRange.Size},
})
prevRange = r.rangeOffsets
}
if len(props) > 0 {
s.totalSize = props[len(props)-1].Size
}
}

// iter the tree unit f return false
// iter the tree until f return false
func (s *sizeProperties) iter(f func(p *rangeProperty) bool) {
s.indexHandles.Ascend(func(i btree.Item) bool {
prop := i.(*rangeProperty)
Expand Down
126 changes: 126 additions & 0 deletions lightning/backend/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package backend
import (
"bytes"

"github.com/cockroachdb/pebble"
. "github.com/pingcap/check"
"github.com/pingcap/tidb/util/hack"
)

type localSuite struct{}
Expand Down Expand Up @@ -31,3 +33,127 @@ func (s *localSuite) TestNextKey(c *C) {
next = nextKey([]byte{1, 255})
c.Assert(bytes.Compare(next, []byte{1, 255, 0, 1, 2}), Equals, -1)
}

func (s *localSuite) TestRangeProperties(c *C) {
type testCase struct {
key []byte
vLen int
count int
}
cases := []testCase{
// handle "a": size(size = 1, offset = 1),keys(1,1)
{[]byte("a"), 0, 1},
{[]byte("b"), defaultPropSizeIndexDistance / 8, 1},
{[]byte("c"), defaultPropSizeIndexDistance / 4, 1},
{[]byte("d"), defaultPropSizeIndexDistance / 2, 1},
{[]byte("e"), defaultPropSizeIndexDistance / 8, 1},
// handle "e": size(size = DISTANCE + 4, offset = DISTANCE + 5),keys(4,5)
{[]byte("f"), defaultPropSizeIndexDistance / 4, 1},
{[]byte("g"), defaultPropSizeIndexDistance / 2, 1},
{[]byte("h"), defaultPropSizeIndexDistance / 8, 1},
{[]byte("i"), defaultPropSizeIndexDistance / 4, 1},
// handle "i": size(size = DISTANCE / 8 * 9 + 4, offset = DISTANCE / 8 * 17 + 9),keys(4,5)
{[]byte("j"), defaultPropSizeIndexDistance / 2, 1},
{[]byte("k"), defaultPropSizeIndexDistance / 2, 1},
// handle "k": size(size = DISTANCE + 2, offset = DISTANCE / 8 * 25 + 11),keys(2,11)
{[]byte("l"), 0, defaultPropKeysIndexDistance / 2},
{[]byte("m"), 0, defaultPropKeysIndexDistance / 2},
//handle "m": keys = DEFAULT_PROP_KEYS_INDEX_DISTANCE,offset = 11+DEFAULT_PROP_KEYS_INDEX_DISTANCE
{[]byte("n"), 1, defaultPropKeysIndexDistance},
//handle "n": keys = DEFAULT_PROP_KEYS_INDEX_DISTANCE, offset = 11+2*DEFAULT_PROP_KEYS_INDEX_DISTANCE
{[]byte("o"), 1, 1},
// handle "o": keys = 1, offset = 12 + 2*DEFAULT_PROP_KEYS_INDEX_DISTANCE
}

collector := newRangePropertiesCollector()
for _, p := range cases {
v := make([]byte, p.vLen)
for i := 0; i < p.count; i++ {
_ = collector.Add(pebble.InternalKey{UserKey: p.key}, v)
}
}

userProperties := make(map[string]string, 1)
_ = collector.Finish(userProperties)

props, err := decodeRangeProperties(hack.Slice(userProperties[PROP_RANGE_INDEX]))
c.Assert(err, IsNil)

c.Assert(props[0].Key, DeepEquals, cases[0].key)
c.Assert(props[len(props)-1].Key, DeepEquals, cases[len(cases)-1].key)
c.Assert(len(props), Equals, 7)

a := props.get([]byte("a"))
c.Assert(a.Size, Equals, uint64(1))
e := props.get([]byte("e"))
c.Assert(e.Size, Equals, uint64(defaultPropSizeIndexDistance+5))
i := props.get([]byte("i"))
c.Assert(i.Size, Equals, uint64(defaultPropSizeIndexDistance/8*17+9))
k := props.get([]byte("k"))
c.Assert(k.Size, Equals, uint64(defaultPropSizeIndexDistance/8*25+11))
m := props.get([]byte("m"))
c.Assert(m.Keys, Equals, uint64(defaultPropKeysIndexDistance+11))
n := props.get([]byte("n"))
c.Assert(n.Keys, Equals, uint64(defaultPropKeysIndexDistance*2+11))
o := props.get([]byte("o"))
c.Assert(o.Keys, Equals, uint64(defaultPropKeysIndexDistance*2+12))

props2 := rangeProperties([]rangeProperty{
{[]byte("b"), rangeOffsets{defaultPropSizeIndexDistance + 10, defaultPropKeysIndexDistance / 2}},
{[]byte("h"), rangeOffsets{defaultPropSizeIndexDistance * 3 / 2, defaultPropKeysIndexDistance * 3 / 2}},
{[]byte("k"), rangeOffsets{defaultPropSizeIndexDistance * 3, defaultPropKeysIndexDistance * 7 / 4}},
{[]byte("mm"), rangeOffsets{defaultPropSizeIndexDistance * 5, defaultPropKeysIndexDistance * 2}},
{[]byte("q"), rangeOffsets{defaultPropSizeIndexDistance * 7, defaultPropKeysIndexDistance*9/4 + 10}},
{[]byte("y"), rangeOffsets{defaultPropSizeIndexDistance*7 + 100, defaultPropKeysIndexDistance*9/4 + 1010}},
})

sizeProps := newSizeProperties()
sizeProps.addAll(props)
sizeProps.addAll(props2)

res := []*rangeProperty{
{[]byte("a"), rangeOffsets{1, 1}},
{[]byte("b"), rangeOffsets{defaultPropSizeIndexDistance + 10, defaultPropKeysIndexDistance / 2}},
{[]byte("e"), rangeOffsets{defaultPropSizeIndexDistance + 4, 4}},
{[]byte("h"), rangeOffsets{defaultPropSizeIndexDistance/2 - 10, defaultPropKeysIndexDistance}},
{[]byte("i"), rangeOffsets{defaultPropSizeIndexDistance*9/8 + 4, 4}},
{[]byte("k"), rangeOffsets{defaultPropSizeIndexDistance*5/2 + 2, defaultPropKeysIndexDistance/4 + 2}},
{[]byte("m"), rangeOffsets{defaultPropKeysIndexDistance, defaultPropKeysIndexDistance}},
{[]byte("mm"), rangeOffsets{defaultPropSizeIndexDistance * 2, defaultPropKeysIndexDistance / 4}},
{[]byte("n"), rangeOffsets{defaultPropKeysIndexDistance * 2, defaultPropKeysIndexDistance}},
{[]byte("o"), rangeOffsets{2, 1}},
{[]byte("q"), rangeOffsets{defaultPropSizeIndexDistance * 2, defaultPropKeysIndexDistance/4 + 10}},
{[]byte("y"), rangeOffsets{100, 1000}},
}

c.Assert(sizeProps.indexHandles.Len(), Equals, 12)
idx := 0
sizeProps.iter(func(p *rangeProperty) bool {
c.Assert(p, DeepEquals, res[idx])
idx++
return true
})

fullRange := Range{start: []byte("a"), end: []byte("z")}
ranges := splitRangeBySizeProps(fullRange, sizeProps, 2*defaultPropSizeIndexDistance, defaultPropKeysIndexDistance*5/2)

c.Assert(ranges, DeepEquals, []Range{
{start: []byte("a"), end: []byte("e")},
{start: []byte("e"), end: []byte("k")},
{start: []byte("k"), end: []byte("mm")},
{start: []byte("mm"), end: []byte("q")},
{start: []byte("q"), end: []byte("z")},
})

ranges = splitRangeBySizeProps(fullRange, sizeProps, 2*defaultPropSizeIndexDistance, defaultPropKeysIndexDistance)
c.Assert(ranges, DeepEquals, []Range{
{start: []byte("a"), end: []byte("e")},
{start: []byte("e"), end: []byte("h")},
{start: []byte("h"), end: []byte("k")},
{start: []byte("k"), end: []byte("m")},
{start: []byte("m"), end: []byte("mm")},
{start: []byte("mm"), end: []byte("n")},
{start: []byte("n"), end: []byte("q")},
{start: []byte("q"), end: []byte("z")},
})
}

0 comments on commit fcd453d

Please sign in to comment.