Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
Lance6716/cherry pick 220 (#224)
Browse files Browse the repository at this point in the history
* restore: update and restore GCLifeTime once when parallel (#220)
  • Loading branch information
lance6716 authored and kennytm committed Aug 7, 2019
1 parent c0ba3c9 commit f010cd8
Showing 1 changed file with 76 additions and 31 deletions.
107 changes: 76 additions & 31 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ var (
requiredTiKVVersion = *semver.New("2.1.0")
)

var gcLifeTimeKey struct{}

func init() {
cfg := tidbcfg.GetGlobalConfig()
Expand Down Expand Up @@ -449,6 +448,59 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan
}
}

type gcLifeTimeManager struct {
runningJobsLock sync.Mutex
runningJobs int
oriGCLifeTime string
}

func newGCLifeTimeManager() *gcLifeTimeManager {
// Default values of three member are enough to initialize this struct
return &gcLifeTimeManager{}
}

// Pre- and post-condition:
// if m.runningJobs == 0, GC life time has not been increased.
// if m.runningJobs > 0, GC life time has been increased.
// m.runningJobs won't be negative(overflow) since index concurrency is relatively small
func (m *gcLifeTimeManager) addOneJob(ctx context.Context, db *sql.DB) error {
m.runningJobsLock.Lock()
defer m.runningJobsLock.Unlock()

if m.runningJobs == 0 {
oriGCLifeTime, err := ObtainGCLifeTime(ctx, db)
if err != nil {
return err
}
m.oriGCLifeTime = oriGCLifeTime
err = increaseGCLifeTime(ctx, db)
if err != nil {
return err
}
}
m.runningJobs += 1
return nil
}

// Pre- and post-condition:
// if m.runningJobs == 0, GC life time has been tried to recovered. If this try fails, a warning will be printed.
// if m.runningJobs > 0, GC life time has not been recovered.
// m.runningJobs won't minus to negative since removeOneJob follows a successful addOneJob.
func (m *gcLifeTimeManager) removeOneJob(ctx context.Context, db *sql.DB, table string) {
m.runningJobsLock.Lock()
defer m.runningJobsLock.Unlock()

m.runningJobs -= 1
if m.runningJobs == 0 {
err := UpdateGCLifeTime(ctx, db, m.oriGCLifeTime)
if err != nil && common.ShouldLogError(err) {
common.AppLogger.Errorf("[%s] update tikv_gc_life_time error %v", table, errors.ErrorStack(err))
}
}
}

var gcLifeTimeKey struct{}

func (rc *RestoreController) restoreTables(ctx context.Context) error {
timer := time.Now()
var wg sync.WaitGroup
Expand All @@ -464,11 +516,9 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
}
taskCh := make(chan task, rc.cfg.App.IndexConcurrency)
defer close(taskCh)
oriGCLifeTime, err := ObtainGCLifeTime(ctx, rc.tidbMgr.db)
if err != nil {
return err
}
ctx2 := context.WithValue(ctx, &gcLifeTimeKey, oriGCLifeTime)

manager := newGCLifeTimeManager()
ctx2 := context.WithValue(ctx, &gcLifeTimeKey, manager)
for i := 0; i < rc.cfg.App.IndexConcurrency; i++ {
go func() {
for task := range taskCh {
Expand Down Expand Up @@ -1315,17 +1365,18 @@ func setSessionConcurrencyVars(ctx context.Context, db *sql.DB, dsn config.DBSto
func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum, error) {
timer := time.Now()

ori, err := increaseGCLifeTime(ctx, db)
if err != nil {
return nil, errors.Trace(err)
var err error
manager, ok := ctx.Value(&gcLifeTimeKey).(*gcLifeTimeManager)
if !ok {
return nil, errors.New("No gcLifeTimeManager found in context, check context initialization")
}

if err = manager.addOneJob(ctx, db); err != nil {
return nil, err
}

// set it back finally
defer func() {
err = UpdateGCLifeTime(ctx, db, ori)
if err != nil && common.ShouldLogError(err) {
common.AppLogger.Errorf("[%s] update tikv_gc_life_time error %v", table, errors.ErrorStack(err))
}
}()
defer manager.removeOneJob(ctx, db, table)

// ADMIN CHECKSUM TABLE <table>,<table> example.
// mysql> admin checksum table test.t;
Expand All @@ -1347,25 +1398,19 @@ func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum,
return &cs, nil
}

func increaseGCLifeTime(ctx context.Context, db *sql.DB) (oriGCLifeTime string, err error) {
func increaseGCLifeTime(ctx context.Context, db *sql.DB) (err error) {
// checksum command usually takes a long time to execute,
// so here need to increase the gcLifeTime for single transaction.
// try to get gcLifeTime from context first.
gcLifeTime, ok := ctx.Value(&gcLifeTimeKey).(string)
if !ok {
oriGCLifeTime, err = ObtainGCLifeTime(ctx, db)
if err != nil {
return "", err
}
} else {
oriGCLifeTime = gcLifeTime
}

// try to get gcLifeTimeManager from context first.
// DoChecksum has assure this getting action success.
manager, _ := ctx.Value(&gcLifeTimeKey).(*gcLifeTimeManager)

var increaseGCLifeTime bool
if oriGCLifeTime != "" {
ori, err := time.ParseDuration(oriGCLifeTime)
if manager.oriGCLifeTime != "" {
ori, err := time.ParseDuration(manager.oriGCLifeTime)
if err != nil {
return "", errors.Trace(err)
return errors.Trace(err)
}
if ori < defaultGCLifeTime {
increaseGCLifeTime = true
Expand All @@ -1377,13 +1422,13 @@ func increaseGCLifeTime(ctx context.Context, db *sql.DB) (oriGCLifeTime string,
if increaseGCLifeTime {
err = UpdateGCLifeTime(ctx, db, defaultGCLifeTime.String())
if err != nil {
return "", errors.Trace(err)
return err
}
}

failpoint.Inject("IncreaseGCUpdateDuration", func() {})

return oriGCLifeTime, nil
return nil
}

////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit f010cd8

Please sign in to comment.