Skip to content

Commit

Permalink
chore(qrm): network plugin support clear residual state
Browse files Browse the repository at this point in the history
  • Loading branch information
luomingmeng committed Feb 13, 2025
1 parent 6c2e614 commit ec5174f
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 11 deletions.
7 changes: 7 additions & 0 deletions cmd/katalyst-agent/app/agent/qrm/network_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,23 @@ package qrm

import (
"fmt"
"strings"
"sync"

"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent"
phconsts "github.com/kubewharf/katalyst-core/pkg/agent/utilcomponent/periodicalhandler/consts"
"github.com/kubewharf/katalyst-core/pkg/config"
)

const (
QRMPluginNameNetwork = "qrm_network_plugin"
)

var QRMNetworkPluginPeriodicalHandlerGroupName = strings.Join([]string{
QRMPluginNameNetwork,
phconsts.PeriodicalHandlersGroupNameSuffix,
}, phconsts.GroupNameSeparator)

// networkPolicyInitializers is used to store the initializing function for network resource plugin policies
var networkPolicyInitializers sync.Map

Expand Down
35 changes: 35 additions & 0 deletions pkg/agent/qrm-plugins/network/consts/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package consts

import (
"time"

"github.com/kubewharf/katalyst-api/pkg/consts"
)

const (
// NetworkResourcePluginPolicyNameDynamic is the name of the dynamic policy.
NetworkResourcePluginPolicyNameDynamic = string(consts.ResourcePluginPolicyNameDynamic)

NetworkPluginDynamicPolicyName = "qrm_network_plugin_" + NetworkResourcePluginPolicyNameDynamic
ClearResidualState = NetworkPluginDynamicPolicyName + "_clear_residual_state"

StateCheckPeriod = 30 * time.Second
StateCheckTolerationTimes = 3
MaxResidualTime = 5 * time.Minute
)
126 changes: 115 additions & 11 deletions pkg/agent/qrm-plugins/network/staticpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"sync"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"
maputil "k8s.io/kubernetes/pkg/util/maps"
Expand All @@ -33,13 +35,17 @@ import (
apiconsts "github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-api/pkg/plugins/skeleton"
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent"
appqrm "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent/qrm"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/commonstate"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/network/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/network/state"
networkreactor "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/network/staticpolicy/reactor"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/reactor"
"github.com/kubewharf/katalyst-core/pkg/agent/utilcomponent/periodicalhandler"
"github.com/kubewharf/katalyst-core/pkg/config"
agentconfig "github.com/kubewharf/katalyst-core/pkg/config/agent"
dynamicconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic"
"github.com/kubewharf/katalyst-core/pkg/config/agent/qrm"
"github.com/kubewharf/katalyst-core/pkg/config/generic"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
Expand Down Expand Up @@ -73,16 +79,17 @@ const (
type StaticPolicy struct {
sync.Mutex

name string
stopCh chan struct{}
started bool
qosConfig *generic.QoSConfiguration
qrmConfig *qrm.QRMPluginsConfiguration
emitter metrics.MetricEmitter
metaServer *metaserver.MetaServer
agentCtx *agent.GenericContext
nics []machine.InterfaceInfo
state state.State
name string
stopCh chan struct{}
started bool
qosConfig *generic.QoSConfiguration
qrmConfig *qrm.QRMPluginsConfiguration
emitter metrics.MetricEmitter
metaServer *metaserver.MetaServer
agentCtx *agent.GenericContext
nics []machine.InterfaceInfo
state state.State
residualHitMap map[string]int64

CgroupV2Env bool
qosLevelToNetClassMap map[string]uint32
Expand Down Expand Up @@ -250,11 +257,105 @@ func (p *StaticPolicy) Start() (err error) {
go wait.Until(func() {
_ = p.emitter.StoreInt64(util.MetricNameHeartBeat, 1, metrics.MetricTypeNameRaw)
}, time.Second*30, p.stopCh)

err = periodicalhandler.RegisterPeriodicalHandlerWithHealthz(consts.ClearResidualState, general.HealthzCheckStateNotReady,
appqrm.QRMNetworkPluginPeriodicalHandlerGroupName, p.clearResidualState, consts.StateCheckPeriod, consts.StateCheckTolerationTimes)
if err != nil {
general.Errorf("start %v failed, err: %v", consts.ClearResidualState, err)
}

go wait.Until(p.applyNetClass, 5*time.Second, p.stopCh)

return nil
}

// clearResidualState is used to clean residual pods in local state
func (p *StaticPolicy) clearResidualState(_ *config.Configuration,
_ interface{},
_ *dynamicconfig.DynamicAgentConfiguration,
_ metrics.MetricEmitter,
_ *metaserver.MetaServer,
) {
general.Infof("exec")
var (
err error
podList []*v1.Pod
)
residualSet := make(map[string]bool)

defer func() {
_ = general.UpdateHealthzStateByError(consts.ClearResidualState, err)
}()

if p.metaServer == nil {
general.Errorf("nil metaServer")
return
}

ctx := context.Background()
podList, err = p.metaServer.GetPodList(ctx, nil)
if err != nil {
general.Errorf("get pod list failed: %v", err)
return
}

podSet := sets.NewString()
for _, pod := range podList {
podSet.Insert(fmt.Sprintf("%v", pod.UID))
}

p.Lock()
defer p.Unlock()

podEntries := p.state.GetPodEntries()
for podUID := range podEntries {
if !podSet.Has(podUID) {
residualSet[podUID] = true
p.residualHitMap[podUID] += 1
general.Infof("found pod: %s with state but doesn't show up in pod watcher, hit count: %d", podUID, p.residualHitMap[podUID])
}
}

podsToDelete := sets.NewString()
for podUID, hitCount := range p.residualHitMap {
if !residualSet[podUID] {
general.Infof("already found pod: %s in pod watcher or its state is cleared, delete it from residualHitMap", podUID)
delete(p.residualHitMap, podUID)
continue
}

if time.Duration(hitCount)*consts.StateCheckPeriod >= consts.MaxResidualTime {
podsToDelete.Insert(podUID)
}
}

if podsToDelete.Len() > 0 {
for {
podUID, found := podsToDelete.PopAny()
if !found {
break
}

general.Infof("clear residual pod: %s in state", podUID)
delete(podEntries, podUID)
}

machineState, err := state.GenerateMachineStateFromPodEntries(p.qrmConfig, p.nics, podEntries, p.state.GetReservedBandwidth())
if err != nil {
general.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err)
return
}

p.state.SetPodEntries(podEntries)
p.state.SetMachineState(machineState)
}

err = p.generateAndApplyGroups()
if err != nil {
general.Errorf("generateAndApplyGroups failed with error: %v", err)
}
}

// Stop stops this plugin
func (p *StaticPolicy) Stop() error {
p.Lock()
Expand Down Expand Up @@ -1057,7 +1158,10 @@ func (p *StaticPolicy) removePod(podUID string) error {
p.state.SetPodEntries(podEntries)
p.state.SetMachineState(machineState)

p.generateAndApplyGroups()
err = p.generateAndApplyGroups()
if err != nil {
return err
}

return nil
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/agent/qrm-plugins/network/staticpolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,19 @@ func TestResourceName(t *testing.T) {
assert.Equal(t, string(consts.ResourceNetBandwidth), policy.ResourceName())
}

func TestClearResidualState(t *testing.T) {
t.Parallel()

policy := makeStaticPolicy(t, true)

policy.metaServer = &metaserver.MetaServer{
MetaAgent: &metaserveragent.MetaAgent{
PodFetcher: &pod.PodFetcherStub{},
},
}
policy.clearResidualState(nil, nil, nil, nil, nil)
}

func TestGetTopologyHints(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit ec5174f

Please sign in to comment.