diff --git a/tests/actions.go b/tests/actions.go index 02af71ef7aa..a2870d8c731 100644 --- a/tests/actions.go +++ b/tests/actions.go @@ -48,11 +48,6 @@ import ( ) const ( - defaultTableNum int = 64 - defaultConcurrency = 512 - defaultBatchSize = 100 - defaultRawSize = 100 - period = 5 * time.Minute ) @@ -87,7 +82,8 @@ type OperatorActions interface { CheckTidbClusterStatus(info *TidbClusterConfig) error CheckTidbClusterStatusOrDie(info *TidbClusterConfig) BeginInsertDataTo(info *TidbClusterConfig) error - StopInsertDataTo(info *TidbClusterConfig) error + BeginInsertDataToOrDie(info *TidbClusterConfig) + StopInsertDataTo(info *TidbClusterConfig) ScaleTidbCluster(info *TidbClusterConfig) error ScaleTidbClusterOrDie(info *TidbClusterConfig) CheckScaleInSafely(info *TidbClusterConfig) error @@ -156,6 +152,8 @@ type TidbClusterConfig struct { UserName string InitSecretName string BackupSecretName string + + BlockWriteConfig blockwriter.Config } func (tc *TidbClusterConfig) BackupHelmSetString(m map[string]string) string { @@ -321,12 +319,8 @@ func (oa *operatorActions) DeployTidbCluster(info *TidbClusterConfig) error { } // init blockWriter case - info.blockWriter = blockwriter.NewBlockWriterCase(blockwriter.Config{ - TableNum: defaultTableNum, - Concurrency: defaultConcurrency, - BatchSize: defaultBatchSize, - RawSize: defaultRawSize, - }) + info.blockWriter = blockwriter.NewBlockWriterCase(info.BlockWriteConfig) + info.blockWriter.ClusterName = info.ClusterName return nil } @@ -482,7 +476,12 @@ func (oa *operatorActions) CheckTidbClusterStatusOrDie(info *TidbClusterConfig) func (oa *operatorActions) BeginInsertDataTo(info *TidbClusterConfig) error { dsn := getDSN(info.Namespace, info.ClusterName, "test", info.Password) - db, err := util.OpenDB(dsn, defaultConcurrency) + if info.blockWriter == nil { + return fmt.Errorf("block writer not initialized for cluster: %s", info.ClusterName) + } + glog.Infof("[%s] [%s] open TiDB connections, concurrency: %d", + info.blockWriter, info.ClusterName, info.blockWriter.GetConcurrency()) + db, err := util.OpenDB(dsn, info.blockWriter.GetConcurrency()) if err != nil { return err } @@ -490,9 +489,15 @@ func (oa *operatorActions) BeginInsertDataTo(info *TidbClusterConfig) error { return info.blockWriter.Start(db) } -func (oa *operatorActions) StopInsertDataTo(info *TidbClusterConfig) error { +func (oa *operatorActions) BeginInsertDataToOrDie(info *TidbClusterConfig) { + err := oa.BeginInsertDataTo(info) + if err != nil { + panic(err) + } +} + +func (oa *operatorActions) StopInsertDataTo(info *TidbClusterConfig) { info.blockWriter.Stop() - return nil } func chartPath(name string, tag string) string { diff --git a/tests/backup/backupcase.go b/tests/backup/backupcase.go index 71b13c6b9c9..9c9991ac31d 100644 --- a/tests/backup/backupcase.go +++ b/tests/backup/backupcase.go @@ -36,11 +36,16 @@ func NewBackupCase(operator tests.OperatorActions, srcCluster *tests.TidbCluster } func (bc *BackupCase) Run() error { - //err := bc.operator.StopInsertDataTo(bc.srcCluster) - //if err != nil { - // glog.Errorf("cluster:[%s] stop insert data failed,error: %v", bc.srcCluster.ClusterName, err) - // return err - //} + + // pause write pressure during backup + bc.operator.StopInsertDataTo(bc.srcCluster) + defer func() { + go func() { + if err := bc.operator.BeginInsertDataTo(bc.srcCluster); err != nil { + glog.Errorf("cluster:[%s] begin insert data failed,error: %v", bc.srcCluster.ClusterName, err) + } + }() + }() err := bc.operator.DeployAdHocBackup(bc.srcCluster) if err != nil { @@ -119,12 +124,6 @@ func (bc *BackupCase) Run() error { return fmt.Errorf("cluster:[%s] the src cluster data[%d] is not equals des cluster data[%d]", bc.srcCluster.FullName(), srcCount, desCount) } - //err = bc.operator.BeginInsertDataTo(bc.srcCluster) - //if err != nil { - // glog.Errorf("cluster:[%s] begin insert data failed,error: %v", bc.srcCluster.ClusterName, err) - // return err - //} - return nil } diff --git a/tests/cmd/stability/main.go b/tests/cmd/stability/main.go index e9a5733eefa..c91ba20e685 100644 --- a/tests/cmd/stability/main.go +++ b/tests/cmd/stability/main.go @@ -90,8 +90,9 @@ func main() { "tidb.resources.requests.memory": "1Gi", "monitor.persistent": "true", }, - Args: map[string]string{}, - Monitor: true, + Args: map[string]string{}, + Monitor: true, + BlockWriteConfig: conf.BlockWriter, } cluster2 := &tests.TidbClusterConfig{ Namespace: clusterName2, @@ -123,8 +124,9 @@ func main() { // TODO assert the the monitor's pvc exist and clean it when bootstrapping "monitor.persistent": "true", }, - Args: map[string]string{}, - Monitor: true, + Args: map[string]string{}, + Monitor: true, + BlockWriteConfig: conf.BlockWriter, } // cluster backup and restore @@ -154,10 +156,10 @@ func main() { oa.CheckTidbClusterStatusOrDie(cluster1) oa.CheckTidbClusterStatusOrDie(cluster2) - //go func() { - // oa.BeginInsertDataTo(cluster1) - // oa.BeginInsertDataTo(cluster2) - //}() + go oa.BeginInsertDataToOrDie(cluster1) + defer oa.StopInsertDataTo(cluster1) + go oa.BeginInsertDataToOrDie(cluster2) + defer oa.StopInsertDataTo(cluster2) // TODO add DDL //var workloads []workload.Workload diff --git a/tests/config.go b/tests/config.go index 7af7be0a73e..b2e28b58bd5 100644 --- a/tests/config.go +++ b/tests/config.go @@ -3,6 +3,7 @@ package tests import ( "flag" "fmt" + "github.com/pingcap/tidb-operator/tests/pkg/blockwriter" "io/ioutil" "strings" @@ -10,6 +11,13 @@ import ( "gopkg.in/yaml.v2" ) +const ( + defaultTableNum int = 64 + defaultConcurrency = 128 + defaultBatchSize = 100 + defaultRawSize = 100 +) + // Config defines the config of operator tests type Config struct { configFile string @@ -23,6 +31,9 @@ type Config struct { ETCDs []Nodes `yaml:"etcds" json:"etcds"` APIServers []Nodes `yaml:"apiservers" json:"apiservers"` + // Block writer + BlockWriter blockwriter.Config `yaml:"block_writer,omitempty"` + // For local test OperatorRepoDir string `yaml:"operator_repo_dir" json:"operator_repo_dir"` } @@ -35,7 +46,14 @@ type Nodes struct { // NewConfig creates a new config. func NewConfig() *Config { - cfg := &Config{} + cfg := &Config{ + BlockWriter: blockwriter.Config{ + TableNum: defaultTableNum, + Concurrency: defaultConcurrency, + BatchSize: defaultBatchSize, + RawSize: defaultRawSize, + }, + } flag.StringVar(&cfg.configFile, "config", "", "Config file") flag.StringVar(&cfg.LogDir, "log-dir", "/logDir", "log directory") flag.IntVar(&cfg.FaultTriggerPort, "fault-trigger-port", 23332, "the http port of fault trigger service") diff --git a/tests/pkg/blockwriter/blockwriter.go b/tests/pkg/blockwriter/blockwriter.go index f7604dc9b40..49d62afdf4e 100644 --- a/tests/pkg/blockwriter/blockwriter.go +++ b/tests/pkg/blockwriter/blockwriter.go @@ -34,22 +34,24 @@ const ( // BlockWriterCase is for concurrent writing blocks. type BlockWriterCase struct { - cfg Config bws []*blockWriter isRunning uint32 isInit uint32 stopChan chan struct{} + cfg Config + ClusterName string + sync.RWMutex } // Config defines the config of BlockWriterCase type Config struct { - TableNum int - Concurrency int - BatchSize int - RawSize int + TableNum int `yaml:"table_num" json:"table_num"` + Concurrency int `yaml:"concurrency" json:"concurrency"` + BatchSize int `yaml:"batch_size" json:"batch_size"` + RawSize int `yaml:"raw_size" json:"raw_size"` } type blockWriter struct { @@ -73,6 +75,10 @@ func NewBlockWriterCase(cfg Config) *BlockWriterCase { return c } +func (c *BlockWriterCase) GetConcurrency() int { + return c.cfg.Concurrency +} + func (c *BlockWriterCase) initBlocks() { c.bws = make([]*blockWriter, c.cfg.Concurrency) for i := 0; i < c.cfg.Concurrency; i++ { @@ -90,7 +96,7 @@ func (c *BlockWriterCase) newBlockWriter() *blockWriter { func (c *BlockWriterCase) generateQuery(ctx context.Context, queryChan chan []string, wg *sync.WaitGroup) { defer func() { - glog.Infof("[%s] [action: generate Query] stopped", c) + glog.Infof("[%s] [%s] [action: generate Query] stopped", c, c.ClusterName) wg.Done() }() @@ -121,7 +127,7 @@ func (c *BlockWriterCase) generateQuery(ctx context.Context, queryChan chan []st if len(queryChan) < queryChanSize { queryChan <- querys } else { - glog.Infof("[%s] [action: generate Query] query channel is full, sleep 10 seconds", c) + glog.Infof("[%s] [%s] [action: generate Query] query channel is full, sleep 10 seconds", c, c.ClusterName) util.Sleep(ctx, 10*time.Second) } } @@ -131,7 +137,7 @@ func (c *BlockWriterCase) generateQuery(ctx context.Context, queryChan chan []st func (bw *blockWriter) batchExecute(db *sql.DB, query string) error { _, err := db.Exec(query) if err != nil { - glog.V(4).Infof("[block_writer] exec sql [%s] failed, err: %v", query, err) + glog.V(4).Infof("[%s] exec sql [%s] failed, err: %v", query, err) return err } @@ -169,10 +175,10 @@ func (bw *blockWriter) run(ctx context.Context, db *sql.DB, queryChan chan []str // Initialize inits case func (c *BlockWriterCase) initialize(db *sql.DB) error { - glog.Infof("[%s] start to init...", c) + glog.Infof("[%s] [%s] start to init...", c, c.ClusterName) defer func() { atomic.StoreUint32(&c.isInit, 1) - glog.Infof("[%s] init end...", c) + glog.Infof("[%s] [%s] init end...", c, c.ClusterName) }() for i := 0; i < c.cfg.TableNum; i++ { @@ -210,14 +216,14 @@ func (c *BlockWriterCase) initialize(db *sql.DB) error { // Start starts to run cases func (c *BlockWriterCase) Start(db *sql.DB) error { if !atomic.CompareAndSwapUint32(&c.isRunning, 0, 1) { - err := fmt.Errorf("[%s] is running, you can't start it again", c) + err := fmt.Errorf("[%s] [%s] is running, you can't start it again", c, c.ClusterName) glog.Error(err) return nil } defer func() { c.RLock() - glog.Infof("[%s] stopped", c) + glog.Infof("[%s] [%s] stopped", c, c.ClusterName) atomic.SwapUint32(&c.isRunning, 0) }() @@ -227,7 +233,7 @@ func (c *BlockWriterCase) Start(db *sql.DB) error { } } - glog.Infof("[%s] start to execute case...", c) + glog.Infof("[%s] [%s] start to execute case...", c, c.ClusterName) var wg sync.WaitGroup