Skip to content

Commit

Permalink
ddl: sync schema version using watch, notify sessions on owner node b…
Browse files Browse the repository at this point in the history
…y job id (pingcap#53217)

ref pingcap#53246
  • Loading branch information
D3Hunter authored May 17, 2024
1 parent 1c4a9c6 commit 5d990c6
Show file tree
Hide file tree
Showing 8 changed files with 514 additions and 50 deletions.
1 change: 1 addition & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ go_library(
"//pkg/util/engine",
"//pkg/util/filter",
"//pkg/util/gcutil",
"//pkg/util/generic",
"//pkg/util/hack",
"//pkg/util/intest",
"//pkg/util/logutil",
Expand Down
51 changes: 40 additions & 11 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/gcutil"
"github.com/pingcap/tidb/pkg/util/generic"
"github.com/pingcap/tidb/pkg/util/syncutil"
"github.com/tikv/client-go/v2/tikvrpc"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -366,15 +367,17 @@ type ddlCtx struct {
ownerManager owner.Manager
schemaSyncer syncer.SchemaSyncer
stateSyncer syncer.StateSyncer
ddlJobDoneCh chan struct{}
ddlEventCh chan<- *statsutil.DDLEvent
lease time.Duration // lease is schema lease, default 45s, see config.Lease.
binlogCli *pumpcli.PumpsClient // binlogCli is used for Binlog.
infoCache *infoschema.InfoCache
statsHandle *handle.Handle
tableLockCkr util.DeadTableLockChecker
etcdCli *clientv3.Client
autoidCli *autoid.ClientDiscover
// ddlJobDoneChMap is used to notify the session that the DDL job is finished.
// jobID -> chan struct{}
ddlJobDoneChMap generic.SyncMap[int64, chan struct{}]
ddlEventCh chan<- *statsutil.DDLEvent
lease time.Duration // lease is schema lease, default 45s, see config.Lease.
binlogCli *pumpcli.PumpsClient // binlogCli is used for Binlog.
infoCache *infoschema.InfoCache
statsHandle *handle.Handle
tableLockCkr util.DeadTableLockChecker
etcdCli *clientv3.Client
autoidCli *autoid.ClientDiscover

*waitSchemaSyncedController
*schemaVersionManager
Expand Down Expand Up @@ -618,6 +621,27 @@ func (dc *ddlCtx) notifyReorgWorkerJobStateChange(job *model.Job) {
rc.notifyJobState(job.State)
}

func (dc *ddlCtx) initJobDoneCh(jobID int64) {
dc.ddlJobDoneChMap.Store(jobID, make(chan struct{}, 1))
}

func (dc *ddlCtx) getJobDoneCh(jobID int64) (chan struct{}, bool) {
return dc.ddlJobDoneChMap.Load(jobID)
}

func (dc *ddlCtx) delJobDoneCh(jobID int64) {
dc.ddlJobDoneChMap.Delete(jobID)
}

func (dc *ddlCtx) notifyJobDone(jobID int64) {
if ch, ok := dc.ddlJobDoneChMap.Load(jobID); ok {
select {
case ch <- struct{}{}:
default:
}
}
}

// EnableTiFlashPoll enables TiFlash poll loop aka PollTiFlashReplicaStatus.
func EnableTiFlashPoll(d any) {
if dd, ok := d.(*ddl); ok {
Expand Down Expand Up @@ -711,7 +735,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
uuid: id,
store: opt.Store,
lease: opt.Lease,
ddlJobDoneCh: make(chan struct{}, 1),
ddlJobDoneChMap: generic.NewSyncMap[int64, chan struct{}](10),
ownerManager: manager,
schemaSyncer: schemaSyncer,
stateSyncer: stateSyncer,
Expand Down Expand Up @@ -811,6 +835,9 @@ func (d *ddl) prepareWorkers4ConcurrencyDDL() {
})
d.wg.Run(d.startDispatchLoop)
d.wg.Run(d.startLocalWorkerLoop)
d.wg.Run(func() {
d.schemaSyncer.SyncJobSchemaVerLoop(d.ctx)
})
}

// Start implements DDL.Start interface.
Expand Down Expand Up @@ -1169,6 +1196,7 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {

// worker should restart to continue handling tasks in limitJobCh, and send back through task.err
err := <-task.errChs[0]
defer d.delJobDoneCh(job.ID)
if err != nil {
// The transaction of enqueuing job is failed.
return errors.Trace(err)
Expand Down Expand Up @@ -1205,13 +1233,14 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
recordLastDDLInfo(ctx, historyJob)
}()
i := 0
notifyCh, _ := d.getJobDoneCh(job.ID)
for {
failpoint.Inject("storeCloseInLoop", func(_ failpoint.Value) {
_ = d.Stop()
})

select {
case <-d.ddlJobDoneCh:
case <-notifyCh:
case <-ticker.C:
i++
ticker = updateTickerInterval(ticker, 10*d.lease, job, i)
Expand Down
5 changes: 4 additions & 1 deletion pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,9 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error {

jobTasks = append(jobTasks, job)
injectModifyJobArgFailPoint(job)
if !job.LocalMode {
d.initJobDoneCh(job.ID)
}
}

se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
Expand Down Expand Up @@ -881,7 +884,7 @@ func (w *worker) HandleJobDone(d *ddlCtx, job *model.Job, t *meta.Meta) error {
return err
}
CleanupDDLReorgHandles(job, w.sess)
asyncNotify(d.ddlJobDoneCh)
d.notifyJobDone(job.ID)
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/ddl/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ func (s *MockSchemaSyncer) OwnerCheckAllVersions(ctx context.Context, jobID int6
}
}

// SyncJobSchemaVerLoop implements SchemaSyncer.SyncJobSchemaVerLoop interface.
func (*MockSchemaSyncer) SyncJobSchemaVerLoop(context.Context) {
}

// Close implements SchemaSyncer.Close interface.
func (*MockSchemaSyncer) Close() {}

Expand Down
8 changes: 6 additions & 2 deletions pkg/ddl/syncer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//pkg/metrics",
"//pkg/sessionctx/variable",
"//pkg/util",
"//pkg/util/disttask",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@io_etcd_go_etcd_api_v3//mvccpb",
Expand All @@ -30,20 +31,23 @@ go_test(
timeout = "short",
srcs = [
"state_syncer_test.go",
"syncer_nokit_test.go",
"syncer_test.go",
],
embed = [":syncer"],
flaky = True,
shard_count = 3,
shard_count = 6,
deps = [
":syncer",
"//pkg/ddl",
"//pkg/ddl/util",
"//pkg/domain/infosync",
"//pkg/infoschema",
"//pkg/parser/terror",
"//pkg/sessionctx/variable",
"//pkg/store/mockstore",
"//pkg/util",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@io_etcd_go_etcd_api_v3//mvccpb",
"@io_etcd_go_etcd_client_v3//:client",
Expand Down
Loading

0 comments on commit 5d990c6

Please sign in to comment.