Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new RequeueError type #80

Merged
merged 12 commits into from
Sep 12, 2018
118 changes: 64 additions & 54 deletions Gopkg.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
@@ -69,3 +69,8 @@ required = [
[[override]]
name = "github.com/BurntSushi/toml"
revision = "3012a1dbe2e4bd1391d42b32f0577cb7bbc7f005"

[[constraint]]
name = "github.com/pkg/errors"
version = "0.9.0"
source = "https://github.com/pingcap/errors.git"
20 changes: 20 additions & 0 deletions pkg/controller/controller_utils.go
Original file line number Diff line number Diff line change
@@ -38,6 +38,26 @@ const (
defaultPushgatewayImage = "prom/pushgateway:v0.3.1"
)

// RequeueError is used to requeue the item, this error type should't be considered as a real error
type RequeueError struct {
s string
}

func (re *RequeueError) Error() string {
return re.s
}

// RequeueErrorf returns a RequeueError
func RequeueErrorf(format string, a ...interface{}) error {
return &RequeueError{fmt.Sprintf(format, a...)}
}

// IsRequeueError returns whether err is a RequeueError
func IsRequeueError(err error) bool {
_, ok := err.(*RequeueError)
return ok
}

// GetOwnerRef returns TidbCluster's OwnerReference
func GetOwnerRef(tc *v1alpha1.TidbCluster) metav1.OwnerReference {
controller := true
19 changes: 14 additions & 5 deletions pkg/controller/tidbcluster/tidb_cluster_control.go
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/manager"
"github.com/pingcap/tidb-operator/pkg/util"
apiequality "k8s.io/apimachinery/pkg/api/equality"
errorutils "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/record"
)

@@ -69,22 +70,30 @@ type defaultTidbClusterControl struct {

// UpdateStatefulSet executes the core logic loop for a tidbcluster.
func (tcc *defaultTidbClusterControl) UpdateTidbCluster(tc *v1alpha1.TidbCluster) error {
// perform the main update function and get the status

oldStatus := tc.Status.DeepCopy()
oldPDReplicas := tc.Spec.PD.Replicas
oldTiKVReplicas := tc.Spec.TiKV.Replicas
oldTiDBReplicas := tc.Spec.TiDB.Replicas

var errs []error
err := tcc.updateTidbCluster(tc)
if err != nil {
return err
errs = append(errs, err)
}

if !apiequality.Semantic.DeepEqual(&tc.Status, oldStatus) || tc.Spec.PD.Replicas != oldPDReplicas {
tc, err = tcc.tcControl.UpdateTidbCluster(tc.DeepCopy())
replicasChanged := tc.Spec.PD.Replicas != oldPDReplicas ||
tc.Spec.TiKV.Replicas != oldTiKVReplicas ||
tc.Spec.TiDB.Replicas != oldTiDBReplicas
if !apiequality.Semantic.DeepEqual(&tc.Status, oldStatus) || replicasChanged {
_, err := tcc.tcControl.UpdateTidbCluster(tc.DeepCopy())
if err != nil {
return err
errs = append(errs, err)
}
}

return nil
return errorutils.NewAggregate(errs)
}

func (tcc *defaultTidbClusterControl) updateTidbCluster(tc *v1alpha1.TidbCluster) error {
7 changes: 6 additions & 1 deletion pkg/controller/tidbcluster/tidb_cluster_controller.go
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/controller"
mm "github.com/pingcap/tidb-operator/pkg/manager/member"
"github.com/pingcap/tidb-operator/pkg/manager/meta"
perrors "github.com/pkg/errors"
apps "k8s.io/api/apps/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
@@ -226,7 +227,11 @@ func (tcc *Controller) processNextWorkItem() bool {
}
defer tcc.queue.Done(key)
if err := tcc.sync(key.(string)); err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing TidbCluster %v, requeuing: %v", key.(string), err))
if perrors.Find(err, controller.IsRequeueError) != nil {
glog.Infof("TidbCluster: %v, still need sync: %v, requeuing", key.(string), err)
} else {
utilruntime.HandleError(fmt.Errorf("TidbCluster: %v, sync failed %v, requeuing", key.(string), err))
}
tcc.queue.AddRateLimited(key)
} else {
tcc.queue.Forget(key)
2 changes: 1 addition & 1 deletion pkg/manager/member/tikv_member_manager.go
Original file line number Diff line number Diff line change
@@ -544,7 +544,7 @@ func (tkmm *tikvMemberManager) setStoreLabelsForTiKV(pdClient controller.PDClien
return err
}

glog.V(2).Infof("Pod: [%s/%s] is on node: [%s]. Node: [%s]'s labels: %v", ns, podName, nodeName, nodeName, ls)
glog.V(4).Infof("Pod: [%s/%s] is on node: [%s]. Node: [%s]'s labels: %v", ns, podName, nodeName, nodeName, ls)
if !tkmm.storeLabelsEqualNodeLabels(store.Store.Labels, ls) {
updated, err := pdClient.SetStoreLabels(store.Store.Id, ls)
if err != nil {
2 changes: 1 addition & 1 deletion pkg/manager/member/tikv_scaler.go
Original file line number Diff line number Diff line change
@@ -86,7 +86,7 @@ func (tsd *tikvScaler) ScaleIn(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSe
}
}
resetReplicas(newSet, oldSet)
return fmt.Errorf("TiKV %s/%s store %d still in cluster, state: %s", ns, podName, id, state)
return controller.RequeueErrorf("TiKV %s/%s store %d still in cluster, state: %s", ns, podName, id, state)
}
}
for _, store := range tc.Status.TiKV.TombstoneStores {
50 changes: 23 additions & 27 deletions pkg/manager/member/tikv_scaler_test.go
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ func TestTiKVScalerScaleOut(t *testing.T) {
hasPVC bool
hasDeferAnn bool
pvcDeleteErr bool
err bool
errExpectFn func(*GomegaWithT, error)
changed bool
}

@@ -75,11 +75,7 @@ func TestTiKVScalerScaleOut(t *testing.T) {
}

err := scaler.ScaleOut(tc, oldSet, newSet)
if test.err {
g.Expect(err).To(HaveOccurred())
} else {
g.Expect(err).NotTo(HaveOccurred())
}
test.errExpectFn(g, err)
if test.changed {
g.Expect(int(*newSet.Spec.Replicas)).To(Equal(6))
} else {
@@ -94,7 +90,7 @@ func TestTiKVScalerScaleOut(t *testing.T) {
hasPVC: true,
hasDeferAnn: false,
pvcDeleteErr: false,
err: false,
errExpectFn: errExpectNil,
changed: true,
},
{
@@ -103,7 +99,7 @@ func TestTiKVScalerScaleOut(t *testing.T) {
hasPVC: true,
hasDeferAnn: false,
pvcDeleteErr: false,
err: false,
errExpectFn: errExpectNil,
changed: false,
},
{
@@ -112,7 +108,7 @@ func TestTiKVScalerScaleOut(t *testing.T) {
hasPVC: false,
hasDeferAnn: false,
pvcDeleteErr: false,
err: false,
errExpectFn: errExpectNil,
changed: true,
},
{
@@ -121,7 +117,7 @@ func TestTiKVScalerScaleOut(t *testing.T) {
hasPVC: true,
hasDeferAnn: true,
pvcDeleteErr: true,
err: true,
errExpectFn: errExpectNotNil,
changed: false,
},
}
@@ -140,7 +136,7 @@ func TestTiKVScalerScaleIn(t *testing.T) {
delStoreErr bool
hasPVC bool
pvcUpdateErr bool
err bool
errExpectFn func(*GomegaWithT, error)
changed bool
}

@@ -177,11 +173,7 @@ func TestTiKVScalerScaleIn(t *testing.T) {
}

err := scaler.ScaleIn(tc, oldSet, newSet)
if test.err {
g.Expect(err).To(HaveOccurred())
} else {
g.Expect(err).NotTo(HaveOccurred())
}
test.errExpectFn(g, err)
if test.changed {
g.Expect(int(*newSet.Spec.Replicas)).To(Equal(4))
} else {
@@ -197,7 +189,7 @@ func TestTiKVScalerScaleIn(t *testing.T) {
delStoreErr: false,
hasPVC: true,
pvcUpdateErr: false,
err: true,
errExpectFn: errExpectNotNil,
changed: false,
},
{
@@ -207,7 +199,7 @@ func TestTiKVScalerScaleIn(t *testing.T) {
delStoreErr: false,
hasPVC: true,
pvcUpdateErr: false,
err: false,
errExpectFn: errExpectNil,
changed: false,
},
{
@@ -220,7 +212,7 @@ func TestTiKVScalerScaleIn(t *testing.T) {
delStoreErr: false,
hasPVC: true,
pvcUpdateErr: false,
err: true,
errExpectFn: errExpectNotNil,
changed: false,
},
{
@@ -235,7 +227,7 @@ func TestTiKVScalerScaleIn(t *testing.T) {
delStoreErr: false,
hasPVC: true,
pvcUpdateErr: false,
err: true,
errExpectFn: errExpectNotNil,
changed: false,
},
{
@@ -250,7 +242,7 @@ func TestTiKVScalerScaleIn(t *testing.T) {
delStoreErr: false,
hasPVC: true,
pvcUpdateErr: false,
err: true,
errExpectFn: errExpectNotNil,
changed: false,
},
{
@@ -265,7 +257,7 @@ func TestTiKVScalerScaleIn(t *testing.T) {
delStoreErr: false,
hasPVC: true,
pvcUpdateErr: false,
err: true,
errExpectFn: errExpectNotNil,
changed: false,
},
{
@@ -275,7 +267,7 @@ func TestTiKVScalerScaleIn(t *testing.T) {
delStoreErr: false,
hasPVC: true,
pvcUpdateErr: false,
err: true,
errExpectFn: errExpectRequeue,
changed: false,
},
{
@@ -285,7 +277,7 @@ func TestTiKVScalerScaleIn(t *testing.T) {
delStoreErr: false,
hasPVC: true,
pvcUpdateErr: false,
err: false,
errExpectFn: errExpectNil,
changed: true,
},
{
@@ -300,7 +292,7 @@ func TestTiKVScalerScaleIn(t *testing.T) {
delStoreErr: false,
hasPVC: true,
pvcUpdateErr: false,
err: true,
errExpectFn: errExpectNotNil,
changed: false,
},
{
@@ -310,7 +302,7 @@ func TestTiKVScalerScaleIn(t *testing.T) {
delStoreErr: false,
hasPVC: false,
pvcUpdateErr: false,
err: true,
errExpectFn: errExpectNotNil,
changed: false,
},
{
@@ -320,7 +312,7 @@ func TestTiKVScalerScaleIn(t *testing.T) {
delStoreErr: false,
hasPVC: true,
pvcUpdateErr: true,
err: true,
errExpectFn: errExpectNotNil,
changed: false,
},
}
@@ -361,3 +353,7 @@ func tombstoneStoreFun(tc *v1alpha1.TidbCluster) {
},
}
}

func errExpectRequeue(g *GomegaWithT, err error) {
g.Expect(controller.IsRequeueError(err)).To(Equal(true))
}
24 changes: 24 additions & 0 deletions vendor/github.com/pkg/errors/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions vendor/github.com/pkg/errors/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions vendor/github.com/pkg/errors/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading