diff --git a/charts/tidb-operator/templates/scheduler-rbac.yaml b/charts/tidb-operator/templates/scheduler-rbac.yaml index 3b740c25f1..eb766fdb6a 100644 --- a/charts/tidb-operator/templates/scheduler-rbac.yaml +++ b/charts/tidb-operator/templates/scheduler-rbac.yaml @@ -50,6 +50,12 @@ rules: - apiGroups: [""] resources: ["persistentvolumes"] verbs: ["get", "list", "watch", "update"] +- apiGroups: ["pingcap.com"] + resources: ["tidbclusters"] + verbs: ["get"] +- apiGroups: [""] + resources: ["persistentvolumeclaims"] + verbs: ["get"] --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1beta1 @@ -103,6 +109,12 @@ rules: - apiGroups: ["storage.k8s.io"] resources: ["storageclasses"] verbs: ["get", "list", "watch"] +- apiGroups: ["pingcap.com"] + resources: ["tidbclusters"] + verbs: ["get"] +- apiGroups: [""] + resources: ["persistentvolumeclaims"] + verbs: ["get"] --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1beta1 diff --git a/cmd/scheduler/main.go b/cmd/scheduler/main.go index f20d4cf4b0..04d9b97862 100644 --- a/cmd/scheduler/main.go +++ b/cmd/scheduler/main.go @@ -21,6 +21,7 @@ import ( "time" "github.com/golang/glog" + "github.com/pingcap/tidb-operator/pkg/client/clientset/versioned" "github.com/pingcap/tidb-operator/pkg/scheduler/server" "github.com/pingcap/tidb-operator/version" "k8s.io/apimachinery/pkg/util/wait" @@ -59,9 +60,13 @@ func main() { if err != nil { glog.Fatalf("failed to get kubernetes Clientset: %v", err) } + cli, err := versioned.NewForConfig(cfg) + if err != nil { + glog.Fatalf("failed to create Clientset: %v", err) + } go wait.Forever(func() { - server.StartServer(kubeCli, port) + server.StartServer(kubeCli, cli, port) }, 5*time.Second) glog.Fatal(http.ListenAndServe(":6060", nil)) } diff --git a/pkg/scheduler/predicates/ha.go b/pkg/scheduler/predicates/ha.go index 3093add244..de2c2b3802 100644 --- a/pkg/scheduler/predicates/ha.go +++ b/pkg/scheduler/predicates/ha.go @@ -15,31 +15,35 @@ package predicates import ( "fmt" + "strings" "github.com/golang/glog" + "github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/client/clientset/versioned" "github.com/pingcap/tidb-operator/pkg/label" - "github.com/pingcap/tidb-operator/pkg/util" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" ) -const ( - replicas int32 = 3 -) - type ha struct { kubeCli kubernetes.Interface + cli versioned.Interface podListFn func(ns, instanceName, component string) (*apiv1.PodList, error) + pvcGetFn func(ns, pvcName string) (*apiv1.PersistentVolumeClaim, error) + tcGetFn func(ns, tcName string) (*v1alpha1.TidbCluster, error) } // NewHA returns a Predicate -func NewHA(kubeCli kubernetes.Interface) Predicate { +func NewHA(kubeCli kubernetes.Interface, cli versioned.Interface) Predicate { h := &ha{ kubeCli: kubeCli, + cli: cli, } h.podListFn = h.realPodListFn + h.pvcGetFn = h.realPVCGetFn + h.tcGetFn = h.realTCGetFn return h } @@ -47,85 +51,79 @@ func (h *ha) Name() string { return "HighAvailability" } -// 1. First, we sort all the nodes we get from kube-scheduler by how many same kind of pod it contains, -// find the nodes that have least pods. -// 2. When scheduling the first replicas pods, we must ensure no previous pods on the nodes. -// 3. For later pods, we choose the nodes that have least pods. +// 1. return the node to kube-scheduler if there is only one node and the pod's pvc is bound +// 2. return these nodes that have least pods and its pods count is less than (replicas+1)/2 to kube-scheduler +// 3. let kube-scheduler to make the final decision func (h *ha) Filter(instanceName string, pod *apiv1.Pod, nodes []apiv1.Node) ([]apiv1.Node, error) { ns := pod.GetNamespace() podName := pod.GetName() + component := pod.Labels[label.ComponentLabelKey] + tcName := getTCNameFromPod(pod, component) - var component string - var exist bool - if component, exist = pod.Labels[label.ComponentLabelKey]; !exist { - return nodes, fmt.Errorf("can't find component in pod labels: %s/%s", ns, podName) + if len(nodes) == 0 { + return nil, fmt.Errorf("kube nodes is empty") + } + + if len(nodes) == 1 { + pvcName := fmt.Sprintf("%s-%s", component, podName) + pvc, err := h.pvcGetFn(ns, pvcName) + if err != nil { + return nil, err + } + if pvc.Status.Phase == apiv1.ClaimBound { + return nodes, nil + } } + podList, err := h.podListFn(ns, instanceName, component) if err != nil { return nil, err } - - ordinal, err := util.GetOrdinalFromPodName(podName) + tc, err := h.tcGetFn(ns, tcName) if err != nil { return nil, err } - - // when a deleted pod is recreated again, it should be rescheduled to the original node - if len(nodes) == 1 { - nextPodName := util.GetNextOrdinalPodName(podName, ordinal) - for _, pod := range podList.Items { - if pod.GetName() == nextPodName && pod.Spec.NodeName != "" { - return nodes, nil - } - } - } + replicas := getReplicasFrom(tc, component) nodeMap := make(map[string][]string) for _, node := range nodes { nodeMap[node.GetName()] = make([]string, 0) } for _, pod := range podList.Items { - podName1 := pod.GetName() + pName := pod.GetName() nodeName := pod.Spec.NodeName - ordinal1, err := util.GetOrdinalFromPodName(podName1) - if err != nil { - return nil, err - } - - if ordinal1 < ordinal && nodeName == "" { - return nil, fmt.Errorf("waiting for pod: %s/%s to be scheduled", ns, podName1) - } if nodeName == "" || nodeMap[nodeName] == nil { continue } - nodeMap[nodeName] = append(nodeMap[nodeName], podName1) + nodeMap[nodeName] = append(nodeMap[nodeName], pName) } glog.V(4).Infof("nodeMap: %+v", nodeMap) - var min int - var minInitialized bool - for _, podNameArr := range nodeMap { - count := len(podNameArr) - if !minInitialized { - minInitialized = true - min = count + min := -1 + minNodeNames := make([]string, 0) + for nodeName, podNames := range nodeMap { + podsCount := len(podNames) + if podsCount+1 >= int(replicas+1)/2 { + continue } - if count < min { - min = count + if min == -1 { + min = podsCount } - } - if ordinal < replicas && min != 0 { - return nil, fmt.Errorf("the first %d pods can't be scheduled to the same node", replicas) - } - minNodeNames := make([]string, 0) - for nodeName, podNameArr := range nodeMap { - if len(podNameArr) == min { - minNodeNames = append(minNodeNames, nodeName) + if podsCount > min { + continue + } + if podsCount < min { + min = podsCount + minNodeNames = make([]string, 0) } + minNodeNames = append(minNodeNames, nodeName) } + if len(minNodeNames) == 0 { + return nil, fmt.Errorf("can't find a node from: %v, nodeMap: %v", nodes, nodeMap) + } return getNodeFromNames(nodes, minNodeNames), nil } @@ -135,3 +133,23 @@ func (h *ha) realPodListFn(ns, instanceName, component string) (*apiv1.PodList, LabelSelector: labels.SelectorFromSet(selector).String(), }) } + +func (h *ha) realPVCGetFn(ns, pvcName string) (*apiv1.PersistentVolumeClaim, error) { + return h.kubeCli.CoreV1().PersistentVolumeClaims(ns).Get(pvcName, metav1.GetOptions{}) +} + +func (h *ha) realTCGetFn(ns, tcName string) (*v1alpha1.TidbCluster, error) { + return h.cli.PingcapV1alpha1().TidbClusters(ns).Get(tcName, metav1.GetOptions{}) +} + +func getTCNameFromPod(pod *apiv1.Pod, component string) string { + return strings.TrimSuffix(pod.GenerateName, fmt.Sprintf("-%s-", component)) +} + +func getReplicasFrom(tc *v1alpha1.TidbCluster, component string) int32 { + if component == v1alpha1.PDMemberType.String() { + return tc.Spec.PD.Replicas + } + + return tc.Spec.TiKV.Replicas +} diff --git a/pkg/scheduler/predicates/ha_test.go b/pkg/scheduler/predicates/ha_test.go index 9e92cb8c18..1ec3b0d051 100644 --- a/pkg/scheduler/predicates/ha_test.go +++ b/pkg/scheduler/predicates/ha_test.go @@ -21,6 +21,7 @@ import ( "testing" . "github.com/onsi/gomega" + "github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1" "github.com/pingcap/tidb-operator/pkg/controller" "github.com/pingcap/tidb-operator/pkg/label" apiv1 "k8s.io/api/core/v1" @@ -28,7 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func TestMapNil(t *testing.T) { +func TestMapAndIntNil(t *testing.T) { g := NewGomegaWithT(t) m := make(map[string][]string) @@ -45,89 +46,157 @@ func TestHAFilter(t *testing.T) { g := NewGomegaWithT(t) type testcase struct { name string - ordinal int32 - podFn func(string, int32) *apiv1.Pod + podFn func(string, string, int32) *apiv1.Pod nodesFn func() []apiv1.Node podListFn func(string, string, string) (*apiv1.PodList, error) + pvcGetFn func(string, string) (*apiv1.PersistentVolumeClaim, error) + tcGetFn func(string, string) (*v1alpha1.TidbCluster, error) expectFn func([]apiv1.Node, error) } testFn := func(test *testcase, t *testing.T) { t.Log(test.name) - clusterName := "demo" + instanceName := "demo" + clusterName := "cluster-1" - pod := test.podFn(clusterName, test.ordinal) + pod := test.podFn(instanceName, clusterName, 0) nodes := test.nodesFn() - ha := ha{podListFn: test.podListFn} - test.expectFn(ha.Filter(clusterName, pod, nodes)) + ha := ha{podListFn: test.podListFn, pvcGetFn: test.pvcGetFn, tcGetFn: test.tcGetFn} + test.expectFn(ha.Filter(instanceName, pod, nodes)) } tests := []testcase{ { - name: "component key is empty", - ordinal: 0, - podFn: func(clusterName string, ordinal int32) *apiv1.Pod { - pod := newHAPDPod(clusterName, ordinal) - pod.Labels = nil - return pod + name: "one node, one scheduled pod recreated and its pvc is bound, return the node", + podFn: newHAPDPod, + nodesFn: fakeOneNode, + pvcGetFn: func(ns string, pvcName string) (*corev1.PersistentVolumeClaim, error) { + return &corev1.PersistentVolumeClaim{ + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{Name: "pd-cluster-1-pd-0"}, + Status: corev1.PersistentVolumeClaimStatus{Phase: corev1.ClaimBound}, + }, nil }, - nodesFn: fakeThreeNodes, - podListFn: podListFn(map[string][]int32{}), expectFn: func(nodes []apiv1.Node, err error) { - g.Expect(strings.Contains(err.Error(), "can't find component in pod labels")).To(Equal(true)) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(len(nodes)).To(Equal(1)) + g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1"})) }, }, { - name: "pod list return error", - ordinal: 0, + name: "already scheduled pod recreated, get pvc failed", + podFn: newHAPDPod, + nodesFn: fakeOneNode, + pvcGetFn: func(ns string, pvcName string) (*corev1.PersistentVolumeClaim, error) { + return nil, fmt.Errorf("get pvc failed") + }, + expectFn: func(nodes []apiv1.Node, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(strings.Contains(err.Error(), "get pvc failed")).To(BeTrue()) + }, + }, + { + name: "list pod failed", podFn: newHAPDPod, nodesFn: fakeThreeNodes, podListFn: podListErr(), expectFn: func(nodes []apiv1.Node, err error) { - g.Expect(strings.Contains(err.Error(), "pod list error")).To(Equal(true)) + g.Expect(err).To(HaveOccurred()) + g.Expect(strings.Contains(err.Error(), "list pods failed")).To(BeTrue()) }, }, { - name: "get ordinal from podName error", - ordinal: 0, - podFn: func(clusterName string, ordinal int32) *apiv1.Pod { - pod := newHAPDPod(clusterName, ordinal) - pod.Name = "xxxx" - return pod - }, + name: "get tidbcluster failed", + podFn: newHAPDPod, nodesFn: fakeThreeNodes, podListFn: podListFn(map[string][]int32{}), + tcGetFn: func(ns string, tcName string) (*v1alpha1.TidbCluster, error) { + return nil, fmt.Errorf("get tidbcluster failed") + }, expectFn: func(nodes []apiv1.Node, err error) { - g.Expect(strings.Contains(err.Error(), "strconv.Atoi: parsing")).To(Equal(true)) + g.Expect(err).To(HaveOccurred()) + g.Expect(strings.Contains(err.Error(), "get tidbcluster failed")).To(BeTrue()) }, }, { - name: "one pod, podName is wrong", - ordinal: 0, + name: "zero node, return zero node", podFn: newHAPDPod, - nodesFn: fakeThreeNodes, - podListFn: podNameWrongListFn(), + nodesFn: fakeZeroNode, + podListFn: podListFn(map[string][]int32{}), + tcGetFn: tcGetFn, expectFn: func(nodes []apiv1.Node, err error) { - g.Expect(strings.Contains(err.Error(), "strconv.Atoi: parsing")).To(Equal(true)) + g.Expect(err).To(HaveOccurred()) + g.Expect(strings.Contains(err.Error(), "kube nodes is empty")).To(BeTrue()) }, }, { - name: "the lower oridnal is not scheduled", - ordinal: 1, + name: "one node, no pod scheduled, return the node", + podFn: newHAPDPod, + nodesFn: fakeOneNode, + pvcGetFn: func(ns string, pvcName string) (*corev1.PersistentVolumeClaim, error) { + return &corev1.PersistentVolumeClaim{ + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{Name: "pd-cluster-1-pd-0"}, + Status: corev1.PersistentVolumeClaimStatus{Phase: corev1.ClaimPending}, + }, nil + }, + podListFn: podListFn(map[string][]int32{}), + tcGetFn: tcGetFn, + expectFn: func(nodes []apiv1.Node, err error) { + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(len(nodes)).To(Equal(1)) + g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1"})) + }, + }, + { + name: "one node, one pod scheduled, return zero node", + podFn: newHAPDPod, + nodesFn: fakeOneNode, + pvcGetFn: func(ns string, pvcName string) (*corev1.PersistentVolumeClaim, error) { + return &corev1.PersistentVolumeClaim{ + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{Name: "pd-cluster-1-pd-0"}, + Status: corev1.PersistentVolumeClaimStatus{Phase: corev1.ClaimPending}, + }, nil + }, + podListFn: podListFn(map[string][]int32{"kube-node-1": {1}}), + tcGetFn: tcGetFn, + expectFn: func(nodes []apiv1.Node, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(strings.Contains(err.Error(), "can't find a node from: ")).To(BeTrue()) + }, + }, + { + name: "two nodes, one pod scheduled on the node, return one node", podFn: newHAPDPod, - nodesFn: fakeThreeNodes, - podListFn: podListFn(map[string][]int32{"": {0}}), + nodesFn: fakeTwoNodes, + podListFn: podListFn(map[string][]int32{"kube-node-1": {0}}), + tcGetFn: tcGetFn, expectFn: func(nodes []apiv1.Node, err error) { - g.Expect(strings.Contains(err.Error(), "waiting for pod: default/demo-pd-0")).To(Equal(true)) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(len(nodes)).To(Equal(1)) + g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-2"})) + }, + }, + { + name: "two nodes, two pods scheduled on these two nodes, return zero node", + podFn: newHAPDPod, + nodesFn: fakeTwoNodes, + podListFn: podListFn(map[string][]int32{"kube-node-1": {0}, "kube-node-2": {1}}), + tcGetFn: tcGetFn, + expectFn: func(nodes []apiv1.Node, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(strings.Contains(err.Error(), "can't find a node from: ")).To(BeTrue()) + g.Expect(len(nodes)).To(Equal(0)) }, }, { - name: "no scheduled pods, three nodes, ordinal 0 should be scheduled to all nodes", - ordinal: 0, + name: "three nodes, zero pod scheduled, return all the three nodes", podFn: newHAPDPod, nodesFn: fakeThreeNodes, podListFn: podListFn(map[string][]int32{}), + tcGetFn: tcGetFn, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(3)) @@ -135,11 +204,11 @@ func TestHAFilter(t *testing.T) { }, }, { - name: "ordinal 0 is scheduled to kube-node-1, ordinal 1 should be scheduled to kube-node-2 or kube-node-3", - ordinal: 1, + name: "three nodes, one pod scheduled, return two nodes", podFn: newHAPDPod, nodesFn: fakeThreeNodes, podListFn: podListFn(map[string][]int32{"kube-node-1": {0}}), + tcGetFn: tcGetFn, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(2)) @@ -147,45 +216,55 @@ func TestHAFilter(t *testing.T) { }, }, { - name: "ordinal 0 is scheduled to kube-node-3, get node-3, ordinal 1 should be scheduled to none", - ordinal: 1, + name: "three nodes, two pods scheduled, return one node", podFn: newHAPDPod, - nodesFn: fakeOneNode, - podListFn: podListFn(map[string][]int32{"kube-node-3": {0}}), + nodesFn: fakeThreeNodes, + podListFn: podListFn(map[string][]int32{"kube-node-1": {0}, "kube-node-2": {1}}), + tcGetFn: tcGetFn, expectFn: func(nodes []apiv1.Node, err error) { - g.Expect(err).To(HaveOccurred()) - g.Expect(strings.Contains(err.Error(), "the first 3 pods can't be scheduled to the same node")).To(Equal(true)) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(len(nodes)).To(Equal(1)) + g.Expect(nodes[0].Name).To(Equal("kube-node-3")) }, }, { - name: "ordinal 0 is scheduled to kube-node-2, ordinal 1 is kube-node-3, ordinal 2 should be scheduled to kube-node-1", - ordinal: 2, + name: "three nodes, one pod not scheduled on these three nodes, return all the three nodes", podFn: newHAPDPod, nodesFn: fakeThreeNodes, - podListFn: podListFn(map[string][]int32{"kube-node-2": {0}, "kube-node-3": {1}}), + podListFn: podListFn(map[string][]int32{"kube-node-4": {4}}), + tcGetFn: tcGetFn, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) - g.Expect(len(nodes)).To(Equal(1)) - g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1"})) + g.Expect(len(nodes)).To(Equal(3)) + g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-2", "kube-node-3"})) }, }, { - name: "ordinal 0 is scheduled to kube-node-1, ordinal 1 is kube-node-3, ordinal 2 should be scheduled to kube-node-1", - ordinal: 2, + name: "three nodes, three pods scheduled on these three nodes, replicas is 4, can't scheduled", podFn: newHAPDPod, - nodesFn: fakeTwoNodes, - podListFn: podListFn(map[string][]int32{"kube-node-1": {0}, "kube-node-3": {1}}), + nodesFn: fakeThreeNodes, + podListFn: podListFn(map[string][]int32{"kube-node-1": {0}, "kube-node-2": {1}, "kube-node-3": {2}}), + tcGetFn: func(ns string, tcName string) (*v1alpha1.TidbCluster, error) { + tc, _ := tcGetFn(ns, tcName) + tc.Spec.PD.Replicas = 4 + return tc, nil + }, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).To(HaveOccurred()) - g.Expect(strings.Contains(err.Error(), "the first 3 pods can't be scheduled to the same node")).To(Equal(true)) + g.Expect(strings.Contains(err.Error(), "can't find a node from: ")).To(BeTrue()) + g.Expect(len(nodes)).To(Equal(0)) }, }, { - name: "the first three oridnals get to 3 nodes, the ordinal 3 should scheduled to 1,2,3", - ordinal: 3, + name: "three nodes, three pods scheduled on these three nodes, replicas is 5, return three nodes", podFn: newHAPDPod, nodesFn: fakeThreeNodes, - podListFn: podListFn(map[string][]int32{"kube-node-2": {0}, "kube-node-3": {1}, "kube-node-1": {2}}), + podListFn: podListFn(map[string][]int32{"kube-node-1": {0}, "kube-node-2": {1}, "kube-node-3": {2}}), + tcGetFn: func(ns string, tcName string) (*v1alpha1.TidbCluster, error) { + tc, _ := tcGetFn(ns, tcName) + tc.Spec.PD.Replicas = 5 + return tc, nil + }, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(3)) @@ -193,11 +272,15 @@ func TestHAFilter(t *testing.T) { }, }, { - name: "the first four oridnals get to 3 nodes, the ordinal 4 should scheduled to 2,3", - ordinal: 4, + name: "three nodes, four pods scheduled on these three nodes, replicas is 5, return two nodes", podFn: newHAPDPod, nodesFn: fakeThreeNodes, - podListFn: podListFn(map[string][]int32{"kube-node-2": {0}, "kube-node-3": {1}, "kube-node-1": {2, 3}}), + podListFn: podListFn(map[string][]int32{"kube-node-1": {0, 3}, "kube-node-2": {1}, "kube-node-3": {2}}), + tcGetFn: func(ns string, tcName string) (*v1alpha1.TidbCluster, error) { + tc, _ := tcGetFn(ns, tcName) + tc.Spec.PD.Replicas = 5 + return tc, nil + }, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(2)) @@ -205,63 +288,67 @@ func TestHAFilter(t *testing.T) { }, }, { - name: "the first five oridnals get to 3 nodes, the ordinal 5 should scheduled to 2", - ordinal: 5, + name: "four nodes, three pods scheduled on these three nodes, replicas is 4, return the fourth node", podFn: newHAPDPod, - nodesFn: fakeThreeNodes, - podListFn: podListFn(map[string][]int32{"kube-node-2": {0}, "kube-node-3": {1, 4}, "kube-node-1": {2, 3}}), + nodesFn: fakeFourNodes, + podListFn: podListFn(map[string][]int32{"kube-node-1": {0}, "kube-node-2": {1}, "kube-node-3": {2}}), + tcGetFn: func(ns string, tcName string) (*v1alpha1.TidbCluster, error) { + tc, _ := tcGetFn(ns, tcName) + tc.Spec.PD.Replicas = 4 + return tc, nil + }, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) - g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-2"})) + g.Expect(nodes[0].Name).To(Equal("kube-node-4")) }, }, { - name: "the first five oridnals get to 3 nodes, got 2 nodes(no node-2), the ordinal 5 should scheduled to 1,3", - ordinal: 5, + name: "four nodes, four pods scheduled on these four nodes, replicas is 5, return these four nodes", podFn: newHAPDPod, - nodesFn: fakeTwoNodes, - podListFn: podListFn(map[string][]int32{"kube-node-2": {0}, "kube-node-3": {1, 4}, "kube-node-1": {2, 3}}), - expectFn: func(nodes []apiv1.Node, err error) { - g.Expect(err).NotTo(HaveOccurred()) - g.Expect(len(nodes)).To(Equal(2)) - g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-3"})) + nodesFn: fakeFourNodes, + podListFn: podListFn(map[string][]int32{"kube-node-1": {0}, "kube-node-2": {1}, "kube-node-3": {2}, "kube-node-4": {3}}), + tcGetFn: func(ns string, tcName string) (*v1alpha1.TidbCluster, error) { + tc, _ := tcGetFn(ns, tcName) + tc.Spec.PD.Replicas = 5 + return tc, nil }, - }, - { - name: "the first five oridnals get to 3 nodes, got 1 nodes(no node-2 node-1), the ordinal 5 should scheduled to 3", - ordinal: 5, - podFn: newHAPDPod, - nodesFn: fakeOneNode, - podListFn: podListFn(map[string][]int32{"kube-node-2": {0}, "kube-node-3": {1, 4}, "kube-node-1": {2, 3}}), expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) - g.Expect(len(nodes)).To(Equal(1)) - g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-3"})) + g.Expect(len(nodes)).To(Equal(4)) + g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-2", "kube-node-3", "kube-node-4"})) }, }, { - name: "the first six oridnals get to 3 nodes, the ordinal 6 should scheduled to 1,2,3", - ordinal: 6, + name: "four nodes, four pods scheduled on these four nodes, replicas is 6, return these four nodes", podFn: newHAPDPod, - nodesFn: fakeThreeNodes, - podListFn: podListFn(map[string][]int32{"kube-node-2": {0, 5}, "kube-node-3": {1, 4}, "kube-node-1": {2, 3}}), + nodesFn: fakeFourNodes, + podListFn: podListFn(map[string][]int32{"kube-node-1": {0}, "kube-node-2": {1}, "kube-node-3": {2}, "kube-node-4": {3}}), + tcGetFn: func(ns string, tcName string) (*v1alpha1.TidbCluster, error) { + tc, _ := tcGetFn(ns, tcName) + tc.Spec.PD.Replicas = 6 + return tc, nil + }, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) - g.Expect(len(nodes)).To(Equal(3)) - g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-2", "kube-node-3"})) + g.Expect(len(nodes)).To(Equal(4)) + g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-2", "kube-node-3", "kube-node-4"})) }, }, { - name: "5 pods scheduled to 3 nodes(and then delete the ordinal 1 pod), the created pod ordinal 1 should rescheduled to node 3", - ordinal: 1, + name: "four nodes, five pods scheduled on these four nodes, replicas is 6, return these three nodes", podFn: newHAPDPod, - nodesFn: fakeOneNode, - podListFn: podListFn(map[string][]int32{"kube-node-2": {0}, "kube-node-3": {4}, "kube-node-1": {2, 3}}), + nodesFn: fakeFourNodes, + podListFn: podListFn(map[string][]int32{"kube-node-1": {0, 4}, "kube-node-2": {1}, "kube-node-3": {2}, "kube-node-4": {3}}), + tcGetFn: func(ns string, tcName string) (*v1alpha1.TidbCluster, error) { + tc, _ := tcGetFn(ns, tcName) + tc.Spec.PD.Replicas = 6 + return tc, nil + }, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) - g.Expect(len(nodes)).To(Equal(1)) - g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-3"})) + g.Expect(len(nodes)).To(Equal(3)) + g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-2", "kube-node-3", "kube-node-4"})) }, }, } @@ -271,13 +358,13 @@ func TestHAFilter(t *testing.T) { } } -func newHAPDPod(clusterName string, ordinal int32) *apiv1.Pod { +func newHAPDPod(instanceName, clusterName string, ordinal int32) *apiv1.Pod { return &apiv1.Pod{ TypeMeta: metav1.TypeMeta{Kind: "Pod", APIVersion: "v1"}, ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-%d", controller.PDMemberName(clusterName), ordinal), Namespace: corev1.NamespaceDefault, - Labels: label.New().PD().Labels(), + Labels: label.New().Instance(instanceName).PD().Labels(), }, } } @@ -307,32 +394,23 @@ func podListFn(nodePodMap map[string][]int32) func(string, string, string) (*api } } -func podNameWrongListFn() func(string, string, string) (*apiv1.PodList, error) { +func podListErr() func(string, string, string) (*apiv1.PodList, error) { return func(ns, clusterName, component string) (*apiv1.PodList, error) { - podList := &apiv1.PodList{ - TypeMeta: metav1.TypeMeta{Kind: "PodList", APIVersion: "v1"}, - Items: []apiv1.Pod{ - { - TypeMeta: metav1.TypeMeta{Kind: "Pod", APIVersion: "v1"}, - ObjectMeta: metav1.ObjectMeta{ - Name: "xxx", - Namespace: corev1.NamespaceDefault, - Labels: label.New().PD().Labels(), - }, - Spec: apiv1.PodSpec{ - NodeName: "kube-node-1", - }, - }, - }, - } - return podList, nil + return nil, errors.New("list pods failed") } } -func podListErr() func(string, string, string) (*apiv1.PodList, error) { - return func(ns, clusterName, component string) (*apiv1.PodList, error) { - return nil, errors.New("pod list error") - } +func tcGetFn(ns string, tcName string) (*v1alpha1.TidbCluster, error) { + return &v1alpha1.TidbCluster{ + TypeMeta: metav1.TypeMeta{Kind: "TidbCluster", APIVersion: "v1alpha1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: tcName, + Namespace: ns, + }, + Spec: v1alpha1.TidbClusterSpec{ + PD: v1alpha1.PDSpec{Replicas: 3}, + }, + }, nil } func getSortedNodeNames(nodes []apiv1.Node) []string { diff --git a/pkg/scheduler/predicates/predicate_test.go b/pkg/scheduler/predicates/predicate_test.go index 2332a72108..581647f4a5 100644 --- a/pkg/scheduler/predicates/predicate_test.go +++ b/pkg/scheduler/predicates/predicate_test.go @@ -62,12 +62,12 @@ func TestGetNodeFromNames(t *testing.T) { { nodes: fakeTwoNodes(), nodeNames: []string{"kube-node-1", "kube-node-2", "kube-node-3"}, - expected: []string{"kube-node-1", "kube-node-3"}, + expected: []string{"kube-node-1", "kube-node-2"}, }, { nodes: fakeTwoNodes(), nodeNames: []string{"kube-node-1", "kube-node-3"}, - expected: []string{"kube-node-1", "kube-node-3"}, + expected: []string{"kube-node-1"}, }, { nodes: fakeTwoNodes(), @@ -77,32 +77,32 @@ func TestGetNodeFromNames(t *testing.T) { { nodes: fakeTwoNodes(), nodeNames: []string{"kube-node-3"}, - expected: []string{"kube-node-3"}, + expected: []string{}, }, { nodes: fakeTwoNodes(), nodeNames: []string{"kube-node-2"}, - expected: []string{}, + expected: []string{"kube-node-2"}, }, { nodes: fakeOneNode(), nodeNames: []string{"kube-node-1", "kube-node-2", "kube-node-3"}, - expected: []string{"kube-node-3"}, + expected: []string{"kube-node-1"}, }, { nodes: fakeOneNode(), nodeNames: []string{"kube-node-2", "kube-node-3"}, - expected: []string{"kube-node-3"}, + expected: []string{}, }, { nodes: fakeOneNode(), nodeNames: []string{"kube-node-1", "kube-node-3"}, - expected: []string{"kube-node-3"}, + expected: []string{"kube-node-1"}, }, { nodes: fakeOneNode(), nodeNames: []string{"kube-node-1", "kube-node-2"}, - expected: []string{}, + expected: []string{"kube-node-1"}, }, { nodes: fakeZeroNode(), diff --git a/pkg/scheduler/predicates/test_helper.go b/pkg/scheduler/predicates/test_helper.go index 787d7a2c71..f8ed2f171d 100644 --- a/pkg/scheduler/predicates/test_helper.go +++ b/pkg/scheduler/predicates/test_helper.go @@ -35,16 +35,37 @@ func fakeThreeNodes() []apiv1.Node { } } -func fakeTwoNodes() []apiv1.Node { +func fakeFourNodes() []apiv1.Node { return []apiv1.Node{ { TypeMeta: metav1.TypeMeta{Kind: "Node", APIVersion: "v1"}, ObjectMeta: metav1.ObjectMeta{Name: "kube-node-1"}, }, + { + TypeMeta: metav1.TypeMeta{Kind: "Node", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{Name: "kube-node-2"}, + }, { TypeMeta: metav1.TypeMeta{Kind: "Node", APIVersion: "v1"}, ObjectMeta: metav1.ObjectMeta{Name: "kube-node-3"}, }, + { + TypeMeta: metav1.TypeMeta{Kind: "Node", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{Name: "kube-node-4"}, + }, + } +} + +func fakeTwoNodes() []apiv1.Node { + return []apiv1.Node{ + { + TypeMeta: metav1.TypeMeta{Kind: "Node", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{Name: "kube-node-1"}, + }, + { + TypeMeta: metav1.TypeMeta{Kind: "Node", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{Name: "kube-node-2"}, + }, } } @@ -52,7 +73,7 @@ func fakeOneNode() []apiv1.Node { return []apiv1.Node{ { TypeMeta: metav1.TypeMeta{Kind: "Node", APIVersion: "v1"}, - ObjectMeta: metav1.ObjectMeta{Name: "kube-node-3"}, + ObjectMeta: metav1.ObjectMeta{Name: "kube-node-1"}, }, } } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index e294a98c9c..7fedd7a7bc 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -15,6 +15,7 @@ package scheduler import ( "github.com/golang/glog" + "github.com/pingcap/tidb-operator/pkg/client/clientset/versioned" "github.com/pingcap/tidb-operator/pkg/label" "github.com/pingcap/tidb-operator/pkg/scheduler/predicates" apiv1 "k8s.io/api/core/v1" @@ -37,16 +38,14 @@ type Scheduler interface { } type scheduler struct { - kubeCli kubernetes.Interface predicates []predicates.Predicate } // NewScheduler returns a Scheduler -func NewScheduler(kubeCli kubernetes.Interface) Scheduler { +func NewScheduler(kubeCli kubernetes.Interface, cli versioned.Interface) Scheduler { return &scheduler{ - kubeCli: kubeCli, predicates: []predicates.Predicate{ - predicates.NewHA(kubeCli), + predicates.NewHA(kubeCli, cli), }, } } diff --git a/pkg/scheduler/server/mux.go b/pkg/scheduler/server/mux.go index b31d0723ff..75f0e7836d 100644 --- a/pkg/scheduler/server/mux.go +++ b/pkg/scheduler/server/mux.go @@ -20,6 +20,7 @@ import ( restful "github.com/emicklei/go-restful" "github.com/golang/glog" + "github.com/pingcap/tidb-operator/pkg/client/clientset/versioned" "github.com/pingcap/tidb-operator/pkg/scheduler" "k8s.io/client-go/kubernetes" schedulerapiv1 "k8s.io/kubernetes/pkg/scheduler/api/v1" @@ -36,8 +37,8 @@ type server struct { } // StartServer starts a kubernetes scheduler extender http apiserver -func StartServer(kubeCli kubernetes.Interface, port int) { - s := scheduler.NewScheduler(kubeCli) +func StartServer(kubeCli kubernetes.Interface, cli versioned.Interface, port int) { + s := scheduler.NewScheduler(kubeCli, cli) svr := &server{scheduler: s} ws := new(restful.WebService)