diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go index b651001c8f60a..caea010e9c98a 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go @@ -17,10 +17,13 @@ limitations under the License. package priorities import ( + "sync" + "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/labels" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" @@ -79,8 +82,6 @@ func getZoneKey(node *api.Node) string { // pods which match the same service selectors or RC selectors as the pod being scheduled. // Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods. func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { - var nsPods []*api.Pod - selectors := make([]labels.Selector, 0) services, err := s.serviceLister.GetPodServices(pod) if err == nil { @@ -103,19 +104,6 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma } } - if len(selectors) > 0 { - pods, err := s.podLister.List(labels.Everything()) - if err != nil { - return nil, err - } - // consider only the pods that belong to the same namespace - for _, nsPod := range pods { - if nsPod.Namespace == pod.Namespace { - nsPods = append(nsPods, nsPod) - } - } - } - nodes, err := nodeLister.List() if err != nil { return nil, err @@ -123,26 +111,63 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma // Count similar pods by node countsByNodeName := map[string]int{} - for _, pod := range nsPods { - // When we are replacing a failed pod, we often see the previous deleted version - // while scheduling the replacement. Ignore the previous deleted version for spreading - // purposes (it can still be considered for resource restrictions etc.) - if pod.DeletionTimestamp != nil { - glog.V(2).Infof("skipping pending-deleted pod: %s/%s", pod.Namespace, pod.Name) - continue - } - matches := false - for _, selector := range selectors { - if selector.Matches(labels.Set(pod.ObjectMeta.Labels)) { - matches = true - break - } + countsByNodeNameLock := sync.Mutex{} + + if len(selectors) > 0 { + // Create a number of go-routines that will be computing number + // of "similar" pods for given nodes. + workers := 16 + toProcess := make(chan string, len(nodes.Items)) + for i := range nodes.Items { + toProcess <- nodes.Items[i].Name } - if !matches { - continue + close(toProcess) + + wg := sync.WaitGroup{} + wg.Add(workers) + for i := 0; i < workers; i++ { + go func() { + defer utilruntime.HandleCrash() + defer wg.Done() + for { + nodeName, ok := <-toProcess + if !ok { + return + } + count := 0 + for _, nodePod := range nodeNameToInfo[nodeName].Pods() { + if pod.Namespace != nodePod.Namespace { + continue + } + // When we are replacing a failed pod, we often see the previous + // deleted version while scheduling the replacement. + // Ignore the previous deleted version for spreading purposes + // (it can still be considered for resource restrictions etc.) + if nodePod.DeletionTimestamp != nil { + glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name) + continue + } + matches := false + for _, selector := range selectors { + if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) { + matches = true + break + } + } + if matches { + count++ + } + } + + func() { + countsByNodeNameLock.Lock() + defer countsByNodeNameLock.Unlock() + countsByNodeName[nodeName] = count + }() + } + }() } - - countsByNodeName[pod.Spec.NodeName]++ + wg.Wait() } // Aggregate by-node information