From 85131927b648629e99ed3dfc217098d605199e03 Mon Sep 17 00:00:00 2001 From: CWen Date: Tue, 19 Mar 2019 10:54:39 +0800 Subject: [PATCH] stability: add blockWriter case for inserting data (#321) * add blockWriter case to insert data --- tests/actions.go | 27 ++- tests/cmd/e2e/main.go | 6 + tests/pkg/blockWriter/blockWriter.go | 273 +++++++++++++++++++++++++++ tests/pkg/util/db.go | 19 ++ tests/pkg/util/string.go | 33 ++++ tests/pkg/util/time.go | 19 ++ 6 files changed, 376 insertions(+), 1 deletion(-) create mode 100644 tests/pkg/blockWriter/blockWriter.go create mode 100644 tests/pkg/util/db.go create mode 100644 tests/pkg/util/string.go create mode 100644 tests/pkg/util/time.go diff --git a/tests/actions.go b/tests/actions.go index 8819883337..5b72ad0741 100644 --- a/tests/actions.go +++ b/tests/actions.go @@ -34,6 +34,8 @@ import ( "github.com/pingcap/tidb-operator/pkg/client/clientset/versioned" "github.com/pingcap/tidb-operator/pkg/controller" "github.com/pingcap/tidb-operator/pkg/label" + "github.com/pingcap/tidb-operator/tests/pkg/blockwriter" + "github.com/pingcap/tidb-operator/tests/pkg/util" "k8s.io/api/apps/v1beta1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -43,6 +45,13 @@ import ( "k8s.io/client-go/kubernetes" ) +const ( + defaultTableNum int = 64 + defaultConcurrency = 512 + defaultBatchSize = 100 + defaultRawSize = 100 +) + func NewOperatorActions(cli versioned.Interface, kubeCli kubernetes.Interface, logDir string) OperatorActions { return &operatorActions{ cli: cli, @@ -138,6 +147,7 @@ type TidbClusterInfo struct { InsertBatchSize string Resources map[string]string Args map[string]string + blockWriter *blockwriter.BlockWriterCase Monitor bool } @@ -242,6 +252,14 @@ func (oa *operatorActions) DeployTidbCluster(info *TidbClusterInfo) error { info.Namespace, info.ClusterName, err, string(res)) } + // init blockWriter case + info.blockWriter = blockwriter.NewBlockWriterCase(blockwriter.Config{ + TableNum: defaultTableNum, + Concurrency: defaultConcurrency, + BatchSize: defaultBatchSize, + RawSize: defaultRawSize, + }) + return nil } @@ -369,10 +387,17 @@ func (oa *operatorActions) CheckTidbClusterStatus(info *TidbClusterInfo) error { } func (oa *operatorActions) BeginInsertDataTo(info *TidbClusterInfo) error { - return nil + dsn := getDSN(info.Namespace, info.ClusterName, "test", info.Password) + db, err := util.OpenDB(dsn, defaultConcurrency) + if err != nil { + return err + } + + return info.blockWriter.Start(db) } func (oa *operatorActions) StopInsertDataTo(info *TidbClusterInfo) error { + info.blockWriter.Stop() return nil } diff --git a/tests/cmd/e2e/main.go b/tests/cmd/e2e/main.go index 73b303323c..e4a6b4af4e 100644 --- a/tests/cmd/e2e/main.go +++ b/tests/cmd/e2e/main.go @@ -15,6 +15,8 @@ package main import ( "flag" + "net/http" + _ "net/http/pprof" "github.com/golang/glog" "github.com/pingcap/tidb-operator/pkg/client/clientset/versioned" @@ -36,6 +38,10 @@ func main() { logs.InitLogs() defer logs.FlushLogs() + go func() { + glog.Info(http.ListenAndServe("localhost:6060", nil)) + }() + cfg, err := rest.InClusterConfig() if err != nil { glog.Fatalf("failed to get config: %v", err) diff --git a/tests/pkg/blockWriter/blockWriter.go b/tests/pkg/blockWriter/blockWriter.go new file mode 100644 index 0000000000..8434f151b0 --- /dev/null +++ b/tests/pkg/blockWriter/blockWriter.go @@ -0,0 +1,273 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License.package spec + +package blockwriter + +import ( + "context" + "database/sql" + "fmt" + "math/rand" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/golang/glog" + "github.com/pingcap/tidb-operator/tests/pkg/util" + "k8s.io/apimachinery/pkg/util/wait" +) + +const ( + queryChanSize int = 10000 +) + +// BlockWriterCase is for concurrent writing blocks. +type BlockWriterCase struct { + cfg Config + bws []*blockWriter + + isRunning uint32 + isInit uint32 + stopChan chan struct{} + + sync.RWMutex +} + +// Config defines the config of BlockWriterCase +type Config struct { + TableNum int + Concurrency int + BatchSize int + RawSize int +} + +type blockWriter struct { + rawSize int + values []string + batchSize int +} + +// NewBlockWriterCase returns the BlockWriterCase. +func NewBlockWriterCase(cfg Config) *BlockWriterCase { + c := &BlockWriterCase{ + cfg: cfg, + stopChan: make(chan struct{}, 1), + } + + if c.cfg.TableNum < 1 { + c.cfg.TableNum = 1 + } + c.initBlocks() + + return c +} + +func (c *BlockWriterCase) initBlocks() { + c.bws = make([]*blockWriter, c.cfg.Concurrency) + for i := 0; i < c.cfg.Concurrency; i++ { + c.bws[i] = c.newBlockWriter() + } +} + +func (c *BlockWriterCase) newBlockWriter() *blockWriter { + return &blockWriter{ + rawSize: c.cfg.RawSize, + values: make([]string, c.cfg.BatchSize), + batchSize: c.cfg.BatchSize, + } +} + +func (c *BlockWriterCase) generateQuery(ctx context.Context, queryChan chan []string, wg *sync.WaitGroup) { + defer func() { + glog.Infof("[%s] [action: generate Query] stopped", c) + wg.Done() + }() + + for { + tableN := rand.Intn(c.cfg.TableNum) + var index string + if tableN > 0 { + index = fmt.Sprintf("%d", tableN) + } + + var querys []string + for i := 0; i < 100; i++ { + values := make([]string, c.cfg.BatchSize) + for i := 0; i < c.cfg.BatchSize; i++ { + blockData := util.RandString(c.cfg.RawSize) + values[i] = fmt.Sprintf("('%s')", blockData) + } + + querys = append(querys, fmt.Sprintf( + "INSERT INTO block_writer%s(raw_bytes) VALUES %s", + index, strings.Join(values, ","))) + } + + select { + case <-ctx.Done(): + return + default: + if len(queryChan) < queryChanSize { + queryChan <- querys + } else { + glog.Infof("[%s] [action: generate Query] query channel is full, sleep 10 seconds", c) + util.Sleep(ctx, 10*time.Second) + } + } + } +} + +func (bw *blockWriter) batchExecute(db *sql.DB, query string) error { + _, err := db.Exec(query) + if err != nil { + glog.Errorf("[block_writer] exec sql [%s] failed, err: %v", query, err) + return err + } + + return nil +} + +func (bw *blockWriter) run(ctx context.Context, db *sql.DB, queryChan chan []string) { + for { + select { + case <-ctx.Done(): + return + default: + } + + querys, ok := <-queryChan + if !ok { + // No more query + return + } + + for _, query := range querys { + select { + case <-ctx.Done(): + return + default: + if err := bw.batchExecute(db, query); err != nil { + glog.Fatal(err) + } + } + } + } +} + +// Initialize inits case +func (c *BlockWriterCase) initialize(db *sql.DB) error { + glog.Infof("[%s] start to init...", c) + defer func() { + atomic.StoreUint32(&c.isInit, 1) + glog.Infof("[%s] init end...", c) + }() + + for i := 0; i < c.cfg.TableNum; i++ { + var s string + if i > 0 { + s = fmt.Sprintf("%d", i) + } + + tmt := fmt.Sprintf("CREATE TABLE IF NOT EXISTS block_writer%s %s", s, ` + ( + id BIGINT NOT NULL AUTO_INCREMENT, + raw_bytes BLOB NOT NULL, + PRIMARY KEY (id) +)`) + + err := wait.PollImmediate(5*time.Second, 30*time.Second, func() (bool, error) { + _, err := db.Exec(tmt) + if err != nil { + glog.Warningf("[%s] exec sql [%s] failed, err: %v, retry...", c, tmt, err) + return false, nil + } + + return true, nil + }) + + if err != nil { + glog.Errorf("[%s] exec sql [%s] failed, err: %v", c, tmt, err) + return err + } + } + + return nil +} + +// Start starts to run cases +func (c *BlockWriterCase) Start(db *sql.DB) error { + if !atomic.CompareAndSwapUint32(&c.isRunning, 0, 1) { + err := fmt.Errorf("[%s] is running, you can't start it again", c) + glog.Error(err) + return err + } + + defer func() { + c.RLock() + glog.Infof("[%s] stopped", c) + atomic.SwapUint32(&c.isRunning, 0) + }() + + if c.isInit == 0 { + if err := c.initialize(db); err != nil { + return err + } + } + + glog.Infof("[%s] start to execute case...", c) + + var wg sync.WaitGroup + + ctx, cancel := context.WithCancel(context.Background()) + + queryChan := make(chan []string, queryChanSize) + + for i := 0; i < c.cfg.Concurrency; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + c.bws[i].run(ctx, db, queryChan) + }(i) + } + + wg.Add(1) + go c.generateQuery(ctx, queryChan, &wg) + +loop: + for { + select { + case <-c.stopChan: + glog.Infof("[%s] stoping...", c) + cancel() + break loop + default: + util.Sleep(context.Background(), 2*time.Second) + } + } + + wg.Wait() + close(queryChan) + + return nil +} + +// Stop stops cases +func (c *BlockWriterCase) Stop() { + c.stopChan <- struct{}{} +} + +// String implements fmt.Stringer interface. +func (c *BlockWriterCase) String() string { + return "block_writer" +} diff --git a/tests/pkg/util/db.go b/tests/pkg/util/db.go new file mode 100644 index 0000000000..c931f76245 --- /dev/null +++ b/tests/pkg/util/db.go @@ -0,0 +1,19 @@ +package util + +import ( + "database/sql" + + "github.com/golang/glog" +) + +// OpenDB opens db +func OpenDB(dsn string, maxIdleConns int) (*sql.DB, error) { + db, err := sql.Open("mysql", dsn) + if err != nil { + return nil, err + } + + db.SetMaxIdleConns(maxIdleConns) + glog.Info("DB opens successfully") + return db, nil +} diff --git a/tests/pkg/util/string.go b/tests/pkg/util/string.go new file mode 100644 index 0000000000..8334dbe2ba --- /dev/null +++ b/tests/pkg/util/string.go @@ -0,0 +1,33 @@ +package util + +import ( + "math/rand" +) + +const ( + alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + + // Used by RandString + letterIdxBits = 6 // 6 bits to represent a letter index + letterIdxMask = 1<= 0; { + if remain == 0 { + cache, remain = rand.Int63(), letterIdxMax + } + if idx := int(cache & letterIdxMask); idx < len(alphabet) { + b[i] = alphabet[idx] + i-- + } + cache >>= letterIdxBits + remain-- + } + + return string(b) +} diff --git a/tests/pkg/util/time.go b/tests/pkg/util/time.go new file mode 100644 index 0000000000..7e3a4909e7 --- /dev/null +++ b/tests/pkg/util/time.go @@ -0,0 +1,19 @@ +package util + +import ( + "context" + "time" +) + +// Sleep defines special `sleep` with context +func Sleep(ctx context.Context, sleepTime time.Duration) { + ticker := time.NewTicker(sleepTime) + defer ticker.Stop() + + select { + case <-ctx.Done(): + return + case <-ticker.C: + return + } +}