Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add scheduled-backup test case #322

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 213 additions & 14 deletions tests/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

_ "github.com/go-sql-driver/mysql"
"github.com/golang/glog"
"github.com/pingcap/errors"
pingcapErrors "github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
Expand All @@ -39,8 +39,10 @@ import (
"k8s.io/api/apps/v1beta1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
)
Expand All @@ -64,9 +66,7 @@ func NewOperatorActions(cli versioned.Interface, kubeCli kubernetes.Interface, l
const (
DefaultPollTimeout time.Duration = 10 * time.Minute
DefaultPollInterval time.Duration = 10 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this to 10 seconds, it will be very noisy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xiaojing and I need to check the log and scheduledbackup-cronjob is called job every minute

)

const (
getBackupDirPodName = "get-backup-dir"
grafanaUsername = "admin"
grafanaPassword = "admin"
)
Expand All @@ -93,6 +93,7 @@ type OperatorActions interface {
CheckRestore(from *TidbClusterInfo, to *TidbClusterInfo) error
ForceDeploy(info *TidbClusterInfo) error
CreateSecret(info *TidbClusterInfo) error
getBackupDir(info *TidbClusterInfo) ([]string, error)
}

type FaultTriggerActions interface {
Expand Down Expand Up @@ -135,6 +136,7 @@ type OperatorInfo struct {
}

type TidbClusterInfo struct {
BackupPVC string
Namespace string
ClusterName string
OperatorTag string
Expand All @@ -143,6 +145,7 @@ type TidbClusterInfo struct {
TiDBImage string
StorageClassName string
Password string
InitSql string
RecordCount string
InsertBatchSize string
Resources map[string]string
Expand All @@ -153,9 +156,6 @@ type TidbClusterInfo struct {

func (tc *TidbClusterInfo) HelmSetString() string {

// add a database and table for test
initSql := `"create database record;use record;create table test(t char(32));"`

set := map[string]string{
"clusterName": tc.ClusterName,
"pd.storageClassName": tc.StorageClassName,
Expand All @@ -167,7 +167,7 @@ func (tc *TidbClusterInfo) HelmSetString() string {
"tikv.image": tc.TiKVImage,
"tidb.image": tc.TiDBImage,
"tidb.passwordSecretName": "set-secret",
"tidb.initSql": initSql,
"tidb.initSql": tc.InitSql,
"monitor.create": strconv.FormatBool(tc.Monitor),
}

Expand Down Expand Up @@ -272,6 +272,7 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterInfo) error {
info.ClusterName,
fmt.Sprintf("%s-backup", info.ClusterName),
fmt.Sprintf("%s-restore", info.ClusterName),
fmt.Sprintf("%s-scheduler-backup", info.ClusterName),
}
for _, chartName := range charts {
res, err := exec.Command("helm", "del", "--purge", chartName).CombinedOutput()
Expand All @@ -281,6 +282,12 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterInfo) error {
}
}

err := oa.kubeCli.CoreV1().Pods(info.Namespace).Delete(getBackupDirPodName, &metav1.DeleteOptions{})

if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete dir pod %v", err)
}

setStr := label.New().Instance(info.ClusterName).String()

resources := []string{"pvc"}
Expand Down Expand Up @@ -411,7 +418,7 @@ func (oa *operatorActions) ScaleTidbCluster(info *TidbClusterInfo) error {
glog.Info("[SCALE] " + cmd)
res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput()
if err != nil {
return errors.Wrapf(err, "failed to scale tidb cluster: %s", string(res))
return pingcapErrors.Wrapf(err, "failed to scale tidb cluster: %s", string(res))
}
return nil
}
Expand All @@ -422,7 +429,7 @@ func (oa *operatorActions) UpgradeTidbCluster(info *TidbClusterInfo) error {
glog.Info("[UPGRADE] " + cmd)
res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput()
if err != nil {
return errors.Wrapf(err, "failed to upgrade tidb cluster: %s", string(res))
return pingcapErrors.Wrapf(err, "failed to upgrade tidb cluster: %s", string(res))
}
return nil
}
Expand Down Expand Up @@ -1052,7 +1059,7 @@ func (oa *operatorActions) DeployAdHocBackup(info *TidbClusterInfo) error {
}()
sets := map[string]string{
"clusterName": info.ClusterName,
"name": "test-backup",
"name": info.BackupPVC,
"mode": "backup",
"user": "root",
"password": info.Password,
Expand All @@ -1076,6 +1083,7 @@ func (oa *operatorActions) DeployAdHocBackup(info *TidbClusterInfo) error {
if err != nil {
return fmt.Errorf("failed to launch adhoc backup job: %v, %s", err, string(res))
}

return nil
}

Expand All @@ -1085,7 +1093,7 @@ func (oa *operatorActions) CheckAdHocBackup(info *TidbClusterInfo) error {
glog.Infof("deploy clean backup end cluster[%s] namespace[%s]", info.ClusterName, info.Namespace)
}()

jobName := fmt.Sprintf("%s-%s", info.ClusterName, "test-backup")
jobName := fmt.Sprintf("%s-%s", info.ClusterName, info.BackupPVC)
fn := func() (bool, error) {
job, err := oa.kubeCli.BatchV1().Jobs(info.Namespace).Get(jobName, metav1.GetOptions{})
if err != nil {
Expand All @@ -1104,6 +1112,7 @@ func (oa *operatorActions) CheckAdHocBackup(info *TidbClusterInfo) error {
if err != nil {
return fmt.Errorf("failed to launch scheduler backup job: %v", err)
}

return nil
}

Expand All @@ -1114,7 +1123,7 @@ func (oa *operatorActions) Restore(from *TidbClusterInfo, to *TidbClusterInfo) e
}()
sets := map[string]string{
"clusterName": to.ClusterName,
"name": "test-backup",
"name": to.BackupPVC,
"mode": "restore",
"user": "root",
"password": to.Password,
Expand Down Expand Up @@ -1148,7 +1157,7 @@ func (oa *operatorActions) CheckRestore(from *TidbClusterInfo, to *TidbClusterIn
glog.Infof("check restore end cluster[%s] namespace[%s]", to.ClusterName, to.Namespace)
}()

jobName := fmt.Sprintf("%s-restore-test-backup", to.ClusterName)
jobName := fmt.Sprintf("%s-restore-%s", to.ClusterName, from.BackupPVC)
fn := func() (bool, error) {
job, err := oa.kubeCli.BatchV1().Jobs(to.Namespace).Get(jobName, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -1268,13 +1277,203 @@ func releaseIsExist(err error) bool {
}

func (oa *operatorActions) DeployScheduledBackup(info *TidbClusterInfo) error {
glog.Infof("begin to deploy scheduled backup")
defer func() {
glog.Infof("deploy shceduled backup end")
}()

cron := fmt.Sprintf("'*/1 * * * *'")
sets := map[string]string{
"clusterName": info.ClusterName,
"scheduledBackup.create": "true",
"scheduledBackup.user": "root",
"scheduledBackup.password": info.Password,
"scheduledBackup.schedule": cron,
"scheduledBackup.storage": "10Gi",
}
var buffer bytes.Buffer
for k, v := range sets {
set := fmt.Sprintf(" --set %s=%s", k, v)
_, err := buffer.WriteString(set)
if err != nil {
return err
}
}

setStr := buffer.String()

cmd := fmt.Sprintf("helm upgrade %s /charts/%s/tidb-cluster %s",
info.ClusterName, info.OperatorTag, setStr)

glog.Infof("scheduled-backup delploy [%s]", cmd)
res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput()
if err != nil {
return fmt.Errorf("failed to launch scheduler backup job: %v, %s", err, string(res))
}
return nil
}

func (oa *operatorActions) CheckScheduledBackup(info *TidbClusterInfo) error {
glog.Infof("begin to check scheduler backup cluster[%s] namespace[%s]", info.ClusterName, info.Namespace)
defer func() {
glog.Infof("deploy check scheduler end cluster[%s] namespace[%s]", info.ClusterName, info.Namespace)
}()

jobName := fmt.Sprintf("%s-scheduled-backup", info.ClusterName)
fn := func() (bool, error) {
job, err := oa.kubeCli.BatchV1beta1().CronJobs(info.Namespace).Get(jobName, metav1.GetOptions{})
if err != nil {
glog.Errorf("failed to get cronjobs %s ,%v", jobName, err)
return false, nil
}

jobs, err := oa.kubeCli.BatchV1().Jobs(info.Namespace).List(metav1.ListOptions{})
if err != nil {
glog.Errorf("failed to list jobs %s ,%v", info.Namespace, err)
return false, nil
}

backupJobs := []batchv1.Job{}
for _, j := range jobs.Items {
if pid, found := getParentUIDFromJob(j); found && pid == job.UID {
backupJobs = append(backupJobs, j)
}
}

if len(backupJobs) == 0 {
glog.Errorf("cluster [%s] scheduler jobs is creating, please wait!", info.ClusterName)
return false, nil
}

for _, j := range backupJobs {
if j.Status.Succeeded == 0 {
glog.Errorf("cluster [%s] back up job is not completed, please wait! ", info.ClusterName)
return false, nil
}
}

return true, nil
}

err := wait.Poll(DefaultPollInterval, DefaultPollTimeout, fn)
if err != nil {
return fmt.Errorf("failed to launch scheduler backup job: %v", err)
}

// sleep 1 minute for cronjob
time.Sleep(60 * time.Second)

dirs, err := oa.getBackupDir(info)
if err != nil {
return fmt.Errorf("failed to get backup dir: %v", err)
}

if len(dirs) != 3 {
return fmt.Errorf("scheduler job failed!")
}

return nil
}

func getParentUIDFromJob(j batchv1.Job) (types.UID, bool) {
controllerRef := metav1.GetControllerOf(&j)

if controllerRef == nil {
return types.UID(""), false
}

if controllerRef.Kind != "CronJob" {
glog.Infof("Job with non-CronJob parent, name %s namespace %s", j.Name, j.Namespace)
return types.UID(""), false
}

return controllerRef.UID, true
}

func (oa *operatorActions) getBackupDir(info *TidbClusterInfo) ([]string, error) {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: getBackupDirPodName,
Namespace: info.Namespace,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: getBackupDirPodName,
Image: "pingcap/tidb-cloud-backup:latest",
Command: []string{"sleep", "3000"},
VolumeMounts: []corev1.VolumeMount{
{
Name: "data",
MountPath: "/data",
},
},
},
},
Volumes: []corev1.Volume{
{
Name: "data",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: info.BackupPVC,
},
},
},
},
},
}

fn := func() (bool, error) {
_, err := oa.kubeCli.CoreV1().Pods(info.Namespace).Get(getBackupDirPodName, metav1.GetOptions{})
if !errors.IsNotFound(err) {
return false, nil
}
return true, nil
}

err := wait.Poll(DefaultPollInterval, DefaultPollTimeout, fn)

if err != nil {
return nil, fmt.Errorf("failed to delete pod %s", getBackupDirPodName)
}

_, err = oa.kubeCli.CoreV1().Pods(info.Namespace).Create(pod)
if err != nil && !errors.IsAlreadyExists(err) {
glog.Errorf("cluster: [%s/%s] create get backup dir pod failed, error :%v", info.Namespace, info.ClusterName, err)
return nil, err
}

fn = func() (bool, error) {
_, err := oa.kubeCli.CoreV1().Pods(info.Namespace).Get(getBackupDirPodName, metav1.GetOptions{})
if errors.IsNotFound(err) {
return false, nil
}
return true, nil
}

err = wait.Poll(DefaultPollInterval, DefaultPollTimeout, fn)

if err != nil {
return nil, fmt.Errorf("failed to create pod %s", getBackupDirPodName)
}

cmd := fmt.Sprintf("kubectl exec %s -n %s ls /data", getBackupDirPodName, info.Namespace)
glog.Infof(cmd)
res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput()
if err != nil {
glog.Errorf("cluster:[%s/%s] exec :%s failed,error:%v,result:%s", info.Namespace, info.ClusterName, cmd, err, res)
return nil, err
}

dirs := strings.Split(string(res), "\n")
glog.Infof("dirs in pod info name [%s] dir name [%s]", info.BackupPVC, strings.Join(dirs, ","))
return dirs, nil
}

func (info *TidbClusterInfo) FullName() string {
return fmt.Sprintf("%s/%s", info.Namespace, info.ClusterName)
}

func (oa *operatorActions) DeployIncrementalBackup(from *TidbClusterInfo, to *TidbClusterInfo) error {
return nil
}
Expand Down
Loading