Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

failover: emit events when pd failover #1466

Merged
merged 11 commits into from
Jan 8, 2020
8 changes: 4 additions & 4 deletions pkg/controller/tidbcluster/tidb_cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ func NewController(
tikvFailoverPeriod time.Duration,
tidbFailoverPeriod time.Duration,
) *Controller {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{QPS: 1})
eventBroadcaster.StartLogging(glog.V(2).Infof)
eventBroadcaster.StartRecordingToSink(&eventv1.EventSinkImpl{
Interface: eventv1.New(kubeCli.CoreV1().RESTClient()).Events("")})
recorder := eventBroadcaster.NewRecorder(v1alpha1.Scheme, corev1.EventSource{Component: "tidbcluster"})
recorder := eventBroadcaster.NewRecorder(v1alpha1.Scheme, corev1.EventSource{Component: "tidb-controller-manager"})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like k8s does


tcInformer := informerFactory.Pingcap().V1alpha1().TidbClusters()
setInformer := kubeInformerFactory.Apps().V1().StatefulSets()
Expand Down Expand Up @@ -108,7 +108,7 @@ func NewController(
typedControl := controller.NewTypedControl(controller.NewRealGenericControl(genericCli, recorder))
pdScaler := mm.NewPDScaler(pdControl, pvcInformer.Lister(), pvcControl)
tikvScaler := mm.NewTiKVScaler(pdControl, pvcInformer.Lister(), pvcControl, podInformer.Lister())
pdFailover := mm.NewPDFailover(cli, pdControl, pdFailoverPeriod, podInformer.Lister(), podControl, pvcInformer.Lister(), pvcControl, pvInformer.Lister())
pdFailover := mm.NewPDFailover(cli, pdControl, pdFailoverPeriod, podInformer.Lister(), podControl, pvcInformer.Lister(), pvcControl, pvInformer.Lister(), recorder)
tikvFailover := mm.NewTiKVFailover(tikvFailoverPeriod)
tidbFailover := mm.NewTiDBFailover(tidbFailoverPeriod)
pdUpgrader := mm.NewPDUpgrader(pdControl, podControl, podInformer.Lister())
Expand Down
19 changes: 16 additions & 3 deletions pkg/manager/member/pd_failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/pingcap/tidb-operator/pkg/pdapi"
"github.com/pingcap/tidb-operator/pkg/util"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
glog "k8s.io/klog"
)

Expand All @@ -39,6 +41,7 @@ type pdFailover struct {
pvcLister corelisters.PersistentVolumeClaimLister
pvcControl controller.PVCControlInterface
pvLister corelisters.PersistentVolumeLister
recorder record.EventRecorder
}

// NewPDFailover returns a pd Failover
Expand All @@ -49,7 +52,8 @@ func NewPDFailover(cli versioned.Interface,
podControl controller.PodControlInterface,
pvcLister corelisters.PersistentVolumeClaimLister,
pvcControl controller.PVCControlInterface,
pvLister corelisters.PersistentVolumeLister) Failover {
pvLister corelisters.PersistentVolumeLister,
recorder record.EventRecorder) Failover {
return &pdFailover{
cli,
pdControl,
Expand All @@ -58,7 +62,8 @@ func NewPDFailover(cli versioned.Interface,
podControl,
pvcLister,
pvcControl,
pvLister}
pvLister,
recorder}
}

