Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tikv scale in failure in some cases #726

Merged
merged 6 commits into from
Aug 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cmd/controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ var (
leaseDuration = 15 * time.Second
renewDuration = 5 * time.Second
retryPeriod = 3 * time.Second
resyncDuration = 30 * time.Second
waitDuration = 5 * time.Second
)

Expand All @@ -62,6 +61,7 @@ func init() {
flag.DurationVar(&pdFailoverPeriod, "pd-failover-period", time.Duration(5*time.Minute), "PD failover period default(5m)")
flag.DurationVar(&tikvFailoverPeriod, "tikv-failover-period", time.Duration(5*time.Minute), "TiKV failover period default(5m)")
flag.DurationVar(&tidbFailoverPeriod, "tidb-failover-period", time.Duration(5*time.Minute), "TiDB failover period")
flag.DurationVar(&controller.ResyncDuration, "resync-duration", time.Duration(30*time.Second), "Resync time of informer")
flag.BoolVar(&controller.TestMode, "test-mode", false, "whether tidb-operator run in test mode")

flag.Parse()
Expand Down Expand Up @@ -104,18 +104,18 @@ func main() {
var informerFactory informers.SharedInformerFactory
var kubeInformerFactory kubeinformers.SharedInformerFactory
if controller.ClusterScoped {
informerFactory = informers.NewSharedInformerFactory(cli, resyncDuration)
kubeInformerFactory = kubeinformers.NewSharedInformerFactory(kubeCli, resyncDuration)
informerFactory = informers.NewSharedInformerFactory(cli, controller.ResyncDuration)
kubeInformerFactory = kubeinformers.NewSharedInformerFactory(kubeCli, controller.ResyncDuration)
} else {
options := []informers.SharedInformerOption{
informers.WithNamespace(ns),
}
informerFactory = informers.NewSharedInformerFactoryWithOptions(cli, resyncDuration, options...)
informerFactory = informers.NewSharedInformerFactoryWithOptions(cli, controller.ResyncDuration, options...)

kubeoptions := []kubeinformers.SharedInformerOption{
kubeinformers.WithNamespace(ns),
}
kubeInformerFactory = kubeinformers.NewSharedInformerFactoryWithOptions(kubeCli, resyncDuration, kubeoptions...)
kubeInformerFactory = kubeinformers.NewSharedInformerFactoryWithOptions(kubeCli, controller.ResyncDuration, kubeoptions...)
}

rl := resourcelock.EndpointsLock{
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package controller
import (
"fmt"
"math"
"time"

"github.com/golang/glog"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1"
Expand All @@ -33,6 +34,8 @@ var (
ClusterScoped bool
// TestMode defines whether tidb operator run in test mode, test mode is only open when test
TestMode bool
// ResyncDuration is the resync time of informer
ResyncDuration time.Duration
)

const (
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/pvc_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (fpc *FakePVCControl) DeletePVC(_ *v1alpha1.TidbCluster, pvc *corev1.Persis
return fpc.PVCIndexer.Delete(pvc)
}

// Update updates the annotation, labels and spec of pvc
// UpdatePVC updates the annotation, labels and spec of pvc
func (fpc *FakePVCControl) UpdatePVC(_ *v1alpha1.TidbCluster, pvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) {
defer fpc.updatePVCTracker.inc()
if fpc.updatePVCTracker.errorReady() {
Expand Down
40 changes: 30 additions & 10 deletions pkg/manager/member/pd_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestPDScalerScaleOut(t *testing.T) {
pdUpgrading bool
hasPVC bool
hasDeferAnn bool
annoIsNil bool
pvcDeleteErr bool
statusSyncFailed bool
err bool
Expand All @@ -61,19 +62,18 @@ func TestPDScalerScaleOut(t *testing.T) {

scaler, _, pvcIndexer, pvcControl := newFakePDScaler()

pvc1 := newPVCForStatefulSet(oldSet, v1alpha1.PDMemberType)
pvc2 := pvc1.DeepCopy()
pvc1.Name = ordinalPVCName(v1alpha1.PDMemberType, oldSet.GetName(), *oldSet.Spec.Replicas)
pvc2.Name = ordinalPVCName(v1alpha1.PDMemberType, oldSet.GetName(), *oldSet.Spec.Replicas+1)
pvc := newPVCForStatefulSet(oldSet, v1alpha1.PDMemberType)
pvc.Name = ordinalPVCName(v1alpha1.PDMemberType, oldSet.GetName(), *oldSet.Spec.Replicas)
if !test.annoIsNil {
pvc.Annotations = map[string]string{}
}

if test.hasDeferAnn {
pvc1.Annotations = map[string]string{}
pvc1.Annotations[label.AnnPVCDeferDeleting] = time.Now().Format(time.RFC3339)
pvc2.Annotations = map[string]string{}
pvc2.Annotations[label.AnnPVCDeferDeleting] = time.Now().Format(time.RFC3339)
pvc.Annotations = map[string]string{}
pvc.Annotations[label.AnnPVCDeferDeleting] = time.Now().Format(time.RFC3339)
}
if test.hasPVC {
pvcIndexer.Add(pvc1)
pvcIndexer.Add(pvc2)
pvcIndexer.Add(pvc)
}

if test.pvcDeleteErr {
Expand Down Expand Up @@ -102,6 +102,7 @@ func TestPDScalerScaleOut(t *testing.T) {
pdUpgrading: false,
hasPVC: true,
hasDeferAnn: false,
annoIsNil: true,
pvcDeleteErr: false,
statusSyncFailed: false,
err: false,
Expand All @@ -113,6 +114,7 @@ func TestPDScalerScaleOut(t *testing.T) {
pdUpgrading: true,
hasPVC: true,
hasDeferAnn: false,
annoIsNil: true,
pvcDeleteErr: false,
statusSyncFailed: false,
err: false,
Expand All @@ -124,6 +126,19 @@ func TestPDScalerScaleOut(t *testing.T) {
pdUpgrading: false,
hasPVC: false,
hasDeferAnn: false,
annoIsNil: true,
pvcDeleteErr: false,
statusSyncFailed: false,
err: false,
changed: true,
},
{
name: "pvc annotation is not nil but doesn't contain defer deletion annotation",
update: normalPDMember,
pdUpgrading: false,
hasPVC: false,
hasDeferAnn: false,
annoIsNil: false,
pvcDeleteErr: false,
statusSyncFailed: false,
err: false,
Expand All @@ -135,6 +150,7 @@ func TestPDScalerScaleOut(t *testing.T) {
pdUpgrading: false,
hasPVC: true,
hasDeferAnn: true,
annoIsNil: false,
pvcDeleteErr: true,
statusSyncFailed: false,
err: true,
Expand All @@ -155,6 +171,7 @@ func TestPDScalerScaleOut(t *testing.T) {
pdUpgrading: false,
hasPVC: true,
hasDeferAnn: true,
annoIsNil: false,
pvcDeleteErr: false,
statusSyncFailed: false,
err: false,
Expand All @@ -168,6 +185,7 @@ func TestPDScalerScaleOut(t *testing.T) {
pdUpgrading: false,
hasPVC: true,
hasDeferAnn: true,
annoIsNil: false,
pvcDeleteErr: false,
statusSyncFailed: false,
err: true,
Expand All @@ -186,6 +204,7 @@ func TestPDScalerScaleOut(t *testing.T) {
pdUpgrading: false,
hasPVC: true,
hasDeferAnn: false,
annoIsNil: true,
pvcDeleteErr: false,
statusSyncFailed: false,
err: true,
Expand All @@ -197,6 +216,7 @@ func TestPDScalerScaleOut(t *testing.T) {
pdUpgrading: false,
hasPVC: true,
hasDeferAnn: false,
annoIsNil: true,
pvcDeleteErr: false,
statusSyncFailed: true,
err: true,
Expand Down
47 changes: 44 additions & 3 deletions pkg/manager/member/tikv_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/pdapi"
apps "k8s.io/api/apps/v1beta1"
corelisters "k8s.io/client-go/listers/core/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
)

type tikvScaler struct {
Expand Down Expand Up @@ -78,6 +79,7 @@ func (tsd *tikvScaler) ScaleIn(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSe
resetReplicas(newSet, oldSet)
return err
}

for _, store := range tc.Status.TiKV.Stores {
if store.PodName == podName {
state := store.State
Expand Down Expand Up @@ -135,9 +137,48 @@ func (tsd *tikvScaler) ScaleIn(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSe
}
}

// store not found in TidbCluster status,
// this can happen when TiKV joins cluster but we haven't synced its status
// so return error to wait another round for safety
// When store not found in TidbCluster status, there are two situations as follows:
// 1. This can happen when TiKV joins cluster but we haven't synced its status.
// In this situation return error to wait another round for safety.
//
// 2. This can happen when TiKV pod has not been successfully registered in the cluster, such as always pending.
// In this situation we should delete this TiKV pod immediately to avoid blocking the subsequent operations.
if !podutil.IsPodReady(pod) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should only handle Pending pods other than all not ready pods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not only pending, for example, considering this situation, the newly launched pod has been crashing and never really joined the tidb cluster. In this case, we can also scale in safely.

pvcName := ordinalPVCName(v1alpha1.TiKVMemberType, setName, ordinal)
pvc, err := tsd.pvcLister.PersistentVolumeClaims(ns).Get(pvcName)
if err != nil {
resetReplicas(newSet, oldSet)
return err
}
safeTimeDeadline := pod.CreationTimestamp.Add(5 * controller.ResyncDuration)
if time.Now().Before(safeTimeDeadline) {
// Wait for 5 resync periods to ensure that the following situation does not occur:
//
// The tikv pod starts for a while, but has not synced its status, and then the pod becomes not ready.
// Here we wait for 5 resync periods to ensure that the status of this tikv pod has been synced.
// After this period of time, if there is still no information about this tikv in TidbCluster status,
// then we can be sure that this tikv has never been added to the tidb cluster.
// So we can scale in this tikv pod safely.
resetReplicas(newSet, oldSet)
return fmt.Errorf("TiKV %s/%s is not ready, wait for some resync periods to synced its status", ns, podName)
}
if pvc.Annotations == nil {
pvc.Annotations = map[string]string{}
}
now := time.Now().Format(time.RFC3339)
pvc.Annotations[label.AnnPVCDeferDeleting] = now
_, err = tsd.pvcControl.UpdatePVC(tc, pvc)
if err != nil {
glog.Errorf("pod %s not ready, tikv scale in: failed to set pvc %s/%s annotation: %s to %s",
podName, ns, pvcName, label.AnnPVCDeferDeleting, now)
resetReplicas(newSet, oldSet)
return err
}
glog.Infof("pod %s not ready, tikv scale in: set pvc %s/%s annotation: %s to %s",
podName, ns, pvcName, label.AnnPVCDeferDeleting, now)
decreaseReplicas(newSet, oldSet)
return nil
}
resetReplicas(newSet, oldSet)
return fmt.Errorf("TiKV %s/%s not found in cluster", ns, podName)
}
Expand Down
Loading