Skip to content

Commit

Permalink
keyviz: support to merge cold logical range (#674)
Browse files Browse the repository at this point in the history
  • Loading branch information
HunDunDM authored Jul 15, 2020
1 parent e507c79 commit 5c8f9d9
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pkg/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

const (
Timeout = time.Second * 3
Timeout = time.Second * 10
)

func NewHTTPClientWithConf(lc fx.Lifecycle, config *config.Config) *http.Client {
Expand Down
3 changes: 3 additions & 0 deletions pkg/keyvisual/decorator/decorator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ type LabelKey struct {
// LabelStrategy requires cross-border determination and key decoration scheme.
type LabelStrategy interface {
ReloadConfig(cfg *config.KeyVisualConfig)
// CrossBorder determines whether two keys not belong to the same logical range.
CrossBorder(startKey, endKey string) bool
// Label returns the Label information of the key.
// Note: When the key is "", need to use LabelGlobalStart or LabelGlobalEnd.
Label(key string) LabelKey
LabelGlobalStart() LabelKey
LabelGlobalEnd() LabelKey
Expand Down
72 changes: 67 additions & 5 deletions pkg/keyvisual/matrix/axis.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (axis *Axis) Focus(strategy Strategy, threshold uint64, ratio int, target i
}

baseChunk := createChunk(axis.Keys, axis.ValuesList[0])
newChunk := baseChunk.Focus(strategy, threshold, ratio, target)
newChunk := baseChunk.Focus(strategy, threshold, ratio, target, MergeColdLogicalRange)
valuesListLen := len(axis.ValuesList)
newValuesList := make([][]uint64, valuesListLen)
newValuesList[0] = newChunk.Values
Expand All @@ -102,7 +102,7 @@ func (axis *Axis) Divide(strategy Strategy, target int) Axis {
}

baseChunk := createChunk(axis.Keys, axis.ValuesList[0])
newChunk := baseChunk.Divide(strategy, target)
newChunk := baseChunk.Divide(strategy, target, MergeColdLogicalRange)
valuesListLen := len(axis.ValuesList)
newValuesList := make([][]uint64, valuesListLen)
newValuesList[0] = newChunk.Values
Expand All @@ -113,6 +113,13 @@ func (axis *Axis) Divide(strategy Strategy, target int) Axis {
return CreateAxis(newChunk.Keys, newValuesList)
}

type FocusMode int

const (
NotMergeLogicalRange FocusMode = iota
MergeColdLogicalRange
)

type chunk struct {
// Keys and ValuesList[i] from Axis
Keys []string
Expand Down Expand Up @@ -209,7 +216,7 @@ func (c *chunk) GetFocusRows(threshold uint64) (count int) {
// Given a `threshold`, merge the rows with less traffic,
// and merge the most `ratio` rows at a time.
// `target` is the estimated final number of rows.
func (c *chunk) Focus(strategy Strategy, threshold uint64, ratio int, target int) chunk {
func (c *chunk) Focus(strategy Strategy, threshold uint64, ratio int, target int, mode FocusMode) chunk {
newKeys := make([]string, 0, target)
newValues := make([]uint64, 0, target)
newKeys = append(newKeys, c.Keys[0])
Expand All @@ -236,11 +243,66 @@ func (c *chunk) Focus(strategy Strategy, threshold uint64, ratio int, target int
}
generateBucket(len(c.Values))

newChunk := createChunk(newKeys, newValues)
if mode == MergeColdLogicalRange && len(newValues) >= target {
newChunk = newChunk.MergeColdLogicalRange(strategy, threshold, target)
}
return newChunk
}

func (c *chunk) MergeColdLogicalRange(strategy Strategy, threshold uint64, target int) chunk {
threshold /= 4 // TODO: This var can be adjusted

newKeys := make([]string, 0, target)
newValues := make([]uint64, 0, target)
newKeys = append(newKeys, c.Keys[0])

coldStart := 0
coldEnd := 0
var coldRangeSum uint64 = 0
mergeColdRange := func() {
if coldEnd <= coldStart {
return
}
newKeys = append(newKeys, c.Keys[coldEnd])
newValues = append(newValues, coldRangeSum)
coldStart = coldEnd
coldRangeSum = 0
}
generateRange := func(end int) {
if end <= coldEnd {
return
}
var rangeSum uint64 = 0
for i := coldEnd; i < end; i++ {
rangeSum += c.Values[i]
}
if coldRangeSum > threshold || rangeSum > threshold {
mergeColdRange()
}
if rangeSum > threshold {
newKeys = append(newKeys, c.Keys[coldEnd+1:end+1]...)
newValues = append(newValues, c.Values[coldEnd:end]...)
coldStart = end
} else {
coldRangeSum += rangeSum
}
coldEnd = end
}

for i := range c.Values {
if strategy.CrossBorder(c.Keys[i], c.Keys[i+1]) {
generateRange(i + 1)
}
}
generateRange(len(c.Values))
mergeColdRange()

return createChunk(newKeys, newValues)
}

// Divide uses binary search to find a suitable threshold, which can reduce the number of buckets of Axis to near the target.
func (c *chunk) Divide(strategy Strategy, target int) chunk {
func (c *chunk) Divide(strategy Strategy, target int, mode FocusMode) chunk {
if target >= len(c.Values) {
return *c
}
Expand All @@ -264,5 +326,5 @@ func (c *chunk) Divide(strategy Strategy, target int) chunk {
threshold := lowerThreshold
focusRows := c.GetFocusRows(threshold)
ratio := len(c.Values)/(target-focusRows) + 1
return c.Focus(strategy, threshold, ratio, target)
return c.Focus(strategy, threshold, ratio, target, mode)
}
2 changes: 1 addition & 1 deletion pkg/keyvisual/matrix/plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (plane *Plane) Pixel(strategy Strategy, target int, displayTags []string) M
chunks[i] = createChunk(axis.Keys, axis.ValuesList[0])
}
compactChunk, helper := compact(strategy, chunks)
baseKeys := compactChunk.Divide(strategy, target).Keys
baseKeys := compactChunk.Divide(strategy, target, NotMergeLogicalRange).Keys
matrix := CreateMatrix(strategy, plane.Times, baseKeys, valuesListLen)

var wg sync.WaitGroup
Expand Down

0 comments on commit 5c8f9d9

Please sign in to comment.