diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index e99601a6d81d7..94e7d50f08908 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -1420,6 +1420,10 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ... // Keep this as Schema ID of non-partitioned table // to avoid trigger early rename in TiFlash diff.AffectedOpts[0].SchemaID = job.SchemaID + // Need reload partition table, use diff.AffectedOpts[0].OldSchemaID to mark it. + if len(multiInfos) > 0 { + diff.AffectedOpts[0].OldSchemaID = ptSchemaID + } } else { // Swap diff.TableID = ptDefID diff --git a/ddl/partition.go b/ddl/partition.go index 28622385532f2..8d70224656e6d 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -2458,16 +2458,27 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } } + var ptInfo []schemaIDAndTableInfo + if len(nt.Constraints) > 0 { + pt.ExchangePartitionInfo = &model.ExchangePartitionInfo{ + ExchangePartitionTableID: nt.ID, + ExchangePartitionDefID: defID, + } + ptInfo = append(ptInfo, schemaIDAndTableInfo{ + schemaID: ptSchemaID, + tblInfo: pt, + }) + } nt.ExchangePartitionInfo = &model.ExchangePartitionInfo{ - ExchangePartitionID: ptID, - ExchangePartitionDefID: defID, + ExchangePartitionTableID: ptID, + ExchangePartitionDefID: defID, } // We need an interim schema version, // so there are no non-matching rows inserted // into the table using the schema version // before the exchange is made. job.SchemaState = model.StateWriteOnly - return updateVersionAndTableInfoWithCheck(d, t, job, nt, true) + return updateVersionAndTableInfoWithCheck(d, t, job, nt, true, ptInfo...) } // From now on, nt (the non-partitioned table) has // ExchangePartitionInfo set, meaning it is restricted @@ -2527,6 +2538,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo } // exchange table meta id + pt.ExchangePartitionInfo = nil partDef.ID, nt.ID = nt.ID, partDef.ID err = t.UpdateTable(ptSchemaID, pt) diff --git a/ddl/rollingback.go b/ddl/rollingback.go index 47e1a3735d5cb..c72be206529e4 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -264,11 +264,35 @@ func needNotifyAndStopReorgWorker(job *model.Job) bool { // rollbackExchangeTablePartition will clear the non-partitioned // table's ExchangePartitionInfo state. -func rollbackExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo) (int64, error) { +func rollbackExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo) (ver int64, err error) { tblInfo.ExchangePartitionInfo = nil job.State = model.JobStateRollbackDone job.SchemaState = model.StatePublic - return updateVersionAndTableInfo(d, t, job, tblInfo, true) + if len(tblInfo.Constraints) == 0 { + return updateVersionAndTableInfo(d, t, job, tblInfo, true) + } + var ( + defID int64 + ptSchemaID int64 + ptID int64 + partName string + withValidation bool + ) + if err = job.DecodeArgs(&defID, &ptSchemaID, &ptID, &partName, &withValidation); err != nil { + return ver, errors.Trace(err) + } + pt, err := getTableInfo(t, ptID, ptSchemaID) + if err != nil { + return ver, errors.Trace(err) + } + pt.ExchangePartitionInfo = nil + var ptInfo []schemaIDAndTableInfo + ptInfo = append(ptInfo, schemaIDAndTableInfo{ + schemaID: ptSchemaID, + tblInfo: pt, + }) + ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true, ptInfo...) + return ver, errors.Trace(err) } func rollingbackExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { diff --git a/executor/insert_common.go b/executor/insert_common.go index 49efe4095984b..f222e21427102 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -25,7 +25,6 @@ import ( "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/executor/internal/exec" "github.com/pingcap/tidb/expression" - "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/ast" @@ -690,28 +689,15 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue } } } - tbl := e.Table.Meta() + // Handle exchange partition - if tbl.ExchangePartitionInfo != nil { - is := e.Ctx().GetDomainInfoSchema().(infoschema.InfoSchema) - pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID) - if !tableFound { - return nil, errors.Errorf("exchange partition process table by id failed") - } - p, ok := pt.(table.PartitionedTable) - if !ok { - return nil, errors.Errorf("exchange partition process assert table partition failed") - } - err := p.CheckForExchangePartition( - e.Ctx(), - pt.Meta().Partition, - row, - tbl.ExchangePartitionInfo.ExchangePartitionDefID, - ) - if err != nil { + tbl := e.Table.Meta() + if tbl.ExchangePartitionInfo != nil && tbl.GetPartitionInfo() == nil { + if err := checkRowForExchangePartition(e.Ctx(), row, tbl); err != nil { return nil, err } } + sc := e.Ctx().GetSessionVars().StmtCtx warnCnt := int(sc.WarningCount()) for i, gCol := range gCols { diff --git a/executor/write.go b/executor/write.go index 4b789162ac264..87ae63a0b230a 100644 --- a/executor/write.go +++ b/executor/write.go @@ -26,9 +26,11 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" @@ -79,23 +81,8 @@ func updateRecord( // Handle exchange partition tbl := t.Meta() - if tbl.ExchangePartitionInfo != nil { - is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) - pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID) - if !tableFound { - return false, errors.Errorf("exchange partition process table by id failed") - } - p, ok := pt.(table.PartitionedTable) - if !ok { - return false, errors.Errorf("exchange partition process assert table partition failed") - } - err := p.CheckForExchangePartition( - sctx, - pt.Meta().Partition, - newData, - tbl.ExchangePartitionInfo.ExchangePartitionDefID, - ) - if err != nil { + if tbl.ExchangePartitionInfo != nil && tbl.GetPartitionInfo() == nil { + if err := checkRowForExchangePartition(sctx, newData, tbl); err != nil { return false, err } } @@ -326,3 +313,42 @@ func resetErrDataTooLong(colName string, rowIdx int, _ error) error { newErr := types.ErrDataTooLong.GenWithStack("Data too long for column '%v' at row %v", colName, rowIdx) return newErr } + +// checkRowForExchangePartition is only used for ExchangePartition by non-partitionTable during write only state. +// It check if rowData inserted or updated violate partition definition or checkConstraints of partitionTable. +func checkRowForExchangePartition(sctx sessionctx.Context, row []types.Datum, tbl *model.TableInfo) error { + is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) + pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionTableID) + if !tableFound { + return errors.Errorf("exchange partition process table by id failed") + } + p, ok := pt.(table.PartitionedTable) + if !ok { + return errors.Errorf("exchange partition process assert table partition failed") + } + err := p.CheckForExchangePartition( + sctx, + pt.Meta().Partition, + row, + tbl.ExchangePartitionInfo.ExchangePartitionDefID, + tbl.ID, + ) + if err != nil { + return err + } + if variable.EnableCheckConstraint.Load() { + type CheckConstraintTable interface { + CheckRowConstraint(sctx sessionctx.Context, rowToCheck []types.Datum) error + } + cc, ok := pt.(CheckConstraintTable) + if !ok { + return errors.Errorf("exchange partition process assert check constraint failed") + } + err := cc.CheckRowConstraint(sctx, row) + if err != nil { + // TODO: make error include ExchangePartition info. + return err + } + } + return nil +} diff --git a/infoschema/builder.go b/infoschema/builder.go index ee04d84686cae..9d9289b4f1a6e 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -315,9 +315,31 @@ func (b *Builder) applyReorganizePartition(m *meta.Meta, diff *model.SchemaDiff) } func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { - // The partitioned table is not affected until the last stage + // It is not in StatePublic. if diff.OldTableID == diff.TableID && diff.OldSchemaID == diff.SchemaID { - return b.applyTableUpdate(m, diff) + ntIDs, err := b.applyTableUpdate(m, diff) + if err != nil { + return nil, errors.Trace(err) + } + if diff.AffectedOpts == nil || diff.AffectedOpts[0].OldSchemaID == 0 { + return ntIDs, err + } + // Reload parition tabe. + ptSchemaID := diff.AffectedOpts[0].OldSchemaID + ptID := diff.AffectedOpts[0].TableID + ptDiff := &model.SchemaDiff{ + Type: diff.Type, + Version: diff.Version, + TableID: ptID, + SchemaID: ptSchemaID, + OldTableID: ptID, + OldSchemaID: ptSchemaID, + } + ptIDs, err := b.applyTableUpdate(m, ptDiff) + if err != nil { + return nil, errors.Trace(err) + } + return append(ptIDs, ntIDs...), nil } ntSchemaID := diff.OldSchemaID ntID := diff.OldTableID diff --git a/parser/model/model.go b/parser/model/model.go index 20749ea4d7be1..cf25198162a6d 100644 --- a/parser/model/model.go +++ b/parser/model/model.go @@ -1168,8 +1168,9 @@ func (p PartitionType) String() string { // ExchangePartitionInfo provides exchange partition info. type ExchangePartitionInfo struct { - ExchangePartitionID int64 `json:"exchange_partition_id"` - ExchangePartitionDefID int64 `json:"exchange_partition_def_id"` + // It is nt tableID when table which has the info is a partition table, else pt tableID. + ExchangePartitionTableID int64 `json:"exchange_partition_id"` + ExchangePartitionDefID int64 `json:"exchange_partition_def_id"` // Deprecated, not used XXXExchangePartitionFlag bool `json:"exchange_partition_flag"` } diff --git a/table/table.go b/table/table.go index a37fa932f06ff..5f34351c2a6a3 100644 --- a/table/table.go +++ b/table/table.go @@ -253,7 +253,7 @@ type PartitionedTable interface { GetAllPartitionIDs() []int64 GetPartitionColumnIDs() []int64 GetPartitionColumnNames() []model.CIStr - CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) error + CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, partID, ntID int64) error } // TableFromMeta builds a table.Table from *model.TableInfo. diff --git a/table/tables/partition.go b/table/tables/partition.go index 7f55d61544cd1..b0f1b8ebddca7 100644 --- a/table/tables/partition.go +++ b/table/tables/partition.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" @@ -1268,12 +1269,12 @@ func PartitionRecordKey(pid int64, handle int64) kv.Key { return tablecodec.EncodeRecordKey(recordPrefix, kv.IntHandle(handle)) } -func (t *partitionedTable) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) error { +func (t *partitionedTable) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, partID, ntID int64) error { defID, err := t.locatePartition(ctx, r) if err != nil { return err } - if defID != pid { + if defID != partID && defID != ntID { return errors.WithStack(table.ErrRowDoesNotMatchGivenPartitionSet) } return nil @@ -1551,6 +1552,39 @@ func (t *partitionTableWithGivenSets) GetPartitionByRow(ctx sessionctx.Context, return t.partitions[pid], nil } +// checkConstraintForExchangePartition is only used for ExchangePartition by partitionTable during write only state. +// It check if rowData inserted or updated violate checkConstraints of non-partitionTable. +func checkConstraintForExchangePartition(sctx sessionctx.Context, row []types.Datum, partID, ntID int64) error { + type InfoSchema interface { + TableByID(id int64) (val table.Table, ok bool) + } + is, ok := sctx.GetDomainInfoSchema().(InfoSchema) + if !ok { + return errors.Errorf("exchange partition process assert inforSchema failed") + } + nt, tableFound := is.TableByID(ntID) + if !tableFound { + // Now partID is nt tableID. + nt, tableFound = is.TableByID(partID) + if !tableFound { + return errors.Errorf("exchange partition process table by id failed") + } + } + type CheckConstraintTable interface { + CheckRowConstraint(sctx sessionctx.Context, rowToCheck []types.Datum) error + } + cc, ok := nt.(CheckConstraintTable) + if !ok { + return errors.Errorf("exchange partition process assert check constraint failed") + } + err := cc.CheckRowConstraint(sctx, row) + if err != nil { + // TODO: make error include ExchangePartition info. + return err + } + return nil +} + // AddRecord implements the AddRecord method for the table.Table interface. func (t *partitionedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { return partitionedTableAddRecord(ctx, t, r, nil, opts) @@ -1570,6 +1604,14 @@ func partitionedTableAddRecord(ctx sessionctx.Context, t *partitionedTable, r [] if t.Meta().Partition.HasTruncatingPartitionID(pid) { return nil, errors.WithStack(dbterror.ErrInvalidDDLState.GenWithStack("the partition is in not in public")) } + exchangePartitionInfo := t.Meta().ExchangePartitionInfo + if exchangePartitionInfo != nil && exchangePartitionInfo.ExchangePartitionDefID == pid && + variable.EnableCheckConstraint.Load() { + err = checkConstraintForExchangePartition(ctx, r, pid, exchangePartitionInfo.ExchangePartitionTableID) + if err != nil { + return nil, errors.WithStack(err) + } + } tbl := t.GetPartition(pid) recordID, err = tbl.AddRecord(ctx, r, opts...) if err != nil { @@ -1695,6 +1737,14 @@ func partitionedTableUpdateRecord(gctx context.Context, ctx sessionctx.Context, if t.Meta().Partition.HasTruncatingPartitionID(to) { return errors.WithStack(dbterror.ErrInvalidDDLState.GenWithStack("the partition is in not in public")) } + exchangePartitionInfo := t.Meta().ExchangePartitionInfo + if exchangePartitionInfo != nil && exchangePartitionInfo.ExchangePartitionDefID == to && + variable.EnableCheckConstraint.Load() { + err = checkConstraintForExchangePartition(ctx, newData, to, exchangePartitionInfo.ExchangePartitionTableID) + if err != nil { + return errors.WithStack(err) + } + } // The old and new data locate in different partitions. // Remove record from old partition and add record to new partition. diff --git a/table/tables/tables.go b/table/tables/tables.go index 91230778b0db5..8ef5ed79c5f34 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -350,6 +350,20 @@ func (t *TableCommon) WritableConstraint() []*table.Constraint { return writeableConstraint } +// CheckRowConstraint verify row check constraints. +func (t *TableCommon) CheckRowConstraint(sctx sessionctx.Context, rowToCheck []types.Datum) error { + for _, constraint := range t.WritableConstraint() { + ok, isNull, err := constraint.ConstraintExpr.EvalInt(sctx, chunk.MutRowFromDatums(rowToCheck).ToRow()) + if err != nil { + return err + } + if ok == 0 && !isNull { + return table.ErrCheckConstraintViolated.FastGenByArgs(constraint.Name.O) + } + } + return nil +} + // FullHiddenColsAndVisibleCols implements table FullHiddenColsAndVisibleCols interface. func (t *TableCommon) FullHiddenColsAndVisibleCols() []*table.Column { if len(t.FullHiddenColsAndVisibleColumns) > 0 { @@ -507,14 +521,9 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, } } // check data constraint - for _, constraint := range t.WritableConstraint() { - ok, isNull, err := constraint.ConstraintExpr.EvalInt(sctx, chunk.MutRowFromDatums(rowToCheck).ToRow()) - if err != nil { - return err - } - if ok == 0 && !isNull { - return table.ErrCheckConstraintViolated.FastGenByArgs(constraint.Name.O) - } + err = t.CheckRowConstraint(sctx, rowToCheck) + if err != nil { + return err } sessVars := sctx.GetSessionVars() // rebuild index @@ -966,17 +975,11 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . row = append(row, value) } } - - for _, constraint := range t.WritableConstraint() { - ok, isNull, err := constraint.ConstraintExpr.EvalInt(sctx, chunk.MutRowFromDatums(r).ToRow()) - if err != nil { - return nil, err - } - if ok == 0 && !isNull { - return nil, table.ErrCheckConstraintViolated.FastGenByArgs(constraint.Name.O) - } + // check data constraint + err = t.CheckRowConstraint(sctx, r) + if err != nil { + return nil, err } - writeBufs := sessVars.GetWriteStmtBufs() adjustRowValuesBuf(writeBufs, len(row)) key := t.RecordKey(recordID) diff --git a/table/tables/test/partition/BUILD.bazel b/table/tables/test/partition/BUILD.bazel index 92f560ede6da6..15c9651f85aa5 100644 --- a/table/tables/test/partition/BUILD.bazel +++ b/table/tables/test/partition/BUILD.bazel @@ -8,7 +8,7 @@ go_test( "partition_test.go", ], flaky = True, - shard_count = 35, + shard_count = 37, deps = [ "//ddl", "//domain", diff --git a/table/tables/test/partition/partition_test.go b/table/tables/test/partition/partition_test.go index e5b395534e582..6cb9005b537e9 100644 --- a/table/tables/test/partition/partition_test.go +++ b/table/tables/test/partition/partition_test.go @@ -818,6 +818,188 @@ func TestExchangePartitionStates(t *testing.T) { tk.MustExec(`insert into tp values (1000012,"1000012")`) } +// Test partition and non-partition both have check constraints. +func TestExchangePartitionCheckConstraintStates(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec(`create database check_constraint`) + tk.MustExec(`set @@global.tidb_enable_check_constraint = 1`) + tk.MustExec(`use check_constraint`) + tk.MustExec(`create table nt (a int check (a > 75) not ENFORCED, b int check (b > 50) ENFORCED)`) + tk.MustExec(`create table pt (a int check (a < 75) ENFORCED, b int check (b < 75) ENFORCED) partition by range (a) (partition p0 values less than (50), partition p1 values less than (100) )`) + + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec(`use check_constraint`) + tk3 := testkit.NewTestKit(t, store) + tk3.MustExec(`use check_constraint`) + tk4 := testkit.NewTestKit(t, store) + tk4.MustExec(`use check_constraint`) + // TODO: error message to check. + errMsg := "[table:3819]Check constraint" + + tk2.MustExec("begin") + // Get table mdl. + tk2.MustQuery(`select * from nt`).Check(testkit.Rows()) + tk2.MustQuery(`select * from pt`).Check(testkit.Rows()) + alterChan := make(chan error) + go func() { + err := tk3.ExecToErr(`alter table pt exchange partition p1 with table nt`) + alterChan <- err + }() + waitFor := func(tableName, s string, pos int) { + for { + select { + case alterErr := <-alterChan: + require.Fail(t, "Alter completed unexpectedly", "With error %v", alterErr) + default: + // Alter still running + } + res := tk4.MustQuery(`admin show ddl jobs where db_name = 'check_constraint' and table_name = '` + tableName + `' and job_type = 'exchange partition'`).Rows() + if len(res) == 1 && res[0][pos] == s { + logutil.BgLogger().Info("Got state", zap.String("State", s)) + break + } + gotime.Sleep(50 * gotime.Millisecond) + } + // Sleep 50ms to wait load InforSchema finish. + gotime.Sleep(50 * gotime.Millisecond) + } + waitFor("nt", "write only", 4) + + tk.MustExec(`insert into nt values (60, 60)`) + // violate pt (a < 75) + tk.MustContainErrMsg(`insert into nt values (80, 60)`, errMsg) + // violate pt (b < 75) + tk.MustContainErrMsg(`insert into nt values (60, 80)`, errMsg) + // violate pt (a < 75) + tk.MustContainErrMsg(`update nt set a = 80 where a = 60`, errMsg) + // violate pt (b < 75) + tk.MustContainErrMsg(`update nt set b = 80 where b = 60`, errMsg) + + tk.MustExec(`insert into pt values (60, 60)`) + // violate nt (b > 50) + tk.MustContainErrMsg(`insert into pt values (60, 50)`, errMsg) + // violate nt (b > 50) + tk.MustContainErrMsg(`update pt set b = 50 where b = 60`, errMsg) + // row in partition p0(less than (50)), is ok. + tk.MustExec(`insert into pt values (30, 50)`) + + tk5 := testkit.NewTestKit(t, store) + tk5.MustExec(`use check_constraint`) + tk5.MustExec("begin") + // Let tk5 get mdl of pt with the version of write-only state. + tk5.MustQuery(`select * from pt`) + + tk6 := testkit.NewTestKit(t, store) + tk6.MustExec(`use check_constraint`) + tk6.MustExec("begin") + // Let tk6 get mdl of nt with the version of write-only state. + tk6.MustQuery(`select * from nt`) + + // Release tk2 mdl, wait ddl enter next state. + tk2.MustExec("commit") + waitFor("pt", "none", 4) + + // violate nt (b > 50) + // Now tk5 handle the sql with MDL: pt version state is write-only, nt version state is none. + tk5.MustContainErrMsg(`insert into pt values (60, 50)`, errMsg) + // Verify exists row(60, 60) in pt. + tk5.MustQuery(`select * from pt where a = 60 and b = 60`).Check(testkit.Rows("60 60")) + // Update oldData and newData both in p1, violate nt (b > 50) + tk5.MustContainErrMsg(`update pt set b = 50 where a = 60 and b = 60`, errMsg) + // Verify exists row(30, 50) in pt. + tk5.MustQuery(`select * from pt where a = 30 and b = 50`).Check(testkit.Rows("30 50")) + // update oldData in p0, newData in p1, violate nt (b > 50) + tk5.MustContainErrMsg(`update pt set a = 60 where a = 30 and b = 50`, errMsg) + + // violate pt (a < 75) + tk6.MustContainErrMsg(`insert into nt values (80, 60)`, errMsg) + // violate pt (b < 75) + tk6.MustContainErrMsg(`insert into nt values (60, 80)`, errMsg) + // Verify exists row(60, 60) in nt. + tk6.MustQuery(`select * from pt where a = 60 and b = 60`).Check(testkit.Rows("60 60")) + // violate pt (a < 75) + tk6.MustContainErrMsg(`update nt set a = 80 where a = 60 and b = 60`, errMsg) + + // Let tk5, tk6 release mdl. + tk5.MustExec("commit") + tk6.MustExec("commit") + + // Wait ddl finish. + <-alterChan +} + +// Test partition table has check constraints while non-partition table do not have. +func TestExchangePartitionCheckConstraintStatesTwo(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec(`create database check_constraint`) + tk.MustExec(`set @@global.tidb_enable_check_constraint = 1`) + tk.MustExec(`use check_constraint`) + tk.MustExec(`create table nt (a int, b int)`) + tk.MustExec(`create table pt (a int check (a < 75) ENFORCED, b int check (b < 75) ENFORCED) partition by range (a) (partition p0 values less than (50), partition p1 values less than (100) )`) + + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec(`use check_constraint`) + tk3 := testkit.NewTestKit(t, store) + tk3.MustExec(`use check_constraint`) + tk4 := testkit.NewTestKit(t, store) + tk4.MustExec(`use check_constraint`) + // TODO: error message to check. + errMsg := "[table:3819]Check constraint" + + tk2.MustExec("begin") + // Get table mdl. + tk2.MustQuery(`select * from nt`).Check(testkit.Rows()) + alterChan := make(chan error) + go func() { + err := tk3.ExecToErr(`alter table pt exchange partition p1 with table nt`) + alterChan <- err + }() + waitFor := func(tableName, s string, pos int) { + for { + select { + case alterErr := <-alterChan: + require.Fail(t, "Alter completed unexpectedly", "With error %v", alterErr) + default: + // Alter still running + } + res := tk4.MustQuery(`admin show ddl jobs where db_name = 'check_constraint' and table_name = '` + tableName + `' and job_type = 'exchange partition'`).Rows() + if len(res) == 1 && res[0][pos] == s { + logutil.BgLogger().Info("Got state", zap.String("State", s)) + break + } + gotime.Sleep(50 * gotime.Millisecond) + } + // Sleep 50ms to wait load InforSchema finish. + gotime.Sleep(50 * gotime.Millisecond) + } + waitFor("nt", "write only", 4) + + tk.MustExec(`insert into nt values (60, 60)`) + // violate pt (a < 75) + tk.MustContainErrMsg(`insert into nt values (80, 60)`, errMsg) + // violate pt (b < 75) + tk.MustContainErrMsg(`insert into nt values (60, 80)`, errMsg) + // violate pt (a < 75) + tk.MustContainErrMsg(`update nt set a = 80 where a = 60`, errMsg) + // violate pt (b < 75) + tk.MustContainErrMsg(`update nt set b = 80 where b = 60`, errMsg) + + tk.MustExec(`insert into pt values (60, 60)`) + tk.MustExec(`insert into pt values (60, 50)`) + tk.MustExec(`update pt set b = 50 where b = 60`) + // row in partition p0(less than (50)), is ok. + tk.MustExec(`insert into pt values (30, 50)`) + + // Release tk2 mdl. + tk2.MustExec("commit") + // Wait ddl finish. + <-alterChan +} + func TestAddKeyPartitionStates(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store)