Skip to content

Commit

Permalink
ddl: use a param to decide whether exec in distributed mode (#41549)
Browse files Browse the repository at this point in the history
ref #41534
  • Loading branch information
tangenta authored Feb 17, 2023
1 parent bc95a4f commit 69a9dec
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 8 deletions.
8 changes: 4 additions & 4 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ type backfillCtx struct {
}

func newBackfillCtx(ctx *ddlCtx, id int, sessCtx sessionctx.Context, reorgTp model.ReorgType,
schemaName string, tbl table.Table) *backfillCtx {
if id == 0 {
schemaName string, tbl table.Table, isDistributed bool) *backfillCtx {
if isDistributed {
id = int(backfillContextID.Add(1))
}
return &backfillCtx{
Expand Down Expand Up @@ -921,7 +921,7 @@ func (b *backfillScheduler) adjustWorkerSize() error {
)
switch b.tp {
case typeAddIndexWorker:
backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo.ReorgMeta.ReorgTp, job.SchemaName, b.tbl)
backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo.ReorgMeta.ReorgTp, job.SchemaName, b.tbl, false)
idxWorker, err := newAddIndexWorker(b.decodeColMap, b.tbl, backfillCtx,
jc, job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey)
if err != nil {
Expand All @@ -934,7 +934,7 @@ func (b *backfillScheduler) adjustWorkerSize() error {
runner = newBackfillWorker(jc.ddlJobCtx, idxWorker)
worker = idxWorker
case typeAddIndexMergeTmpWorker:
backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo.ReorgMeta.ReorgTp, job.SchemaName, b.tbl)
backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo.ReorgMeta.ReorgTp, job.SchemaName, b.tbl, false)
tmpIdxWorker := newMergeTempIndexWorker(backfillCtx, i, b.tbl, reorgInfo.currElement.ID, jc)
runner = newBackfillWorker(jc.ddlJobCtx, tmpIdxWorker)
worker = tmpIdxWorker
Expand Down
2 changes: 1 addition & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,7 @@ func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT
}
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)
return &updateColumnWorker{
backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.ReorgMeta.ReorgTp, reorgInfo.SchemaName, t),
backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.ReorgMeta.ReorgTp, reorgInfo.SchemaName, t, false),
oldColInfo: oldCol,
newColInfo: newCol,
metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("update_col_rate", reorgInfo.SchemaName, t.Meta().Name.String())),
Expand Down
2 changes: 1 addition & 1 deletion ddl/dist_backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func newBackfillWorkerContext(d *ddl, schemaName string, tbl table.Table, worker
}

var bf backfiller
bf, err = bfFunc(newBackfillCtx(d.ddlCtx, 0, se, bfMeta.ReorgTp, schemaName, tbl))
bf, err = bfFunc(newBackfillCtx(d.ddlCtx, 0, se, bfMeta.ReorgTp, schemaName, tbl, true))
if err != nil {
if canSkipError(jobID, len(bwCtx.backfillWorkers), err) {
err = nil
Expand Down
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1985,7 +1985,7 @@ func newCleanUpIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT
}
return &cleanUpIndexWorker{
baseIndexWorker: baseIndexWorker{
backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.ReorgMeta.ReorgTp, reorgInfo.SchemaName, t),
backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.ReorgMeta.ReorgTp, reorgInfo.SchemaName, t, false),
indexes: indexes,
rowDecoder: rowDecoder,
defaultVals: make([]types.Datum, len(t.WritableCols())),
Expand Down
2 changes: 1 addition & 1 deletion ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2576,7 +2576,7 @@ func newReorgPartitionWorker(sessCtx sessionctx.Context, i int, t table.Physical
maxOffset = mathutil.Max[int](maxOffset, col.Offset)
}
return &reorgPartitionWorker{
backfillCtx: newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo.ReorgMeta.ReorgTp, reorgInfo.SchemaName, t),
backfillCtx: newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo.ReorgMeta.ReorgTp, reorgInfo.SchemaName, t, false),
metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("reorg_partition_rate", reorgInfo.SchemaName, t.Meta().Name.String())),
rowDecoder: decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap),
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
Expand Down

0 comments on commit 69a9dec

Please sign in to comment.