func (pf *pdFailover) Failover(tc *v1alpha1.TidbCluster) error {
Expand All @@ -73,9 +78,12 @@ func (pf *pdFailover) Failover(tc *v1alpha1.TidbCluster) error {
}

healthCount := 0
for _, pdMember := range tc.Status.PD.Members {
for podName, pdMember := range tc.Status.PD.Members {
if pdMember.Health {
healthCount++
} else {
pf.recorder.Eventf(tc, apiv1.EventTypeWarning, "PDMemberUnhealthy",
"%s(%s) is unhealthy", podName, pdMember.ID)
Comment on lines +85 to +86
Copy link
Contributor

@aylei aylei Jan 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, this is better to consider as a status instead of event, recording this will be too noisy (despite the flow control, the controller will emit an event in each round of control loop if there is an unhealthy PD member).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The is already a .status.pd[].health attribute.

Emit it as an event is good for kubectl describe

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, seems like it is already in the describe result?

    Members:
      Yutengqiu - Demo - Pd - 0:
        Client URL:            http://yutengqiu-demo-pd-0.yutengqiu-demo-pd-peer.yutengqiu.svc:2379
        Health:                true
        Id:                    12697782363740270066
        Last Transition Time:  2020-01-06T01:48:58Z
        Name:                  yutengqiu-demo-pd-0
      Yutengqiu - Demo - Pd - 3:
        Client URL:            http://yutengqiu-demo-pd-3.yutengqiu-demo-pd-peer.yutengqiu.svc:2379
        Health:                true
        Id:                    10833084519111696661
        Last Transition Time:  2020-01-06T05:37:58Z
        Name:                  yutengqiu-demo-pd-3
      Yutengqiu - Demo - Pd - 4:
        Client URL:            http://yutengqiu-demo-pd-4.yutengqiu-demo-pd-peer.yutengqiu.svc:2379
        Health:                true
        Id:                    10563190389194377650
        Last Transition Time:  2020-01-06T05:44:25Z
        Name:                  yutengqiu-demo-pd-4
      Yutengqiu - Demo - Pd - 6:
        Client URL:            http://yutengqiu-demo-pd-6.yutengqiu-demo-pd-peer.yutengqiu.svc:2379
        Health:                true
        Id:                    6735927804110166558
        Last Transition Time:  2020-01-06T05:32:10Z
        Name:                  yutengqiu-demo-pd-6

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emit an event is more user-friendly, it is a progress of failover.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or change PDMemberUnhealthy to a more proper name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the progress of failover makes sense, it is okay for me to emit events based on the the status of PD when we cannot actually capture the the "PD turning from healthy to unhealthy" event.

The naming issue is just trivial, I think current name is ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to report the event on state change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good suggestion, we can do this in the syncTidbClusterStatus method, not failover method, an issue opened: #1495

}
}
inQuorum := healthCount > len(tc.Status.PD.Members)/2
Expand Down Expand Up @@ -132,6 +140,9 @@ func (pf *pdFailover) tryToMarkAPeerAsFailure(tc *v1alpha1.TidbCluster) error {
return err
}

pf.recorder.Eventf(tc, apiv1.EventTypeWarning, "PDMemberMarkedAsFailure",
"%s(%s) marked as a failure member", podName, pdMember.ID)

tc.Status.PD.FailureMembers[podName] = v1alpha1.PDFailureMember{
PodName: podName,
MemberID: pdMember.ID,
Expand Down Expand Up @@ -173,6 +184,8 @@ func (pf *pdFailover) tryToDeleteAFailureMember(tc *v1alpha1.TidbCluster) error
return err
}
glog.Infof("pd failover: delete member: %d successfully", memberID)
pf.recorder.Eventf(tc, apiv1.EventTypeWarning, "PDMemberDeleted",
"%s(%d) deleted from cluster", failurePodName, memberID)

// The order of old PVC deleting and the new Pod creating is not guaranteed by Kubernetes.
// If new Pod is created before old PVC deleted, new Pod will reuse old PVC.
Expand Down
72 changes: 71 additions & 1 deletion pkg/manager/member/pd_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package member

import (
"fmt"
"sort"
"strings"
"testing"
"time"
Expand All @@ -31,11 +32,13 @@ import (
kubeinformers "k8s.io/client-go/informers"
kubefake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
)

func TestPDFailoverFailover(t *testing.T) {
g := NewGomegaWithT(t)

recorder := record.NewFakeRecorder(100)
type testcase struct {
name string
update func(*v1alpha1.TidbCluster)
Expand All @@ -57,6 +60,7 @@ func TestPDFailoverFailover(t *testing.T) {

pdFailover, pvcIndexer, podIndexer, fakePDControl, fakePodControl, fakePVCControl := newFakePDFailover()
pdClient := controller.NewFakePDClient(fakePDControl, tc)
pdFailover.recorder = recorder

pdClient.AddReaction(pdapi.DeleteMemberByIDActionType, func(action *pdapi.Action) (interface{}, error) {
if test.delMemberFailed {
Expand Down Expand Up @@ -107,6 +111,8 @@ func TestPDFailoverFailover(t *testing.T) {
expectFn: func(tc *v1alpha1.TidbCluster, _ *pdFailover) {
g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3))
g.Expect(len(tc.Status.PD.FailureMembers)).To(Equal(0))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(0))
},
},
{
Expand All @@ -122,6 +128,8 @@ func TestPDFailoverFailover(t *testing.T) {
errExpectFn: errExpectNotNil,
expectFn: func(tc *v1alpha1.TidbCluster, _ *pdFailover) {
g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(0))
},
},
{
Expand All @@ -141,6 +149,11 @@ func TestPDFailoverFailover(t *testing.T) {
expectFn: func(tc *v1alpha1.TidbCluster, _ *pdFailover) {
g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3))
g.Expect(len(tc.Status.PD.FailureMembers)).To(Equal(0))
events := collectEvents(recorder.Events)
sort.Strings(events)
g.Expect(events).To(HaveLen(2))
g.Expect(events[0]).To(ContainSubstring("test-pd-0(0) is unhealthy"))
g.Expect(events[1]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
},
},
{
Expand All @@ -160,6 +173,9 @@ func TestPDFailoverFailover(t *testing.T) {
pd1, ok := tc.Status.PD.FailureMembers[pd1Name]
g.Expect(ok).To(Equal(true))
g.Expect(pd1.MemberDeleted).To(Equal(true))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(1))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) deleted from cluster"))
},
},
{
Expand All @@ -182,6 +198,9 @@ func TestPDFailoverFailover(t *testing.T) {
expectFn: func(tc *v1alpha1.TidbCluster, _ *pdFailover) {
g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3))
g.Expect(len(tc.Status.PD.FailureMembers)).To(Equal(0))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(1))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
},
},
{
Expand All @@ -204,6 +223,9 @@ func TestPDFailoverFailover(t *testing.T) {
expectFn: func(tc *v1alpha1.TidbCluster, _ *pdFailover) {
g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3))
g.Expect(len(tc.Status.PD.FailureMembers)).To(Equal(0))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(1))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
},
},
{
Expand All @@ -223,6 +245,9 @@ func TestPDFailoverFailover(t *testing.T) {
expectFn: func(tc *v1alpha1.TidbCluster, _ *pdFailover) {
g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3))
g.Expect(len(tc.Status.PD.FailureMembers)).To(Equal(0))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(1))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
},
},
{
Expand All @@ -247,6 +272,10 @@ func TestPDFailoverFailover(t *testing.T) {
g.Expect(failureMembers.MemberID).To(Equal("12891273174085095651"))
g.Expect(string(failureMembers.PVCUID)).To(Equal("pvc-1-uid"))
g.Expect(failureMembers.MemberDeleted).To(BeFalse())
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(2))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
g.Expect(events[1]).To(ContainSubstring("test-pd-1(12891273174085095651) marked as a failure member"))
},
},
{
Expand All @@ -266,6 +295,10 @@ func TestPDFailoverFailover(t *testing.T) {
pd1, ok := tc.Status.PD.FailureMembers[pd1Name]
g.Expect(ok).To(Equal(true))
g.Expect(pd1.MemberDeleted).To(Equal(true))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(2))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
g.Expect(events[1]).To(ContainSubstring("test-pd-1(12891273174085095651) deleted from cluster"))
},
},
{
Expand Down Expand Up @@ -294,6 +327,9 @@ func TestPDFailoverFailover(t *testing.T) {
pd1, ok := tc.Status.PD.FailureMembers[pd1Name]
g.Expect(ok).To(Equal(true))
g.Expect(pd1.MemberDeleted).To(Equal(false))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(1))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
},
},
{
Expand All @@ -316,6 +352,9 @@ func TestPDFailoverFailover(t *testing.T) {
pd1, ok := tc.Status.PD.FailureMembers[pd1Name]
g.Expect(ok).To(Equal(true))
g.Expect(pd1.MemberDeleted).To(Equal(false))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(1))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
},
},
{
Expand All @@ -338,6 +377,10 @@ func TestPDFailoverFailover(t *testing.T) {
pd1, ok := tc.Status.PD.FailureMembers[pd1Name]
g.Expect(ok).To(Equal(true))
g.Expect(pd1.MemberDeleted).To(Equal(false))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(2))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
g.Expect(events[1]).To(ContainSubstring("test-pd-1(12891273174085095651) deleted from cluster"))
},
},
{
Expand All @@ -360,6 +403,10 @@ func TestPDFailoverFailover(t *testing.T) {
pd1, ok := tc.Status.PD.FailureMembers[pd1Name]
g.Expect(ok).To(Equal(true))
g.Expect(pd1.MemberDeleted).To(Equal(false))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(2))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
g.Expect(events[1]).To(ContainSubstring("test-pd-1(12891273174085095651) deleted from cluster"))
},
},
{
Expand All @@ -385,6 +432,10 @@ func TestPDFailoverFailover(t *testing.T) {
_, err = pf.pvcLister.PersistentVolumeClaims(metav1.NamespaceDefault).Get(pvcName)
g.Expect(err).To(HaveOccurred())
g.Expect(errors.IsNotFound(err)).To(BeTrue())
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(2))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
g.Expect(events[1]).To(ContainSubstring("test-pd-1(12891273174085095651) deleted from cluster"))
},
},
{
Expand All @@ -411,6 +462,10 @@ func TestPDFailoverFailover(t *testing.T) {
g.Expect(errors.IsNotFound(err)).To(BeTrue())
_, err = pf.pvcLister.PersistentVolumeClaims(metav1.NamespaceDefault).Get(pvcName)
g.Expect(err).NotTo(HaveOccurred())
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(2))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
g.Expect(events[1]).To(ContainSubstring("test-pd-1(12891273174085095651) deleted from cluster"))
},
},
}
Expand Down Expand Up @@ -541,7 +596,8 @@ func newFakePDFailover() (*pdFailover, cache.Indexer, cache.Indexer, *pdapi.Fake
podControl,
pvcInformer.Lister(),
pvcControl,
pvInformer.Lister()},
pvInformer.Lister(),
nil},
pvcInformer.Informer().GetIndexer(),
podInformer.Informer().GetIndexer(),
pdControl, podControl, pvcControl
Expand Down Expand Up @@ -649,3 +705,17 @@ func newPodForPDFailover(tc *v1alpha1.TidbCluster, memberType v1alpha1.MemberTyp
},
}
}

func collectEvents(source <-chan string) []string {
done := false
events := make([]string, 0)
for !done {
select {
case event := <-source:
events = append(events, event)
default:
done = true
}
}
return events
}