Skip to content

Commit

Permalink
failover: emit events when pd failover (#1466) (#1507)
Browse files Browse the repository at this point in the history
  • Loading branch information
sre-bot authored Jan 11, 2020
1 parent 0cc3447 commit 6ca1d83
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 8 deletions.
8 changes: 4 additions & 4 deletions pkg/controller/tidbcluster/tidb_cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ func NewController(
tikvFailoverPeriod time.Duration,
tidbFailoverPeriod time.Duration,
) *Controller {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{QPS: 1})
eventBroadcaster.StartLogging(glog.V(2).Infof)
eventBroadcaster.StartRecordingToSink(&eventv1.EventSinkImpl{
Interface: eventv1.New(kubeCli.CoreV1().RESTClient()).Events("")})
recorder := eventBroadcaster.NewRecorder(v1alpha1.Scheme, corev1.EventSource{Component: "tidbcluster"})
recorder := eventBroadcaster.NewRecorder(v1alpha1.Scheme, corev1.EventSource{Component: "tidb-controller-manager"})

tcInformer := informerFactory.Pingcap().V1alpha1().TidbClusters()
setInformer := kubeInformerFactory.Apps().V1().StatefulSets()
Expand Down Expand Up @@ -108,7 +108,7 @@ func NewController(
typedControl := controller.NewTypedControl(controller.NewRealGenericControl(genericCli, recorder))
pdScaler := mm.NewPDScaler(pdControl, pvcInformer.Lister(), pvcControl)
tikvScaler := mm.NewTiKVScaler(pdControl, pvcInformer.Lister(), pvcControl, podInformer.Lister())
pdFailover := mm.NewPDFailover(cli, pdControl, pdFailoverPeriod, podInformer.Lister(), podControl, pvcInformer.Lister(), pvcControl, pvInformer.Lister())
pdFailover := mm.NewPDFailover(cli, pdControl, pdFailoverPeriod, podInformer.Lister(), podControl, pvcInformer.Lister(), pvcControl, pvInformer.Lister(), recorder)
tikvFailover := mm.NewTiKVFailover(tikvFailoverPeriod)
tidbFailover := mm.NewTiDBFailover(tidbFailoverPeriod)
pdUpgrader := mm.NewPDUpgrader(pdControl, podControl, podInformer.Lister())
Expand Down
19 changes: 16 additions & 3 deletions pkg/manager/member/pd_failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/pingcap/tidb-operator/pkg/pdapi"
"github.com/pingcap/tidb-operator/pkg/util"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
glog "k8s.io/klog"
)

Expand All @@ -39,6 +41,7 @@ type pdFailover struct {
pvcLister corelisters.PersistentVolumeClaimLister
pvcControl controller.PVCControlInterface
pvLister corelisters.PersistentVolumeLister
recorder record.EventRecorder
}

// NewPDFailover returns a pd Failover
Expand All @@ -49,7 +52,8 @@ func NewPDFailover(cli versioned.Interface,
podControl controller.PodControlInterface,
pvcLister corelisters.PersistentVolumeClaimLister,
pvcControl controller.PVCControlInterface,
pvLister corelisters.PersistentVolumeLister) Failover {
pvLister corelisters.PersistentVolumeLister,
recorder record.EventRecorder) Failover {
return &pdFailover{
cli,
pdControl,
Expand All @@ -58,7 +62,8 @@ func NewPDFailover(cli versioned.Interface,
podControl,
pvcLister,
pvcControl,
pvLister}
pvLister,
recorder}
}

func (pf *pdFailover) Failover(tc *v1alpha1.TidbCluster) error {
Expand All @@ -73,9 +78,12 @@ func (pf *pdFailover) Failover(tc *v1alpha1.TidbCluster) error {
}

healthCount := 0
for _, pdMember := range tc.Status.PD.Members {
for podName, pdMember := range tc.Status.PD.Members {
if pdMember.Health {
healthCount++
} else {
pf.recorder.Eventf(tc, apiv1.EventTypeWarning, "PDMemberUnhealthy",
"%s(%s) is unhealthy", podName, pdMember.ID)
}
}
inQuorum := healthCount > len(tc.Status.PD.Members)/2
Expand Down Expand Up @@ -132,6 +140,9 @@ func (pf *pdFailover) tryToMarkAPeerAsFailure(tc *v1alpha1.TidbCluster) error {
return err
}

pf.recorder.Eventf(tc, apiv1.EventTypeWarning, "PDMemberMarkedAsFailure",
"%s(%s) marked as a failure member", podName, pdMember.ID)

tc.Status.PD.FailureMembers[podName] = v1alpha1.PDFailureMember{
PodName: podName,
MemberID: pdMember.ID,
Expand Down Expand Up @@ -173,6 +184,8 @@ func (pf *pdFailover) tryToDeleteAFailureMember(tc *v1alpha1.TidbCluster) error
return err
}
glog.Infof("pd failover: delete member: %d successfully", memberID)
pf.recorder.Eventf(tc, apiv1.EventTypeWarning, "PDMemberDeleted",
"%s(%d) deleted from cluster", failurePodName, memberID)

// The order of old PVC deleting and the new Pod creating is not guaranteed by Kubernetes.
// If new Pod is created before old PVC deleted, new Pod will reuse old PVC.
Expand Down
72 changes: 71 additions & 1 deletion pkg/manager/member/pd_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package member

import (
"fmt"
"sort"
"strings"
"testing"
"time"
Expand All @@ -31,11 +32,13 @@ import (
kubeinformers "k8s.io/client-go/informers"
kubefake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
)

func TestPDFailoverFailover(t *testing.T) {
g := NewGomegaWithT(t)

recorder := record.NewFakeRecorder(100)
type testcase struct {
name string
update func(*v1alpha1.TidbCluster)
Expand All @@ -57,6 +60,7 @@ func TestPDFailoverFailover(t *testing.T) {

pdFailover, pvcIndexer, podIndexer, fakePDControl, fakePodControl, fakePVCControl := newFakePDFailover()
pdClient := controller.NewFakePDClient(fakePDControl, tc)
pdFailover.recorder = recorder

pdClient.AddReaction(pdapi.DeleteMemberByIDActionType, func(action *pdapi.Action) (interface{}, error) {
if test.delMemberFailed {
Expand Down Expand Up @@ -107,6 +111,8 @@ func TestPDFailoverFailover(t *testing.T) {
expectFn: func(tc *v1alpha1.TidbCluster, _ *pdFailover) {
g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3))
g.Expect(len(tc.Status.PD.FailureMembers)).To(Equal(0))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(0))
},
},
{
Expand All @@ -122,6 +128,8 @@ func TestPDFailoverFailover(t *testing.T) {
errExpectFn: errExpectNotNil,
expectFn: func(tc *v1alpha1.TidbCluster, _ *pdFailover) {
g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(0))
},
},
{
Expand All @@ -141,6 +149,11 @@ func TestPDFailoverFailover(t *testing.T) {
expectFn: func(tc *v1alpha1.TidbCluster, _ *pdFailover) {
g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3))
g.Expect(len(tc.Status.PD.FailureMembers)).To(Equal(0))
events := collectEvents(recorder.Events)
sort.Strings(events)
g.Expect(events).To(HaveLen(2))
g.Expect(events[0]).To(ContainSubstring("test-pd-0(0) is unhealthy"))
g.Expect(events[1]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
},
},
{
Expand All @@ -160,6 +173,9 @@ func TestPDFailoverFailover(t *testing.T) {
pd1, ok := tc.Status.PD.FailureMembers[pd1Name]
g.Expect(ok).To(Equal(true))
g.Expect(pd1.MemberDeleted).To(Equal(true))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(1))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) deleted from cluster"))
},
},
{
Expand All @@ -182,6 +198,9 @@ func TestPDFailoverFailover(t *testing.T) {
expectFn: func(tc *v1alpha1.TidbCluster, _ *pdFailover) {
g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3))
g.Expect(len(tc.Status.PD.FailureMembers)).To(Equal(0))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(1))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
},
},
{
Expand All @@ -204,6 +223,9 @@ func TestPDFailoverFailover(t *testing.T) {
expectFn: func(tc *v1alpha1.TidbCluster, _ *pdFailover) {
g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3))
g.Expect(len(tc.Status.PD.FailureMembers)).To(Equal(0))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(1))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
},
},
{
Expand All @@ -223,6 +245,9 @@ func TestPDFailoverFailover(t *testing.T) {
expectFn: func(tc *v1alpha1.TidbCluster, _ *pdFailover) {
g.Expect(int(tc.Spec.PD.Replicas)).To(Equal(3))
g.Expect(len(tc.Status.PD.FailureMembers)).To(Equal(0))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(1))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
},
},
{
Expand All @@ -247,6 +272,10 @@ func TestPDFailoverFailover(t *testing.T) {
g.Expect(failureMembers.MemberID).To(Equal("12891273174085095651"))
g.Expect(string(failureMembers.PVCUID)).To(Equal("pvc-1-uid"))
g.Expect(failureMembers.MemberDeleted).To(BeFalse())
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(2))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
g.Expect(events[1]).To(ContainSubstring("test-pd-1(12891273174085095651) marked as a failure member"))
},
},
{
Expand All @@ -266,6 +295,10 @@ func TestPDFailoverFailover(t *testing.T) {
pd1, ok := tc.Status.PD.FailureMembers[pd1Name]
g.Expect(ok).To(Equal(true))
g.Expect(pd1.MemberDeleted).To(Equal(true))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(2))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
g.Expect(events[1]).To(ContainSubstring("test-pd-1(12891273174085095651) deleted from cluster"))
},
},
{
Expand Down Expand Up @@ -294,6 +327,9 @@ func TestPDFailoverFailover(t *testing.T) {
pd1, ok := tc.Status.PD.FailureMembers[pd1Name]
g.Expect(ok).To(Equal(true))
g.Expect(pd1.MemberDeleted).To(Equal(false))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(1))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
},
},
{
Expand All @@ -316,6 +352,9 @@ func TestPDFailoverFailover(t *testing.T) {
pd1, ok := tc.Status.PD.FailureMembers[pd1Name]
g.Expect(ok).To(Equal(true))
g.Expect(pd1.MemberDeleted).To(Equal(false))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(1))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
},
},
{
Expand All @@ -338,6 +377,10 @@ func TestPDFailoverFailover(t *testing.T) {
pd1, ok := tc.Status.PD.FailureMembers[pd1Name]
g.Expect(ok).To(Equal(true))
g.Expect(pd1.MemberDeleted).To(Equal(false))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(2))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
g.Expect(events[1]).To(ContainSubstring("test-pd-1(12891273174085095651) deleted from cluster"))
},
},
{
Expand All @@ -360,6 +403,10 @@ func TestPDFailoverFailover(t *testing.T) {
pd1, ok := tc.Status.PD.FailureMembers[pd1Name]
g.Expect(ok).To(Equal(true))
g.Expect(pd1.MemberDeleted).To(Equal(false))
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(2))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
g.Expect(events[1]).To(ContainSubstring("test-pd-1(12891273174085095651) deleted from cluster"))
},
},
{
Expand All @@ -385,6 +432,10 @@ func TestPDFailoverFailover(t *testing.T) {
_, err = pf.pvcLister.PersistentVolumeClaims(metav1.NamespaceDefault).Get(pvcName)
g.Expect(err).To(HaveOccurred())
g.Expect(errors.IsNotFound(err)).To(BeTrue())
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(2))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
g.Expect(events[1]).To(ContainSubstring("test-pd-1(12891273174085095651) deleted from cluster"))
},
},
{
Expand All @@ -411,6 +462,10 @@ func TestPDFailoverFailover(t *testing.T) {
g.Expect(errors.IsNotFound(err)).To(BeTrue())
_, err = pf.pvcLister.PersistentVolumeClaims(metav1.NamespaceDefault).Get(pvcName)
g.Expect(err).NotTo(HaveOccurred())
events := collectEvents(recorder.Events)
g.Expect(events).To(HaveLen(2))
g.Expect(events[0]).To(ContainSubstring("test-pd-1(12891273174085095651) is unhealthy"))
g.Expect(events[1]).To(ContainSubstring("test-pd-1(12891273174085095651) deleted from cluster"))
},
},
}
Expand Down Expand Up @@ -541,7 +596,8 @@ func newFakePDFailover() (*pdFailover, cache.Indexer, cache.Indexer, *pdapi.Fake
podControl,
pvcInformer.Lister(),
pvcControl,
pvInformer.Lister()},
pvInformer.Lister(),
nil},
pvcInformer.Informer().GetIndexer(),
podInformer.Informer().GetIndexer(),
pdControl, podControl, pvcControl
Expand Down Expand Up @@ -649,3 +705,17 @@ func newPodForPDFailover(tc *v1alpha1.TidbCluster, memberType v1alpha1.MemberTyp
},
}
}

func collectEvents(source <-chan string) []string {
done := false
events := make([]string, 0)
for !done {
select {
case event := <-source:
events = append(events, event)
default:
done = true
}
}
return events
}

0 comments on commit 6ca1d83

Please sign in to comment.