From 2087cd67d84c906b63ee01685733a13eafa41d4e Mon Sep 17 00:00:00 2001 From: weekface Date: Thu, 19 Dec 2019 15:16:44 +0800 Subject: [PATCH 1/9] scheduler: refine scheduler error message --- pkg/scheduler/predicates/ha.go | 118 +++++++++--------- pkg/scheduler/predicates/ha_test.go | 94 +++++--------- pkg/scheduler/predicates/stable_scheduling.go | 16 +-- .../predicates/stable_scheduling_test.go | 11 +- pkg/scheduler/predicates/test_helper.go | 14 +++ pkg/scheduler/scheduler.go | 15 ++- pkg/scheduler/scheduler_test.go | 14 ++- 7 files changed, 141 insertions(+), 141 deletions(-) diff --git a/pkg/scheduler/predicates/ha.go b/pkg/scheduler/predicates/ha.go index a023fc0625e..a704831da4b 100644 --- a/pkg/scheduler/predicates/ha.go +++ b/pkg/scheduler/predicates/ha.go @@ -31,7 +31,6 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" glog "k8s.io/klog" ) @@ -47,15 +46,13 @@ type ha struct { pvcListFn func(ns, instanceName, component string) (*apiv1.PersistentVolumeClaimList, error) updatePVCFn func(*apiv1.PersistentVolumeClaim) error acquireLockFn func(*apiv1.Pod) (*apiv1.PersistentVolumeClaim, *apiv1.PersistentVolumeClaim, error) - recorder record.EventRecorder } // NewHA returns a Predicate -func NewHA(kubeCli kubernetes.Interface, cli versioned.Interface, recorder record.EventRecorder) Predicate { +func NewHA(kubeCli kubernetes.Interface, cli versioned.Interface) Predicate { h := &ha{ - kubeCli: kubeCli, - cli: cli, - recorder: recorder, + kubeCli: kubeCli, + cli: cli, } h.podListFn = h.realPodListFn h.podGetFn = h.realPodGetFn @@ -68,7 +65,7 @@ func NewHA(kubeCli kubernetes.Interface, cli versioned.Interface, recorder recor } func (h *ha) Name() string { - return "HighAvailability" + return "HAScheduling" } // 1. return the node to kube-scheduler if there is only one feasible node and the pod's pvc is bound @@ -95,7 +92,7 @@ func (h *ha) Filter(instanceName string, pod *apiv1.Pod, nodes []apiv1.Node) ([] } if len(nodes) == 0 { - return nil, fmt.Errorf("kube nodes is empty") + return nil, fmt.Errorf("no nodes available to schedule pods") } if _, _, err := h.acquireLockFn(pod); err != nil { return nil, err @@ -144,53 +141,54 @@ func (h *ha) Filter(instanceName string, pod *apiv1.Pod, nodes []apiv1.Node) ([] min := -1 minNodeNames := make([]string, 0) - for nodeName, podNames := range nodeMap { - podsCount := len(podNames) - maxPodsPerNode := 0 - - if component == label.PDLabelVal { + maxPodsPerNode := 0 + + if component == label.PDLabelVal { + /** + * replicas maxPodsPerNode + * --------------------------- + * 1 1 + * 2 1 + * 3 1 + * 4 1 + * 5 2 + * ... + */ + maxPodsPerNode = int((replicas+1)/2) - 1 + if maxPodsPerNode <= 0 { + maxPodsPerNode = 1 + } + } else { + // 1. TiKV instances must run on at least 3 nodes, otherwise HA is not possible + if allNodes.Len() < 3 { + maxPodsPerNode = 1 + } else { /** - * replicas maxPodsPerNode - * --------------------------- - * 1 1 - * 2 1 - * 3 1 - * 4 1 - * 5 2 + * 2. we requires TiKV instances to run on at least 3 nodes, so max + * allowed pods on each node is ceil(replicas / 3) + * + * replicas maxPodsPerNode best HA on three nodes + * --------------------------------------------------- + * 3 1 1, 1, 1 + * 4 2 1, 1, 2 + * 5 2 1, 2, 2 + * 6 2 2, 2, 2 + * 7 3 2, 2, 3 + * 8 3 2, 3, 3 * ... */ - maxPodsPerNode = int((replicas+1)/2) - 1 - if maxPodsPerNode <= 0 { - maxPodsPerNode = 1 - } - } else { - // replicas less than 3 cannot achieve high availability - if replicas < 3 { - minNodeNames = append(minNodeNames, nodeName) - glog.Infof("replicas is %d, add node %s to minNodeNames", replicas, nodeName) - continue - } - - // 1. TiKV instances must run on at least 3 nodes, otherwise HA is not possible - if allNodes.Len() < 3 { - maxPodsPerNode = 1 - } else { - /** - * 2. we requires TiKV instances to run on at least 3 nodes, so max - * allowed pods on each node is ceil(replicas / 3) - * - * replicas maxPodsPerNode best HA on three nodes - * --------------------------------------------------- - * 3 1 1, 1, 1 - * 4 2 1, 1, 2 - * 5 2 1, 2, 2 - * 6 2 2, 2, 2 - * 7 3 2, 2, 3 - * 8 3 2, 3, 3 - * ... - */ - maxPodsPerNode = int(math.Ceil(float64(replicas) / 3)) - } + maxPodsPerNode = int(math.Ceil(float64(replicas) / 3)) + } + } + + for nodeName, podNames := range nodeMap { + podsCount := len(podNames) + + // tikv replicas less than 3 cannot achieve high availability + if component == label.TiKVLabelVal && replicas < 3 { + minNodeNames = append(minNodeNames, nodeName) + glog.Infof("replicas is %d, add node %s to minNodeNames", replicas, nodeName) + continue } if podsCount+1 > maxPodsPerNode { @@ -216,10 +214,18 @@ func (h *ha) Filter(instanceName string, pod *apiv1.Pod, nodes []apiv1.Node) ([] } if len(minNodeNames) == 0 { - msg := fmt.Sprintf("can't schedule to nodes: %v, because these pods had been scheduled to nodes: %v", GetNodeNames(nodes), nodeMap) - glog.Info(msg) - h.recorder.Event(pod, apiv1.EventTypeWarning, "FailedScheduling", msg) - return nil, errors.New(msg) + nodesStrArr := []string{} + for nodeName, podNameArr := range nodeMap { + s := fmt.Sprintf("%s (%d %s pods)", + nodeName, len(podNameArr), strings.ToLower(component)) + nodesStrArr = append(nodesStrArr, s) + } + sort.Strings(nodesStrArr) + + // example: unable to schedule to nodes: kube-node-1 (1 pd pods), kube-node-2 (1 pd pods), max pods per node: 1 + errMsg := fmt.Sprintf("unable to schedule to nodes: %s, max pods per node: %d", + strings.Join(nodesStrArr, ", "), maxPodsPerNode) + return nil, errors.New(errMsg) } return getNodeFromNames(nodes, minNodeNames), nil } diff --git a/pkg/scheduler/predicates/ha_test.go b/pkg/scheduler/predicates/ha_test.go index 1cbfc98b79a..ff9e3064628 100644 --- a/pkg/scheduler/predicates/ha_test.go +++ b/pkg/scheduler/predicates/ha_test.go @@ -27,7 +27,6 @@ import ( apiv1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/record" ) func TestMapAndIntNil(t *testing.T) { @@ -436,7 +435,7 @@ func TestHAFilter(t *testing.T) { pvcGetFn func(string, string) (*apiv1.PersistentVolumeClaim, error) tcGetFn func(string, string) (*v1alpha1.TidbCluster, error) acquireLockFn func(*apiv1.Pod) (*apiv1.PersistentVolumeClaim, *apiv1.PersistentVolumeClaim, error) - expectFn func([]apiv1.Node, error, record.FakeRecorder) + expectFn func([]apiv1.Node, error) } testFn := func(test *testcase, t *testing.T) { @@ -446,17 +445,15 @@ func TestHAFilter(t *testing.T) { pod := test.podFn(instanceName, clusterName, 0) nodes := test.nodesFn() - recorder := record.NewFakeRecorder(10) ha := ha{ podListFn: test.podListFn, pvcGetFn: test.pvcGetFn, tcGetFn: test.tcGetFn, acquireLockFn: test.acquireLockFn, - recorder: recorder, } n, err := ha.Filter(instanceName, pod, nodes) - test.expectFn(n, err, *recorder) + test.expectFn(n, err) } tests := []testcase{ @@ -472,7 +469,7 @@ func TestHAFilter(t *testing.T) { }, nil }, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1"})) @@ -492,7 +489,7 @@ func TestHAFilter(t *testing.T) { acquireLockFn: func(pod *corev1.Pod) (*apiv1.PersistentVolumeClaim, *apiv1.PersistentVolumeClaim, error) { return nil, nil, fmt.Errorf("failed to acquire the lock") }, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).To(HaveOccurred()) g.Expect(strings.Contains(err.Error(), "failed to acquire the lock")).To(BeTrue()) }, @@ -505,7 +502,7 @@ func TestHAFilter(t *testing.T) { return nil, fmt.Errorf("get pvc failed") }, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).To(HaveOccurred()) g.Expect(strings.Contains(err.Error(), "get pvc failed")).To(BeTrue()) }, @@ -516,7 +513,7 @@ func TestHAFilter(t *testing.T) { nodesFn: fakeThreeNodes, podListFn: podListErr(), acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).To(HaveOccurred()) g.Expect(strings.Contains(err.Error(), "list pods failed")).To(BeTrue()) }, @@ -530,7 +527,7 @@ func TestHAFilter(t *testing.T) { return nil, fmt.Errorf("get tidbcluster failed") }, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).To(HaveOccurred()) g.Expect(strings.Contains(err.Error(), "get tidbcluster failed")).To(BeTrue()) }, @@ -542,9 +539,9 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{}), tcGetFn: tcGetFn, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).To(HaveOccurred()) - g.Expect(strings.Contains(err.Error(), "kube nodes is empty")).To(BeTrue()) + g.Expect(strings.Contains(err.Error(), "no nodes available to schedule pods")).To(BeTrue()) }, }, { @@ -561,7 +558,7 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{}), tcGetFn: tcGetOneReplicasFn, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1"})) @@ -581,7 +578,7 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{}), tcGetFn: tcGetOneReplicasFn, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(2)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-2"})) @@ -601,7 +598,7 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{}), tcGetFn: tcGetTwoReplicasFn, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1"})) @@ -621,7 +618,7 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{}), tcGetFn: tcGetTwoReplicasFn, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(2)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-2"})) @@ -641,7 +638,7 @@ func TestHAFilter(t *testing.T) { acquireLockFn: acquireSuccess, podListFn: podListFn(map[string][]int32{}), tcGetFn: tcGetFn, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1"})) @@ -661,12 +658,9 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{"kube-node-1": {1}}), acquireLockFn: acquireSuccess, tcGetFn: tcGetFn, - expectFn: func(nodes []apiv1.Node, err error, recorder record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).To(HaveOccurred()) - events := collectEvents(recorder.Events) - g.Expect(events).To(HaveLen(1)) - g.Expect(events[0]).To(ContainSubstring("FailedScheduling")) - g.Expect(strings.Contains(err.Error(), "can't schedule to nodes:")).To(BeTrue()) + g.Expect(err.Error()).To(ContainSubstring("unable to schedule to nodes: kube-node-1 (1 pd pods), max pods per node: 1")) g.Expect(len(nodes)).To(Equal(0)) }, }, @@ -677,7 +671,7 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{"kube-node-1": {0}}), acquireLockFn: acquireSuccess, tcGetFn: tcGetFn, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-2"})) @@ -690,12 +684,9 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{"kube-node-1": {0}, "kube-node-2": {1}}), acquireLockFn: acquireSuccess, tcGetFn: tcGetFn, - expectFn: func(nodes []apiv1.Node, err error, recorder record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).To(HaveOccurred()) - events := collectEvents(recorder.Events) - g.Expect(events).To(HaveLen(1)) - g.Expect(events[0]).To(ContainSubstring("FailedScheduling")) - g.Expect(strings.Contains(err.Error(), "can't schedule to nodes:")).To(BeTrue()) + g.Expect(err.Error()).To(ContainSubstring("unable to schedule to nodes: kube-node-1 (1 pd pods), kube-node-2 (1 pd pods), max pods per node: 1")) g.Expect(len(nodes)).To(Equal(0)) }, }, @@ -706,7 +697,7 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{}), acquireLockFn: acquireSuccess, tcGetFn: tcGetFn, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(3)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-2", "kube-node-3"})) @@ -719,7 +710,7 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{"kube-node-1": {0}}), acquireLockFn: acquireSuccess, tcGetFn: tcGetFn, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(2)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-2", "kube-node-3"})) @@ -732,7 +723,7 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{"kube-node-1": {0}, "kube-node-2": {1}}), acquireLockFn: acquireSuccess, tcGetFn: tcGetFn, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) g.Expect(nodes[0].Name).To(Equal("kube-node-3")) @@ -745,7 +736,7 @@ func TestHAFilter(t *testing.T) { podListFn: podListFn(map[string][]int32{"kube-node-4": {4}}), acquireLockFn: acquireSuccess, tcGetFn: tcGetFn, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(3)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-2", "kube-node-3"})) @@ -762,7 +753,7 @@ func TestHAFilter(t *testing.T) { tc.Spec.TiKV.Replicas = 4 return tc, nil }, - expectFn: func(nodes []apiv1.Node, err error, recorder record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(3)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-2", "kube-node-3"})) @@ -772,19 +763,16 @@ func TestHAFilter(t *testing.T) { name: "two nodes, 2,2 pods scheduled on these two nodes, replicas is 5, can't schedule", podFn: newHATiKVPod, nodesFn: fakeTwoNodes, - podListFn: podListFn(map[string][]int32{"kube-node-1": {2}, "kube-node-2": {2}}), + podListFn: podListFn(map[string][]int32{"kube-node-1": {0}, "kube-node-2": {1}}), tcGetFn: func(ns string, tcName string) (*v1alpha1.TidbCluster, error) { tc, _ := tcGetFn(ns, tcName) tc.Spec.TiKV.Replicas = 5 return tc, nil }, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error, recorder record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).To(HaveOccurred()) - events := collectEvents(recorder.Events) - g.Expect(events).To(HaveLen(1)) - g.Expect(events[0]).To(ContainSubstring("FailedScheduling")) - g.Expect(strings.Contains(err.Error(), "can't schedule to nodes:")).To(BeTrue()) + g.Expect(err.Error()).To(ContainSubstring("unable to schedule to nodes: kube-node-1 (1 tikv pods), kube-node-2 (1 tikv pods), max pods per node: 1")) g.Expect(len(nodes)).To(Equal(0)) }, }, @@ -799,7 +787,7 @@ func TestHAFilter(t *testing.T) { return tc, nil }, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(3)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-2", "kube-node-3"})) @@ -816,7 +804,7 @@ func TestHAFilter(t *testing.T) { tc.Spec.PD.Replicas = 5 return tc, nil }, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(2)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-2", "kube-node-3"})) @@ -833,7 +821,7 @@ func TestHAFilter(t *testing.T) { return tc, nil }, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) g.Expect(nodes[0].Name).To(Equal("kube-node-4")) @@ -850,7 +838,7 @@ func TestHAFilter(t *testing.T) { return tc, nil }, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(4)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-2", "kube-node-3", "kube-node-4"})) @@ -867,7 +855,7 @@ func TestHAFilter(t *testing.T) { return tc, nil }, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(4)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-2", "kube-node-3", "kube-node-4"})) @@ -884,7 +872,7 @@ func TestHAFilter(t *testing.T) { return tc, nil }, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(3)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-2", "kube-node-3", "kube-node-4"})) @@ -901,7 +889,7 @@ func TestHAFilter(t *testing.T) { return tc, nil }, acquireLockFn: acquireSuccess, - expectFn: func(nodes []apiv1.Node, err error, _ record.FakeRecorder) { + expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-2"})) @@ -1045,17 +1033,3 @@ func getSortedNodeNames(nodes []apiv1.Node) []string { func acquireSuccess(*apiv1.Pod) (*apiv1.PersistentVolumeClaim, *apiv1.PersistentVolumeClaim, error) { return nil, nil, nil } - -func collectEvents(source <-chan string) []string { - done := false - events := make([]string, 0) - for !done { - select { - case event := <-source: - events = append(events, event) - default: - done = true - } - } - return events -} diff --git a/pkg/scheduler/predicates/stable_scheduling.go b/pkg/scheduler/predicates/stable_scheduling.go index ab1e226b10d..eaaaf025602 100644 --- a/pkg/scheduler/predicates/stable_scheduling.go +++ b/pkg/scheduler/predicates/stable_scheduling.go @@ -24,7 +24,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/record" glog "k8s.io/klog" ) @@ -40,17 +39,15 @@ var ( ) type stableScheduling struct { - kubeCli kubernetes.Interface - cli versioned.Interface - recorder record.EventRecorder + kubeCli kubernetes.Interface + cli versioned.Interface } // NewStableScheduling returns a Predicate -func NewStableScheduling(kubeCli kubernetes.Interface, cli versioned.Interface, recorder record.EventRecorder) Predicate { +func NewStableScheduling(kubeCli kubernetes.Interface, cli versioned.Interface) Predicate { p := &stableScheduling{ - kubeCli: kubeCli, - cli: cli, - recorder: recorder, + kubeCli: kubeCli, + cli: cli, } return p } @@ -102,8 +99,7 @@ func (p *stableScheduling) Filter(instanceName string, pod *apiv1.Pod, nodes []a return []apiv1.Node{node}, nil } } - msg := fmt.Sprintf("cannot run on its previous node %q", nodeName) - p.recorder.Event(pod, apiv1.EventTypeWarning, UnableToRunOnPreviousNodeReason, msg) + return nodes, fmt.Errorf("cannot run on its previous node %q", nodeName) } else { glog.V(2).Infof("no previous node exists for pod %q in TiDB cluster %s/%q", podName, ns, tcName) } diff --git a/pkg/scheduler/predicates/stable_scheduling_test.go b/pkg/scheduler/predicates/stable_scheduling_test.go index 8a29cdd0d72..c63fb49404b 100644 --- a/pkg/scheduler/predicates/stable_scheduling_test.go +++ b/pkg/scheduler/predicates/stable_scheduling_test.go @@ -135,10 +135,8 @@ func TestStableSchedulingFilter(t *testing.T) { makeNode("node-3"), }, expectFn: func(nodes []v1.Node, err error, recorder *record.FakeRecorder) { - g.Expect(err).NotTo(HaveOccurred()) - events := collectEvents(recorder.Events) - g.Expect(events).To(HaveLen(1)) - g.Expect(events[0]).To(ContainSubstring(UnableToRunOnPreviousNodeReason)) + g.Expect(err).To(HaveOccurred()) + g.Expect(err.Error()).To(ContainSubstring("cannot run on its previous node \"node-4\"")) g.Expect(len(nodes)).To(Equal(3)) }, }, @@ -174,9 +172,8 @@ func TestStableSchedulingFilter(t *testing.T) { g.Expect(err).NotTo(HaveOccurred()) } p := stableScheduling{ - kubeCli: kubeCli, - cli: cli, - recorder: recorder, + kubeCli: kubeCli, + cli: cli, } nodes, err := p.Filter(tc.instanceName, tc.pod, tc.candicateNodes) tc.expectFn(nodes, err, recorder) diff --git a/pkg/scheduler/predicates/test_helper.go b/pkg/scheduler/predicates/test_helper.go index f8ed2f171df..55b09667c1a 100644 --- a/pkg/scheduler/predicates/test_helper.go +++ b/pkg/scheduler/predicates/test_helper.go @@ -81,3 +81,17 @@ func fakeOneNode() []apiv1.Node { func fakeZeroNode() []apiv1.Node { return []apiv1.Node{} } + +func CollectEvents(source <-chan string) []string { + done := false + events := make([]string, 0) + for !done { + select { + case event := <-source: + events = append(events, event) + default: + done = true + } + } + return events +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 23fb3cc5a9e..86dcbd567b4 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -15,6 +15,7 @@ package scheduler import ( "fmt" + "github.com/pingcap/tidb-operator/pkg/client/clientset/versioned" "github.com/pingcap/tidb-operator/pkg/features" "github.com/pingcap/tidb-operator/pkg/label" @@ -45,6 +46,8 @@ type Scheduler interface { type scheduler struct { // component => predicates predicates map[string][]predicates.Predicate + + recorder record.EventRecorder } // NewScheduler returns a Scheduler @@ -56,19 +59,20 @@ func NewScheduler(kubeCli kubernetes.Interface, cli versioned.Interface) Schedul recorder := eventBroadcaster.NewRecorder(kubescheme.Scheme, apiv1.EventSource{Component: "tidb-scheduler"}) predicatesByComponent := map[string][]predicates.Predicate{ label.PDLabelVal: { - predicates.NewHA(kubeCli, cli, recorder), + predicates.NewHA(kubeCli, cli), }, label.TiKVLabelVal: { - predicates.NewHA(kubeCli, cli, recorder), + predicates.NewHA(kubeCli, cli), }, } if features.DefaultFeatureGate.Enabled(features.StableScheduling) { predicatesByComponent[label.TiDBLabelVal] = []predicates.Predicate{ - predicates.NewStableScheduling(kubeCli, cli, recorder), + predicates.NewStableScheduling(kubeCli, cli), } } return &scheduler{ predicates: predicatesByComponent, + recorder: recorder, } } @@ -115,7 +119,10 @@ func (s *scheduler) Filter(args *schedulerapiv1.ExtenderArgs) (*schedulerapiv1.E glog.Infof("entering predicate: %s, nodes: %v", predicate.Name(), predicates.GetNodeNames(kubeNodes)) kubeNodes, err = predicate.Filter(instanceName, pod, kubeNodes) if err != nil { - return nil, err + s.recorder.Event(pod, apiv1.EventTypeWarning, predicate.Name(), err.Error()) + return &schedulerapiv1.ExtenderFilterResult{ + Nodes: &apiv1.NodeList{Items: kubeNodes}, + }, nil } glog.Infof("leaving predicate: %s, nodes: %v", predicate.Name(), predicates.GetNodeNames(kubeNodes)) } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 1885d6c21c9..15ef5e5eb16 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -15,7 +15,6 @@ package scheduler import ( "fmt" - "strings" "testing" . "github.com/onsi/gomega" @@ -24,6 +23,7 @@ import ( apiv1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" schedulerapiv1 "k8s.io/kubernetes/pkg/scheduler/api/v1" ) @@ -36,6 +36,8 @@ func TestSchedulerFilter(t *testing.T) { expectFn func(*GomegaWithT, *schedulerapiv1.ExtenderFilterResult, error) } + recorder := record.NewFakeRecorder(10) + testFn := func(test *testcase, t *testing.T) { t.Log(test.name) @@ -51,6 +53,8 @@ func TestSchedulerFilter(t *testing.T) { newFakeErrPredicate(), }, }, + + recorder: recorder, } if test.predicateError { for _, predicatesByComponent := range s.predicates { @@ -136,9 +140,11 @@ func TestSchedulerFilter(t *testing.T) { }, predicateError: true, expectFn: func(g *GomegaWithT, result *schedulerapiv1.ExtenderFilterResult, err error) { - g.Expect(err).To(HaveOccurred()) - g.Expect(strings.Contains(err.Error(), "predicate error")).To(BeTrue()) - g.Expect(result).To(BeNil()) + g.Expect(err).NotTo(HaveOccurred()) + events := predicates.CollectEvents(recorder.Events) + g.Expect(events).To(HaveLen(1)) + g.Expect(events[0]).To(ContainSubstring("predicate error")) + g.Expect(result.Nodes.Items).To(BeNil()) }, }, { From 9f59411b5aefa3ed595384d5f4327366a4cd58a5 Mon Sep 17 00:00:00 2001 From: weekface Date: Thu, 19 Dec 2019 16:47:07 +0800 Subject: [PATCH 2/9] fix make check error --- pkg/scheduler/predicates/stable_scheduling.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/scheduler/predicates/stable_scheduling.go b/pkg/scheduler/predicates/stable_scheduling.go index eaaaf025602..23b4c4ccc0c 100644 --- a/pkg/scheduler/predicates/stable_scheduling.go +++ b/pkg/scheduler/predicates/stable_scheduling.go @@ -100,9 +100,8 @@ func (p *stableScheduling) Filter(instanceName string, pod *apiv1.Pod, nodes []a } } return nodes, fmt.Errorf("cannot run on its previous node %q", nodeName) - } else { - glog.V(2).Infof("no previous node exists for pod %q in TiDB cluster %s/%q", podName, ns, tcName) } + glog.V(2).Infof("no previous node exists for pod %q in TiDB cluster %s/%q", podName, ns, tcName) return nodes, nil } From f365ee729e9414b349ae227c04a63d13b6188c7d Mon Sep 17 00:00:00 2001 From: weekface Date: Thu, 19 Dec 2019 17:35:24 +0800 Subject: [PATCH 3/9] Update pkg/scheduler/predicates/ha.go Co-Authored-By: onlymellb --- pkg/scheduler/predicates/ha.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scheduler/predicates/ha.go b/pkg/scheduler/predicates/ha.go index a704831da4b..991c807dcb2 100644 --- a/pkg/scheduler/predicates/ha.go +++ b/pkg/scheduler/predicates/ha.go @@ -92,7 +92,7 @@ func (h *ha) Filter(instanceName string, pod *apiv1.Pod, nodes []apiv1.Node) ([] } if len(nodes) == 0 { - return nil, fmt.Errorf("no nodes available to schedule pods") + return nil, fmt.Errorf("no nodes available to schedule pods %s/%s", ns, podName) } if _, _, err := h.acquireLockFn(pod); err != nil { return nil, err From 98a626e70bd810895a95a1c39522caebd0ca2e2a Mon Sep 17 00:00:00 2001 From: weekface Date: Thu, 19 Dec 2019 17:41:42 +0800 Subject: [PATCH 4/9] fix typo --- pkg/scheduler/predicates/ha.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scheduler/predicates/ha.go b/pkg/scheduler/predicates/ha.go index 991c807dcb2..7d52c300f91 100644 --- a/pkg/scheduler/predicates/ha.go +++ b/pkg/scheduler/predicates/ha.go @@ -92,7 +92,7 @@ func (h *ha) Filter(instanceName string, pod *apiv1.Pod, nodes []apiv1.Node) ([] } if len(nodes) == 0 { - return nil, fmt.Errorf("no nodes available to schedule pods %s/%s", ns, podName) + return nil, fmt.Errorf("no nodes available to schedule pods %s/%s", ns, podName) } if _, _, err := h.acquireLockFn(pod); err != nil { return nil, err From c7c3f2f8f4ba2d414cc20bc05b05fe4b2007e2eb Mon Sep 17 00:00:00 2001 From: weekface Date: Thu, 19 Dec 2019 19:47:42 +0800 Subject: [PATCH 5/9] address comment --- pkg/scheduler/scheduler.go | 9 +++++---- pkg/scheduler/scheduler_test.go | 4 +++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 86dcbd567b4..a106b29078f 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -118,13 +118,14 @@ func (s *scheduler) Filter(args *schedulerapiv1.ExtenderArgs) (*schedulerapiv1.E for _, predicate := range predicatesByComponent { glog.Infof("entering predicate: %s, nodes: %v", predicate.Name(), predicates.GetNodeNames(kubeNodes)) kubeNodes, err = predicate.Filter(instanceName, pod, kubeNodes) + glog.Infof("leaving predicate: %s, nodes: %v", predicate.Name(), predicates.GetNodeNames(kubeNodes)) if err != nil { s.recorder.Event(pod, apiv1.EventTypeWarning, predicate.Name(), err.Error()) - return &schedulerapiv1.ExtenderFilterResult{ - Nodes: &apiv1.NodeList{Items: kubeNodes}, - }, nil + if len(kubeNodes) == 0 { + // do not return error to k8s: https://github.com/pingcap/tidb-operator/issues/1353 + return nil, nil + } } - glog.Infof("leaving predicate: %s, nodes: %v", predicate.Name(), predicates.GetNodeNames(kubeNodes)) } return &schedulerapiv1.ExtenderFilterResult{ diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 15ef5e5eb16..bdeecc00f86 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -144,7 +144,9 @@ func TestSchedulerFilter(t *testing.T) { events := predicates.CollectEvents(recorder.Events) g.Expect(events).To(HaveLen(1)) g.Expect(events[0]).To(ContainSubstring("predicate error")) - g.Expect(result.Nodes.Items).To(BeNil()) + if result != nil { + g.Expect(result.Nodes.Items).To(BeNil()) + } }, }, { From 7840d22d0a5d8ec65e0c66333ca6a239e0bee555 Mon Sep 17 00:00:00 2001 From: weekface Date: Thu, 19 Dec 2019 19:55:02 +0800 Subject: [PATCH 6/9] address comment --- pkg/scheduler/predicates/stable_scheduling.go | 2 +- pkg/scheduler/predicates/stable_scheduling_test.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/scheduler/predicates/stable_scheduling.go b/pkg/scheduler/predicates/stable_scheduling.go index 23b4c4ccc0c..5855c669451 100644 --- a/pkg/scheduler/predicates/stable_scheduling.go +++ b/pkg/scheduler/predicates/stable_scheduling.go @@ -99,7 +99,7 @@ func (p *stableScheduling) Filter(instanceName string, pod *apiv1.Pod, nodes []a return []apiv1.Node{node}, nil } } - return nodes, fmt.Errorf("cannot run on its previous node %q", nodeName) + return nodes, fmt.Errorf("cannot run %s/%s on its previous node %q", ns, podName, nodeName) } glog.V(2).Infof("no previous node exists for pod %q in TiDB cluster %s/%q", podName, ns, tcName) diff --git a/pkg/scheduler/predicates/stable_scheduling_test.go b/pkg/scheduler/predicates/stable_scheduling_test.go index c63fb49404b..74cf6362317 100644 --- a/pkg/scheduler/predicates/stable_scheduling_test.go +++ b/pkg/scheduler/predicates/stable_scheduling_test.go @@ -51,7 +51,8 @@ func makeTidbCluster(name, node string) *v1alpha1.TidbCluster { func makePod(name string, component string) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: name, + Namespace: v1.NamespaceDefault, + Name: name, Labels: map[string]string{ label.ComponentLabelKey: component, }, @@ -136,7 +137,7 @@ func TestStableSchedulingFilter(t *testing.T) { }, expectFn: func(nodes []v1.Node, err error, recorder *record.FakeRecorder) { g.Expect(err).To(HaveOccurred()) - g.Expect(err.Error()).To(ContainSubstring("cannot run on its previous node \"node-4\"")) + g.Expect(err.Error()).To(ContainSubstring("cannot run default/demo-tidb-0 on its previous node \"node-4\"")) g.Expect(len(nodes)).To(Equal(3)) }, }, From 182874eed5cc6061922fd307c13193ffa719ca08 Mon Sep 17 00:00:00 2001 From: weekface Date: Mon, 23 Dec 2019 16:18:19 +0800 Subject: [PATCH 7/9] address comment --- pkg/scheduler/predicates/stable_scheduling.go | 7 +++++-- pkg/scheduler/predicates/stable_scheduling_test.go | 4 +++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/scheduler/predicates/stable_scheduling.go b/pkg/scheduler/predicates/stable_scheduling.go index 5855c669451..1ab7bea2acd 100644 --- a/pkg/scheduler/predicates/stable_scheduling.go +++ b/pkg/scheduler/predicates/stable_scheduling.go @@ -14,6 +14,7 @@ package predicates import ( + "errors" "fmt" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" @@ -101,7 +102,9 @@ func (p *stableScheduling) Filter(instanceName string, pod *apiv1.Pod, nodes []a } return nodes, fmt.Errorf("cannot run %s/%s on its previous node %q", ns, podName, nodeName) } - glog.V(2).Infof("no previous node exists for pod %q in TiDB cluster %s/%q", podName, ns, tcName) - return nodes, nil + msg := fmt.Sprintf("no previous node exists for pod %q in TiDB cluster %s/%s", podName, ns, tcName) + glog.Warning(msg) + + return nodes, errors.New(msg) } diff --git a/pkg/scheduler/predicates/stable_scheduling_test.go b/pkg/scheduler/predicates/stable_scheduling_test.go index 74cf6362317..e6ff273a006 100644 --- a/pkg/scheduler/predicates/stable_scheduling_test.go +++ b/pkg/scheduler/predicates/stable_scheduling_test.go @@ -56,6 +56,7 @@ func makePod(name string, component string) *v1.Pod { Labels: map[string]string{ label.ComponentLabelKey: component, }, + GenerateName: name[0 : len(name)-1], }, } } @@ -121,7 +122,8 @@ func TestStableSchedulingFilter(t *testing.T) { makeNode("node-3"), }, expectFn: func(nodes []v1.Node, err error, recorder *record.FakeRecorder) { - g.Expect(err).NotTo(HaveOccurred()) + g.Expect(err).To(HaveOccurred()) + g.Expect(err.Error()).To(ContainSubstring("no previous node exists for pod \"demo-tidb-0\" in TiDB cluster default/demo")) g.Expect(len(nodes)).To(Equal(3)) }, }, From bc7f6aa7e4dd364f560af107d052521c06af4a8f Mon Sep 17 00:00:00 2001 From: weekface Date: Mon, 23 Dec 2019 16:42:14 +0800 Subject: [PATCH 8/9] adress comment --- pkg/scheduler/scheduler.go | 3 ++- pkg/scheduler/scheduler_test.go | 4 +--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index a106b29078f..7f85951f641 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -122,8 +122,9 @@ func (s *scheduler) Filter(args *schedulerapiv1.ExtenderArgs) (*schedulerapiv1.E if err != nil { s.recorder.Event(pod, apiv1.EventTypeWarning, predicate.Name(), err.Error()) if len(kubeNodes) == 0 { + break // do not return error to k8s: https://github.com/pingcap/tidb-operator/issues/1353 - return nil, nil + // return nil, nil } } } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index bdeecc00f86..15ef5e5eb16 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -144,9 +144,7 @@ func TestSchedulerFilter(t *testing.T) { events := predicates.CollectEvents(recorder.Events) g.Expect(events).To(HaveLen(1)) g.Expect(events[0]).To(ContainSubstring("predicate error")) - if result != nil { - g.Expect(result.Nodes.Items).To(BeNil()) - } + g.Expect(result.Nodes.Items).To(BeNil()) }, }, { From 2dfc38ff93755188884dc660087bbc3f6fcdb37d Mon Sep 17 00:00:00 2001 From: weekface Date: Mon, 23 Dec 2019 16:56:36 +0800 Subject: [PATCH 9/9] remove comment --- pkg/scheduler/scheduler.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 7f85951f641..e627dc2837e 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -123,8 +123,6 @@ func (s *scheduler) Filter(args *schedulerapiv1.ExtenderArgs) (*schedulerapiv1.E s.recorder.Event(pod, apiv1.EventTypeWarning, predicate.Name(), err.Error()) if len(kubeNodes) == 0 { break - // do not return error to k8s: https://github.com/pingcap/tidb-operator/issues/1353 - // return nil, nil } } }