Skip to content

Commit

Permalink
ddl: fix bug that range columns partition table add partitions failed (
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 committed Oct 18, 2019
1 parent 268d5ae commit 0dd5b00
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 33 deletions.
20 changes: 20 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,19 @@ func (s *testIntegrationSuite9) TestCreateTableWithPartition(c *C) {
);`
assertErrorCode(c, tk, sql9, tmysql.ErrPartitionFunctionIsNotAllowed)

assertErrorCode(c, tk, `create TABLE t10 (c1 int,c2 int) partition by range(c1 / c2 ) (partition p0 values less than (2));`, tmysql.ErrPartitionFunctionIsNotAllowed)
_, err = tk.Exec(`CREATE TABLE t9 (
a INT NOT NULL,
b INT NOT NULL,
c INT NOT NULL
)
partition by range columns(a) (
partition p0 values less than (10),
partition p2 values less than (20),
partition p3 values less than (20)
);`)
c.Assert(ddl.ErrRangeNotIncreasing.Equal(err), IsTrue)

assertErrorCode(c, tk, `create TABLE t10 (c1 int,c2 int) partition by range(c1 / c2 ) (partition p0 values less than (2));`, tmysql.ErrPartitionFunctionIsNotAllowed)

tk.MustExec(`create TABLE t11 (c1 int,c2 int) partition by range(c1 div c2 ) (partition p0 values less than (2));`)
Expand Down Expand Up @@ -525,6 +538,13 @@ func (s *testIntegrationSuite5) TestAlterTableAddPartition(c *C) {
);`)
tk.MustExec(`ALTER TABLE tt5 add partition ( partition p2 values less than (-1) );`)
tk.MustExec(`ALTER TABLE tt5 add partition ( partition p3 values less than (5-1) );`)

// Test add partition for the table partition by range columns.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a datetime) partition by range columns (a) (partition p1 values less than ('2019-06-01'), partition p2 values less than ('2019-07-01'));")
sql := "alter table t add partition ( partition p3 values less than ('2019-07-01'));"
tk.MustGetErrCode(sql, tmysql.ErrRangeNotIncreasing)
tk.MustExec("alter table t add partition ( partition p3 values less than ('2019-08-01'));")
}

