Skip to content

Commit

Permalink
stats: refine updating stats using feedback (#6796)
Browse files Browse the repository at this point in the history
  • Loading branch information
alivxxx authored Jul 3, 2018
1 parent f6ee36e commit c9cea72
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 88 deletions.
176 changes: 103 additions & 73 deletions statistics/feedback.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ type BucketFeedback struct {
feedback []feedback // All the feedback info in the same bucket.
lower *types.Datum // The lower bound of the new bucket.
upper *types.Datum // The upper bound of the new bucket.
scalar scalar // The scalar info for the boundary.
}

// buildBucketFeedback build the feedback for each bucket from the histogram feedback.
Expand Down Expand Up @@ -300,65 +299,31 @@ func (b *BucketFeedback) getBoundaries(num int) []types.Datum {
return vals[:total]
}

// There are only two types of datum in bucket: one is `Blob`, which is for index; the other one
// is `Int`, which is for primary key.
type bucket = feedback

// Get the fraction of the [lowerVal, upperVal] that intersect with the bucket boundary.
func (b *BucketFeedback) getFraction(lowerVal, upperVal *types.Datum) float64 {
var lower, upper float64
if b.lower.Kind() == types.KindBytes {
value := lowerVal.GetBytes()
lower = convertBytesToScalar(value[b.scalar.commonPfxLen:])
value = upperVal.GetBytes()
upper = convertBytesToScalar(value[b.scalar.commonPfxLen:])
} else {
lower = float64(lowerVal.GetInt64())
upper = float64(upperVal.GetInt64())
}
return calcFraction(b.scalar.lower, b.scalar.upper, upper) - calcFraction(b.scalar.lower, b.scalar.upper, lower)
}

func (b *BucketFeedback) getBucketCount(count float64) int64 {
// Get the scalar info for boundary.
prefixLen := commonPrefixLength(b.lower.GetBytes(), b.upper.GetBytes())
if b.lower.Kind() == types.KindBytes {
b.scalar.commonPfxLen = commonPrefixLength(b.lower.GetBytes(), b.upper.GetBytes())
b.scalar.lower = convertBytesToScalar(b.lower.GetBytes()[prefixLen:])
b.scalar.upper = convertBytesToScalar(b.upper.GetBytes()[prefixLen:])
} else {
b.scalar.lower = float64(b.lower.GetInt64())
b.scalar.upper = float64(b.upper.GetInt64())
}
// Use the feedback that covers most to update this bucket's count. We only consider feedback that covers at
// least minBucketFraction.
maxFraction := minBucketFraction
for _, fb := range b.feedback {
fraction := b.getFraction(fb.lower, fb.upper)
if fraction >= maxFraction {
maxFraction = fraction
count = float64(fb.count) / fraction
}
}
return int64(count)
}

// updateBucket split the bucket according to feedback.
func (b *BucketFeedback) splitBucket(newBktNum int, totalCount float64, count float64) []bucket {
// do not split if the count is already too small.
if newBktNum <= 1 || count < minBucketFraction*totalCount {
bkt := bucket{lower: b.lower, upper: b.upper, count: int64(count)}
return []bucket{bkt}
}
// splitBucket firstly splits this "BucketFeedback" to "newNumBkts" new buckets,
// calculates the count for each new bucket, merge the new bucket whose count
// is smaller than "minBucketFraction*totalCount" with the next new bucket
// until the last new bucket.
func (b *BucketFeedback) splitBucket(newNumBkts int, totalCount float64, originBucketCount float64) []bucket {
// Split the bucket.
bounds := b.getBoundaries(newBktNum)
bounds := b.getBoundaries(newNumBkts + 1)
bkts := make([]bucket, 0, len(bounds)-1)
for i := 1; i < len(bounds); i++ {
newCount := int64(count * b.getFraction(&bounds[i-1], &bounds[i]))
newBkt := bucket{&bounds[i-1], bounds[i].Copy(), 0, 0}
// get bucket count
_, ratio := getOverlapFraction(feedback{b.lower, b.upper, int64(originBucketCount), 0}, newBkt)
countInNewBkt := originBucketCount * ratio
countInNewBkt = b.refineBucketCount(newBkt, countInNewBkt)
// do not split if the count of result bucket is too small.
if float64(newCount) < minBucketFraction*totalCount {
if countInNewBkt < minBucketFraction*totalCount {
bounds[i] = bounds[i-1]
continue
}
bkts = append(bkts, bucket{lower: &bounds[i-1], upper: bounds[i].Copy(), count: newCount, repeat: 0})
newBkt.count = int64(countInNewBkt)
bkts = append(bkts, newBkt)
// To guarantee that each bucket's range will not overlap.
if bounds[i].Kind() == types.KindBytes {
bounds[i].SetBytes(kv.Key(bounds[i].GetBytes()).PrefixNext())
Expand All @@ -371,11 +336,82 @@ func (b *BucketFeedback) splitBucket(newBktNum int, totalCount float64, count fl
return bkts
}

// Get the split count for the histogram.
func getSplitCount(count, remainBuckets int) int {
remainBuckets = mathutil.Max(remainBuckets, 10)
func getFraction4PK(minValue, maxValue, lower, upper *types.Datum) (float64, float64) {
if minValue.Kind() == types.KindInt64 {
l, r := float64(minValue.GetInt64()), float64(maxValue.GetInt64())
return calcFraction(l, r, float64(lower.GetInt64())), calcFraction(l, r, float64(upper.GetInt64()))
}
l, r := float64(minValue.GetUint64()), float64(maxValue.GetUint64())
return calcFraction(l, r, float64(lower.GetUint64())), calcFraction(l, r, float64(upper.GetUint64()))
}

func getFraction4Index(minValue, maxValue, lower, upper *types.Datum, prefixLen int) (float64, float64) {
l, r := convertBytesToScalar(minValue.GetBytes()[prefixLen:]), convertBytesToScalar(maxValue.GetBytes()[prefixLen:])
return calcFraction(l, r, convertBytesToScalar(lower.GetBytes()[prefixLen:])),
calcFraction(l, r, convertBytesToScalar(upper.GetBytes()[prefixLen:]))
}

// getOverlapFraction gets the overlap fraction of feedback and bucket range. In order to get the bucket count, it also
// returns the ratio between bucket fraction and feedback fraction.
func getOverlapFraction(fb feedback, bkt bucket) (float64, float64) {
datums := make([]types.Datum, 0, 4)
datums = append(datums, *fb.lower, *fb.upper)
datums = append(datums, *bkt.lower, *bkt.upper)
err := types.SortDatums(nil, datums)
if err != nil {
return 0, 0
}
var fbLower, fbUpper, bktLower, bktUpper float64
minValue, maxValue := &datums[0], &datums[3]
if datums[0].Kind() == types.KindBytes {
prefixLen := commonPrefixLength(minValue.GetBytes(), maxValue.GetBytes())
fbLower, fbUpper = getFraction4Index(minValue, maxValue, fb.lower, fb.upper, prefixLen)
bktLower, bktUpper = getFraction4Index(minValue, maxValue, bkt.lower, bkt.upper, prefixLen)
} else {
fbLower, fbUpper = getFraction4PK(minValue, maxValue, fb.lower, fb.upper)
bktLower, bktUpper = getFraction4PK(minValue, maxValue, bkt.lower, bkt.upper)
}
ratio := (bktUpper - bktLower) / (fbUpper - fbLower)
// full overlap
if fbLower <= bktLower && bktUpper <= fbUpper {
return bktUpper - bktLower, ratio
}
if bktLower <= fbLower && fbUpper <= bktUpper {
return fbUpper - fbLower, ratio
}
// partial overlap
overlap := math.Min(bktUpper-fbLower, fbUpper-bktLower)
return overlap, ratio
}

// refineBucketCount refine the newly split bucket count. It uses the feedback that overlaps most
// with the bucket to get the bucket count.
func (b *BucketFeedback) refineBucketCount(bkt bucket, defaultCount float64) float64 {
bestFraction := minBucketFraction
count := defaultCount
for _, fb := range b.feedback {
fraction, ratio := getOverlapFraction(fb, bkt)
// choose the max overlap fraction
if fraction > bestFraction {
bestFraction = fraction
count = float64(fb.count) * ratio
}
}
return count
}

const (
defaultSplitCount = 10
splitPerFeedback = 10
)

// getSplitCount gets the split count for the histogram. It is based on the intuition that:
// 1: If we have more remaining unused buckets, we can split more.
// 2: We cannot split too aggressive, thus we make it split every `splitPerFeedback`.
func getSplitCount(numFeedbacks, remainBuckets int) int {
// Split more if have more buckets available.
return mathutil.Min(remainBuckets, count/10)
splitCount := mathutil.Max(remainBuckets, defaultSplitCount)
return mathutil.Min(splitCount, numFeedbacks/splitPerFeedback)
}

type bucketScore struct {
Expand Down Expand Up @@ -461,33 +497,23 @@ func mergeBuckets(bkts []bucket, isNewBuckets []bool, totalCount float64) []buck
return bkts
}

// splitBuckets split the histogram buckets according to the feedback.
func splitBuckets(h *Histogram, feedback *QueryFeedback) ([]bucket, []bool, int64) {
bktID2FB, fbNum := buildBucketFeedback(h, feedback)
counts := make([]int64, 0, h.Len())
for i := 0; i < h.Len(); i++ {
bkt, ok := bktID2FB[i]
if !ok {
counts = append(counts, h.bucketCount(i))
} else {
counts = append(counts, bkt.getBucketCount(float64(h.bucketCount(i))))
}
}
totCount := int64(0)
for _, count := range counts {
totCount += count
}
bktID2FB, numTotalFBs := buildBucketFeedback(h, feedback)
buckets := make([]bucket, 0, h.Len())
isNewBuckets := make([]bool, 0, h.Len())
splitCount := getSplitCount(fbNum, defaultBucketCount-h.Len())
splitCount := getSplitCount(numTotalFBs, defaultBucketCount-h.Len())
for i := 0; i < h.Len(); i++ {
bkt, ok := bktID2FB[i]
bktFB, ok := bktID2FB[i]
// No feedback, just use the original one.
if !ok {
buckets = append(buckets, bucket{h.GetLower(i), h.GetUpper(i), counts[i], h.Buckets[i].Repeat})
buckets = append(buckets, bucket{h.GetLower(i), h.GetUpper(i), h.bucketCount(i), h.Buckets[i].Repeat})
isNewBuckets = append(isNewBuckets, false)
continue
}
bkts := bkt.splitBucket(splitCount*len(bkt.feedback)/fbNum, float64(totCount), float64(counts[i]))
// Distribute the total split count to bucket based on number of bucket feedback.
newBktNums := splitCount * len(bktFB.feedback) / numTotalFBs
bkts := bktFB.splitBucket(newBktNums, h.totalRowCount(), float64(h.bucketCount(i)))
buckets = append(buckets, bkts...)
if len(bkts) == 1 {
isNewBuckets = append(isNewBuckets, false)
Expand All @@ -497,6 +523,10 @@ func splitBuckets(h *Histogram, feedback *QueryFeedback) ([]bucket, []bool, int6
}
}
}
totCount := int64(0)
for _, bkt := range buckets {
totCount += bkt.count
}
return buckets, isNewBuckets, totCount
}

Expand Down
51 changes: 36 additions & 15 deletions statistics/feedback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,17 @@ func (s *testFeedbackSuite) TestUpdateHistogram(c *C) {
q := NewQueryFeedback(0, genHistogram(), 0, false)
q.feedback = feedbacks
originBucketCount := defaultBucketCount
defaultBucketCount = 5
defaultBucketCount = 7
defer func() { defaultBucketCount = originBucketCount }()
c.Assert(UpdateHistogram(q.Hist(), q).ToString(0), Equals,
"column:0 ndv:0 totColSize:0\n"+
"num: 10000\tlower_bound: 0\tupper_bound: 1\trepeats: 0\n"+
"num: 10003\tlower_bound: 2\tupper_bound: 3\trepeats: 0\n"+
"num: 10021\tlower_bound: 4\tupper_bound: 20\trepeats: 0\n"+
"num: 10046\tlower_bound: 21\tupper_bound: 46\trepeats: 0\n"+
"num: 10059\tlower_bound: 47\tupper_bound: 60\trepeats: 0")
"num: 10008\tlower_bound: 2\tupper_bound: 7\trepeats: 0\n"+
"num: 10019\tlower_bound: 8\tupper_bound: 19\trepeats: 0\n"+
"num: 10019\tlower_bound: 20\tupper_bound: 20\trepeats: 0\n"+
"num: 10037\tlower_bound: 21\tupper_bound: 39\trepeats: 0\n"+
"num: 10055\tlower_bound: 40\tupper_bound: 58\trepeats: 0\n"+
"num: 10057\tlower_bound: 59\tupper_bound: 60\trepeats: 0")
}

func (s *testFeedbackSuite) TestSplitBuckets(c *C) {
Expand All @@ -93,10 +95,10 @@ func (s *testFeedbackSuite) TestSplitBuckets(c *C) {
"num: 1\tlower_bound: 2\tupper_bound: 3\trepeats: 0\n"+
"num: 1\tlower_bound: 5\tupper_bound: 7\trepeats: 0\n"+
"num: 6\tlower_bound: 10\tupper_bound: 15\trepeats: 0\n"+
"num: 10\tlower_bound: 16\tupper_bound: 20\trepeats: 0\n"+
"num: 10\tlower_bound: 30\tupper_bound: 50\trepeats: 0")
"num: 6\tlower_bound: 16\tupper_bound: 20\trepeats: 0\n"+
"num: 6\tlower_bound: 30\tupper_bound: 50\trepeats: 0")
c.Assert(isNewBuckets, DeepEquals, []bool{false, false, false, true, true, false})
c.Assert(totalCount, Equals, int64(11))
c.Assert(totalCount, Equals, int64(6))

// test do not split if the bucket count is too small
feedbacks = []feedback{newFeedback(0, 1, 100000)}
Expand All @@ -111,26 +113,45 @@ func (s *testFeedbackSuite) TestSplitBuckets(c *C) {
"num: 100000\tlower_bound: 0\tupper_bound: 1\trepeats: 0\n"+
"num: 100000\tlower_bound: 2\tupper_bound: 3\trepeats: 0\n"+
"num: 100000\tlower_bound: 5\tupper_bound: 7\trepeats: 0\n"+
"num: 100002\tlower_bound: 10\tupper_bound: 20\trepeats: 0\n"+
"num: 100002\tlower_bound: 30\tupper_bound: 50\trepeats: 0")
c.Assert(isNewBuckets, DeepEquals, []bool{false, false, false, false, false})
c.Assert(totalCount, Equals, int64(100002))
"num: 100001\tlower_bound: 10\tupper_bound: 15\trepeats: 0\n"+
"num: 100001\tlower_bound: 16\tupper_bound: 20\trepeats: 0\n"+
"num: 100001\tlower_bound: 30\tupper_bound: 50\trepeats: 0")
c.Assert(isNewBuckets, DeepEquals, []bool{false, false, false, true, true, false})
c.Assert(totalCount, Equals, int64(100001))

// test do not split if the result bucket count is too small
h := NewHistogram(0, 0, 0, 0, types.NewFieldType(mysql.TypeLong), 5, 0)
appendBucket(h, 0, 1000000)
h.Buckets[0].Count = 1000000
feedbacks = feedbacks[:0]
for i := 0; i < 100; i++ {
feedbacks = append(feedbacks, newFeedback(0, 101, 1))
feedbacks = append(feedbacks, newFeedback(0, 10, 1))
}
q = NewQueryFeedback(0, h, 0, false)
q.feedback = feedbacks
buckets, isNewBuckets, totalCount = splitBuckets(q.Hist(), q)
c.Assert(buildNewHistogram(q.Hist(), buckets).ToString(0), Equals,
"column:0 ndv:0 totColSize:0\n"+
"num: 9900\tlower_bound: 0\tupper_bound: 1000000\trepeats: 0")
"num: 1000000\tlower_bound: 0\tupper_bound: 1000000\trepeats: 0")
c.Assert(isNewBuckets, DeepEquals, []bool{false})
c.Assert(totalCount, Equals, int64(9900))
c.Assert(totalCount, Equals, int64(1000000))

// test split even if the feedback range is too small
h = NewHistogram(0, 0, 0, 0, types.NewFieldType(mysql.TypeLong), 5, 0)
appendBucket(h, 0, 1000000)
feedbacks = feedbacks[:0]
for i := 0; i < 100; i++ {
feedbacks = append(feedbacks, newFeedback(0, 10, 1))
}
q = NewQueryFeedback(0, h, 0, false)
q.feedback = feedbacks
buckets, isNewBuckets, totalCount = splitBuckets(q.Hist(), q)
c.Assert(buildNewHistogram(q.Hist(), buckets).ToString(0), Equals,
"column:0 ndv:0 totColSize:0\n"+
"num: 1\tlower_bound: 0\tupper_bound: 10\trepeats: 0\n"+
"num: 1\tlower_bound: 11\tupper_bound: 1000000\trepeats: 0")
c.Assert(isNewBuckets, DeepEquals, []bool{true, true})
c.Assert(totalCount, Equals, int64(1))
}

func (s *testFeedbackSuite) TestMergeBuckets(c *C) {
Expand Down

0 comments on commit c9cea72

Please sign in to comment.