Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable blockWriter write pressure in stability test #399

Merged
merged 13 commits into from
Apr 17, 2019
35 changes: 20 additions & 15 deletions tests/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ import (
)

const (
defaultTableNum int = 64
defaultConcurrency = 512
defaultBatchSize = 100
defaultRawSize = 100

period = 5 * time.Minute
)

Expand Down Expand Up @@ -87,7 +82,8 @@ type OperatorActions interface {
CheckTidbClusterStatus(info *TidbClusterConfig) error
CheckTidbClusterStatusOrDie(info *TidbClusterConfig)
BeginInsertDataTo(info *TidbClusterConfig) error
StopInsertDataTo(info *TidbClusterConfig) error
BeginInsertDataToOrDie(info *TidbClusterConfig)
StopInsertDataTo(info *TidbClusterConfig)
ScaleTidbCluster(info *TidbClusterConfig) error
ScaleTidbClusterOrDie(info *TidbClusterConfig)
CheckScaleInSafely(info *TidbClusterConfig) error
Expand Down Expand Up @@ -156,6 +152,8 @@ type TidbClusterConfig struct {
UserName string
InitSecretName string
BackupSecretName string

BlockWriteConfig blockwriter.Config
}

func (tc *TidbClusterConfig) BackupHelmSetString(m map[string]string) string {
Expand Down Expand Up @@ -321,12 +319,8 @@ func (oa *operatorActions) DeployTidbCluster(info *TidbClusterConfig) error {
}

// init blockWriter case
info.blockWriter = blockwriter.NewBlockWriterCase(blockwriter.Config{
TableNum: defaultTableNum,
Concurrency: defaultConcurrency,
BatchSize: defaultBatchSize,
RawSize: defaultRawSize,
})
info.blockWriter = blockwriter.NewBlockWriterCase(info.BlockWriteConfig)
info.blockWriter.ClusterName = info.ClusterName

return nil
}
Expand Down Expand Up @@ -482,17 +476,28 @@ func (oa *operatorActions) CheckTidbClusterStatusOrDie(info *TidbClusterConfig)

func (oa *operatorActions) BeginInsertDataTo(info *TidbClusterConfig) error {
dsn := getDSN(info.Namespace, info.ClusterName, "test", info.Password)
db, err := util.OpenDB(dsn, defaultConcurrency)
if info.blockWriter == nil {
return fmt.Errorf("block writer not initialized for cluster: %s", info.ClusterName)
}
glog.Infof("[%s] [%s] open TiDB connections, concurrency: %d",
info.blockWriter, info.ClusterName, info.blockWriter.GetConcurrency())
db, err := util.OpenDB(dsn, info.blockWriter.GetConcurrency())
if err != nil {
return err
}

return info.blockWriter.Start(db)
}

func (oa *operatorActions) StopInsertDataTo(info *TidbClusterConfig) error {
func (oa *operatorActions) BeginInsertDataToOrDie(info *TidbClusterConfig) {
err := oa.BeginInsertDataTo(info)
if err != nil {
panic(err)
}
}

func (oa *operatorActions) StopInsertDataTo(info *TidbClusterConfig) {
info.blockWriter.Stop()
return nil
}

func chartPath(name string, tag string) string {
Expand Down
21 changes: 10 additions & 11 deletions tests/backup/backupcase.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,16 @@ func NewBackupCase(operator tests.OperatorActions, srcCluster *tests.TidbCluster
}

func (bc *BackupCase) Run() error {
//err := bc.operator.StopInsertDataTo(bc.srcCluster)
//if err != nil {
// glog.Errorf("cluster:[%s] stop insert data failed,error: %v", bc.srcCluster.ClusterName, err)
// return err
//}

// pause write pressure during backup
bc.operator.StopInsertDataTo(bc.srcCluster)
defer func() {
go func() {
if err := bc.operator.BeginInsertDataTo(bc.srcCluster); err != nil {
glog.Errorf("cluster:[%s] begin insert data failed,error: %v", bc.srcCluster.ClusterName, err)
}
}()
}()

err := bc.operator.DeployAdHocBackup(bc.srcCluster)
if err != nil {
Expand Down Expand Up @@ -119,12 +124,6 @@ func (bc *BackupCase) Run() error {
return fmt.Errorf("cluster:[%s] the src cluster data[%d] is not equals des cluster data[%d]", bc.srcCluster.FullName(), srcCount, desCount)
}

//err = bc.operator.BeginInsertDataTo(bc.srcCluster)
//if err != nil {
// glog.Errorf("cluster:[%s] begin insert data failed,error: %v", bc.srcCluster.ClusterName, err)
// return err
//}

return nil
}

Expand Down
18 changes: 10 additions & 8 deletions tests/cmd/stability/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ func main() {
"tidb.resources.requests.memory": "1Gi",
"monitor.persistent": "true",
},
Args: map[string]string{},
Monitor: true,
Args: map[string]string{},
Monitor: true,
BlockWriteConfig: conf.BlockWriter,
}
cluster2 := &tests.TidbClusterConfig{
Namespace: clusterName2,
Expand Down Expand Up @@ -123,8 +124,9 @@ func main() {
// TODO assert the the monitor's pvc exist and clean it when bootstrapping
"monitor.persistent": "true",
},
Args: map[string]string{},
Monitor: true,
Args: map[string]string{},
Monitor: true,
BlockWriteConfig: conf.BlockWriter,
}

// cluster backup and restore
Expand Down Expand Up @@ -154,10 +156,10 @@ func main() {
oa.CheckTidbClusterStatusOrDie(cluster1)
oa.CheckTidbClusterStatusOrDie(cluster2)

//go func() {
// oa.BeginInsertDataTo(cluster1)
// oa.BeginInsertDataTo(cluster2)
//}()
go oa.BeginInsertDataToOrDie(cluster1)
defer oa.StopInsertDataTo(cluster1)
go oa.BeginInsertDataToOrDie(cluster2)
defer oa.StopInsertDataTo(cluster2)

// TODO add DDL
//var workloads []workload.Workload
Expand Down
20 changes: 19 additions & 1 deletion tests/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,21 @@ package tests
import (
"flag"
"fmt"
"github.com/pingcap/tidb-operator/tests/pkg/blockwriter"
"io/ioutil"
"strings"

"github.com/golang/glog"
"gopkg.in/yaml.v2"
)

const (
defaultTableNum int = 64
defaultConcurrency = 128
defaultBatchSize = 100
defaultRawSize = 100
)

// Config defines the config of operator tests
type Config struct {
configFile string
Expand All @@ -23,6 +31,9 @@ type Config struct {
ETCDs []Nodes `yaml:"etcds" json:"etcds"`
APIServers []Nodes `yaml:"apiservers" json:"apiservers"`

// Block writer
BlockWriter blockwriter.Config `yaml:"block_writer,omitempty"`

// For local test
OperatorRepoDir string `yaml:"operator_repo_dir" json:"operator_repo_dir"`
}
Expand All @@ -35,7 +46,14 @@ type Nodes struct {

// NewConfig creates a new config.
func NewConfig() *Config {
cfg := &Config{}
cfg := &Config{
BlockWriter: blockwriter.Config{
TableNum: defaultTableNum,
Concurrency: defaultConcurrency,
BatchSize: defaultBatchSize,
RawSize: defaultRawSize,
},
}
flag.StringVar(&cfg.configFile, "config", "", "Config file")
flag.StringVar(&cfg.LogDir, "log-dir", "/logDir", "log directory")
flag.IntVar(&cfg.FaultTriggerPort, "fault-trigger-port", 23332, "the http port of fault trigger service")
Expand Down
32 changes: 19 additions & 13 deletions tests/pkg/blockwriter/blockwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,24 @@ const (

// BlockWriterCase is for concurrent writing blocks.
type BlockWriterCase struct {
cfg Config
bws []*blockWriter

isRunning uint32
isInit uint32
stopChan chan struct{}

cfg Config
ClusterName string

sync.RWMutex
}

// Config defines the config of BlockWriterCase
type Config struct {
TableNum int
Concurrency int
BatchSize int
RawSize int
TableNum int `yaml:"table_num" json:"table_num"`
Concurrency int `yaml:"concurrency" json:"concurrency"`
BatchSize int `yaml:"batch_size" json:"batch_size"`
RawSize int `yaml:"raw_size" json:"raw_size"`
}

type blockWriter struct {
Expand All @@ -73,6 +75,10 @@ func NewBlockWriterCase(cfg Config) *BlockWriterCase {
return c
}

func (c *BlockWriterCase) GetConcurrency() int {
return c.cfg.Concurrency
}

func (c *BlockWriterCase) initBlocks() {
c.bws = make([]*blockWriter, c.cfg.Concurrency)
for i := 0; i < c.cfg.Concurrency; i++ {
Expand All @@ -90,7 +96,7 @@ func (c *BlockWriterCase) newBlockWriter() *blockWriter {

func (c *BlockWriterCase) generateQuery(ctx context.Context, queryChan chan []string, wg *sync.WaitGroup) {
defer func() {
glog.Infof("[%s] [action: generate Query] stopped", c)
glog.Infof("[%s] [%s] [action: generate Query] stopped", c, c.ClusterName)
wg.Done()
}()

Expand Down Expand Up @@ -121,7 +127,7 @@ func (c *BlockWriterCase) generateQuery(ctx context.Context, queryChan chan []st
if len(queryChan) < queryChanSize {
queryChan <- querys
} else {
glog.Infof("[%s] [action: generate Query] query channel is full, sleep 10 seconds", c)
glog.Infof("[%s] [%s] [action: generate Query] query channel is full, sleep 10 seconds", c, c.ClusterName)
util.Sleep(ctx, 10*time.Second)
}
}
Expand All @@ -131,7 +137,7 @@ func (c *BlockWriterCase) generateQuery(ctx context.Context, queryChan chan []st
func (bw *blockWriter) batchExecute(db *sql.DB, query string) error {
_, err := db.Exec(query)
if err != nil {
glog.V(4).Infof("[block_writer] exec sql [%s] failed, err: %v", query, err)
glog.V(4).Infof("[%s] exec sql [%s] failed, err: %v", query, err)
return err
}

Expand Down Expand Up @@ -169,10 +175,10 @@ func (bw *blockWriter) run(ctx context.Context, db *sql.DB, queryChan chan []str

// Initialize inits case
func (c *BlockWriterCase) initialize(db *sql.DB) error {
glog.Infof("[%s] start to init...", c)
glog.Infof("[%s] [%s] start to init...", c, c.ClusterName)
defer func() {
atomic.StoreUint32(&c.isInit, 1)
glog.Infof("[%s] init end...", c)
glog.Infof("[%s] [%s] init end...", c, c.ClusterName)
}()

for i := 0; i < c.cfg.TableNum; i++ {
Expand Down Expand Up @@ -210,14 +216,14 @@ func (c *BlockWriterCase) initialize(db *sql.DB) error {
// Start starts to run cases
func (c *BlockWriterCase) Start(db *sql.DB) error {
if !atomic.CompareAndSwapUint32(&c.isRunning, 0, 1) {
err := fmt.Errorf("[%s] is running, you can't start it again", c)
err := fmt.Errorf("[%s] [%s] is running, you can't start it again", c, c.ClusterName)
glog.Error(err)
return nil
}

defer func() {
c.RLock()
glog.Infof("[%s] stopped", c)
glog.Infof("[%s] [%s] stopped", c, c.ClusterName)
atomic.SwapUint32(&c.isRunning, 0)
}()

Expand All @@ -227,7 +233,7 @@ func (c *BlockWriterCase) Start(db *sql.DB) error {
}
}

glog.Infof("[%s] start to execute case...", c)
glog.Infof("[%s] [%s] start to execute case...", c, c.ClusterName)

var wg sync.WaitGroup

Expand Down