func (s *testIntegrationSuite6) TestAlterTableDropPartition(c *C) {
Expand Down
52 changes: 21 additions & 31 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1324,11 +1324,7 @@ func buildTableInfoWithCheck(ctx sessionctx.Context, d *ddl, s *ast.CreateTableS
if pi != nil {
switch pi.Type {
case model.PartitionTypeRange:
if len(pi.Columns) == 0 {
err = checkPartitionByRange(ctx, tbInfo, pi, s, cols, newConstraints)
} else {
err = checkPartitionByRangeColumn(ctx, tbInfo, pi, s)
}
err = checkPartitionByRange(ctx, tbInfo, pi, cols, s)
case model.PartitionTypeHash:
err = checkPartitionByHash(ctx, pi, s, cols, tbInfo)
}
Expand Down Expand Up @@ -1572,12 +1568,8 @@ func checkPartitionByHash(ctx sessionctx.Context, pi *model.PartitionInfo, s *as
return checkPartitionFuncType(ctx, s, cols, tbInfo)
}

func checkPartitionByRange(ctx sessionctx.Context, tbInfo *model.TableInfo, pi *model.PartitionInfo, s *ast.CreateTableStmt, cols []*table.Column, newConstraints []*ast.Constraint) error {
if err := checkPartitionNameUnique(tbInfo, pi); err != nil {
return err
}

if err := checkCreatePartitionValue(ctx, tbInfo, pi, cols); err != nil {
func checkPartitionByRange(ctx sessionctx.Context, tbInfo *model.TableInfo, pi *model.PartitionInfo, cols []*table.Column, s *ast.CreateTableStmt) error {
if err := checkPartitionNameUnique(pi); err != nil {
return err
}

Expand All @@ -1589,31 +1581,28 @@ func checkPartitionByRange(ctx sessionctx.Context, tbInfo *model.TableInfo, pi *
return err
}

if err := checkPartitionFuncValid(ctx, tbInfo, s.Partition.Expr); err != nil {
return err
}
if len(pi.Columns) == 0 {
if err := checkCreatePartitionValue(ctx, tbInfo, pi, cols); err != nil {
return err
}

return checkPartitionFuncType(ctx, s, cols, tbInfo)
}
// s maybe nil when add partition.
if s == nil {
return nil
}

func checkPartitionByRangeColumn(ctx sessionctx.Context, tbInfo *model.TableInfo, pi *model.PartitionInfo, s *ast.CreateTableStmt) error {
if err := checkPartitionNameUnique(tbInfo, pi); err != nil {
return err
if err := checkPartitionFuncValid(ctx, tbInfo, s.Partition.Expr); err != nil {
return err
}
return checkPartitionFuncType(ctx, s, cols, tbInfo)
}

// Check for range columns partition.
if err := checkRangeColumnsPartitionType(tbInfo, pi.Columns); err != nil {
return err
}

if err := checkRangeColumnsPartitionValue(ctx, tbInfo, pi); err != nil {
return err
}

if err := checkNoRangePartitions(len(pi.Definitions)); err != nil {
return errors.Trace(err)
}

return checkAddPartitionTooManyPartitions(uint64(len(pi.Definitions)))
return checkRangeColumnsPartitionValue(ctx, tbInfo, pi)
}

func checkRangeColumnsPartitionType(tbInfo *model.TableInfo, columns []model.CIStr) error {
Expand Down Expand Up @@ -1649,8 +1638,9 @@ func checkRangeColumnsPartitionValue(ctx sessionctx.Context, tbInfo *model.Table
if len(curr.LessThan) != len(pi.Columns) {
return errors.Trace(ast.ErrPartitionColumnList)
}
var prev *model.PartitionDefinition
for i := 1; i < len(defs); i++ {
prev, curr := curr, &defs[i]
prev, curr = curr, &defs[i]
succ, err := checkTwoRangeColumns(ctx, curr, prev, pi, tbInfo)
if err != nil {
return err
Expand Down Expand Up @@ -2198,7 +2188,7 @@ func (d *ddl) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *
return errors.Trace(err)
}

err = checkPartitionNameUnique(meta, partInfo)
err = checkPartitionNameUnique(partInfo)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -2207,7 +2197,7 @@ func (d *ddl) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *
// old partitions to check all partitions is strictly increasing.
tmp := *partInfo
tmp.Definitions = append(pi.Definitions, tmp.Definitions...)
err = checkCreatePartitionValue(ctx, meta, &tmp, t.Cols())
err = checkPartitionByRange(ctx, meta, &tmp, t.Cols(), nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
14 changes: 13 additions & 1 deletion ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,19 @@ func buildRangePartitionDefinitions(ctx sessionctx.Context, d *ddl, s *ast.Creat
return nil
}

func checkPartitionNameUnique(tbInfo *model.TableInfo, pi *model.PartitionInfo) error {
func checkPartitionNameUnique(pi *model.PartitionInfo) error {
partNames := make(map[string]struct{})
newPars := pi.Definitions
for _, newPar := range newPars {
if _, ok := partNames[newPar.Name.L]; ok {
return ErrSameNamePartition.GenWithStackByArgs(newPar.Name)
}
partNames[newPar.Name.L] = struct{}{}
}
return nil
}

func checkAddPartitionNameUnique(tbInfo *model.TableInfo, pi *model.PartitionInfo) error {
partNames := make(map[string]struct{})
if tbInfo.Partition != nil {
oldPars := tbInfo.Partition.Definitions
Expand Down
2 changes: 1 addition & 1 deletion ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ func onAddTablePartition(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}

err = checkPartitionNameUnique(tblInfo, partInfo)
err = checkAddPartitionNameUnique(tblInfo, partInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down

0 comments on commit 0dd5b00

Please sign in to comment.