Skip to content

Commit

Permalink
table partition: exchange partition with table enhance (#35749)
Browse files Browse the repository at this point in the history
close #35996
  • Loading branch information
ymkzpx authored Jul 20, 2022
1 parent c7cfebb commit e9f3980
Show file tree
Hide file tree
Showing 12 changed files with 249 additions and 64 deletions.
103 changes: 103 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1994,6 +1994,40 @@ func TestAlterTableExchangePartition(t *testing.T) {
tk.MustQuery("select * from e7").Check(testkit.Rows("1"))
tk.MustGetErrCode("alter table e6 exchange partition p1 with table e7", tmysql.ErrRowDoesNotMatchPartition)

// validation test for list partition
tk.MustExec("set @@tidb_enable_list_partition=true")
tk.MustExec(`CREATE TABLE t1 (store_id int)
PARTITION BY LIST (store_id) (
PARTITION pNorth VALUES IN (1, 2, 3, 4, 5),
PARTITION pEast VALUES IN (6, 7, 8, 9, 10),
PARTITION pWest VALUES IN (11, 12, 13, 14, 15),
PARTITION pCentral VALUES IN (16, 17, 18, 19, 20)
);`)
tk.MustExec(`create table t2 (store_id int);`)
tk.MustExec(`insert into t1 values (1);`)
tk.MustExec(`insert into t1 values (6);`)
tk.MustExec(`insert into t1 values (11);`)
tk.MustExec(`insert into t2 values (3);`)
tk.MustExec("alter table t1 exchange partition pNorth with table t2")

tk.MustQuery("select * from t1 partition(pNorth)").Check(testkit.Rows("3"))
tk.MustGetErrCode("alter table t1 exchange partition pEast with table t2", tmysql.ErrRowDoesNotMatchPartition)

// validation test for list columns partition
tk.MustExec(`CREATE TABLE t3 (id int, store_id int)
PARTITION BY LIST COLUMNS (id, store_id) (
PARTITION p0 VALUES IN ((1, 1), (2, 2)),
PARTITION p1 VALUES IN ((3, 3), (4, 4))
);`)
tk.MustExec(`create table t4 (id int, store_id int);`)
tk.MustExec(`insert into t3 values (1, 1);`)
tk.MustExec(`insert into t4 values (2, 2);`)
tk.MustExec("alter table t3 exchange partition p0 with table t4")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 after the exchange, please analyze related table of the exchange to update statistics"))

tk.MustQuery("select * from t3 partition(p0)").Check(testkit.Rows("2 2"))
tk.MustGetErrCode("alter table t3 exchange partition p1 with table t4", tmysql.ErrRowDoesNotMatchPartition)

// test exchange partition from different databases
tk.MustExec("create table e8 (a int) partition by hash(a) partitions 2;")
tk.MustExec("create database if not exists exchange_partition")
Expand Down Expand Up @@ -2292,6 +2326,38 @@ func TestExchangePartitionTableCompatiable(t *testing.T) {
"alter table pt27 exchange partition p0 with table nt27;",
dbterror.ErrTablesDifferentMetadata,
},
{
"create table pt28 (a int primary key, b int, index(a)) partition by hash(a) partitions 1;",
"create table nt28 (a int not null, b int, index(a));",
"alter table pt28 exchange partition p0 with table nt28;",
dbterror.ErrTablesDifferentMetadata,
},
{
"create table pt29 (a int primary key, b int) partition by hash(a) partitions 1;",
"create table nt29 (a int not null, b int, index(a));",
"alter table pt29 exchange partition p0 with table nt29;",
dbterror.ErrTablesDifferentMetadata,
},
{
"create table pt30 (a int primary key, b int) partition by hash(a) partitions 1;",
"create table nt30 (a int, b int, unique index(a));",
"alter table pt30 exchange partition p0 with table nt30;",
dbterror.ErrTablesDifferentMetadata,
},
{
// auto_increment
"create table pt31 (id bigint not null primary key auto_increment) partition by hash(id) partitions 1;",
"create table nt31 (id bigint not null primary key);",
"alter table pt31 exchange partition p0 with table nt31;",
dbterror.ErrTablesDifferentMetadata,
},
{
// auto_random
"create table pt32 (id bigint not null primary key AUTO_RANDOM) partition by hash(id) partitions 1;",
"create table nt32 (id bigint not null primary key);",
"alter table pt32 exchange partition p0 with table nt32;",
dbterror.ErrTablesDifferentMetadata,
},
}

tk := testkit.NewTestKit(t, store)
Expand All @@ -2315,6 +2381,43 @@ func TestExchangePartitionTableCompatiable(t *testing.T) {
require.NoError(t, err)
}

func TestExchangePartitionHook(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
// why use tkCancel, not tk.
tkCancel := testkit.NewTestKit(t, store)

tk.MustExec("set @@tidb_enable_exchange_partition=1")
defer tk.MustExec("set @@tidb_enable_exchange_partition=0")

tk.MustExec("use test")
tk.MustExec(`create table pt (a int) partition by range(a) (
partition p0 values less than (3),
partition p1 values less than (6),
PARTITION p2 VALUES LESS THAN (9),
PARTITION p3 VALUES LESS THAN (MAXVALUE)
);`)
tk.MustExec(`create table nt(a int);`)

tk.MustExec(`insert into pt values (0), (4), (7)`)
tk.MustExec("insert into nt values (1)")

hook := &ddl.TestDDLCallback{Do: dom}
dom.DDL().SetHook(hook)

hookFunc := func(job *model.Job) {
if job.Type == model.ActionExchangeTablePartition && job.SchemaState != model.StateNone {
tkCancel.MustExec("use test")
tkCancel.MustGetErrCode("insert into nt values (5)", tmysql.ErrRowDoesNotMatchGivenPartitionSet)
}
}
hook.OnJobUpdatedExported = hookFunc

tk.MustExec("alter table pt exchange partition p0 with table nt")
tk.MustQuery("select * from pt partition(p0)").Check(testkit.Rows("1"))
}

func TestExchangePartitionExpressIndex(t *testing.T) {
restore := config.RestoreFunc()
defer restore()
Expand Down
1 change: 1 addition & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3965,6 +3965,7 @@ func (d *ddl) ExchangeTablePartition(ctx sessionctx.Context, ident ast.Ident, sp
if err != nil {
return errors.Trace(err)
}
ctx.GetSessionVars().StmtCtx.AppendWarning(fmt.Errorf("after the exchange, please analyze related table of the exchange to update statistics"))
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
}
Expand Down
8 changes: 5 additions & 3 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1414,10 +1414,12 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, error)
diff.AffectedOpts = affects
case model.ActionExchangeTablePartition:
var (
ptSchemaID int64
ptTableID int64
ptSchemaID int64
ptTableID int64
partName string
withValidation bool
)
err = job.DecodeArgs(&diff.TableID, &ptSchemaID, &ptTableID)
err = job.DecodeArgs(&diff.TableID, &ptSchemaID, &ptTableID, &partName, &withValidation)
if err != nil {
return 0, errors.Trace(err)
}
Expand Down
46 changes: 44 additions & 2 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1388,6 +1388,18 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
if err != nil {
return ver, errors.Trace(err)
}
if nt.ExchangePartitionInfo == nil || !nt.ExchangePartitionInfo.ExchangePartitionFlag {
nt.ExchangePartitionInfo = &model.ExchangePartitionInfo{
ExchangePartitionFlag: true,
ExchangePartitionID: ptID,
ExchangePartitionDefID: defID,
}
return updateVersionAndTableInfoWithCheck(d, t, job, nt, true)
}

if d.lease > 0 {
delayForAsyncCommit()
}

if withValidation {
err = checkExchangePartitionRecordValidation(w, pt, index, ntDbInfo.Name, nt.Name)
Expand Down Expand Up @@ -1471,6 +1483,12 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
return ver, errors.Trace(err)
}

err = checkExchangePartitionPlacementPolicy(t, partDef.PlacementPolicyRef, nt.PlacementPolicyRef)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

// the follow code is a swap function for rules of two partitions
// though partitions has exchanged their ID, swap still take effect

Expand Down Expand Up @@ -1521,7 +1539,8 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
return ver, errors.Wrapf(err, "failed to notify PD the label rules")
}

