Skip to content

Commit

Permalink
add node numa metric emitter and support memory bandwidth related metric
Browse files Browse the repository at this point in the history
  • Loading branch information
luomingmeng committed Dec 16, 2024
1 parent c1ff5df commit a799e81
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type MetricEmitterPluginOptions struct {

NodeMetricLabels []string
NodeMetricMapping map[string]string
NUMAMetricMapping map[string]string

MetricSyncers []string
}
Expand Down Expand Up @@ -74,6 +75,8 @@ func (o *MetricEmitterPluginOptions) AddFlags(fss *cliflag.NamedFlagSets) {
"node labels to be added in metric selector lists")
fs.StringToStringVar(&o.NodeMetricMapping, "metric-node-metrics", o.NodeMetricMapping,
"the metric name for node-level to override the default collecting logic")
fs.StringToStringVar(&o.NUMAMetricMapping, "metric-node-numa-metrics", o.NUMAMetricMapping,
"the metric name for node-numa-level to override the default collecting logic")

fs.StringSliceVar(&o.MetricSyncers, "metric-syncers", o.MetricSyncers,
"those syncers that should be enabled")
Expand Down Expand Up @@ -106,6 +109,10 @@ func (o *MetricEmitterPluginOptions) ApplyTo(c *metricemitter.MetricEmitterPlugi
c.MetricEmitterNodeConfiguration.MetricMapping = o.NodeMetricMapping
}

if o.NUMAMetricMapping != nil && len(o.NUMAMetricMapping) != 0 {
c.MetricEmitterNodeConfiguration.NUMAMetricMapping = o.NUMAMetricMapping
}

c.MetricSyncers = o.MetricSyncers
return nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ require (
)

