Skip to content

Commit

Permalink
chore(qrm): network plugin support clear residual state
Browse files Browse the repository at this point in the history
  • Loading branch information
luomingmeng committed Feb 13, 2025
1 parent 51795f9 commit 742555a
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 11 deletions.
7 changes: 7 additions & 0 deletions cmd/katalyst-agent/app/agent/qrm/network_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,23 @@ package qrm

import (
"fmt"
"strings"
"sync"

"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent"
phconsts "github.com/kubewharf/katalyst-core/pkg/agent/utilcomponent/periodicalhandler/consts"
"github.com/kubewharf/katalyst-core/pkg/config"
)

const (
QRMPluginNameNetwork = "qrm_network_plugin"
)

var QRMNetworkPluginPeriodicalHandlerGroupName = strings.Join([]string{
QRMPluginNameNetwork,
phconsts.PeriodicalHandlersGroupNameSuffix,
}, phconsts.GroupNameSeparator)

// networkPolicyInitializers is used to store the initializing function for network resource plugin policies
var networkPolicyInitializers sync.Map

Expand Down
19 changes: 19 additions & 0 deletions pkg/agent/qrm-plugins/network/consts/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package consts

import (
"time"

"github.com/kubewharf/katalyst-api/pkg/consts"
)

const (
// NetworkResourcePluginPolicyNameDynamic is the name of the dynamic policy.
NetworkResourcePluginPolicyNameDynamic = string(consts.ResourcePluginPolicyNameDynamic)

NetworkPluginDynamicPolicyName = "qrm_network_plugin_" + NetworkResourcePluginPolicyNameDynamic
ClearResidualState = NetworkPluginDynamicPolicyName + "_clear_residual_state"

StateCheckPeriod = 30 * time.Second
StateCheckTolerationTimes = 3
MaxResidualTime = 5 * time.Minute
)
126 changes: 115 additions & 11 deletions pkg/agent/qrm-plugins/network/staticpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"sync"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"
maputil "k8s.io/kubernetes/pkg/util/maps"
Expand All @@ -33,13 +35,17 @@ import (
apiconsts "github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-api/pkg/plugins/skeleton"
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent"
appqrm "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent/qrm"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/commonstate"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/network/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/network/state"
networkreactor "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/network/staticpolicy/reactor"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/reactor"
"github.com/kubewharf/katalyst-core/pkg/agent/utilcomponent/periodicalhandler"
"github.com/kubewharf/katalyst-core/pkg/config"
agentconfig "github.com/kubewharf/katalyst-core/pkg/config/agent"
dynamicconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic"
"github.com/kubewharf/katalyst-core/pkg/config/agent/qrm"
"github.com/kubewharf/katalyst-core/pkg/config/generic"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
Expand Down Expand Up @@ -73,16 +79,17 @@ const (
type StaticPolicy struct {
sync.Mutex

name string
stopCh chan struct{}
started bool
qosConfig *generic.QoSConfiguration
qrmConfig *qrm.QRMPluginsConfiguration
emitter metrics.MetricEmitter
metaServer *metaserver.MetaServer
agentCtx *agent.GenericContext
nics []machine.InterfaceInfo
state state.State
name string
stopCh chan struct{}
started bool
qosConfig *generic.QoSConfiguration
qrmConfig *qrm.QRMPluginsConfiguration
emitter metrics.MetricEmitter
metaServer *metaserver.MetaServer
agentCtx *agent.GenericContext
nics []machine.InterfaceInfo
state state.State
residualHitMap map[string]int64

CgroupV2Env bool
qosLevelToNetClassMap map[string]uint32
Expand Down Expand Up @@ -250,11 +257,105 @@ func (p *StaticPolicy) Start() (err error) {
go wait.Until(func() {
_ = p.emitter.StoreInt64(util.MetricNameHeartBeat, 1, metrics.MetricTypeNameRaw)
}, time.Second*30, p.stopCh)

err = periodicalhandler.RegisterPeriodicalHandlerWithHealthz(consts.ClearResidualState, general.HealthzCheckStateNotReady,
appqrm.QRMNetworkPluginPeriodicalHandlerGroupName, p.clearResidualState, consts.StateCheckPeriod, consts.StateCheckTolerationTimes)
if err != nil {
general.Errorf("start %v failed, err: %v", consts.ClearResidualState, err)
}

go wait.Until(p.applyNetClass, 5*time.Second, p.stopCh)

return nil
}

// clearResidualState is used to clean residual pods in local state
func (p *StaticPolicy) clearResidualState(_ *config.Configuration,
_ interface{},
_ *dynamicconfig.DynamicAgentConfiguration,
_ metrics.MetricEmitter,
_ *metaserver.MetaServer,
) {
general.Infof("exec")
var (
err error
podList []*v1.Pod
)
residualSet := make(map[string]bool)

defer func() {
_ = general.UpdateHealthzStateByError(consts.ClearResidualState, err)
}()

if p.metaServer == nil {
general.Errorf("nil metaServer")
return
}

ctx := context.Background()
podList, err = p.metaServer.GetPodList(ctx, nil)
if err != nil {
general.Errorf("get pod list failed: %v", err)
return
}

podSet := sets.NewString()
for _, pod := range podList {
podSet.Insert(fmt.Sprintf("%v", pod.UID))
}

p.Lock()
defer p.Unlock()

podEntries := p.state.GetPodEntries()
for podUID := range podEntries {
if !podSet.Has(podUID) {
residualSet[podUID] = true
p.residualHitMap[podUID] += 1
general.Infof("found pod: %s with state but doesn't show up in pod watcher, hit count: %d", podUID, p.residualHitMap[podUID])
}
}

podsToDelete := sets.NewString()
for podUID, hitCount := range p.residualHitMap {
if !residualSet[podUID] {
general.Infof("already found pod: %s in pod watcher or its state is cleared, delete it from residualHitMap", podUID)
delete(p.residualHitMap, podUID)
continue
}

if time.Duration(hitCount)*consts.StateCheckPeriod >= consts.MaxResidualTime {
podsToDelete.Insert(podUID)
}
}

if podsToDelete.Len() > 0 {
for {
podUID, found := podsToDelete.PopAny()
if !found {
break
}

general.Infof("clear residual pod: %s in state", podUID)
delete(podEntries, podUID)
}

machineState, err := state.GenerateMachineStateFromPodEntries(p.qrmConfig, p.nics, podEntries, p.state.GetReservedBandwidth())
if err != nil {
general.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err)
return
}

p.state.SetPodEntries(podEntries)
p.state.SetMachineState(machineState)
}

err = p.generateAndApplyGroups()
if err != nil {
general.Errorf("generateAndApplyGroups failed with error: %v", err)
}
}

// Stop stops this plugin
func (p *StaticPolicy) Stop() error {
p.Lock()
Expand Down Expand Up @@ -1057,7 +1158,10 @@ func (p *StaticPolicy) removePod(podUID string) error {
p.state.SetPodEntries(podEntries)
p.state.SetMachineState(machineState)

p.generateAndApplyGroups()
err = p.generateAndApplyGroups()
if err != nil {
return err
}

return nil
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/agent/qrm-plugins/network/staticpolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,19 @@ func TestResourceName(t *testing.T) {
assert.Equal(t, string(consts.ResourceNetBandwidth), policy.ResourceName())
}

func TestClearResidualState(t *testing.T) {
t.Parallel()

policy := makeStaticPolicy(t, true)

policy.metaServer = &metaserver.MetaServer{
MetaAgent: &metaserveragent.MetaAgent{
PodFetcher: &pod.PodFetcherStub{},
},
}
policy.clearResidualState(nil, nil, nil, nil, nil)
}

func TestGetTopologyHints(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 742555a

Please sign in to comment.