ver, err = updateSchemaVersion(d, t, job)
nt.ExchangePartitionInfo = nil
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, nt, true)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -1603,7 +1622,7 @@ func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, inde
case model.PartitionTypeList:
if len(pi.Columns) == 0 {
sql, paramList = buildCheckSQLForListPartition(pi, index, schemaName, tableName)
} else if len(pi.Columns) == 1 {
} else {
sql, paramList = buildCheckSQLForListColumnsPartition(pi, index, schemaName, tableName)
}
default:
Expand All @@ -1628,6 +1647,29 @@ func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, inde
return nil
}

func checkExchangePartitionPlacementPolicy(t *meta.Meta, ntPlacementPolicyRef *model.PolicyRefInfo, ptPlacementPolicyRef *model.PolicyRefInfo) error {
if ntPlacementPolicyRef == nil && ptPlacementPolicyRef == nil {
return nil
}
if ntPlacementPolicyRef == nil || ptPlacementPolicyRef == nil {
return dbterror.ErrTablesDifferentMetadata
}

ptPlacementPolicyInfo, _ := getPolicyInfo(t, ptPlacementPolicyRef.ID)
ntPlacementPolicyInfo, _ := getPolicyInfo(t, ntPlacementPolicyRef.ID)
if ntPlacementPolicyInfo == nil && ptPlacementPolicyInfo == nil {
return nil
}
if ntPlacementPolicyInfo == nil || ptPlacementPolicyInfo == nil {
return dbterror.ErrTablesDifferentMetadata
}
if ntPlacementPolicyInfo.Name.L != ptPlacementPolicyInfo.Name.L {
return dbterror.ErrTablesDifferentMetadata
}

return nil
}

func buildCheckSQLForRangeExprPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) {
var buf strings.Builder
paramList := make([]interface{}, 0, 4)
Expand Down
64 changes: 6 additions & 58 deletions ddl/placement_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1928,9 +1928,6 @@ func TestExchangePartitionWithPlacement(t *testing.T) {
policy1, ok := dom.InfoSchema().PolicyByName(model.NewCIStr("p1"))
require.True(t, ok)

policy2, ok := dom.InfoSchema().PolicyByName(model.NewCIStr("p2"))
require.True(t, ok)

tk.MustExec(`CREATE TABLE t1 (id INT) placement policy p1`)
defer tk.MustExec("drop table t1")

Expand All @@ -1941,12 +1938,8 @@ func TestExchangePartitionWithPlacement(t *testing.T) {
require.NoError(t, err)
t1ID := t1.Meta().ID

t2, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t2"))
require.NoError(t, err)
t2ID := t2.Meta().ID

tk.MustExec(`CREATE TABLE tp (id INT) placement policy p3 PARTITION BY RANGE (id) (
PARTITION p0 VALUES LESS THAN (100),
PARTITION p0 VALUES LESS THAN (100) placement policy p1,
PARTITION p1 VALUES LESS THAN (1000) placement policy p2,
PARTITION p2 VALUES LESS THAN (10000)
);`)
Expand All @@ -1956,7 +1949,6 @@ func TestExchangePartitionWithPlacement(t *testing.T) {
require.NoError(t, err)
tpID := tp.Meta().ID
par0ID := tp.Meta().Partition.Definitions[0].ID
par1ID := tp.Meta().Partition.Definitions[1].ID

// exchange par0, t1
tk.MustExec("alter table tp exchange partition p0 with table t1")
Expand All @@ -1969,69 +1961,25 @@ func TestExchangePartitionWithPlacement(t *testing.T) {
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p3` */\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100),\n" +
"(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PLACEMENT POLICY=`p1` */,\n" +
" PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" +
" PARTITION `p2` VALUES LESS THAN (10000))"))
tp, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp"))
require.NoError(t, err)
require.Equal(t, tpID, tp.Meta().ID)
require.Equal(t, t1ID, tp.Meta().Partition.Definitions[0].ID)
require.Nil(t, tp.Meta().Partition.Definitions[0].PlacementPolicyRef)
require.NotNil(t, tp.Meta().Partition.Definitions[0].PlacementPolicyRef)
t1, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
require.NoError(t, err)
require.Equal(t, par0ID, t1.Meta().ID)
require.Equal(t, policy1.ID, t1.Meta().PlacementPolicyRef.ID)
checkExistTableBundlesInPD(t, dom, "test", "tp")

// exchange par0, t2
tk.MustExec("alter table tp exchange partition p0 with table t2")
tk.MustQuery("show create table t2").Check(testkit.Rows("" +
"t2 CREATE TABLE `t2` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
tk.MustQuery("show create table tp").Check(testkit.Rows("" +
"tp CREATE TABLE `tp` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p3` */\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100),\n" +
" PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" +
" PARTITION `p2` VALUES LESS THAN (10000))"))
tp, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp"))
require.NoError(t, err)
require.Equal(t, tpID, tp.Meta().ID)
require.Equal(t, t2ID, tp.Meta().Partition.Definitions[0].ID)
require.Nil(t, tp.Meta().Partition.Definitions[0].PlacementPolicyRef)
t2, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t2"))
require.NoError(t, err)
require.Equal(t, t1ID, t2.Meta().ID)
require.Nil(t, t2.Meta().PlacementPolicyRef)
checkExistTableBundlesInPD(t, dom, "test", "tp")
tk.MustGetErrCode("alter table tp exchange partition p0 with table t2", mysql.ErrTablesDifferentMetadata)

// exchange par1, t1
tk.MustExec("alter table tp exchange partition p1 with table t1")
tk.MustQuery("show create table t1").Check(testkit.Rows("" +
"t1 CREATE TABLE `t1` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p1` */"))
tk.MustQuery("show create table tp").Check(testkit.Rows("" +
"tp CREATE TABLE `tp` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p3` */\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100),\n" +
" PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" +
" PARTITION `p2` VALUES LESS THAN (10000))"))
tp, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp"))
require.NoError(t, err)
require.Equal(t, tpID, tp.Meta().ID)
require.Equal(t, par0ID, tp.Meta().Partition.Definitions[1].ID)
require.Equal(t, policy2.ID, tp.Meta().Partition.Definitions[1].PlacementPolicyRef.ID)
t1, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
require.NoError(t, err)
require.Equal(t, par1ID, t1.Meta().ID)
require.Equal(t, policy1.ID, t1.Meta().PlacementPolicyRef.ID)
checkExistTableBundlesInPD(t, dom, "test", "tp")
// exchange par1, t2
tk.MustGetErrCode("alter table tp exchange partition p1 with table t2", mysql.ErrTablesDifferentMetadata)
}

func TestPDFail(t *testing.T) {
Expand Down
18 changes: 18 additions & 0 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -642,6 +643,23 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue
}
}
}
tbl := e.Table.Meta()
// Handle exchange partition
if tbl.ExchangePartitionInfo != nil && tbl.ExchangePartitionInfo.ExchangePartitionFlag {
is := e.ctx.GetDomainInfoSchema().(infoschema.InfoSchema)
pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID)
if !tableFound {
return nil, errors.Errorf("exchange partition process table by id failed")
}
p, ok := pt.(table.PartitionedTable)
if !ok {
return nil, errors.Errorf("exchange partition process assert table partition failed")
}
err := p.CheckForExchangePartition(e.ctx, pt.Meta().Partition, row, tbl.ExchangePartitionInfo.ExchangePartitionDefID)
if err != nil {
return nil, err
}
}
for i, gCol := range gCols {
colIdx := gCol.ColumnInfo.Offset
val, err := e.GenExprs[i].Eval(chunk.MutRowFromDatums(row).ToRow())
Expand Down
2 changes: 1 addition & 1 deletion executor/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ func TestBatchInsertDelete(t *testing.T) {
atomic.StoreUint64(&kv.TxnTotalSizeLimit, originLimit)
}()
// Set the limitation to a small value, make it easier to reach the limitation.
atomic.StoreUint64(&kv.TxnTotalSizeLimit, 5600)
atomic.StoreUint64(&kv.TxnTotalSizeLimit, 5700)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down
Loading

0 comments on commit e9f3980

Please sign in to comment.