Skip to content

Commit

Permalink
Merge pull request kubernetes#23274 from wojtek-t/speedup_scheduler_s…
Browse files Browse the repository at this point in the history
…preading

Auto commit by PR queue bot
  • Loading branch information
k8s-merge-robot committed Mar 22, 2016
2 parents d124dee + ebcc8f7 commit c262c81
Showing 1 changed file with 58 additions and 33 deletions.
91 changes: 58 additions & 33 deletions plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ limitations under the License.
package priorities

import (
"sync"

"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/labels"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
Expand Down Expand Up @@ -79,8 +82,6 @@ func getZoneKey(node *api.Node) string {
// pods which match the same service selectors or RC selectors as the pod being scheduled.
// Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods.
func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
var nsPods []*api.Pod

selectors := make([]labels.Selector, 0)
services, err := s.serviceLister.GetPodServices(pod)
if err == nil {
Expand All @@ -103,46 +104,70 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma
}
}

if len(selectors) > 0 {
pods, err := s.podLister.List(labels.Everything())
if err != nil {
return nil, err
}
// consider only the pods that belong to the same namespace
for _, nsPod := range pods {
if nsPod.Namespace == pod.Namespace {
nsPods = append(nsPods, nsPod)
}
}
}

nodes, err := nodeLister.List()
if err != nil {
return nil, err
}

// Count similar pods by node
countsByNodeName := map[string]int{}
for _, pod := range nsPods {
// When we are replacing a failed pod, we often see the previous deleted version
// while scheduling the replacement. Ignore the previous deleted version for spreading
// purposes (it can still be considered for resource restrictions etc.)
if pod.DeletionTimestamp != nil {
glog.V(2).Infof("skipping pending-deleted pod: %s/%s", pod.Namespace, pod.Name)
continue
}
matches := false
for _, selector := range selectors {
if selector.Matches(labels.Set(pod.ObjectMeta.Labels)) {
matches = true
break
}
countsByNodeNameLock := sync.Mutex{}

if len(selectors) > 0 {
// Create a number of go-routines that will be computing number
// of "similar" pods for given nodes.
workers := 16
toProcess := make(chan string, len(nodes.Items))
for i := range nodes.Items {
toProcess <- nodes.Items[i].Name
}
if !matches {
continue
close(toProcess)

wg := sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer utilruntime.HandleCrash()
defer wg.Done()
for {
nodeName, ok := <-toProcess
if !ok {
return
}
count := 0
for _, nodePod := range nodeNameToInfo[nodeName].Pods() {
if pod.Namespace != nodePod.Namespace {
continue
}
// When we are replacing a failed pod, we often see the previous
// deleted version while scheduling the replacement.
// Ignore the previous deleted version for spreading purposes
// (it can still be considered for resource restrictions etc.)
if nodePod.DeletionTimestamp != nil {
glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name)
continue
}
matches := false
for _, selector := range selectors {
if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) {
matches = true
break
}
}
if matches {
count++
}
}

func() {
countsByNodeNameLock.Lock()
defer countsByNodeNameLock.Unlock()
countsByNodeName[nodeName] = count
}()
}
}()
}

countsByNodeName[pod.Spec.NodeName]++
wg.Wait()
}

// Aggregate by-node information
Expand Down

0 comments on commit c262c81

Please sign in to comment.