diff --git a/Makefile b/Makefile index e0584e5fe2..900763701a 100644 --- a/Makefile +++ b/Makefile @@ -35,8 +35,8 @@ e2e-docker-push: e2e-docker e2e-docker: e2e-build mkdir -p images/tidb-operator-e2e/bin mv tests/e2e/e2e.test images/tidb-operator-e2e/bin/ - [[ -d images/tidb-operator-e2e/tidb-operator ]] && rm -r images/tidb-operator-e2e/tidb-operator - [[ -d images/tidb-operator-e2e/tidb-cluster ]] && rm -r images/tidb-operator-e2e/tidb-cluster + [[ -d images/tidb-operator-e2e/tidb-operator ]] && rm -r images/tidb-operator-e2e/tidb-operator || true + [[ -d images/tidb-operator-e2e/tidb-cluster ]] && rm -r images/tidb-operator-e2e/tidb-cluster || true cp -r charts/tidb-operator images/tidb-operator-e2e/ cp -r charts/tidb-cluster images/tidb-operator-e2e/ docker build -t "${DOCKER_REGISTRY}/pingcap/tidb-operator-e2e:latest" images/tidb-operator-e2e diff --git a/pkg/controller/tidbcluster/tidb_cluster_control.go b/pkg/controller/tidbcluster/tidb_cluster_control.go index de4e3b5e35..92777ca092 100644 --- a/pkg/controller/tidbcluster/tidb_cluster_control.go +++ b/pkg/controller/tidbcluster/tidb_cluster_control.go @@ -14,7 +14,6 @@ package tidbcluster import ( - "github.com/golang/glog" "github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1" "github.com/pingcap/tidb-operator/pkg/controller" "github.com/pingcap/tidb-operator/pkg/manager" @@ -24,11 +23,6 @@ import ( "k8s.io/client-go/tools/record" ) -const ( -// unused -// pdConnTimeout = 2 * time.Second -) - // ControlInterface implements the control logic for updating TidbClusters and their children StatefulSets. // It is implemented as an interface to allow for extensions that provide different semantics. // Currently, there is only one implementation. @@ -90,15 +84,23 @@ func (tcc *defaultTidbClusterControl) UpdateTidbCluster(tc *v1alpha1.TidbCluster } func (tcc *defaultTidbClusterControl) updateTidbCluster(tc *v1alpha1.TidbCluster) error { + ns := tc.GetNamespace() + tcName := tc.GetName() + + // ReclaimPolicyManager + err := tcc.reclaimPolicyManager.Sync(tc) + if err != nil { + return err + } + // PD - err := tcc.pdMemberManager.Sync(tc) + err = tcc.pdMemberManager.Sync(tc) if err != nil { return err } if !tcc.IsPDAvailable(tc) { - glog.Infof("tidbcluster: [%s/%s]'s pd cluster is not running.", tc.GetNamespace(), tc.GetName()) - return nil + return controller.RequeueErrorf("TidbCluster: [%s/%s], waiting for PD cluster running", ns, tcName) } // TiKV @@ -109,8 +111,7 @@ func (tcc *defaultTidbClusterControl) updateTidbCluster(tc *v1alpha1.TidbCluster // Wait tikv status sync if !tcc.IsTiKVAvailable(tc) { - glog.Infof("tidbcluster: [%s/%s]'s tikv cluster is not running.", tc.GetNamespace(), tc.GetName()) - return nil + return controller.RequeueErrorf("TidbCluster: [%s/%s], waiting for TiKV cluster running", ns, tcName) } // TiDB @@ -119,12 +120,6 @@ func (tcc *defaultTidbClusterControl) updateTidbCluster(tc *v1alpha1.TidbCluster return err } - // ReclaimPolicyManager - err = tcc.reclaimPolicyManager.Sync(tc) - if err != nil { - return err - } - // MetaManager err = tcc.metaManager.Sync(tc) if err != nil { @@ -151,7 +146,7 @@ func (tcc *defaultTidbClusterControl) IsPDAvailable(tc *v1alpha1.TidbCluster) bo return false } - if tc.Status.PD.StatefulSet.ReadyReplicas < lowerLimit { + if tc.Status.PD.StatefulSet == nil || tc.Status.PD.StatefulSet.ReadyReplicas < lowerLimit { return false } @@ -175,7 +170,7 @@ func (tcc *defaultTidbClusterControl) IsTiKVAvailable(tc *v1alpha1.TidbCluster) return false } - if tc.Status.TiKV.StatefulSet.ReadyReplicas < lowerLimit { + if tc.Status.TiKV.StatefulSet == nil || tc.Status.TiKV.StatefulSet.ReadyReplicas < lowerLimit { return false } diff --git a/pkg/controller/tidbcluster/tidb_cluster_control_test.go b/pkg/controller/tidbcluster/tidb_cluster_control_test.go index 71c916a2ff..96f5f7b72a 100644 --- a/pkg/controller/tidbcluster/tidb_cluster_control_test.go +++ b/pkg/controller/tidbcluster/tidb_cluster_control_test.go @@ -14,6 +14,8 @@ package tidbcluster import ( + "fmt" + "strings" "testing" . "github.com/onsi/gomega" @@ -23,36 +25,316 @@ import ( "github.com/pingcap/tidb-operator/pkg/controller" mm "github.com/pingcap/tidb-operator/pkg/manager/member" "github.com/pingcap/tidb-operator/pkg/manager/meta" + perrors "github.com/pkg/errors" + apps "k8s.io/api/apps/v1beta1" + corev1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" - kubeinformers "k8s.io/client-go/informers" - kubefake "k8s.io/client-go/kubernetes/fake" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ) -func TestTidbClusterControl(t *testing.T) { +func TestTidbClusterControlUpdateTidbCluster(t *testing.T) { g := NewGomegaWithT(t) - tc := newTidbCluster() - pdClient := controller.NewFakePDClient() - pdClient.AddReaction(controller.GetHealthActionType, func(action *controller.Action) (interface{}, error) { - return &controller.HealthInfo{ - Healths: []controller.MemberHealth{ - {Name: "pd1", MemberID: 1, ClientUrls: []string{"http://pd1:2379"}, Health: true}, - {Name: "pd2", MemberID: 2, ClientUrls: []string{"http://pd2:2379"}, Health: true}, - {Name: "pd3", MemberID: 3, ClientUrls: []string{"http://pd3:2379"}, Health: false}, - }, - }, nil - }) + type testcase struct { + name string + update func(cluster *v1alpha1.TidbCluster) + syncReclaimPolicyErr bool + syncPDMemberManagerErr bool + syncTiKVMemberManagerErr bool + syncTiDBMemberManagerErr bool + syncMetaManagerErr bool + errExpectFn func(*GomegaWithT, error) + } + testFn := func(test *testcase, t *testing.T) { + t.Log(test.name) - control, setControl, pdControl := newFakeTidbClusterControl() - pdControl.SetPDClient(tc, pdClient) + tc := newTidbClusterForTidbClusterControl() + if test.update != nil { + test.update(tc) + } + control, reclaimPolicyManager, pdMemberManager, tikvMemberManager, tidbMemberManager, metaManager := newFakeTidbClusterControl() - err := syncTidbClusterControl(tc, setControl, control) - g.Expect(err).NotTo(HaveOccurred()) + if test.syncReclaimPolicyErr { + reclaimPolicyManager.SetSyncError(fmt.Errorf("reclaim policy sync error")) + } + if test.syncPDMemberManagerErr { + pdMemberManager.SetSyncError(fmt.Errorf("pd member manager sync error")) + } + if test.syncTiKVMemberManagerErr { + tikvMemberManager.SetSyncError(fmt.Errorf("tikv member manager sync error")) + } + if test.syncTiDBMemberManagerErr { + tidbMemberManager.SetSyncError(fmt.Errorf("tidb member manager sync error")) + } + if test.syncMetaManagerErr { + metaManager.SetSyncError(fmt.Errorf("meta manager sync error")) + } - newTC, err := setControl.TcLister.TidbClusters(tc.Namespace).Get(tc.Name) - g.Expect(err).NotTo(HaveOccurred()) - g.Expect(newTC.Status.PD.StatefulSet).NotTo(Equal(nil)) + err := control.UpdateTidbCluster(tc) + if test.errExpectFn != nil { + test.errExpectFn(g, err) + } + } + tests := []testcase{ + { + name: "reclaim policy sync error", + update: nil, + syncReclaimPolicyErr: true, + syncPDMemberManagerErr: false, + syncTiKVMemberManagerErr: false, + syncTiDBMemberManagerErr: false, + syncMetaManagerErr: false, + errExpectFn: func(g *GomegaWithT, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(strings.Contains(err.Error(), "reclaim policy sync error")).To(Equal(true)) + }, + }, + { + name: "pd member manager sync error", + update: nil, + syncReclaimPolicyErr: false, + syncPDMemberManagerErr: true, + syncTiKVMemberManagerErr: false, + syncTiDBMemberManagerErr: false, + syncMetaManagerErr: false, + errExpectFn: func(g *GomegaWithT, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(strings.Contains(err.Error(), "pd member manager sync error")).To(Equal(true)) + }, + }, + { + name: "pd members count is 1", + update: func(cluster *v1alpha1.TidbCluster) { + cluster.Status.PD.Members = map[string]v1alpha1.PDMember{ + "pd-0": {Name: "pd-0", Health: true}, + } + }, + syncReclaimPolicyErr: false, + syncPDMemberManagerErr: false, + syncTiKVMemberManagerErr: false, + syncTiDBMemberManagerErr: false, + syncMetaManagerErr: false, + errExpectFn: func(g *GomegaWithT, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(perrors.Find(err, controller.IsRequeueError)).NotTo(BeNil()) + g.Expect(strings.Contains(err.Error(), "waiting for PD cluster running")).To(Equal(true)) + }, + }, + { + name: "pd members count is 2, but health count is 1", + update: func(cluster *v1alpha1.TidbCluster) { + cluster.Status.PD.Members = map[string]v1alpha1.PDMember{ + "pd-0": {Name: "pd-0", Health: true}, + "pd-1": {Name: "pd-1", Health: false}, + } + }, + syncReclaimPolicyErr: false, + syncPDMemberManagerErr: false, + syncTiKVMemberManagerErr: false, + syncTiDBMemberManagerErr: false, + syncMetaManagerErr: false, + errExpectFn: func(g *GomegaWithT, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(perrors.Find(err, controller.IsRequeueError)).NotTo(BeNil()) + g.Expect(strings.Contains(err.Error(), "waiting for PD cluster running")).To(Equal(true)) + }, + }, + { + name: "pd members count is 3, health count is 3, but ready replicas is 1", + update: func(cluster *v1alpha1.TidbCluster) { + cluster.Status.PD.Members = map[string]v1alpha1.PDMember{ + "pd-0": {Name: "pd-0", Health: true}, + "pd-1": {Name: "pd-1", Health: true}, + "pd-2": {Name: "pd-2", Health: true}, + } + cluster.Status.PD.StatefulSet = &apps.StatefulSetStatus{ReadyReplicas: 1} + }, + syncReclaimPolicyErr: false, + syncPDMemberManagerErr: false, + syncTiKVMemberManagerErr: false, + syncTiDBMemberManagerErr: false, + syncMetaManagerErr: false, + errExpectFn: func(g *GomegaWithT, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(perrors.Find(err, controller.IsRequeueError)).NotTo(BeNil()) + g.Expect(strings.Contains(err.Error(), "waiting for PD cluster running")).To(Equal(true)) + }, + }, + { + name: "tikv member manager sync error", + update: func(cluster *v1alpha1.TidbCluster) { + cluster.Status.PD.Members = map[string]v1alpha1.PDMember{ + "pd-0": {Name: "pd-0", Health: true}, + "pd-1": {Name: "pd-1", Health: true}, + "pd-2": {Name: "pd-2", Health: true}, + } + cluster.Status.PD.StatefulSet = &apps.StatefulSetStatus{ReadyReplicas: 3} + }, + syncReclaimPolicyErr: false, + syncPDMemberManagerErr: false, + syncTiKVMemberManagerErr: true, + syncTiDBMemberManagerErr: false, + syncMetaManagerErr: false, + errExpectFn: func(g *GomegaWithT, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(strings.Contains(err.Error(), "tikv member manager sync error")).To(Equal(true)) + }, + }, + { + name: "pd is ready, tikv stores count is 0", + update: func(cluster *v1alpha1.TidbCluster) { + cluster.Status.PD.Members = map[string]v1alpha1.PDMember{ + "pd-0": {Name: "pd-0", Health: true}, + "pd-1": {Name: "pd-1", Health: true}, + "pd-2": {Name: "pd-2", Health: true}, + } + cluster.Status.PD.StatefulSet = &apps.StatefulSetStatus{ReadyReplicas: 3} + cluster.Status.TiKV.Stores = map[string]v1alpha1.TiKVStore{} + }, + syncReclaimPolicyErr: false, + syncPDMemberManagerErr: false, + syncTiKVMemberManagerErr: false, + syncTiDBMemberManagerErr: false, + syncMetaManagerErr: false, + errExpectFn: func(g *GomegaWithT, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(perrors.Find(err, controller.IsRequeueError)).NotTo(BeNil()) + g.Expect(strings.Contains(err.Error(), "waiting for TiKV cluster running")).To(Equal(true)) + }, + }, + { + name: "pd is ready, tikv stores count is 1, but available count is 0", + update: func(cluster *v1alpha1.TidbCluster) { + cluster.Status.PD.Members = map[string]v1alpha1.PDMember{ + "pd-0": {Name: "pd-0", Health: true}, + "pd-1": {Name: "pd-1", Health: true}, + "pd-2": {Name: "pd-2", Health: true}, + } + cluster.Status.PD.StatefulSet = &apps.StatefulSetStatus{ReadyReplicas: 3} + cluster.Status.TiKV.Stores = map[string]v1alpha1.TiKVStore{ + "tikv-0": {PodName: "tikv-0", State: v1alpha1.TiKVStateDown}, + } + }, + syncReclaimPolicyErr: false, + syncPDMemberManagerErr: false, + syncTiKVMemberManagerErr: false, + syncTiDBMemberManagerErr: false, + syncMetaManagerErr: false, + errExpectFn: func(g *GomegaWithT, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(perrors.Find(err, controller.IsRequeueError)).NotTo(BeNil()) + g.Expect(strings.Contains(err.Error(), "waiting for TiKV cluster running")).To(Equal(true)) + }, + }, + { + name: "pd is ready, tikv stores count is 1, available count is 1, ready replicas is 0", + update: func(cluster *v1alpha1.TidbCluster) { + cluster.Status.PD.Members = map[string]v1alpha1.PDMember{ + "pd-0": {Name: "pd-0", Health: true}, + "pd-1": {Name: "pd-1", Health: true}, + "pd-2": {Name: "pd-2", Health: true}, + } + cluster.Status.PD.StatefulSet = &apps.StatefulSetStatus{ReadyReplicas: 3} + cluster.Status.TiKV.Stores = map[string]v1alpha1.TiKVStore{ + "tikv-0": {PodName: "tikv-0", State: v1alpha1.TiKVStateUp}, + } + cluster.Status.TiKV.StatefulSet = &apps.StatefulSetStatus{ReadyReplicas: 0} + }, + syncReclaimPolicyErr: false, + syncPDMemberManagerErr: false, + syncTiKVMemberManagerErr: false, + syncTiDBMemberManagerErr: false, + syncMetaManagerErr: false, + errExpectFn: func(g *GomegaWithT, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(perrors.Find(err, controller.IsRequeueError)).NotTo(BeNil()) + g.Expect(strings.Contains(err.Error(), "waiting for TiKV cluster running")).To(Equal(true)) + }, + }, + { + name: "tidb member manager sync error", + update: func(cluster *v1alpha1.TidbCluster) { + cluster.Status.PD.Members = map[string]v1alpha1.PDMember{ + "pd-0": {Name: "pd-0", Health: true}, + "pd-1": {Name: "pd-1", Health: true}, + "pd-2": {Name: "pd-2", Health: true}, + } + cluster.Status.PD.StatefulSet = &apps.StatefulSetStatus{ReadyReplicas: 3} + cluster.Status.TiKV.Stores = map[string]v1alpha1.TiKVStore{ + "tikv-0": {PodName: "tikv-0", State: v1alpha1.TiKVStateUp}, + "tikv-1": {PodName: "tikv-1", State: v1alpha1.TiKVStateUp}, + "tikv-2": {PodName: "tikv-2", State: v1alpha1.TiKVStateUp}, + } + cluster.Status.TiKV.StatefulSet = &apps.StatefulSetStatus{ReadyReplicas: 3} + }, + syncReclaimPolicyErr: false, + syncPDMemberManagerErr: false, + syncTiKVMemberManagerErr: false, + syncTiDBMemberManagerErr: true, + syncMetaManagerErr: false, + errExpectFn: func(g *GomegaWithT, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(strings.Contains(err.Error(), "tidb member manager sync error")).To(Equal(true)) + }, + }, + { + name: "meta manager sync error", + update: func(cluster *v1alpha1.TidbCluster) { + cluster.Status.PD.Members = map[string]v1alpha1.PDMember{ + "pd-0": {Name: "pd-0", Health: true}, + "pd-1": {Name: "pd-1", Health: true}, + "pd-2": {Name: "pd-2", Health: true}, + } + cluster.Status.PD.StatefulSet = &apps.StatefulSetStatus{ReadyReplicas: 3} + cluster.Status.TiKV.Stores = map[string]v1alpha1.TiKVStore{ + "tikv-0": {PodName: "tikv-0", State: v1alpha1.TiKVStateUp}, + "tikv-1": {PodName: "tikv-1", State: v1alpha1.TiKVStateUp}, + "tikv-2": {PodName: "tikv-2", State: v1alpha1.TiKVStateUp}, + } + cluster.Status.TiKV.StatefulSet = &apps.StatefulSetStatus{ReadyReplicas: 3} + }, + syncReclaimPolicyErr: false, + syncPDMemberManagerErr: false, + syncTiKVMemberManagerErr: false, + syncTiDBMemberManagerErr: false, + syncMetaManagerErr: true, + errExpectFn: func(g *GomegaWithT, err error) { + g.Expect(err).To(HaveOccurred()) + g.Expect(strings.Contains(err.Error(), "meta manager sync error")).To(Equal(true)) + }, + }, + { + name: "normal", + update: func(cluster *v1alpha1.TidbCluster) { + cluster.Status.PD.Members = map[string]v1alpha1.PDMember{ + "pd-0": {Name: "pd-0", Health: true}, + "pd-1": {Name: "pd-1", Health: true}, + "pd-2": {Name: "pd-2", Health: true}, + } + cluster.Status.PD.StatefulSet = &apps.StatefulSetStatus{ReadyReplicas: 3} + cluster.Status.TiKV.Stores = map[string]v1alpha1.TiKVStore{ + "tikv-0": {PodName: "tikv-0", State: v1alpha1.TiKVStateUp}, + "tikv-1": {PodName: "tikv-1", State: v1alpha1.TiKVStateUp}, + "tikv-2": {PodName: "tikv-2", State: v1alpha1.TiKVStateUp}, + } + cluster.Status.TiKV.StatefulSet = &apps.StatefulSetStatus{ReadyReplicas: 3} + }, + syncReclaimPolicyErr: false, + syncPDMemberManagerErr: false, + syncTiKVMemberManagerErr: false, + syncTiDBMemberManagerErr: false, + syncMetaManagerErr: false, + errExpectFn: func(g *GomegaWithT, err error) { + g.Expect(err).NotTo(HaveOccurred()) + }, + }, + } + + for i := range tests { + testFn(&tests[i], t) + } } func TestTidbClusterStatusEquality(t *testing.T) { @@ -68,44 +350,20 @@ func TestTidbClusterStatusEquality(t *testing.T) { g.Expect(apiequality.Semantic.DeepEqual(&tcStatus, tcStatusCopy)).To(Equal(false)) } -func newFakeTidbClusterControl() (ControlInterface, *controller.FakeStatefulSetControl, *controller.FakePDControl) { +func newFakeTidbClusterControl() (ControlInterface, *meta.FakeReclaimPolicyManager, *mm.FakePDMemberManager, *mm.FakeTiKVMemberManager, *mm.FakeTiDBMemberManager, *meta.FakeMetaManager) { cli := fake.NewSimpleClientset() - kubeCli := kubefake.NewSimpleClientset() - setInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Apps().V1beta1().StatefulSets() - svcInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Services() tcInformer := informers.NewSharedInformerFactory(cli, 0).Pingcap().V1alpha1().TidbClusters() - pvcInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().PersistentVolumeClaims() - pvInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().PersistentVolumes() recorder := record.NewFakeRecorder(10) - podInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Pods() - nodeInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Nodes() - - pdControl := controller.NewFakePDControl() - tidbControl := controller.NewFakeTiDBControl() - setControl := controller.NewFakeStatefulSetControl(setInformer, tcInformer) - svcControl := controller.NewFakeServiceControl(svcInformer, tcInformer) - pvControl := controller.NewRealPVControl(kubeCli, pvcInformer.Lister(), pvInformer.Lister(), recorder) - pvcControl := controller.NewRealPVCControl(kubeCli, recorder, pvcInformer.Lister()) - podControl := controller.NewRealPodControl(kubeCli, pdControl, podInformer.Lister(), recorder) + tcControl := controller.NewFakeTidbClusterControl(tcInformer) - pdScaler := mm.NewFakePDScaler() - tikvScaler := mm.NewFakeTiKVScaler() - autoFailover := true - pdFailover := mm.NewFakePDFailover() - pdUpgrader := mm.NewFakePDUpgrader() - tikvFailover := mm.NewFakeTiKVFailover() - tikvUpgrader := mm.NewFakeTiKVUpgrader() - tidbUpgrader := mm.NewFakeTiDBUpgrader() - tidbFailover := mm.NewFakeTiDBFailover() - - pdMemberManager := mm.NewPDMemberManager(pdControl, setControl, svcControl, setInformer.Lister(), svcInformer.Lister(), podInformer.Lister(), podControl, pvcInformer.Lister(), pdScaler, pdUpgrader, autoFailover, pdFailover) - tikvMemberManager := mm.NewTiKVMemberManager(pdControl, setControl, svcControl, setInformer.Lister(), svcInformer.Lister(), podInformer.Lister(), nodeInformer.Lister(), autoFailover, tikvFailover, tikvScaler, tikvUpgrader) - tidbMemberManager := mm.NewTiDBMemberManager(setControl, svcControl, tidbControl, setInformer.Lister(), svcInformer.Lister(), podInformer.Lister(), tidbUpgrader, autoFailover, tidbFailover) - reclaimPolicyManager := meta.NewReclaimPolicyManager(pvcInformer.Lister(), pvInformer.Lister(), pvControl) - metaManager := meta.NewMetaManager(pvcInformer.Lister(), pvcControl, pvInformer.Lister(), pvControl, podInformer.Lister(), podControl) + pdMemberManager := mm.NewFakePDMemberManager() + tikvMemberManager := mm.NewFakeTiKVMemberManager() + tidbMemberManager := mm.NewFakeTiDBMemberManager() + reclaimPolicyManager := meta.NewFakeReclaimPolicyManager() + metaManager := meta.NewFakeMetaManager() control := NewDefaultTidbClusterControl(tcControl, pdMemberManager, tikvMemberManager, tidbMemberManager, reclaimPolicyManager, metaManager, recorder) - return control, setControl, pdControl + return control, reclaimPolicyManager, pdMemberManager, tikvMemberManager, tidbMemberManager, metaManager } func syncTidbClusterControl(tc *v1alpha1.TidbCluster, _ *controller.FakeStatefulSetControl, control ControlInterface) error { @@ -118,3 +376,28 @@ func syncTidbClusterControl(tc *v1alpha1.TidbCluster, _ *controller.FakeStateful return nil } + +func newTidbClusterForTidbClusterControl() *v1alpha1.TidbCluster { + return &v1alpha1.TidbCluster{ + TypeMeta: metav1.TypeMeta{ + Kind: "TidbCluster", + APIVersion: "pingcap.com/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pd", + Namespace: corev1.NamespaceDefault, + UID: types.UID("test"), + }, + Spec: v1alpha1.TidbClusterSpec{ + PD: v1alpha1.PDSpec{ + Replicas: 3, + }, + TiKV: v1alpha1.TiKVSpec{ + Replicas: 3, + }, + TiDB: v1alpha1.TiDBSpec{ + Replicas: 1, + }, + }, + } +} diff --git a/pkg/manager/member/pd_member_manager.go b/pkg/manager/member/pd_member_manager.go index 7ff3ce8d8b..2d698c09eb 100644 --- a/pkg/manager/member/pd_member_manager.go +++ b/pkg/manager/member/pd_member_manager.go @@ -606,3 +606,22 @@ func (pmm *pdMemberManager) getNewPDSetForTidbCluster(tc *v1alpha1.TidbCluster) return pdSet, nil } + +type FakePDMemberManager struct { + err error +} + +func NewFakePDMemberManager() *FakePDMemberManager { + return &FakePDMemberManager{} +} + +func (fpmm *FakePDMemberManager) SetSyncError(err error) { + fpmm.err = err +} + +func (fpmm *FakePDMemberManager) Sync(_ *v1alpha1.TidbCluster) error { + if fpmm.err != nil { + return fpmm.err + } + return nil +} diff --git a/pkg/manager/member/tidb_member_manager.go b/pkg/manager/member/tidb_member_manager.go index f1ac8dd32b..1cfe7864ef 100644 --- a/pkg/manager/member/tidb_member_manager.go +++ b/pkg/manager/member/tidb_member_manager.go @@ -408,3 +408,22 @@ func (tmm *tidbMemberManager) tidbStatefulSetIsUpgrading(set *apps.StatefulSet, } return false, nil } + +type FakeTiDBMemberManager struct { + err error +} + +func NewFakeTiDBMemberManager() *FakeTiDBMemberManager { + return &FakeTiDBMemberManager{} +} + +func (ftmm *FakeTiDBMemberManager) SetSyncError(err error) { + ftmm.err = err +} + +func (ftmm *FakeTiDBMemberManager) Sync(_ *v1alpha1.TidbCluster) error { + if ftmm.err != nil { + return ftmm.err + } + return nil +} diff --git a/pkg/manager/member/tikv_member_manager.go b/pkg/manager/member/tikv_member_manager.go index 8cb78269b5..1ce5fd695a 100644 --- a/pkg/manager/member/tikv_member_manager.go +++ b/pkg/manager/member/tikv_member_manager.go @@ -633,3 +633,22 @@ func (tkmm *tikvMemberManager) tikvStatefulSetIsUpgrading(set *apps.StatefulSet, return evictLeaderSchedulers != nil && len(evictLeaderSchedulers) > 0, nil } + +type FakeTiKVMemberManager struct { + err error +} + +func NewFakeTiKVMemberManager() *FakeTiKVMemberManager { + return &FakeTiKVMemberManager{} +} + +func (ftmm *FakeTiKVMemberManager) SetSyncError(err error) { + ftmm.err = err +} + +func (ftmm *FakeTiKVMemberManager) Sync(_ *v1alpha1.TidbCluster) error { + if ftmm.err != nil { + return ftmm.err + } + return nil +} diff --git a/pkg/manager/meta/meta_manager.go b/pkg/manager/meta/meta_manager.go index c8fea1455b..be152ab2b7 100644 --- a/pkg/manager/meta/meta_manager.go +++ b/pkg/manager/meta/meta_manager.go @@ -126,3 +126,22 @@ func (pmm *metaManager) resolvePVCFromPod(pod *corev1.Pod) (*corev1.PersistentVo } var _ manager.Manager = &metaManager{} + +type FakeMetaManager struct { + err error +} + +func NewFakeMetaManager() *FakeMetaManager { + return &FakeMetaManager{} +} + +func (fmm *FakeMetaManager) SetSyncError(err error) { + fmm.err = err +} + +func (fmm *FakeMetaManager) Sync(_ *v1alpha1.TidbCluster) error { + if fmm.err != nil { + return fmm.err + } + return nil +} diff --git a/pkg/manager/meta/reclaim_policy_manager.go b/pkg/manager/meta/reclaim_policy_manager.go index ffb6103a56..421a43f65a 100644 --- a/pkg/manager/meta/reclaim_policy_manager.go +++ b/pkg/manager/meta/reclaim_policy_manager.go @@ -71,3 +71,22 @@ func (rpm *reclaimPolicyManager) Sync(tc *v1alpha1.TidbCluster) error { } var _ manager.Manager = &reclaimPolicyManager{} + +type FakeReclaimPolicyManager struct { + err error +} + +func NewFakeReclaimPolicyManager() *FakeReclaimPolicyManager { + return &FakeReclaimPolicyManager{} +} + +func (frpm *FakeReclaimPolicyManager) SetSyncError(err error) { + frpm.err = err +} + +func (frpm *FakeReclaimPolicyManager) Sync(_ *v1alpha1.TidbCluster) error { + if frpm.err != nil { + return frpm.err + } + return nil +} diff --git a/pkg/scheduler/predicates/ha.go b/pkg/scheduler/predicates/ha.go index 6372638f52..3093add244 100644 --- a/pkg/scheduler/predicates/ha.go +++ b/pkg/scheduler/predicates/ha.go @@ -16,6 +16,7 @@ package predicates import ( "fmt" + "github.com/golang/glog" "github.com/pingcap/tidb-operator/pkg/label" "github.com/pingcap/tidb-operator/pkg/util" apiv1 "k8s.io/api/core/v1" @@ -69,6 +70,16 @@ func (h *ha) Filter(instanceName string, pod *apiv1.Pod, nodes []apiv1.Node) ([] return nil, err } + // when a deleted pod is recreated again, it should be rescheduled to the original node + if len(nodes) == 1 { + nextPodName := util.GetNextOrdinalPodName(podName, ordinal) + for _, pod := range podList.Items { + if pod.GetName() == nextPodName && pod.Spec.NodeName != "" { + return nodes, nil + } + } + } + nodeMap := make(map[string][]string) for _, node := range nodes { nodeMap[node.GetName()] = make([]string, 0) @@ -84,15 +95,13 @@ func (h *ha) Filter(instanceName string, pod *apiv1.Pod, nodes []apiv1.Node) ([] if ordinal1 < ordinal && nodeName == "" { return nil, fmt.Errorf("waiting for pod: %s/%s to be scheduled", ns, podName1) } - if nodeName == "" { - continue - } - if nodeMap[nodeName] == nil { + if nodeName == "" || nodeMap[nodeName] == nil { continue } nodeMap[nodeName] = append(nodeMap[nodeName], podName1) } + glog.V(4).Infof("nodeMap: %+v", nodeMap) var min int var minInitialized bool diff --git a/pkg/scheduler/predicates/ha_test.go b/pkg/scheduler/predicates/ha_test.go index a7e531fe38..9e92cb8c18 100644 --- a/pkg/scheduler/predicates/ha_test.go +++ b/pkg/scheduler/predicates/ha_test.go @@ -252,6 +252,18 @@ func TestHAFilter(t *testing.T) { g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-1", "kube-node-2", "kube-node-3"})) }, }, + { + name: "5 pods scheduled to 3 nodes(and then delete the ordinal 1 pod), the created pod ordinal 1 should rescheduled to node 3", + ordinal: 1, + podFn: newHAPDPod, + nodesFn: fakeOneNode, + podListFn: podListFn(map[string][]int32{"kube-node-2": {0}, "kube-node-3": {4}, "kube-node-1": {2, 3}}), + expectFn: func(nodes []apiv1.Node, err error) { + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(len(nodes)).To(Equal(1)) + g.Expect(getSortedNodeNames(nodes)).To(Equal([]string{"kube-node-3"})) + }, + }, } for i := range tests { diff --git a/pkg/util/util.go b/pkg/util/util.go index 1142ffac9b..da7f092d77 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -14,6 +14,7 @@ package util import ( + "fmt" "sort" "strconv" "strings" @@ -216,3 +217,8 @@ func GetOrdinalFromPodName(podName string) (int32, error) { } return int32(ordinalInt), nil } + +func GetNextOrdinalPodName(podName string, ordinal int32) string { + basicStr := podName[:strings.LastIndex(podName, "-")] + return fmt.Sprintf("%s-%d", basicStr, ordinal+1) +}