Skip to content

Commit

Permalink
*: REORGANIZE PARTITION (pingcap#38535) (pingcap#41096)
Browse files Browse the repository at this point in the history
  • Loading branch information
mjonss authored and blacktear23 committed Feb 15, 2023
1 parent 6813529 commit 5b40a63
Show file tree
Hide file tree
Showing 40 changed files with 2,466 additions and 219 deletions.
2 changes: 2 additions & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ go_test(
"placement_policy_test.go",
"placement_sql_test.go",
"primary_key_handle_test.go",
"reorg_partition_test.go",
"repair_table_test.go",
"resource_group_test.go",
"restart_test.go",
Expand Down Expand Up @@ -265,6 +266,7 @@ go_test(
"//util/domainutil",
"//util/gcutil",
"//util/logutil",
"//util/mathutil",
"//util/mock",
"//util/sem",
"//util/sqlexec",
Expand Down
16 changes: 13 additions & 3 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const (
typeUpdateColumnWorker backfillerType = 1
typeCleanUpIndexWorker backfillerType = 2
typeAddIndexMergeTmpWorker backfillerType = 3
typeReorgPartitionWorker backfillerType = 4

// InstanceLease is the instance lease.
InstanceLease = 1 * time.Minute
Expand All @@ -85,6 +86,8 @@ func (bT backfillerType) String() string {
return "clean up index"
case typeAddIndexMergeTmpWorker:
return "merge temporary index"
case typeReorgPartitionWorker:
return "reorganize partition"
default:
return "unknown"
}
Expand Down Expand Up @@ -140,6 +143,7 @@ func GetLeaseGoTime(currTime time.Time, lease time.Duration) types.Time {
// 1: add-index
// 2: modify-column-type
// 3: clean-up global index
// 4: reorganize partition
//
// They all have a write reorganization state to back fill data into the rows existed.
// Backfilling is time consuming, to accelerate this process, TiDB has built some sub
Expand Down Expand Up @@ -660,7 +664,6 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
return errors.Trace(err)
}

// nextHandle will be updated periodically in runReorgJob, so no need to update it here.
dc.getReorgCtx(reorgInfo.Job.ID).setNextKey(nextKey)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblOK).Observe(elapsedTime.Seconds())
logutil.BgLogger().Info("[ddl] backfill workers successfully processed batch",
Expand Down Expand Up @@ -707,7 +710,7 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,

task := &reorgBackfillTask{
id: i,
jobID: job.ID,
jobID: reorgInfo.Job.ID,
physicalTable: phyTbl,
priority: reorgInfo.Priority,
startKey: startKey,
Expand All @@ -724,7 +727,7 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,
}

// handleRangeTasks sends tasks to workers, and returns remaining kvRanges that is not handled.
func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table,
func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.PhysicalTable,
totalAddedCount *int64, kvRanges []kv.KeyRange) ([]kv.KeyRange, error) {
batchTasks := getBatchTasks(t, scheduler.reorgInfo, kvRanges, backfillTaskChanSize)
if len(batchTasks) == 0 {
Expand Down Expand Up @@ -934,6 +937,13 @@ func (b *backfillScheduler) adjustWorkerSize() error {
idxWorker := newCleanUpIndexWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc)
runner = newBackfillWorker(jc.ddlJobCtx, idxWorker)
worker = idxWorker
case typeReorgPartitionWorker:
partWorker, err := newReorgPartitionWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc)
if err != nil {
return err
}
runner = newBackfillWorker(jc.ddlJobCtx, partWorker)
worker = partWorker
default:
return errors.New("unknown backfill type")
}
Expand Down
43 changes: 38 additions & 5 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,9 +992,37 @@ func BuildElements(changingCol *model.ColumnInfo, changingIdxs []*model.IndexInf
return elements
}

func (w *worker) updatePhysicalTableRow(t table.PhysicalTable, reorgInfo *reorgInfo) error {
func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) error {
logutil.BgLogger().Info("[ddl] start to update table row", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String()))
return w.writePhysicalTableRecord(w.sessPool, t, typeUpdateColumnWorker, reorgInfo)
if tbl, ok := t.(table.PartitionedTable); ok {
done := false
for !done {
p := tbl.GetPartition(reorgInfo.PhysicalTableID)
if p == nil {
return dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", reorgInfo.PhysicalTableID, t.Meta().ID)
}
workType := typeReorgPartitionWorker
if reorgInfo.Job.Type != model.ActionReorganizePartition {
// workType = typeUpdateColumnWorker
// TODO: Support Modify Column on partitioned table
// https://github.com/pingcap/tidb/issues/38297
return dbterror.ErrCancelledDDLJob.GenWithStack("Modify Column on partitioned table / typeUpdateColumnWorker not yet supported.")
}
err := w.writePhysicalTableRecord(w.sessPool, p, workType, reorgInfo)
if err != nil {
return err
}
done, err = updateReorgInfo(w.sessPool, tbl, reorgInfo)
if err != nil {
return errors.Trace(err)
}
}
return nil
}
if tbl, ok := t.(table.PhysicalTable); ok {
return w.writePhysicalTableRecord(w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo)
}
return dbterror.ErrCancelledDDLJob.GenWithStack("internal error for phys tbl id: %d tbl id: %d", reorgInfo.PhysicalTableID, t.Meta().ID)
}

// TestReorgGoroutineRunning is only used in test to indicate the reorg goroutine has been started.
Expand Down Expand Up @@ -1025,6 +1053,11 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error
}
}

