From 8d3542eafc14a174a12b001a630399d2903490a9 Mon Sep 17 00:00:00 2001 From: weekface Date: Tue, 15 Jan 2019 12:51:02 +0800 Subject: [PATCH 1/4] Serial scheduling --- .../templates/scheduler-rbac.yaml | 4 +- pkg/label/label.go | 2 + pkg/manager/meta/reclaim_policy_manager.go | 3 + .../meta/reclaim_policy_manager_test.go | 55 ++- pkg/scheduler/predicates/ha.go | 95 +++- pkg/scheduler/predicates/ha_test.go | 448 +++++++++++++++--- 6 files changed, 520 insertions(+), 87 deletions(-) diff --git a/charts/tidb-operator/templates/scheduler-rbac.yaml b/charts/tidb-operator/templates/scheduler-rbac.yaml index eb766fdb6a9..6901fe714b1 100644 --- a/charts/tidb-operator/templates/scheduler-rbac.yaml +++ b/charts/tidb-operator/templates/scheduler-rbac.yaml @@ -55,7 +55,7 @@ rules: verbs: ["get"] - apiGroups: [""] resources: ["persistentvolumeclaims"] - verbs: ["get"] + verbs: ["get", "list", "update"] --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1beta1 @@ -114,7 +114,7 @@ rules: verbs: ["get"] - apiGroups: [""] resources: ["persistentvolumeclaims"] - verbs: ["get"] + verbs: ["get", "list", "update"] --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1beta1 diff --git a/pkg/label/label.go b/pkg/label/label.go index 38f49b0a789..cdd529b4ab8 100644 --- a/pkg/label/label.go +++ b/pkg/label/label.go @@ -46,6 +46,8 @@ const ( AnnPodNameKey string = "tidb.pingcap.com/pod-name" // AnnPVCDeferDeleting is pvc defer deletion annotation key used in PVC for defer deleting PVC AnnPVCDeferDeleting = "tidb.pingcap.com/pvc-defer-deleting" + // AnnPVCPodScheduling is pod scheduling annotation key, it represents whether the pod is scheduling + AnnPVCPodScheduling = "tidb.pingcap.com/pod-scheduling" // PDLabelVal is PD label value PDLabelVal string = "pd" diff --git a/pkg/manager/meta/reclaim_policy_manager.go b/pkg/manager/meta/reclaim_policy_manager.go index 421a43f65a7..e2911bc3250 100644 --- a/pkg/manager/meta/reclaim_policy_manager.go +++ b/pkg/manager/meta/reclaim_policy_manager.go @@ -52,6 +52,9 @@ func (rpm *reclaimPolicyManager) Sync(tc *v1alpha1.TidbCluster) error { } for _, pvc := range pvcs { + if pvc.Spec.VolumeName == "" { + continue + } pv, err := rpm.pvLister.Get(pvc.Spec.VolumeName) if err != nil { return err diff --git a/pkg/manager/meta/reclaim_policy_manager_test.go b/pkg/manager/meta/reclaim_policy_manager_test.go index 3878df414b0..4d8c3f20446 100644 --- a/pkg/manager/meta/reclaim_policy_manager_test.go +++ b/pkg/manager/meta/reclaim_policy_manager_test.go @@ -34,11 +34,12 @@ import ( func TestReclaimPolicyManagerSync(t *testing.T) { g := NewGomegaWithT(t) type testcase struct { - name string - pvcHasLabels bool - updateErr bool - err bool - changed bool + name string + pvcHasLabels bool + pvcHasVolumeName bool + updateErr bool + err bool + changed bool } testFn := func(test *testcase, t *testing.T) { @@ -50,6 +51,9 @@ func TestReclaimPolicyManagerSync(t *testing.T) { if !test.pvcHasLabels { pvc1.Labels = nil } + if !test.pvcHasVolumeName { + pvc1.Spec.VolumeName = "" + } rpm, fakePVControl, pvcIndexer, pvIndexer := newFakeReclaimPolicyManager() err := pvcIndexer.Add(pvc1) @@ -82,25 +86,36 @@ func TestReclaimPolicyManagerSync(t *testing.T) { tests := []testcase{ { - name: "normal", - pvcHasLabels: true, - updateErr: false, - err: false, - changed: true, + name: "normal", + pvcHasLabels: true, + pvcHasVolumeName: true, + updateErr: false, + err: false, + changed: true, + }, + { + name: "pvc don't have labels", + pvcHasLabels: false, + pvcHasVolumeName: true, + updateErr: false, + err: false, + changed: false, }, { - name: "pvc don't have labels", - pvcHasLabels: false, - updateErr: false, - err: false, - changed: false, + name: "pvc don't have volumeName", + pvcHasLabels: false, + pvcHasVolumeName: false, + updateErr: false, + err: false, + changed: false, }, { - name: "update failed", - pvcHasLabels: true, - updateErr: true, - err: true, - changed: false, + name: "update failed", + pvcHasLabels: true, + pvcHasVolumeName: true, + updateErr: true, + err: true, + changed: false, }, } diff --git a/pkg/scheduler/predicates/ha.go b/pkg/scheduler/predicates/ha.go index de2c2b3802e..080b84fecdd 100644 --- a/pkg/scheduler/predicates/ha.go +++ b/pkg/scheduler/predicates/ha.go @@ -16,6 +16,8 @@ package predicates import ( "fmt" "strings" + "sync" + "time" "github.com/golang/glog" "github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1" @@ -28,11 +30,15 @@ import ( ) type ha struct { - kubeCli kubernetes.Interface - cli versioned.Interface - podListFn func(ns, instanceName, component string) (*apiv1.PodList, error) - pvcGetFn func(ns, pvcName string) (*apiv1.PersistentVolumeClaim, error) - tcGetFn func(ns, tcName string) (*v1alpha1.TidbCluster, error) + lock sync.Mutex + kubeCli kubernetes.Interface + cli versioned.Interface + podListFn func(ns, instanceName, component string) (*apiv1.PodList, error) + pvcGetFn func(ns, pvcName string) (*apiv1.PersistentVolumeClaim, error) + tcGetFn func(ns, tcName string) (*v1alpha1.TidbCluster, error) + pvcListFn func(ns, instanceName, component string) (*apiv1.PersistentVolumeClaimList, error) + updatePVCFn func(*apiv1.PersistentVolumeClaim) error + acquireLockFn func(*apiv1.Pod) (*apiv1.PersistentVolumeClaim, *apiv1.PersistentVolumeClaim, error) } // NewHA returns a Predicate @@ -44,6 +50,9 @@ func NewHA(kubeCli kubernetes.Interface, cli versioned.Interface) Predicate { h.podListFn = h.realPodListFn h.pvcGetFn = h.realPVCGetFn h.tcGetFn = h.realTCGetFn + h.pvcListFn = h.realPVCListFn + h.updatePVCFn = h.realUpdatePVCFn + h.acquireLockFn = h.realAcquireLock return h } @@ -55,6 +64,9 @@ func (h *ha) Name() string { // 2. return these nodes that have least pods and its pods count is less than (replicas+1)/2 to kube-scheduler // 3. let kube-scheduler to make the final decision func (h *ha) Filter(instanceName string, pod *apiv1.Pod, nodes []apiv1.Node) ([]apiv1.Node, error) { + h.lock.Lock() + defer h.lock.Unlock() + ns := pod.GetNamespace() podName := pod.GetName() component := pod.Labels[label.ComponentLabelKey] @@ -63,9 +75,12 @@ func (h *ha) Filter(instanceName string, pod *apiv1.Pod, nodes []apiv1.Node) ([] if len(nodes) == 0 { return nil, fmt.Errorf("kube nodes is empty") } + if _, _, err := h.acquireLockFn(pod); err != nil { + return nil, err + } if len(nodes) == 1 { - pvcName := fmt.Sprintf("%s-%s", component, podName) + pvcName := pvcName(component, podName) pvc, err := h.pvcGetFn(ns, pvcName) if err != nil { return nil, err @@ -127,6 +142,50 @@ func (h *ha) Filter(instanceName string, pod *apiv1.Pod, nodes []apiv1.Node) ([] return getNodeFromNames(nodes, minNodeNames), nil } +func (h *ha) realAcquireLock(pod *apiv1.Pod) (*apiv1.PersistentVolumeClaim, *apiv1.PersistentVolumeClaim, error) { + ns := pod.GetNamespace() + component := pod.Labels[label.ComponentLabelKey] + instanceName := pod.Labels[label.InstanceLabelKey] + podName := pod.GetName() + pvcList, err := h.pvcListFn(ns, instanceName, component) + if err != nil { + return nil, nil, err + } + + currentPVCName := pvcName(component, podName) + var currentPVC *apiv1.PersistentVolumeClaim + var schedulingPVC *apiv1.PersistentVolumeClaim + for i := 0; i < len(pvcList.Items); i++ { + pvc := pvcList.Items[i] + if pvc.GetName() == currentPVCName && currentPVC == nil { + currentPVC = &pvcList.Items[i] + } + if pvc.Annotations[label.AnnPVCPodScheduling] != "" && schedulingPVC == nil { + schedulingPVC = &pvcList.Items[i] + } + } + + if currentPVC == nil { + return schedulingPVC, currentPVC, fmt.Errorf("can't find current Pod %s/%s's PVC", ns, podName) + } + if schedulingPVC == nil { + return schedulingPVC, currentPVC, h.setCurrentPodScheduling(currentPVC) + } + if schedulingPVC == currentPVC { + return schedulingPVC, currentPVC, nil + } + if schedulingPVC.Status.Phase != apiv1.ClaimBound { + return schedulingPVC, currentPVC, fmt.Errorf("waiting for Pod %s/%s scheduling", ns, strings.TrimPrefix(schedulingPVC.GetName(), component)) + } + + delete(schedulingPVC.Annotations, label.AnnPVCPodScheduling) + err = h.updatePVCFn(schedulingPVC) + if err != nil { + return schedulingPVC, currentPVC, err + } + return schedulingPVC, currentPVC, h.setCurrentPodScheduling(currentPVC) +} + func (h *ha) realPodListFn(ns, instanceName, component string) (*apiv1.PodList, error) { selector := label.New().Instance(instanceName).Component(component).Labels() return h.kubeCli.CoreV1().Pods(ns).List(metav1.ListOptions{ @@ -134,6 +193,18 @@ func (h *ha) realPodListFn(ns, instanceName, component string) (*apiv1.PodList, }) } +func (h *ha) realPVCListFn(ns, instanceName, component string) (*apiv1.PersistentVolumeClaimList, error) { + selector := label.New().Instance(instanceName).Component(component).Labels() + return h.kubeCli.CoreV1().PersistentVolumeClaims(ns).List(metav1.ListOptions{ + LabelSelector: labels.SelectorFromSet(selector).String(), + }) +} + +func (h *ha) realUpdatePVCFn(pvc *apiv1.PersistentVolumeClaim) error { + _, err := h.kubeCli.CoreV1().PersistentVolumeClaims(pvc.GetNamespace()).Update(pvc) + return err +} + func (h *ha) realPVCGetFn(ns, pvcName string) (*apiv1.PersistentVolumeClaim, error) { return h.kubeCli.CoreV1().PersistentVolumeClaims(ns).Get(pvcName, metav1.GetOptions{}) } @@ -142,6 +213,14 @@ func (h *ha) realTCGetFn(ns, tcName string) (*v1alpha1.TidbCluster, error) { return h.cli.PingcapV1alpha1().TidbClusters(ns).Get(tcName, metav1.GetOptions{}) } +func (h *ha) setCurrentPodScheduling(pvc *apiv1.PersistentVolumeClaim) error { + if pvc.Annotations == nil { + pvc.Annotations = map[string]string{} + } + pvc.Annotations[label.AnnPVCPodScheduling] = time.Now().Format(time.RFC3339) + return h.updatePVCFn(pvc) +} + func getTCNameFromPod(pod *apiv1.Pod, component string) string { return strings.TrimSuffix(pod.GenerateName, fmt.Sprintf("-%s-", component)) } @@ -153,3 +232,7 @@ func getReplicasFrom(tc *v1alpha1.TidbCluster, component string) int32 { return tc.Spec.TiKV.Replicas } + +func pvcName(component, podName string) string { + return fmt.Sprintf("%s-%s", component, podName) +} diff --git a/pkg/scheduler/predicates/ha_test.go b/pkg/scheduler/predicates/ha_test.go index 1ec3b0d051e..03e78c874e1 100644 --- a/pkg/scheduler/predicates/ha_test.go +++ b/pkg/scheduler/predicates/ha_test.go @@ -42,16 +42,286 @@ func TestMapAndIntNil(t *testing.T) { g.Expect(m["a"] == nil).To(Equal(false)) } +func TestHARealAcquireLockFn(t *testing.T) { + g := NewGomegaWithT(t) + type testcase struct { + name string + podFn func(string, string, int32) *apiv1.Pod + pvcListFn func(ns, instanceName, component string) (*apiv1.PersistentVolumeClaimList, error) + updatePVCFn func(*apiv1.PersistentVolumeClaim) error + expectFn func(*apiv1.PersistentVolumeClaim, *apiv1.PersistentVolumeClaim, error) + } + + testFn := func(test *testcase, t *testing.T) { + t.Log(test.name) + instanceName := "demo" + clusterName := "cluster-1" + + ha := ha{ + pvcListFn: test.pvcListFn, + updatePVCFn: test.updatePVCFn, + } + pod := test.podFn(instanceName, clusterName, 0) + + test.expectFn(ha.realAcquireLock(pod)) + } + + tests := []testcase{ + { + name: "pvcListFn failed", + podFn: newHAPDPod, + pvcListFn: func(ns, instanceName, component string) (*corev1.PersistentVolumeClaimList, error) { + return nil, fmt.Errorf("failed to list pvc") + }, + expectFn: func(schedulingPVC, currentPVC *apiv1.PersistentVolumeClaim, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(strings.Contains(err.Error(), "failed to list pvc")).To(BeTrue()) + }, + }, + { + name: "can't find current pvc", + podFn: newHAPDPod, + pvcListFn: func(ns, instanceName, component string) (*corev1.PersistentVolumeClaimList, error) { + return &corev1.PersistentVolumeClaimList{ + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaimList", APIVersion: "v1"}, + Items: []corev1.PersistentVolumeClaim{ + { + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "pd-cluster-1-pd-1", + }, + }, + { + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "pd-cluster-1-pd-2", + }, + }, + }, + }, nil + }, + expectFn: func(schedulingPVC, currentPVC *apiv1.PersistentVolumeClaim, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(strings.Contains(err.Error(), "can't find current Pod")).To(BeTrue()) + }, + }, + { + name: "no scheduling pod, setCurrentPodScheduling success", + podFn: newHAPDPod, + pvcListFn: func(ns, instanceName, component string) (*corev1.PersistentVolumeClaimList, error) { + return &corev1.PersistentVolumeClaimList{ + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaimList", APIVersion: "v1"}, + Items: []corev1.PersistentVolumeClaim{ + { + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "pd-cluster-1-pd-0", + }, + }, + { + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "pd-cluster-1-pd-1", + }, + }, + }, + }, nil + }, + updatePVCFn: func(claim *corev1.PersistentVolumeClaim) error { + return nil + }, + expectFn: func(schedulingPVC, currentPVC *apiv1.PersistentVolumeClaim, err error) { + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(schedulingPVC).To(BeNil()) + g.Expect(currentPVC.Annotations[label.AnnPVCPodScheduling]).NotTo(BeEmpty()) + }, + }, + { + name: "no scheduling pod, setCurrentPodScheduling failed", + podFn: newHAPDPod, + pvcListFn: func(ns, instanceName, component string) (*corev1.PersistentVolumeClaimList, error) { + return &corev1.PersistentVolumeClaimList{ + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaimList", APIVersion: "v1"}, + Items: []corev1.PersistentVolumeClaim{ + { + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "pd-cluster-1-pd-0", + }, + }, + { + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "pd-cluster-1-pd-1", + }, + }, + }, + }, nil + }, + updatePVCFn: func(claim *corev1.PersistentVolumeClaim) error { + return fmt.Errorf("setCurrentPodScheduling failed") + }, + expectFn: func(schedulingPVC, currentPVC *apiv1.PersistentVolumeClaim, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(schedulingPVC).To(BeNil()) + g.Expect(strings.Contains(err.Error(), "setCurrentPodScheduling failed")).To(BeTrue()) + }, + }, + { + name: "current pvc is scheduling", + podFn: newHAPDPod, + pvcListFn: func(ns, instanceName, component string) (*corev1.PersistentVolumeClaimList, error) { + return &corev1.PersistentVolumeClaimList{ + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaimList", APIVersion: "v1"}, + Items: []corev1.PersistentVolumeClaim{ + { + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "pd-cluster-1-pd-0", + Annotations: map[string]string{label.AnnPVCPodScheduling: "true"}, + }, + }, + { + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "pd-cluster-1-pd-1", + }, + }, + }, + }, nil + }, + expectFn: func(schedulingPVC, currentPVC *apiv1.PersistentVolumeClaim, err error) { + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(schedulingPVC).To(Equal(currentPVC)) + }, + }, + { + name: "scheduling pvc is not bound", + podFn: newHAPDPod, + pvcListFn: func(ns, instanceName, component string) (*corev1.PersistentVolumeClaimList, error) { + return &corev1.PersistentVolumeClaimList{ + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaimList", APIVersion: "v1"}, + Items: []corev1.PersistentVolumeClaim{ + { + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "pd-cluster-1-pd-0", + }, + }, + { + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "pd-cluster-1-pd-1", + Annotations: map[string]string{label.AnnPVCPodScheduling: "true"}, + }, + Status: corev1.PersistentVolumeClaimStatus{Phase: corev1.ClaimPending}, + }, + }, + }, nil + }, + expectFn: func(schedulingPVC, currentPVC *apiv1.PersistentVolumeClaim, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(strings.Contains(err.Error(), "waiting for Pod ")).To(BeTrue()) + }, + }, + { + name: "scheduling pvc is bound, update pvc failed", + podFn: newHAPDPod, + pvcListFn: func(ns, instanceName, component string) (*corev1.PersistentVolumeClaimList, error) { + return &corev1.PersistentVolumeClaimList{ + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaimList", APIVersion: "v1"}, + Items: []corev1.PersistentVolumeClaim{ + { + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "pd-cluster-1-pd-0", + }, + }, + { + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "pd-cluster-1-pd-1", + Annotations: map[string]string{label.AnnPVCPodScheduling: "true"}, + }, + Status: corev1.PersistentVolumeClaimStatus{Phase: corev1.ClaimBound}, + }, + }, + }, nil + }, + updatePVCFn: func(claim *corev1.PersistentVolumeClaim) error { + return fmt.Errorf("failed to update pvc") + }, + expectFn: func(schedulingPVC, currentPVC *apiv1.PersistentVolumeClaim, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(schedulingPVC.Annotations[label.AnnPVCPodScheduling]).To(BeEmpty()) + g.Expect(strings.Contains(err.Error(), "failed to update pvc")).To(BeTrue()) + }, + }, + { + name: "scheduling pvc is bound, update success", + podFn: newHAPDPod, + pvcListFn: func(ns, instanceName, component string) (*corev1.PersistentVolumeClaimList, error) { + return &corev1.PersistentVolumeClaimList{ + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaimList", APIVersion: "v1"}, + Items: []corev1.PersistentVolumeClaim{ + { + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "pd-cluster-1-pd-0", + }, + }, + { + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "pd-cluster-1-pd-1", + Annotations: map[string]string{label.AnnPVCPodScheduling: "true"}, + }, + Status: corev1.PersistentVolumeClaimStatus{Phase: corev1.ClaimBound}, + }, + }, + }, nil + }, + updatePVCFn: func(claim *corev1.PersistentVolumeClaim) error { + return nil + }, + expectFn: func(schedulingPVC, currentPVC *apiv1.PersistentVolumeClaim, err error) { + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(schedulingPVC.Annotations[label.AnnPVCPodScheduling]).To(BeEmpty()) + g.Expect(currentPVC.Annotations[label.AnnPVCPodScheduling]).NotTo(BeEmpty()) + }, + }, + } + + for i := range tests { + testFn(&tests[i], t) + } +} + func TestHAFilter(t *testing.T) { g := NewGomegaWithT(t) type testcase struct { - name string - podFn func(string, string, int32) *apiv1.Pod - nodesFn func() []apiv1.Node - podListFn func(string, string, string) (*apiv1.PodList, error) - pvcGetFn func(string, string) (*apiv1.PersistentVolumeClaim, error) - tcGetFn func(string, string) (*v1alpha1.TidbCluster, error) - expectFn func([]apiv1.Node, error) + name string + podFn func(string, string, int32) *apiv1.Pod + nodesFn func() []apiv1.Node + podListFn func(string, string, string) (*apiv1.PodList, error) + pvcGetFn func(string, string) (*apiv1.PersistentVolumeClaim, error) + tcGetFn func(string, string) (*v1alpha1.TidbCluster, error) + acquireLockFn func(*apiv1.Pod) (*apiv1.PersistentVolumeClaim, *apiv1.PersistentVolumeClaim, error) + expectFn func([]apiv1.Node, error) } testFn := func(test *testcase, t *testing.T) { @@ -62,7 +332,7 @@ func TestHAFilter(t *testing.T) { pod := test.podFn(instanceName, clusterName, 0) nodes := test.nodesFn() - ha := ha{podListFn: test.podListFn, pvcGetFn: test.pvcGetFn, tcGetFn: test.tcGetFn} + ha := ha{podListFn: test.podListFn, pvcGetFn: test.pvcGetFn, tcGetFn: test.tcGetFn, acquireLockFn: test.acquireLockFn} test.expectFn(ha.Filter(instanceName, pod, nodes)) } @@ -78,12 +348,32 @@ func TestHAFilter(t *testing.T) { Status: corev1.PersistentVolumeClaimStatus{Phase: corev1.ClaimBound}, }, nil }, + acquireLockFn: acquireSuccess, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1"})) }, }, + { + name: "acquired lock failed", + podFn: newHAPDPod, + nodesFn: fakeOneNode, + pvcGetFn: func(ns string, pvcName string) (*corev1.PersistentVolumeClaim, error) { + return &corev1.PersistentVolumeClaim{ + TypeMeta: metav1.TypeMeta{Kind: "PersistentVolumeClaim", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{Name: "pd-cluster-1-pd-0"}, + Status: corev1.PersistentVolumeClaimStatus{Phase: corev1.ClaimBound}, + }, nil + }, + acquireLockFn: func(pod *corev1.Pod) (*apiv1.PersistentVolumeClaim, *apiv1.PersistentVolumeClaim, error) { + return nil, nil, fmt.Errorf("failed to acquire the lock") + }, + expectFn: func(nodes []apiv1.Node, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(strings.Contains(err.Error(), "failed to acquire the lock")).To(BeTrue()) + }, + }, { name: "already scheduled pod recreated, get pvc failed", podFn: newHAPDPod, @@ -91,16 +381,18 @@ func TestHAFilter(t *testing.T) { pvcGetFn: func(ns string, pvcName string) (*corev1.PersistentVolumeClaim, error) { return nil, fmt.Errorf("get pvc failed") }, + acquireLockFn: acquireSuccess, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).To(HaveOccurred()) g.Expect(strings.Contains(err.Error(), "get pvc failed")).To(BeTrue()) }, }, { - name: "list pod failed", - podFn: newHAPDPod, - nodesFn: fakeThreeNodes, - podListFn: podListErr(), + name: "list pod failed", + podFn: newHAPDPod, + nodesFn: fakeThreeNodes, + podListFn: podListErr(), + acquireLockFn: acquireSuccess, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).To(HaveOccurred()) g.Expect(strings.Contains(err.Error(), "list pods failed")).To(BeTrue()) @@ -114,17 +406,19 @@ func TestHAFilter(t *testing.T) { tcGetFn: func(ns string, tcName string) (*v1alpha1.TidbCluster, error) { return nil, fmt.Errorf("get tidbcluster failed") }, + acquireLockFn: acquireSuccess, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).To(HaveOccurred()) g.Expect(strings.Contains(err.Error(), "get tidbcluster failed")).To(BeTrue()) }, }, { - name: "zero node, return zero node", - podFn: newHAPDPod, - nodesFn: fakeZeroNode, - podListFn: podListFn(map[string][]int32{}), - tcGetFn: tcGetFn, + name: "zero node, return zero node", + podFn: newHAPDPod, + nodesFn: fakeZeroNode, + podListFn: podListFn(map[string][]int32{}), + tcGetFn: tcGetFn, + acquireLockFn: acquireSuccess, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).To(HaveOccurred()) g.Expect(strings.Contains(err.Error(), "kube nodes is empty")).To(BeTrue()) @@ -141,8 +435,9 @@ func TestHAFilter(t *testing.T) { Status: corev1.PersistentVolumeClaimStatus{Phase: corev1.ClaimPending}, }, nil }, - podListFn: podListFn(map[string][]int32{}), - tcGetFn: tcGetFn, + acquireLockFn: acquireSuccess, + podListFn: podListFn(map[string][]int32{}), + tcGetFn: tcGetFn, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) @@ -160,19 +455,21 @@ func TestHAFilter(t *testing.T) { Status: corev1.PersistentVolumeClaimStatus{Phase: corev1.ClaimPending}, }, nil }, - podListFn: podListFn(map[string][]int32{"kube-node-1": {1}}), - tcGetFn: tcGetFn, + podListFn: podListFn(map[string][]int32{"kube-node-1": {1}}), + acquireLockFn: acquireSuccess, + tcGetFn: tcGetFn, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).To(HaveOccurred()) g.Expect(strings.Contains(err.Error(), "can't find a node from: ")).To(BeTrue()) }, }, { - name: "two nodes, one pod scheduled on the node, return one node", - podFn: newHAPDPod, - nodesFn: fakeTwoNodes, - podListFn: podListFn(map[string][]int32{"kube-node-1": {0}}), - tcGetFn: tcGetFn, + name: "two nodes, one pod scheduled on the node, return one node", + podFn: newHAPDPod, + nodesFn: fakeTwoNodes, + podListFn: podListFn(map[string][]int32{"kube-node-1": {0}}), + acquireLockFn: acquireSuccess, + tcGetFn: tcGetFn, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) @@ -180,11 +477,12 @@ func TestHAFilter(t *testing.T) { }, }, { - name: "two nodes, two pods scheduled on these two nodes, return zero node", - podFn: newHAPDPod, - nodesFn: fakeTwoNodes, - podListFn: podListFn(map[string][]int32{"kube-node-1": {0}, "kube-node-2": {1}}), - tcGetFn: tcGetFn, + name: "two nodes, two pods scheduled on these two nodes, return zero node", + podFn: newHAPDPod, + nodesFn: fakeTwoNodes, + podListFn: podListFn(map[string][]int32{"kube-node-1": {0}, "kube-node-2": {1}}), + acquireLockFn: acquireSuccess, + tcGetFn: tcGetFn, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).To(HaveOccurred()) g.Expect(strings.Contains(err.Error(), "can't find a node from: ")).To(BeTrue()) @@ -192,11 +490,12 @@ func TestHAFilter(t *testing.T) { }, }, { - name: "three nodes, zero pod scheduled, return all the three nodes", - podFn: newHAPDPod, - nodesFn: fakeThreeNodes, - podListFn: podListFn(map[string][]int32{}), - tcGetFn: tcGetFn, + name: "three nodes, zero pod scheduled, return all the three nodes", + podFn: newHAPDPod, + nodesFn: fakeThreeNodes, + podListFn: podListFn(map[string][]int32{}), + acquireLockFn: acquireSuccess, + tcGetFn: tcGetFn, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(3)) @@ -204,11 +503,12 @@ func TestHAFilter(t *testing.T) { }, }, { - name: "three nodes, one pod scheduled, return two nodes", - podFn: newHAPDPod, - nodesFn: fakeThreeNodes, - podListFn: podListFn(map[string][]int32{"kube-node-1": {0}}), - tcGetFn: tcGetFn, + name: "three nodes, one pod scheduled, return two nodes", + podFn: newHAPDPod, + nodesFn: fakeThreeNodes, + podListFn: podListFn(map[string][]int32{"kube-node-1": {0}}), + acquireLockFn: acquireSuccess, + tcGetFn: tcGetFn, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(2)) @@ -216,11 +516,12 @@ func TestHAFilter(t *testing.T) { }, }, { - name: "three nodes, two pods scheduled, return one node", - podFn: newHAPDPod, - nodesFn: fakeThreeNodes, - podListFn: podListFn(map[string][]int32{"kube-node-1": {0}, "kube-node-2": {1}}), - tcGetFn: tcGetFn, + name: "three nodes, two pods scheduled, return one node", + podFn: newHAPDPod, + nodesFn: fakeThreeNodes, + podListFn: podListFn(map[string][]int32{"kube-node-1": {0}, "kube-node-2": {1}}), + acquireLockFn: acquireSuccess, + tcGetFn: tcGetFn, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) @@ -228,11 +529,12 @@ func TestHAFilter(t *testing.T) { }, }, { - name: "three nodes, one pod not scheduled on these three nodes, return all the three nodes", - podFn: newHAPDPod, - nodesFn: fakeThreeNodes, - podListFn: podListFn(map[string][]int32{"kube-node-4": {4}}), - tcGetFn: tcGetFn, + name: "three nodes, one pod not scheduled on these three nodes, return all the three nodes", + podFn: newHAPDPod, + nodesFn: fakeThreeNodes, + podListFn: podListFn(map[string][]int32{"kube-node-4": {4}}), + acquireLockFn: acquireSuccess, + tcGetFn: tcGetFn, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(3)) @@ -240,10 +542,11 @@ func TestHAFilter(t *testing.T) { }, }, { - name: "three nodes, three pods scheduled on these three nodes, replicas is 4, can't scheduled", - podFn: newHAPDPod, - nodesFn: fakeThreeNodes, - podListFn: podListFn(map[string][]int32{"kube-node-1": {0}, "kube-node-2": {1}, "kube-node-3": {2}}), + name: "three nodes, three pods scheduled on these three nodes, replicas is 4, can't scheduled", + podFn: newHAPDPod, + nodesFn: fakeThreeNodes, + podListFn: podListFn(map[string][]int32{"kube-node-1": {0}, "kube-node-2": {1}, "kube-node-3": {2}}), + acquireLockFn: acquireSuccess, tcGetFn: func(ns string, tcName string) (*v1alpha1.TidbCluster, error) { tc, _ := tcGetFn(ns, tcName) tc.Spec.PD.Replicas = 4 @@ -265,6 +568,7 @@ func TestHAFilter(t *testing.T) { tc.Spec.PD.Replicas = 5 return tc, nil }, + acquireLockFn: acquireSuccess, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(3)) @@ -272,10 +576,11 @@ func TestHAFilter(t *testing.T) { }, }, { - name: "three nodes, four pods scheduled on these three nodes, replicas is 5, return two nodes", - podFn: newHAPDPod, - nodesFn: fakeThreeNodes, - podListFn: podListFn(map[string][]int32{"kube-node-1": {0, 3}, "kube-node-2": {1}, "kube-node-3": {2}}), + name: "three nodes, four pods scheduled on these three nodes, replicas is 5, return two nodes", + podFn: newHAPDPod, + nodesFn: fakeThreeNodes, + podListFn: podListFn(map[string][]int32{"kube-node-1": {0, 3}, "kube-node-2": {1}, "kube-node-3": {2}}), + acquireLockFn: acquireSuccess, tcGetFn: func(ns string, tcName string) (*v1alpha1.TidbCluster, error) { tc, _ := tcGetFn(ns, tcName) tc.Spec.PD.Replicas = 5 @@ -297,6 +602,7 @@ func TestHAFilter(t *testing.T) { tc.Spec.PD.Replicas = 4 return tc, nil }, + acquireLockFn: acquireSuccess, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(1)) @@ -313,6 +619,7 @@ func TestHAFilter(t *testing.T) { tc.Spec.PD.Replicas = 5 return tc, nil }, + acquireLockFn: acquireSuccess, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(4)) @@ -329,6 +636,7 @@ func TestHAFilter(t *testing.T) { tc.Spec.PD.Replicas = 6 return tc, nil }, + acquireLockFn: acquireSuccess, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(4)) @@ -345,12 +653,30 @@ func TestHAFilter(t *testing.T) { tc.Spec.PD.Replicas = 6 return tc, nil }, + acquireLockFn: acquireSuccess, expectFn: func(nodes []apiv1.Node, err error) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(len(nodes)).To(Equal(3)) g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-2", "kube-node-3", "kube-node-4"})) }, }, + { + name: "four nodes, five pods scheduled on these four nodes, replicas is 6, return one node", + podFn: newHAPDPod, + nodesFn: fakeFourNodes, + podListFn: podListFn(map[string][]int32{"kube-node-1": {0}, "kube-node-2": {}, "kube-node-3": {1, 4}, "kube-node-4": {2, 3}}), + tcGetFn: func(ns string, tcName string) (*v1alpha1.TidbCluster, error) { + tc, _ := tcGetFn(ns, tcName) + tc.Spec.PD.Replicas = 6 + return tc, nil + }, + acquireLockFn: acquireSuccess, + expectFn: func(nodes []apiv1.Node, err error) { + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(len(nodes)).To(Equal(1)) + g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-2"})) + }, + }, } for i := range tests { @@ -421,3 +747,7 @@ func getSortedNodeNames(nodes []apiv1.Node) []string { sort.Strings(arr) return arr } + +func acquireSuccess(*apiv1.Pod) (*apiv1.PersistentVolumeClaim, *apiv1.PersistentVolumeClaim, error) { + return nil, nil, nil +} From 8a8b61d2144ee1f9b7380639bfa1bd257925ba37 Mon Sep 17 00:00:00 2001 From: weekface Date: Wed, 16 Jan 2019 15:37:58 +0800 Subject: [PATCH 2/4] address comment --- pkg/scheduler/predicates/ha.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scheduler/predicates/ha.go b/pkg/scheduler/predicates/ha.go index 080b84fecdd..f2c93b407dc 100644 --- a/pkg/scheduler/predicates/ha.go +++ b/pkg/scheduler/predicates/ha.go @@ -157,7 +157,7 @@ func (h *ha) realAcquireLock(pod *apiv1.Pod) (*apiv1.PersistentVolumeClaim, *api var schedulingPVC *apiv1.PersistentVolumeClaim for i := 0; i < len(pvcList.Items); i++ { pvc := pvcList.Items[i] - if pvc.GetName() == currentPVCName && currentPVC == nil { + if pvc.GetName() == currentPVCName { currentPVC = &pvcList.Items[i] } if pvc.Annotations[label.AnnPVCPodScheduling] != "" && schedulingPVC == nil { From 39999cf8846ed3a3197488aa1448b208916d1eaf Mon Sep 17 00:00:00 2001 From: weekface Date: Thu, 17 Jan 2019 16:06:07 +0800 Subject: [PATCH 3/4] address comment --- pkg/scheduler/predicates/ha.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/scheduler/predicates/ha.go b/pkg/scheduler/predicates/ha.go index f2c93b407dc..b87b2a575f9 100644 --- a/pkg/scheduler/predicates/ha.go +++ b/pkg/scheduler/predicates/ha.go @@ -155,13 +155,13 @@ func (h *ha) realAcquireLock(pod *apiv1.Pod) (*apiv1.PersistentVolumeClaim, *api currentPVCName := pvcName(component, podName) var currentPVC *apiv1.PersistentVolumeClaim var schedulingPVC *apiv1.PersistentVolumeClaim - for i := 0; i < len(pvcList.Items); i++ { - pvc := pvcList.Items[i] - if pvc.GetName() == currentPVCName { - currentPVC = &pvcList.Items[i] + items := pvcList.Items + for i, _ := range items { + if items[i].GetName() == currentPVCName { + currentPVC = &items[i] } - if pvc.Annotations[label.AnnPVCPodScheduling] != "" && schedulingPVC == nil { - schedulingPVC = &pvcList.Items[i] + if items[i].Annotations[label.AnnPVCPodScheduling] != "" && schedulingPVC == nil { + schedulingPVC = &items[i] } } From 91a2c4aef70277cac6f013e9c0cdf74e2de260a1 Mon Sep 17 00:00:00 2001 From: weekface Date: Mon, 21 Jan 2019 11:58:32 +0800 Subject: [PATCH 4/4] make check happy --- pkg/scheduler/predicates/ha.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scheduler/predicates/ha.go b/pkg/scheduler/predicates/ha.go index b87b2a575f9..cc1f9e333de 100644 --- a/pkg/scheduler/predicates/ha.go +++ b/pkg/scheduler/predicates/ha.go @@ -156,7 +156,7 @@ func (h *ha) realAcquireLock(pod *apiv1.Pod) (*apiv1.PersistentVolumeClaim, *api var currentPVC *apiv1.PersistentVolumeClaim var schedulingPVC *apiv1.PersistentVolumeClaim items := pvcList.Items - for i, _ := range items { + for i := range items { if items[i].GetName() == currentPVCName { currentPVC = &items[i] }