replace (
github.com/kubewharf/katalyst-api => github.com/luomingmeng/katalyst-api v0.0.0-20241211184538-e538b25e3c9e
k8s.io/api => k8s.io/api v0.24.6
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.24.6
k8s.io/apimachinery => k8s.io/apimachinery v0.24.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.5.2-0.20241210135216-5785b7552c05 h1:oV/CsCzr3T3WnWz91aZebSpbKiYFQSS564CgsCrD/QQ=
github.com/kubewharf/katalyst-api v0.5.2-0.20241210135216-5785b7552c05/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/kubelet v1.24.6-kubewharf.9 h1:jOTYZt7h/J7I8xQMKMUcJjKf5UFBv37jHWvNp5VRFGc=
github.com/kubewharf/kubelet v1.24.6-kubewharf.9/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8=
Expand All @@ -583,6 +581,8 @@ github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0U
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=
github.com/logrusorgru/aurora v0.0.0-20181002194514-a7b3b318ed4e/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/lpabon/godbc v0.1.1/go.mod h1:Jo9QV0cf3U6jZABgiJ2skINAXb9j8m51r07g4KI92ZA=
github.com/luomingmeng/katalyst-api v0.0.0-20241211184538-e538b25e3c9e h1:Lh1j8V/PQeGDvPHI7OG0KyArQXK3N5sJCYoIL38O22U=
github.com/luomingmeng/katalyst-api v0.0.0-20241211184538-e538b25e3c9e/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
Expand Down
64 changes: 60 additions & 4 deletions pkg/agent/sysadvisor/plugin/metric-emitter/syncer/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package node
import (
"context"
"fmt"
"strconv"
"time"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -50,11 +51,16 @@ var nodeRawMetricNameMapping = map[string]string{
consts.MetricMemAvailableSystem: apimetricnode.CustomMetricNodeMemoryAvailable,
}

// nodeCachedMetricNameMapping maps the cached metricName (processed by plugin.SysAdvisorPlugin)
// nodeNUMARawMetricNameMapping maps the raw metricName (collected from agent.MetricsFetcher)
// to the standard metricName (used by custom-metric-api-server)
var nodeNUMARawMetricNameMapping = map[string]string{
consts.MetricMemBandwidthReadNuma: apimetricnode.CustomMetricNUMAMemoryBandwidthRead,
consts.MetricMemBandwidthWriteNuma: apimetricnode.CustomMetricNUMAMemoryBandwidthWrite,
}

type MetricSyncerNode struct {
metricMapping map[string]string
metricMapping map[string]string
numaMetricMapping map[string]string

conf *metricemitter.MetricEmitterPluginConfiguration
node *v1.Node
Expand All @@ -79,9 +85,11 @@ func NewMetricSyncerNode(conf *config.Configuration, _ interface{},
}

metricMapping := general.MergeMap(nodeRawMetricNameMapping, conf.MetricEmitterNodeConfiguration.MetricMapping)
numaMetricMapping := general.MergeMap(nodeNUMARawMetricNameMapping, conf.MetricEmitterNodeConfiguration.NUMAMetricMapping)

return &MetricSyncerNode{
metricMapping: metricMapping,
metricMapping: metricMapping,
numaMetricMapping: numaMetricMapping,

conf: conf.AgentConfiguration.MetricEmitterPluginConfiguration,

Expand All @@ -98,16 +106,29 @@ func (n *MetricSyncerNode) Name() string {

func (n *MetricSyncerNode) Run(ctx context.Context) {
rChan := make(chan metrictypes.NotifiedResponse, 20)
rNUMAChan := make(chan metrictypes.NotifiedResponse, 20)
go n.receiveRawNode(ctx, rChan)
go wait.Until(func() { n.advisorMetric(ctx) }, time.Second*3, ctx.Done())
go n.receiveRawNodeNUMA(ctx, rNUMAChan)

go wait.Until(func() { n.advisorMetric(ctx) }, time.Second*3, ctx.Done())
// there is no need to deRegister for node-related metric
for rawMetricName := range n.metricMapping {
klog.Infof("register raw node metric: %v", rawMetricName)
n.metaServer.MetricsFetcher.RegisterNotifier(metrictypes.MetricsScopeNode, metrictypes.NotifiedRequest{
MetricName: rawMetricName,
}, rChan)
}

// there is no need to deRegister for numa-related metric
for rawNUMAMetricName := range n.numaMetricMapping {
for _, numaID := range n.metaServer.CPUDetails.NUMANodes().ToSliceNoSortInt() {
klog.Infof("register raw node numa metric: %v, numa: %d", rawNUMAMetricName, numaID)
n.metaServer.MetricsFetcher.RegisterNotifier(metrictypes.MetricsScopeNuma, metrictypes.NotifiedRequest{
MetricName: rawNUMAMetricName,
NumaID: numaID,
}, rNUMAChan)
}
}
}

// receiveRawNode receives notified response from raw data source
Expand Down Expand Up @@ -142,6 +163,41 @@ func (n *MetricSyncerNode) receiveRawNode(ctx context.Context, rChan chan metric
}
}

func (n *MetricSyncerNode) receiveRawNodeNUMA(ctx context.Context, rChan chan metrictypes.NotifiedResponse) {
for {
select {
case response := <-rChan:
if response.Req.MetricName == "" {
continue
} else if response.Time == nil {
continue
}

targetMetricName, ok := n.numaMetricMapping[response.Req.MetricName]
if !ok {
klog.Warningf("invalid npde numa raw metric name: %v", response.Req.MetricName)
continue
}

klog.V(4).Infof("get metric %v for node, numa %v", response.Req.MetricName, response.Req.NumaNode)
if tags := n.generateMetricTag(ctx); len(tags) > 0 {
_ = n.dataEmitter.StoreFloat64(targetMetricName, response.Value, metrics.MetricTypeNameRaw, append(tags,
metrics.MetricTag{
Key: fmt.Sprintf("%s", data.CustomMetricLabelKeyTimestamp),
Val: fmt.Sprintf("%v", response.Time.UnixMilli()),
},
metrics.MetricTag{
Key: fmt.Sprintf("%snuma", data.CustomMetricLabelSelectorPrefixKey),
Val: strconv.Itoa(response.Req.NumaID),
})...)
}
case <-ctx.Done():
klog.Infof("metric emitter for node numa has been stopped")
return
}
}
}

// generateMetricTag generates tags that are bounded to current Node object
func (n *MetricSyncerNode) generateMetricTag(ctx context.Context) (tags []metrics.MetricTag) {
if n.node == nil && n.metaServer != nil && n.metaServer.NodeFetcher != nil {
Expand Down
127 changes: 127 additions & 0 deletions pkg/agent/sysadvisor/plugin/metric-emitter/syncer/node/node_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package node

import (
"context"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"

"github.com/kubewharf/katalyst-core/pkg/consts"
metrictypes "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/types"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/metric"
)

type testResp struct {
tags []metrics.MetricTag
res float64
}

type testMetrics struct {
res chan testResp
metrics.DummyMetrics
}

func (t testMetrics) StoreFloat64(_ string, data float64, _ metrics.MetricTypeName, tags ...metrics.MetricTag) error {
t.res <- testResp{
res: data,
tags: tags,
}
return nil
}

func TestMetricSyncerNode_receiveRawNodeNUMA(t *testing.T) {

Check failure on line 52 in pkg/agent/sysadvisor/plugin/metric-emitter/syncer/node/node_test.go

View workflow job for this annotation

GitHub Actions / Parallel

Function TestMetricSyncerNode_receiveRawNodeNUMA missing the call to method parallel
testTimestamp := time.Date(2024, 11, 12, 0, 0, 0, 0, time.Local)
testUnixTime := testTimestamp.UnixMilli()

type fields struct {
numaMetricMapping map[string]string
}
type args struct {
resp metrictypes.NotifiedResponse
}
tests := []struct {
name string
fields fields
args args
want testResp
}{
{
name: "test",
fields: fields{
numaMetricMapping: nodeNUMARawMetricNameMapping,
},
args: args{
resp: metrictypes.NotifiedResponse{
MetricData: metric.MetricData{
Value: 100,
Time: &testTimestamp,
},
Req: metrictypes.NotifiedRequest{
MetricName: consts.MetricMemBandwidthReadNuma,
NumaID: 0,
},
},
},
want: testResp{
tags: []metrics.MetricTag{
{
Key: "object",
Val: "nodes",
},
{
Key: "object_name",
Val: "",
},
{
Key: "timestamp",
Val: strconv.Itoa(int(testUnixTime)),
},
{
Key: "selector_numa",
Val: "0",
},
},
res: 100,
},
},
}
for _, tt := range tests {

Check failure on line 108 in pkg/agent/sysadvisor/plugin/metric-emitter/syncer/node/node_test.go

View workflow job for this annotation

GitHub Actions / Parallel

Range statement for test TestMetricSyncerNode_receiveRawNodeNUMA missing the call to method parallel in test Run
t.Run(tt.name, func(t *testing.T) {
res := make(chan testResp)
defer close(res)
n := &MetricSyncerNode{
numaMetricMapping: tt.fields.numaMetricMapping,
dataEmitter: testMetrics{res: res},
node: &v1.Node{},
}
rChan := make(chan metrictypes.NotifiedResponse)
defer close(rChan)
go n.receiveRawNodeNUMA(context.TODO(), rChan)
go func() {
rChan <- tt.args.resp
}()
result := <-res
assert.Equal(t, tt.want, result)
})
}
}
6 changes: 4 additions & 2 deletions pkg/agent/sysadvisor/plugin/metric-emitter/syncer/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ var podRawMetricNameMapping = map[string]string{
consts.MetricCPUUsageRatioContainer: apimetricpod.CustomMetricPodCPUUsageRatio,
consts.MetricCPUCPIContainer: apimetricpod.CustomMetricPodCPUCPI,

consts.MetricMemRssContainer: apimetricpod.CustomMetricPodMemoryRSS,
consts.MetricMemUsageContainer: apimetricpod.CustomMetricPodMemoryUsage,
consts.MetricMemRssContainer: apimetricpod.CustomMetricPodMemoryRSS,
consts.MetricMemUsageContainer: apimetricpod.CustomMetricPodMemoryUsage,
consts.MetricMemBandwidthReadContainer: apimetricpod.CustomMetricPodMemoryReadBandwidth,
consts.MetricMemBandwidthWriteContainer: apimetricpod.CustomMetricPodMemoryWriteBandwidth,
}

type podRawChanel struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type MetricEmitterNodeConfiguration struct {

// MetricMapping will override the default to-collected metrics in node-level
MetricMapping map[string]string

// NUMAMetricMapping will override the default to-collected metrics in numa-level
NUMAMetricMapping map[string]string
}

type MetricEmitterExternalConfiguration struct{}
Expand Down

0 comments on commit a799e81

Please sign in to comment.