diff --git a/pkg/http/http.go b/pkg/http/http.go index ab8022cc2f..b47d52937d 100644 --- a/pkg/http/http.go +++ b/pkg/http/http.go @@ -26,7 +26,7 @@ import ( ) const ( - Timeout = time.Second * 3 + Timeout = time.Second * 10 ) func NewHTTPClientWithConf(lc fx.Lifecycle, config *config.Config) *http.Client { diff --git a/pkg/keyvisual/decorator/decorator.go b/pkg/keyvisual/decorator/decorator.go index a277f82118..b5b9dc3530 100644 --- a/pkg/keyvisual/decorator/decorator.go +++ b/pkg/keyvisual/decorator/decorator.go @@ -29,7 +29,10 @@ type LabelKey struct { // LabelStrategy requires cross-border determination and key decoration scheme. type LabelStrategy interface { ReloadConfig(cfg *config.KeyVisualConfig) + // CrossBorder determines whether two keys not belong to the same logical range. CrossBorder(startKey, endKey string) bool + // Label returns the Label information of the key. + // Note: When the key is "", need to use LabelGlobalStart or LabelGlobalEnd. Label(key string) LabelKey LabelGlobalStart() LabelKey LabelGlobalEnd() LabelKey diff --git a/pkg/keyvisual/matrix/axis.go b/pkg/keyvisual/matrix/axis.go index 8346ac6fab..4f5f134fa7 100644 --- a/pkg/keyvisual/matrix/axis.go +++ b/pkg/keyvisual/matrix/axis.go @@ -83,7 +83,7 @@ func (axis *Axis) Focus(strategy Strategy, threshold uint64, ratio int, target i } baseChunk := createChunk(axis.Keys, axis.ValuesList[0]) - newChunk := baseChunk.Focus(strategy, threshold, ratio, target) + newChunk := baseChunk.Focus(strategy, threshold, ratio, target, MergeColdLogicalRange) valuesListLen := len(axis.ValuesList) newValuesList := make([][]uint64, valuesListLen) newValuesList[0] = newChunk.Values @@ -102,7 +102,7 @@ func (axis *Axis) Divide(strategy Strategy, target int) Axis { } baseChunk := createChunk(axis.Keys, axis.ValuesList[0]) - newChunk := baseChunk.Divide(strategy, target) + newChunk := baseChunk.Divide(strategy, target, MergeColdLogicalRange) valuesListLen := len(axis.ValuesList) newValuesList := make([][]uint64, valuesListLen) newValuesList[0] = newChunk.Values @@ -113,6 +113,13 @@ func (axis *Axis) Divide(strategy Strategy, target int) Axis { return CreateAxis(newChunk.Keys, newValuesList) } +type FocusMode int + +const ( + NotMergeLogicalRange FocusMode = iota + MergeColdLogicalRange +) + type chunk struct { // Keys and ValuesList[i] from Axis Keys []string @@ -209,7 +216,7 @@ func (c *chunk) GetFocusRows(threshold uint64) (count int) { // Given a `threshold`, merge the rows with less traffic, // and merge the most `ratio` rows at a time. // `target` is the estimated final number of rows. -func (c *chunk) Focus(strategy Strategy, threshold uint64, ratio int, target int) chunk { +func (c *chunk) Focus(strategy Strategy, threshold uint64, ratio int, target int, mode FocusMode) chunk { newKeys := make([]string, 0, target) newValues := make([]uint64, 0, target) newKeys = append(newKeys, c.Keys[0]) @@ -236,11 +243,66 @@ func (c *chunk) Focus(strategy Strategy, threshold uint64, ratio int, target int } generateBucket(len(c.Values)) + newChunk := createChunk(newKeys, newValues) + if mode == MergeColdLogicalRange && len(newValues) >= target { + newChunk = newChunk.MergeColdLogicalRange(strategy, threshold, target) + } + return newChunk +} + +func (c *chunk) MergeColdLogicalRange(strategy Strategy, threshold uint64, target int) chunk { + threshold /= 4 // TODO: This var can be adjusted + + newKeys := make([]string, 0, target) + newValues := make([]uint64, 0, target) + newKeys = append(newKeys, c.Keys[0]) + + coldStart := 0 + coldEnd := 0 + var coldRangeSum uint64 = 0 + mergeColdRange := func() { + if coldEnd <= coldStart { + return + } + newKeys = append(newKeys, c.Keys[coldEnd]) + newValues = append(newValues, coldRangeSum) + coldStart = coldEnd + coldRangeSum = 0 + } + generateRange := func(end int) { + if end <= coldEnd { + return + } + var rangeSum uint64 = 0 + for i := coldEnd; i < end; i++ { + rangeSum += c.Values[i] + } + if coldRangeSum > threshold || rangeSum > threshold { + mergeColdRange() + } + if rangeSum > threshold { + newKeys = append(newKeys, c.Keys[coldEnd+1:end+1]...) + newValues = append(newValues, c.Values[coldEnd:end]...) + coldStart = end + } else { + coldRangeSum += rangeSum + } + coldEnd = end + } + + for i := range c.Values { + if strategy.CrossBorder(c.Keys[i], c.Keys[i+1]) { + generateRange(i + 1) + } + } + generateRange(len(c.Values)) + mergeColdRange() + return createChunk(newKeys, newValues) } // Divide uses binary search to find a suitable threshold, which can reduce the number of buckets of Axis to near the target. -func (c *chunk) Divide(strategy Strategy, target int) chunk { +func (c *chunk) Divide(strategy Strategy, target int, mode FocusMode) chunk { if target >= len(c.Values) { return *c } @@ -264,5 +326,5 @@ func (c *chunk) Divide(strategy Strategy, target int) chunk { threshold := lowerThreshold focusRows := c.GetFocusRows(threshold) ratio := len(c.Values)/(target-focusRows) + 1 - return c.Focus(strategy, threshold, ratio, target) + return c.Focus(strategy, threshold, ratio, target, mode) } diff --git a/pkg/keyvisual/matrix/plane.go b/pkg/keyvisual/matrix/plane.go index 1e77694352..3e3f2e91cb 100644 --- a/pkg/keyvisual/matrix/plane.go +++ b/pkg/keyvisual/matrix/plane.go @@ -75,7 +75,7 @@ func (plane *Plane) Pixel(strategy Strategy, target int, displayTags []string) M chunks[i] = createChunk(axis.Keys, axis.ValuesList[0]) } compactChunk, helper := compact(strategy, chunks) - baseKeys := compactChunk.Divide(strategy, target).Keys + baseKeys := compactChunk.Divide(strategy, target, NotMergeLogicalRange).Keys matrix := CreateMatrix(strategy, plane.Times, baseKeys, valuesListLen) var wg sync.WaitGroup