Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keyviz: support to merge cold logical range #674

Merged
merged 3 commits into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

const (
Timeout = time.Second * 3
Timeout = time.Second * 10
)

func NewHTTPClientWithConf(lc fx.Lifecycle, config *config.Config) *http.Client {
Expand Down
3 changes: 3 additions & 0 deletions pkg/keyvisual/decorator/decorator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ type LabelKey struct {
// LabelStrategy requires cross-border determination and key decoration scheme.
type LabelStrategy interface {
ReloadConfig(cfg *config.KeyVisualConfig)
// CrossBorder determines whether two keys not belong to the same logical range.
CrossBorder(startKey, endKey string) bool
// Label returns the Label information of the key.
// Note: When the key is "", need to use LabelGlobalStart or LabelGlobalEnd.
Label(key string) LabelKey
LabelGlobalStart() LabelKey
LabelGlobalEnd() LabelKey
Expand Down
72 changes: 67 additions & 5 deletions pkg/keyvisual/matrix/axis.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (axis *Axis) Focus(strategy Strategy, threshold uint64, ratio int, target i
}

baseChunk := createChunk(axis.Keys, axis.ValuesList[0])
newChunk := baseChunk.Focus(strategy, threshold, ratio, target)
newChunk := baseChunk.Focus(strategy, threshold, ratio, target, MergeColdLogicalRange)
valuesListLen := len(axis.ValuesList)
newValuesList := make([][]uint64, valuesListLen)
newValuesList[0] = newChunk.Values
Expand All @@ -102,7 +102,7 @@ func (axis *Axis) Divide(strategy Strategy, target int) Axis {
}

baseChunk := createChunk(axis.Keys, axis.ValuesList[0])
newChunk := baseChunk.Divide(strategy, target)
newChunk := baseChunk.Divide(strategy, target, MergeColdLogicalRange)
valuesListLen := len(axis.ValuesList)
newValuesList := make([][]uint64, valuesListLen)
newValuesList[0] = newChunk.Values
Expand All @@ -113,6 +113,13 @@ func (axis *Axis) Divide(strategy Strategy, target int) Axis {
return CreateAxis(newChunk.Keys, newValuesList)
}

type FocusMode int

const (
NotMergeLogicalRange FocusMode = iota
MergeColdLogicalRange
)

type chunk struct {
// Keys and ValuesList[i] from Axis
Keys []string
Expand Down Expand Up @@ -209,7 +216,7 @@ func (c *chunk) GetFocusRows(threshold uint64) (count int) {
// Given a `threshold`, merge the rows with less traffic,
// and merge the most `ratio` rows at a time.
// `target` is the estimated final number of rows.
func (c *chunk) Focus(strategy Strategy, threshold uint64, ratio int, target int) chunk {
func (c *chunk) Focus(strategy Strategy, threshold uint64, ratio int, target int, mode FocusMode) chunk {
newKeys := make([]string, 0, target)
newValues := make([]uint64, 0, target)
newKeys = append(newKeys, c.Keys[0])
Expand All @@ -236,11 +243,66 @@ func (c *chunk) Focus(strategy Strategy, threshold uint64, ratio int, target int
}
generateBucket(len(c.Values))

newChunk := createChunk(newKeys, newValues)
if mode == MergeColdLogicalRange && len(newValues) >= target {
newChunk = newChunk.MergeColdLogicalRange(strategy, threshold, target)
}
return newChunk
}

func (c *chunk) MergeColdLogicalRange(strategy Strategy, threshold uint64, target int) chunk {
threshold /= 4 // TODO: This var can be adjusted

newKeys := make([]string, 0, target)
newValues := make([]uint64, 0, target)
newKeys = append(newKeys, c.Keys[0])

coldStart := 0
coldEnd := 0
var coldRangeSum uint64 = 0
mergeColdRange := func() {
if coldEnd <= coldStart {
return
}
newKeys = append(newKeys, c.Keys[coldEnd])
newValues = append(newValues, coldRangeSum)
coldStart = coldEnd
coldRangeSum = 0
}
generateRange := func(end int) {
if end <= coldEnd {
return
}
var rangeSum uint64 = 0
for i := coldEnd; i < end; i++ {
rangeSum += c.Values[i]
}
if coldRangeSum > threshold || rangeSum > threshold {
mergeColdRange()
}
if rangeSum > threshold {
newKeys = append(newKeys, c.Keys[coldEnd+1:end+1]...)
newValues = append(newValues, c.Values[coldEnd:end]...)
coldStart = end
} else {
coldRangeSum += rangeSum
}
coldEnd = end
}

for i := range c.Values {
if strategy.CrossBorder(c.Keys[i], c.Keys[i+1]) {
generateRange(i + 1)
}
}
generateRange(len(c.Values))
mergeColdRange()

return createChunk(newKeys, newValues)
}

// Divide uses binary search to find a suitable threshold, which can reduce the number of buckets of Axis to near the target.
func (c *chunk) Divide(strategy Strategy, target int) chunk {
func (c *chunk) Divide(strategy Strategy, target int, mode FocusMode) chunk {
if target >= len(c.Values) {
return *c
}
Expand All @@ -264,5 +326,5 @@ func (c *chunk) Divide(strategy Strategy, target int) chunk {
threshold := lowerThreshold
focusRows := c.GetFocusRows(threshold)
ratio := len(c.Values)/(target-focusRows) + 1
return c.Focus(strategy, threshold, ratio, target)
return c.Focus(strategy, threshold, ratio, target, mode)
}
2 changes: 1 addition & 1 deletion pkg/keyvisual/matrix/plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (plane *Plane) Pixel(strategy Strategy, target int, displayTags []string) M
chunks[i] = createChunk(axis.Keys, axis.ValuesList[0])
}
compactChunk, helper := compact(strategy, chunks)
baseKeys := compactChunk.Divide(strategy, target).Keys
baseKeys := compactChunk.Divide(strategy, target, NotMergeLogicalRange).Keys
matrix := CreateMatrix(strategy, plane.Times, baseKeys, valuesListLen)

var wg sync.WaitGroup
Expand Down