if _, ok := t.(table.PartitionedTable); ok {
// TODO: remove when modify column of partitioned table is supported
// https://github.com/pingcap/tidb/issues/38297
return dbterror.ErrCancelledDDLJob.GenWithStack("Modify Column on partitioned table / typeUpdateColumnWorker not yet supported.")
}
// Get the original start handle and end handle.
currentVer, err := getValidCurrentVersion(reorgInfo.d.store)
if err != nil {
Expand Down Expand Up @@ -1152,8 +1185,8 @@ type rowRecord struct {
warning *terror.Error // It's used to record the cast warning of a record.
}

// getNextKey gets next handle of entry that we are going to process.
func (*updateColumnWorker) getNextKey(taskRange reorgBackfillTask,
// getNextHandleKey gets next handle of entry that we are going to process.
func getNextHandleKey(taskRange reorgBackfillTask,
taskDone bool, lastAccessedHandle kv.Key) (nextHandle kv.Key) {
if !taskDone {
// The task is not done. So we need to pick the last processed entry's handle and add one.
Expand Down Expand Up @@ -1203,7 +1236,7 @@ func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorg
}

logutil.BgLogger().Debug("[ddl] txn fetches handle info", zap.Uint64("txnStartTS", txn.StartTS()), zap.String("taskRange", taskRange.String()), zap.Duration("takeTime", time.Since(startTime)))
return w.rowRecords, w.getNextKey(taskRange, taskDone, lastAccessedHandle), taskDone, errors.Trace(err)
return w.rowRecords, getNextHandleKey(taskRange, taskDone, lastAccessedHandle), taskDone, errors.Trace(err)
}

func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, rawRow []byte) error {
Expand Down
4 changes: 2 additions & 2 deletions ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,15 +688,15 @@ func TestTransactionWithWriteOnlyColumn(t *testing.T) {
dom.DDL().SetHook(hook)
done := make(chan error, 1)
// test transaction on add column.
go backgroundExec(store, "alter table t1 add column c int not null", done)
go backgroundExec(store, "test", "alter table t1 add column c int not null", done)
err := <-done
require.NoError(t, err)
require.NoError(t, checkErr)
tk.MustQuery("select a from t1").Check(testkit.Rows("2"))
tk.MustExec("delete from t1")

// test transaction on drop column.
go backgroundExec(store, "alter table t1 drop column c", done)
go backgroundExec(store, "test", "alter table t1 drop column c", done)
err = <-done
require.NoError(t, err)
require.NoError(t, checkErr)
Expand Down
79 changes: 77 additions & 2 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,14 +1215,14 @@ func TestBitDefaultValue(t *testing.T) {
);`)
}

func backgroundExec(s kv.Storage, sql string, done chan error) {
func backgroundExec(s kv.Storage, schema, sql string, done chan error) {
se, err := session.CreateSession4Test(s)
if err != nil {
done <- errors.Trace(err)
return
}
defer se.Close()
_, err = se.Execute(context.Background(), "use test")
_, err = se.Execute(context.Background(), "use "+schema)
if err != nil {
done <- errors.Trace(err)
return
Expand Down Expand Up @@ -4292,3 +4292,78 @@ func TestRegexpFunctionsGeneratedColumn(t *testing.T) {

tk.MustExec("drop table if exists reg_like")
}

func TestReorgPartitionRangeFailure(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(`create schema reorgfail`)
tk.MustExec("use reorgfail")

tk.MustExec("CREATE TABLE t (id int, d varchar(255)) partition by range (id) (partition p0 values less than (1000000), partition p1 values less than (2000000), partition p2 values less than (3000000))")
tk.MustContainErrMsg(`ALTER TABLE t REORGANIZE PARTITION p0,p2 INTO (PARTITION p0 VALUES LESS THAN (1000000))`, "[ddl:8200]Unsupported REORGANIZE PARTITION of RANGE; not adjacent partitions")
tk.MustContainErrMsg(`ALTER TABLE t REORGANIZE PARTITION p0,p2 INTO (PARTITION p0 VALUES LESS THAN (4000000))`, "[ddl:8200]Unsupported REORGANIZE PARTITION of RANGE; not adjacent partitions")
}

func TestReorgPartitionDocs(t *testing.T) {
// To test what is added as partition management in the docs
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(`create schema reorgdocs`)
tk.MustExec("use reorgdocs")
tk.MustExec(`CREATE TABLE members (
id int,
fname varchar(255),
lname varchar(255),
dob date,
data json
)
PARTITION BY RANGE (YEAR(dob)) (
PARTITION pBefore1950 VALUES LESS THAN (1950),
PARTITION p1950 VALUES LESS THAN (1960),
PARTITION p1960 VALUES LESS THAN (1970),
PARTITION p1970 VALUES LESS THAN (1980),
PARTITION p1980 VALUES LESS THAN (1990),
PARTITION p1990 VALUES LESS THAN (2000))`)
tk.MustExec(`CREATE TABLE member_level (
id int,
level int,
achievements json
)
PARTITION BY LIST (level) (
PARTITION l1 VALUES IN (1),
PARTITION l2 VALUES IN (2),
PARTITION l3 VALUES IN (3),
PARTITION l4 VALUES IN (4),
PARTITION l5 VALUES IN (5));`)
tk.MustExec(`ALTER TABLE members DROP PARTITION p1990`)
tk.MustExec(`ALTER TABLE member_level DROP PARTITION l5`)
tk.MustExec(`ALTER TABLE members TRUNCATE PARTITION p1980`)
tk.MustExec(`ALTER TABLE member_level TRUNCATE PARTITION l4`)
tk.MustExec("ALTER TABLE members ADD PARTITION (PARTITION `p1990to2010` VALUES LESS THAN (2010))")
tk.MustExec(`ALTER TABLE member_level ADD PARTITION (PARTITION l5_6 VALUES IN (5,6))`)
tk.MustContainErrMsg(`ALTER TABLE members ADD PARTITION (PARTITION p1990 VALUES LESS THAN (2000))`, "[ddl:1493]VALUES LESS THAN value must be strictly increasing for each partition")
tk.MustExec(`ALTER TABLE members REORGANIZE PARTITION p1990to2010 INTO
(PARTITION p1990 VALUES LESS THAN (2000),
PARTITION p2000 VALUES LESS THAN (2010),
PARTITION p2010 VALUES LESS THAN (2020),
PARTITION p2020 VALUES LESS THAN (2030),
PARTITION pMax VALUES LESS THAN (MAXVALUE))`)
tk.MustExec(`ALTER TABLE member_level REORGANIZE PARTITION l5_6 INTO
(PARTITION l5 VALUES IN (5),
PARTITION l6 VALUES IN (6))`)
tk.MustExec(`ALTER TABLE members REORGANIZE PARTITION pBefore1950,p1950 INTO (PARTITION pBefore1960 VALUES LESS THAN (1960))`)
tk.MustExec(`ALTER TABLE member_level REORGANIZE PARTITION l1,l2 INTO (PARTITION l1_2 VALUES IN (1,2))`)
tk.MustExec(`ALTER TABLE members REORGANIZE PARTITION pBefore1960,p1960,p1970,p1980,p1990,p2000,p2010,p2020,pMax INTO
(PARTITION p1800 VALUES LESS THAN (1900),
PARTITION p1900 VALUES LESS THAN (2000),
PARTITION p2000 VALUES LESS THAN (2100))`)
tk.MustExec(`ALTER TABLE member_level REORGANIZE PARTITION l1_2,l3,l4,l5,l6 INTO
(PARTITION lOdd VALUES IN (1,3,5),
PARTITION lEven VALUES IN (2,4,6))`)
tk.MustContainErrMsg(`ALTER TABLE members REORGANIZE PARTITION p1800,p2000 INTO (PARTITION p2000 VALUES LESS THAN (2100))`, "[ddl:8200]Unsupported REORGANIZE PARTITION of RANGE; not adjacent partitions")
tk.MustExec(`INSERT INTO members VALUES (313, "John", "Doe", "2022-11-22", NULL)`)
tk.MustExec(`ALTER TABLE members REORGANIZE PARTITION p2000 INTO (PARTITION p2000 VALUES LESS THAN (2050))`)
tk.MustContainErrMsg(`ALTER TABLE members REORGANIZE PARTITION p2000 INTO (PARTITION p2000 VALUES LESS THAN (2020))`, "[table:1526]Table has no partition for value 2022")
tk.MustExec(`INSERT INTO member_level (id, level) values (313, 6)`)
tk.MustContainErrMsg(`ALTER TABLE member_level REORGANIZE PARTITION lEven INTO (PARTITION lEven VALUES IN (2,4))`, "[table:1526]Table has no partition for value 6")
}
8 changes: 3 additions & 5 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3337,9 +3337,6 @@ func TestPartitionErrorCode(t *testing.T) {
);`)
tk.MustGetDBError("alter table t_part coalesce partition 4;", dbterror.ErrCoalesceOnlyOnHashPartition)

tk.MustGetErrCode(`alter table t_part reorganize partition p0, p1 into (
partition p0 values less than (1980));`, errno.ErrUnsupportedDDLOperation)

tk.MustGetErrCode("alter table t_part check partition p0, p1;", errno.ErrUnsupportedDDLOperation)
tk.MustGetErrCode("alter table t_part optimize partition p0,p1;", errno.ErrUnsupportedDDLOperation)
tk.MustGetErrCode("alter table t_part rebuild partition p0,p1;", errno.ErrUnsupportedDDLOperation)
Expand Down Expand Up @@ -3751,9 +3748,9 @@ func TestTruncatePartitionMultipleTimes(t *testing.T) {
}
hook.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
done1 := make(chan error, 1)
go backgroundExec(store, "alter table test.t truncate partition p0;", done1)
go backgroundExec(store, "test", "alter table test.t truncate partition p0;", done1)
done2 := make(chan error, 1)
go backgroundExec(store, "alter table test.t truncate partition p0;", done2)
go backgroundExec(store, "test", "alter table test.t truncate partition p0;", done2)
<-done1
<-done2
require.LessOrEqual(t, errCount, int32(1))
Expand Down Expand Up @@ -4584,6 +4581,7 @@ func TestAlterModifyPartitionColTruncateWarning(t *testing.T) {
tk.MustQuery(`show warnings`).Check(testkit.Rows(""+
"Warning 1265 Data truncated for column 'a', value is ' 654321'",
"Warning 1265 Data truncated for column 'a', value is ' 654321'"))
tk.MustExec(`admin check table t`)
}

func TestAlterModifyColumnOnPartitionedTableRename(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions ddl/db_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,15 @@ func TestTransactionOnAddDropColumn(t *testing.T) {
dom.DDL().SetHook(hook)
done := make(chan error, 1)
// test transaction on add column.
go backgroundExec(store, "alter table t1 add column c int not null after a", done)
go backgroundExec(store, "test", "alter table t1 add column c int not null after a", done)
err := <-done
require.NoError(t, err)
require.Nil(t, checkErr)
tk.MustQuery("select a,b from t1 order by a").Check(testkit.Rows("1 1", "1 1", "1 1", "2 2", "2 2", "2 2"))
tk.MustExec("delete from t1")

// test transaction on drop column.
go backgroundExec(store, "alter table t1 drop column c", done)
go backgroundExec(store, "test", "alter table t1 drop column c", done)
err = <-done
require.NoError(t, err)
require.Nil(t, checkErr)
Expand Down Expand Up @@ -899,7 +899,7 @@ func TestAddColumn2(t *testing.T) {
dom.DDL().SetHook(hook)
done := make(chan error, 1)
// test transaction on add column.
go backgroundExec(store, "alter table t1 add column c int not null", done)
go backgroundExec(store, "test", "alter table t1 add column c int not null", done)
err := <-done
require.NoError(t, err)

Expand Down Expand Up @@ -940,7 +940,7 @@ func TestAddColumn2(t *testing.T) {
}
dom.DDL().SetHook(hook)

go backgroundExec(store, "alter table t2 add column b int not null default 3", done)
go backgroundExec(store, "test", "alter table t2 add column b int not null default 3", done)
err = <-done
require.NoError(t, err)
re.Check(testkit.Rows("1 2"))
Expand Down
2 changes: 1 addition & 1 deletion ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func TestIssue22307(t *testing.T) {
dom.DDL().SetHook(hook)
done := make(chan error, 1)
// test transaction on add column.
go backgroundExec(store, "alter table t drop column b;", done)
go backgroundExec(store, "test", "alter table t drop column b;", done)
err := <-done
require.NoError(t, err)
require.EqualError(t, checkErr1, "[planner:1054]Unknown column 'b' in 'where clause'")
Expand Down
3 changes: 2 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,8 @@ func getIntervalFromPolicy(policy []time.Duration, i int) (time.Duration, bool)

func getJobCheckInterval(job *model.Job, i int) (time.Duration, bool) {
switch job.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey, model.ActionModifyColumn:
case model.ActionAddIndex, model.ActionAddPrimaryKey, model.ActionModifyColumn,
model.ActionReorganizePartition:
return getIntervalFromPolicy(slowDDLIntervalPolicy, i)
case model.ActionCreateTable, model.ActionCreateSchema:
return getIntervalFromPolicy(fastDDLIntervalPolicy, i)
Expand Down
Loading

0 comments on commit 5b40a63

Please sign in to comment.