Skip to content

Commit

Permalink
Sync pump status to tidb-cluster. (#1292)
Browse files Browse the repository at this point in the history
* Specifiy terraform ack version.

* Add sync pump status to tidb-cluster CR

* Revert "Specifiy terraform ack version."

This reverts commit 6506e0b.

* fix some comments

* add unit test and fix syncTiDBClusterStatus func

* fix updateTC and remove PumpStatus Members

* Update pkg/manager/member/pump_member_manager_test.go

Co-Authored-By: Aylei <rayingecho@gmail.com>

* Update pump_member_manager.go

* fix tidbclusterstatus struct pkg/apis/pingcap/v1alpha1/types.go

Co-Authored-By: weekface <weekface@gmail.com>

* fix cmControl

* update codegenerated deepcopy

* fix zz_generated.deepcopy.go

* run hack/codegen.sh

* Update pump_member_manager.go

Co-authored-by: Aylei <rayingecho@gmail.com>
Co-authored-by: weekface <weekface@gmail.com>
Co-authored-by: pingcap-github-bot <sre-bot@pingcap.com>
  • Loading branch information
4 people committed Dec 21, 2019
1 parent 0e0a358 commit 90ead3d
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 5 deletions.
7 changes: 7 additions & 0 deletions pkg/apis/pingcap/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ type TidbClusterStatus struct {
PD PDStatus `json:"pd,omitempty"`
TiKV TiKVStatus `json:"tikv,omitempty"`
TiDB TiDBStatus `json:"tidb,omitempty"`
Pump PumpStatus `josn:"pump,omitempty"`
}

// +k8s:openapi-gen=true
Expand Down Expand Up @@ -468,6 +469,12 @@ type TiKVFailureStore struct {
CreatedAt metav1.Time `json:"createdAt,omitempty"`
}

// PumpStatus is Pump status
type PumpStatus struct {
Phase MemberPhase `json:"phase,omitempty"`
StatefulSet *apps.StatefulSetStatus `json:"statefulSet,omitempty"`
}

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

Expand Down
22 changes: 22 additions & 0 deletions pkg/apis/pingcap/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/controller/tidbcluster/tidb_cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func NewController(
tcControl := controller.NewRealTidbClusterControl(cli, tcInformer.Lister(), recorder)
pdControl := pdapi.NewDefaultPDControl(kubeCli)
tidbControl := controller.NewDefaultTiDBControl()
cmControl := controller.NewRealConfigMapControl(kubeCli, cmInformer.Lister(), recorder)
setControl := controller.NewRealStatefuSetControl(kubeCli, setInformer.Lister(), recorder)
svcControl := controller.NewRealServiceControl(kubeCli, svcInformer.Lister(), recorder)
pvControl := controller.NewRealPVControl(kubeCli, pvcInformer.Lister(), pvInformer.Lister(), recorder)
Expand Down Expand Up @@ -197,9 +198,11 @@ func NewController(
setControl,
svcControl,
typedControl,
cmControl,
setInformer.Lister(),
svcInformer.Lister(),
cmInformer.Lister(),
podInformer.Lister(),
),
mm.NewTidbDiscoveryManager(typedControl),
podRestarter,
Expand Down
70 changes: 65 additions & 5 deletions pkg/manager/member/pump_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (
"github.com/pingcap/tidb-operator/pkg/label"
"github.com/pingcap/tidb-operator/pkg/manager"
"github.com/pingcap/tidb-operator/pkg/util"
apps "k8s.io/api/apps/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
v1 "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
glog "k8s.io/klog"
)

const (
Expand All @@ -39,26 +41,32 @@ type pumpMemberManager struct {
setControl controller.StatefulSetControlInterface
svcControl controller.ServiceControlInterface
typedControl controller.TypedControlInterface
cmControl controller.ConfigMapControlInterface
setLister v1.StatefulSetLister
svcLister corelisters.ServiceLister
cmLister corelisters.ConfigMapLister
podLister corelisters.PodLister
}

// NewPumpMemberManager returns a controller to reconcile pump clusters
func NewPumpMemberManager(
setControl controller.StatefulSetControlInterface,
svcControl controller.ServiceControlInterface,
typedControl controller.TypedControlInterface,
cmControl controller.ConfigMapControlInterface,
setLister v1.StatefulSetLister,
svcLister corelisters.ServiceLister,
cmLister corelisters.ConfigMapLister) manager.Manager {
cmLister corelisters.ConfigMapLister,
podLister corelisters.PodLister) manager.Manager {
return &pumpMemberManager{
setControl,
svcControl,
typedControl,
cmControl,
setLister,
svcLister,
cmLister,
podLister,
}
}

Expand All @@ -69,12 +77,11 @@ func (pmm *pumpMemberManager) Sync(tc *v1alpha1.TidbCluster) error {
if err := pmm.syncHeadlessService(tc); err != nil {
return err
}
return pmm.syncStatefulSet(tc)
return pmm.syncPumpStatefulSetForTidbCluster(tc)
}

// syncStatefulSet syncs the pump statefulset
// TODO: sync statefulset status of pump to tidbcluster
func (pmm *pumpMemberManager) syncStatefulSet(tc *v1alpha1.TidbCluster) error {
//syncPumpStatefulSetForTidbCluster sync statefulset status of pump to tidbcluster
func (pmm *pumpMemberManager) syncPumpStatefulSetForTidbCluster(tc *v1alpha1.TidbCluster) error {

oldPumpSetTemp, err := pmm.setLister.StatefulSets(tc.Namespace).Get(controller.PumpMemberName(tc.Name))
if err != nil && !errors.IsNotFound(err) {
Expand Down Expand Up @@ -118,6 +125,32 @@ func (pmm *pumpMemberManager) syncStatefulSet(tc *v1alpha1.TidbCluster) error {
_, err = pmm.setControl.UpdateStatefulSet(tc, &set)
return err
}

if err := pmm.syncTiDBClusterStatus(tc, oldPumpSet); err != nil {
glog.Errorf("failed to sync TidbCluster: [%s/%s]'s status, error: %v", tc.Namespace, tc.Name, err)
return err
}
return nil
}

func (pmm *pumpMemberManager) syncTiDBClusterStatus(tc *v1alpha1.TidbCluster, set *apps.StatefulSet) error {

tc.Status.Pump.StatefulSet = &set.Status

upgrading, err := pmm.pumpStatefulSetIsUpgrading(set, tc)
if err != nil {
return err
}
if upgrading {
tc.Status.Pump.Phase = v1alpha1.UpgradePhase
} else {
tc.Status.Pump.Phase = v1alpha1.NormalPhase
}

//TODO: support sync pump members from PD.
// pumpStatus := map[string]v1alpha1.PumpMember{}
// tc.Status.Pump.Members = pumpStatus

return nil
}

Expand Down Expand Up @@ -416,6 +449,33 @@ func getPumpLogLevel(tc *v1alpha1.TidbCluster) string {
return logLevel
}

func (pmm *pumpMemberManager) pumpStatefulSetIsUpgrading(set *apps.StatefulSet, tc *v1alpha1.TidbCluster) (bool, error) {
if statefulSetIsUpgrading(set) {
return true, nil
}
selector, err := label.New().
Instance(tc.GetLabels()[label.InstanceLabelKey]).
Pump().
Selector()
if err != nil {
return false, err
}
pumpPods, err := pmm.podLister.Pods(tc.GetNamespace()).List(selector)
if err != nil {
return false, err
}
for _, pod := range pumpPods {
revisionHash, exist := pod.Labels[apps.ControllerRevisionHashLabelKey]
if !exist {
return false, nil
}
if revisionHash != tc.Status.Pump.StatefulSet.UpdateRevision {
return true, nil
}
}
return false, nil
}

type FakePumpMemberManager struct {
err error
}
Expand Down
60 changes: 60 additions & 0 deletions pkg/manager/member/pump_member_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
kubeinformers "k8s.io/client-go/informers"
kubefake "k8s.io/client-go/kubernetes/fake"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -452,16 +453,20 @@ func newFakePumpMemberManager() (*pumpMemberManager, *pumpFakeControls, *pumpFak
svcInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Services()
epsInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Endpoints()
cmInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().ConfigMaps()
podInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Core().V1().Pods()
setControl := controller.NewFakeStatefulSetControl(setInformer, tcInformer)
svcControl := controller.NewFakeServiceControl(svcInformer, epsInformer, tcInformer)
cmControl := controller.NewFakeConfigMapControl(cmInformer)
genericControl := controller.NewFakeGenericControl()
pmm := &pumpMemberManager{
setControl,
svcControl,
controller.NewTypedControl(genericControl),
cmControl,
setInformer.Lister(),
svcInformer.Lister(),
cmInformer.Lister(),
podInformer.Lister(),
}
controls := &pumpFakeControls{
svc: svcControl,
Expand Down Expand Up @@ -724,3 +729,58 @@ func TestGetNewPumpConfigMap(t *testing.T) {
}

// TODO: add ut for getPumpStatefulSet
func TestSyncTiDBClusterStatus(t *testing.T) {
g := NewGomegaWithT(t)
type testcase struct {
name string
updateTC func(*appsv1.StatefulSet)
upgradingFn func(corelisters.PodLister, *appsv1.StatefulSet, *v1alpha1.TidbCluster) (bool, error)
errExpectFn func(*GomegaWithT, error)
tcExpectFn func(*GomegaWithT, *v1alpha1.TidbCluster)
}
status := appsv1.StatefulSetStatus{
Replicas: int32(3),
}
testFn := func(test *testcase, t *testing.T) {
tc := newTidbClusterForPump()

set := &appsv1.StatefulSet{
Status: status,
}
if test.updateTC != nil {
test.updateTC(set)
}
pmm, _, _ := newFakePumpMemberManager()

err := pmm.syncTiDBClusterStatus(tc, set)

if test.errExpectFn != nil {
test.errExpectFn(g, err)
}
if test.tcExpectFn != nil {
test.tcExpectFn(g, tc)
}
}
tests := []testcase{
{
name: "statefulset is upgrading",
updateTC: func(set *appsv1.StatefulSet) {
set.Status.CurrentRevision = "pump-v1"
set.Status.UpdateRevision = "pump-v2"
},
upgradingFn: func(lister corelisters.PodLister, set *appsv1.StatefulSet, cluster *v1alpha1.TidbCluster) (bool, error) {
return true, nil
},
errExpectFn: nil,
tcExpectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster) {
g.Expect(tc.Status.Pump.StatefulSet.Replicas).To(Equal(int32(3)))
g.Expect(tc.Status.Pump.Phase).To(Equal(v1alpha1.UpgradePhase))
},
},
}

for i := range tests {
t.Logf(tests[i].name)
testFn(&tests[i], t)
}
}

0 comments on commit 90ead3d

Please sign in to comment.