Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Speed up DDL execution by send DDL concurrently #377

Merged
merged 14 commits into from
Jul 6, 2020
59 changes: 50 additions & 9 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (rc *Client) CreateTables(
for i, t := range tables {
tbMapping[t.Info.Name.String()] = i
}
dataCh := rc.GoCreateTables(context.TODO(), dom, tables, newTS, errCh)
dataCh := rc.GoCreateTables(context.TODO(), dom, tables, newTS, nil, errCh)
for et := range dataCh {
rules := et.RewriteRule
rewriteRules.Table = append(rewriteRules.Table, rules.Table...)
Expand All @@ -355,11 +355,19 @@ func (rc *Client) CreateTables(
return rewriteRules, newTables, nil
}

func (rc *Client) createTable(dom *domain.Domain, table *utils.Table, newTS uint64) (CreatedTable, error) {
func (rc *Client) createTable(
ctx context.Context,
dom *domain.Domain,
table *utils.Table,
newTS uint64,
) (CreatedTable, error) {
if rc.IsSkipCreateSQL() {
log.Info("skip create table and alter autoIncID", zap.Stringer("table", table.Info.Name))
} else {
err := rc.db.CreateTable(rc.ctx, table)
// don't use rc.ctx here...
// remove the ctx field of Client would be a great work,
// we just take a small step here :<
err := rc.db.CreateTable(ctx, table)
if err != nil {
return CreatedTable{}, err
}
Expand All @@ -378,22 +386,25 @@ func (rc *Client) createTable(dom *domain.Domain, table *utils.Table, newTS uint
}

// GoCreateTables create tables, and generate their information.
// this function will use workers as the same number of sessionPool,
// leave sessionPool nil to send DDLs sequential.
func (rc *Client) GoCreateTables(
ctx context.Context,
dom *domain.Domain,
tables []*utils.Table,
newTS uint64,
sessionPool []glue.Session,
errCh chan<- error,
) <-chan CreatedTable {
// Could we have a smaller size of tables?
outCh := make(chan CreatedTable, len(tables))
createOneTable := func(t *utils.Table) error {
createOneTable := func(ctx context.Context, t *utils.Table) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
rt, err := rc.createTable(dom, t, newTS)
rt, err := rc.createTable(ctx, dom, t, newTS)
if err != nil {
log.Error("create table failed",
zap.Error(err),
Expand All @@ -408,16 +419,46 @@ func (rc *Client) GoCreateTables(
outCh <- rt
return nil
}
startWork := func(t *utils.Table, done func()) {
defer done()
if err := createOneTable(ctx, t); err != nil {
errCh <- err
return
}
}
if len(sessionPool) > 0 {
workers := utils.NewWorkerPool(uint(len(sessionPool)), "DDL workers")
startWork = func(t *utils.Table, done func()) {
workers.ApplyWithID(func(id uint64) {
defer done()
selectedSession := int(id) % len(sessionPool)
vctx := context.WithValue(ctx, SessionInContext, sessionPool[selectedSession])
if err := createOneTable(vctx, t); err != nil {
errCh <- err
return
}
})
}
}

go func() {
wg := new(sync.WaitGroup)
YuJuncen marked this conversation as resolved.
Show resolved Hide resolved
defer close(outCh)
defer log.Info("all tables created")
defer func() {
if len(sessionPool) > 0 {
for _, se := range sessionPool {
se.Close()
}
}
}()

for _, table := range tables {
if err := createOneTable(table); err != nil {
errCh <- err
return
}
tbl := table
wg.Add(1)
startWork(tbl, wg.Done)
}
wg.Wait()
}()
return outCh
}
Expand Down
52 changes: 38 additions & 14 deletions pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@ import (
"github.com/pingcap/br/pkg/utils"
)

// DBContextKey is the key type of the context value this file uses.
type DBContextKey int

// SessionInContext can get the value of current contextual ID from context.
const SessionInContext DBContextKey = iota

// DB is a TiDB instance, not thread-safe.
// If you want share it between goroutines,
// please put the `SessionInContext` value at each goroutine.
type DB struct {
se glue.Session
}

// NewDB returns a new DB.
func NewDB(g glue.Glue, store kv.Storage) (*DB, error) {
func makeSession(g glue.Glue, store kv.Storage) (glue.Session, error) {
se, err := g.CreateSession(store)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -37,6 +44,23 @@ func NewDB(g glue.Glue, store kv.Storage) (*DB, error) {
if err != nil {
return nil, errors.Trace(err)
}
return se, nil
}

func (db *DB) contextualSession(ctx context.Context) glue.Session {
if session, ok := ctx.Value(SessionInContext).(glue.Session); ok {
return session
}
return db.se
}
YuJuncen marked this conversation as resolved.
Show resolved Hide resolved

// NewDB returns a new DB.
func NewDB(g glue.Glue, store kv.Storage) (*DB, error) {
se, err := makeSession(g, store)
if err != nil || se == nil {
return nil, err
}

return &DB{
se: se,
}, nil
Expand All @@ -49,13 +73,13 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error {
dbInfo := ddlJob.BinlogInfo.DBInfo
switch ddlJob.Type {
case model.ActionCreateSchema:
err = db.se.CreateDatabase(ctx, dbInfo)
err = db.contextualSession(ctx).CreateDatabase(ctx, dbInfo)
if err != nil {
log.Error("create database failed", zap.Stringer("db", dbInfo.Name), zap.Error(err))
}
return errors.Trace(err)
case model.ActionCreateTable:
err = db.se.CreateTable(ctx, model.NewCIStr(ddlJob.SchemaName), tableInfo)
err = db.contextualSession(ctx).CreateTable(ctx, model.NewCIStr(ddlJob.SchemaName), tableInfo)
if err != nil {
log.Error("create table failed",
zap.Stringer("db", dbInfo.Name),
Expand All @@ -67,7 +91,7 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error {

if tableInfo != nil {
switchDbSQL := fmt.Sprintf("use %s;", utils.EncloseName(ddlJob.SchemaName))
err = db.se.Execute(ctx, switchDbSQL)
err = db.contextualSession(ctx).Execute(ctx, switchDbSQL)
if err != nil {
log.Error("switch db failed",
zap.String("query", switchDbSQL),
Expand All @@ -76,7 +100,7 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error {
return errors.Trace(err)
}
}
err = db.se.Execute(ctx, ddlJob.Query)
err = db.contextualSession(ctx).Execute(ctx, ddlJob.Query)
if err != nil {
log.Error("execute ddl query failed",
zap.String("query", ddlJob.Query),
Expand All @@ -89,7 +113,7 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error {

// CreateDatabase executes a CREATE DATABASE SQL.
func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {
err := db.se.CreateDatabase(ctx, schema)
err := db.contextualSession(ctx).CreateDatabase(ctx, schema)
if err != nil {
log.Error("create database failed", zap.Stringer("db", schema.Name), zap.Error(err))
}
Expand All @@ -98,7 +122,7 @@ func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {

// CreateTable executes a CREATE TABLE SQL.
func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error {
err := db.se.CreateTable(ctx, table.Db.Name, table.Info)
err := db.contextualSession(ctx).CreateTable(ctx, table.Db.Name, table.Info)
if err != nil {
log.Error("create table failed",
zap.Stringer("db", table.Db.Name),
Expand Down Expand Up @@ -128,7 +152,7 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error {
} else {
setValSQL = fmt.Sprintf(setValFormat, table.Info.Sequence.MaxValue)
}
err = db.se.Execute(ctx, setValSQL)
err = db.contextualSession(ctx).Execute(ctx, setValSQL)
if err != nil {
log.Error("restore meta sql failed",
zap.String("query", setValSQL),
Expand All @@ -139,7 +163,7 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error {
}

// trigger cycle round > 0
err = db.se.Execute(ctx, nextSeqSQL)
err = db.contextualSession(ctx).Execute(ctx, nextSeqSQL)
if err != nil {
log.Error("restore meta sql failed",
zap.String("query", nextSeqSQL),
Expand All @@ -165,7 +189,7 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error {
table.Info.AutoIncID)
}

err = db.se.Execute(ctx, restoreMetaSQL)
err = db.contextualSession(ctx).Execute(ctx, restoreMetaSQL)
if err != nil {
log.Error("restore meta sql failed",
zap.String("query", restoreMetaSQL),
Expand All @@ -185,7 +209,7 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error {
utils.EncloseName(table.Info.Name.O),
table.Info.AutoRandID)

err = db.se.Execute(ctx, alterAutoRandIDSQL)
err = db.contextualSession(ctx).Execute(ctx, alterAutoRandIDSQL)
if err != nil {
log.Error("alter AutoRandID failed",
zap.String("query", alterAutoRandIDSQL),
Expand All @@ -201,7 +225,7 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error {
// AlterTiflashReplica alters the replica count of tiflash.
func (db *DB) AlterTiflashReplica(ctx context.Context, table *utils.Table, count int) error {
switchDbSQL := fmt.Sprintf("use %s;", utils.EncloseName(table.Db.Name.O))
err := db.se.Execute(ctx, switchDbSQL)
err := db.contextualSession(ctx).Execute(ctx, switchDbSQL)
if err != nil {
log.Error("switch db failed",
zap.String("SQL", switchDbSQL),
Expand All @@ -214,7 +238,7 @@ func (db *DB) AlterTiflashReplica(ctx context.Context, table *utils.Table, count
utils.EncloseName(table.Info.Name.O),
count,
)
err = db.se.Execute(ctx, alterTiFlashSQL)
err = db.contextualSession(ctx).Execute(ctx, alterTiFlashSQL)
if err != nil {
log.Error("alter tiflash replica failed",
zap.String("query", alterTiFlashSQL),
Expand Down
14 changes: 13 additions & 1 deletion pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,19 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf

// We make bigger errCh so we won't block on multi-part failed.
errCh := make(chan error, 32)
tableStream := client.GoCreateTables(ctx, mgr.GetDomain(), tables, newTS, errCh)
sessionPool := make([]glue.Session, 0, cfg.Concurrency)
kennytm marked this conversation as resolved.
Show resolved Hide resolved
for i := uint32(0); i < cfg.Concurrency; i++ {
session, e := g.CreateSession(mgr.GetTiKV())
if e != nil {
log.Warn("create session pool failed, we will send DDLs only by created sessions",
zap.Error(e),
zap.Int("sessionCount", len(sessionPool)),
)
break
}
sessionPool = append(sessionPool, session)
}
tableStream := client.GoCreateTables(ctx, mgr.GetDomain(), tables, newTS, sessionPool, errCh)
if len(files) == 0 {
log.Info("no files, empty databases and tables are restored")
summary.SetSuccessStatus(true)
Expand Down
8 changes: 7 additions & 1 deletion pkg/utils/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Worker struct {
}

type taskFunc func()
type identifiedTaskFunc func(uint64)

// NewWorkerPool returns a WorkPool.
func NewWorkerPool(limit uint, name string) *WorkerPool {
Expand All @@ -36,6 +37,11 @@ func NewWorkerPool(limit uint, name string) *WorkerPool {

// Apply executes a task.
func (pool *WorkerPool) Apply(fn taskFunc) {
pool.ApplyWithID(func(_ uint64) { fn() })
}

// ApplyWithID execute a task and provides it with the worker ID.
func (pool *WorkerPool) ApplyWithID(fn identifiedTaskFunc) {
var worker *Worker
select {
case worker = <-pool.workers:
Expand All @@ -44,7 +50,7 @@ func (pool *WorkerPool) Apply(fn taskFunc) {
worker = <-pool.workers
}
go func() {
fn()
fn(worker.ID)
pool.recycle(worker)
}()
}
Expand Down