Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

partition: make ExchangePartition follow check constraints during writeOnly state(Part2) #46030

Merged
merged 20 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 6 additions & 21 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/executor/internal/exec"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -690,28 +689,14 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue
}
}
}
tbl := e.Table.Meta()

// Handle exchange partition
if tbl.ExchangePartitionInfo != nil {
is := e.Ctx().GetDomainInfoSchema().(infoschema.InfoSchema)
pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID)
if !tableFound {
return nil, errors.Errorf("exchange partition process table by id failed")
}
p, ok := pt.(table.PartitionedTable)
if !ok {
return nil, errors.Errorf("exchange partition process assert table partition failed")
}
err := p.CheckForExchangePartition(
e.Ctx(),
pt.Meta().Partition,
row,
tbl.ExchangePartitionInfo.ExchangePartitionDefID,
)
if err != nil {
return nil, err
}
err := exchangePartitionCheckRow(e.Ctx(), row, e.Table)
if err != nil {
return nil, err
}

tbl := e.Table.Meta()
sc := e.Ctx().GetSessionVars().StmtCtx
warnCnt := int(sc.WarningCount())
for i, gCol := range gCols {
Expand Down
91 changes: 71 additions & 20 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand All @@ -45,6 +46,73 @@ var (
_ exec.Executor = &LoadDataExec{}
)

// exchangePartitionCheckRow is only used for ExchangePartition durating write only state.
// It check if rowData inserted or updated violate partition definition or check constraints.
func exchangePartitionCheckRow(sctx sessionctx.Context, row []types.Datum, t table.Table) error {
tbl := t.Meta()
if tbl.ExchangePartitionInfo != nil {
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID)
if !tableFound {
return errors.Errorf("exchange partition process table by id failed")
}
p, ok := pt.(table.PartitionedTable)
if !ok {
return errors.Errorf("exchange partition process assert table partition failed")
}
err := p.CheckForExchangePartition(
sctx,
pt.Meta().Partition,
row,
tbl.ExchangePartitionInfo.ExchangePartitionDefID,
)
if err != nil {
return err
}
if variable.EnableCheckConstraint.Load() {
cc, ok := pt.(table.CheckConstraintTable)
if !ok {
return errors.Errorf("exchange partition process assert check constraint failed")
}
err := cc.CheckRowConstraint(sctx, row)
if err != nil {
// TODO: make error include ExchangePartition info.
return err
}
}
} else if len(tbl.ExchangePartitionPartIDs) > 0 {
if variable.EnableCheckConstraint.Load() {
p, ok := t.(table.PartitionedTable)
if !ok {
return errors.Errorf("exchange partition process assert table partition failed")
}
physicalTable, err := p.GetPartitionByRow(sctx, row)
jiyfhust marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
partId := physicalTable.GetPhysicalID()
ntId, ok := tbl.ExchangePartitionPartIDs[partId]
if ok {
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
nt, tableFound := is.TableByID(ntId)
if !tableFound {
return errors.Errorf("exchange partition process table by id failed")
}
cc, ok := nt.(table.CheckConstraintTable)
if !ok {
return errors.Errorf("exchange partition process assert check constraint failed")
}
err = cc.CheckRowConstraint(sctx, row)
if err != nil {
// TODO: make error include ExchangePartition info.
return err
}
}
}
}
return nil
}

// updateRecord updates the row specified by the handle `h`, from `oldData` to `newData`.
// `modified` means which columns are really modified. It's used for secondary indices.
// Length of `oldData` and `newData` equals to length of `t.WritableCols()`.
Expand Down Expand Up @@ -78,26 +146,9 @@ func updateRecord(
}

// Handle exchange partition
tbl := t.Meta()
if tbl.ExchangePartitionInfo != nil {
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID)
if !tableFound {
return false, errors.Errorf("exchange partition process table by id failed")
}
p, ok := pt.(table.PartitionedTable)
if !ok {
return false, errors.Errorf("exchange partition process assert table partition failed")
}
err := p.CheckForExchangePartition(
sctx,
pt.Meta().Partition,
newData,
tbl.ExchangePartitionInfo.ExchangePartitionDefID,
)
if err != nil {
return false, err
}
err := exchangePartitionCheckRow(sctx, newData, t)
if err != nil {
return false, err
}

// Compare datum, then handle some flags.
Expand Down
4 changes: 4 additions & 0 deletions parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,11 @@ type TableInfo struct {
// StatsOptions is used when do analyze/auto-analyze for each table
StatsOptions *StatsOptions `json:"stats_options"`

// Current table is non-partition table.
ExchangePartitionInfo *ExchangePartitionInfo `json:"exchange_partition_info"`
// Current table is partition table.
// Key: partID, value: non-partition tableID.
ExchangePartitionPartIDs map[int64]int64 `json:"exchange_partition_partids"`
mjonss marked this conversation as resolved.
Show resolved Hide resolved

TTLInfo *TTLInfo `json:"ttl_info"`
}
Expand Down
7 changes: 7 additions & 0 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,13 @@ type PartitionedTable interface {
CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) error
}

// CheckConstraintTable is only used for ExchangePartition during write only state.
// Function CheckRowConstraint will return error is row not satisfy check constraints.
type CheckConstraintTable interface {
Table
CheckRowConstraint(sctx sessionctx.Context, rowToCheck []types.Datum) error
}

// TableFromMeta builds a table.Table from *model.TableInfo.
// Currently, it is assigned to tables.TableFromMeta in tidb package's init function.
var TableFromMeta func(allocators autoid.Allocators, tblInfo *model.TableInfo) (Table, error)
Expand Down
81 changes: 81 additions & 0 deletions table/tables/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,87 @@ func TestExchangePartitionStates(t *testing.T) {
tk.MustExec(`insert into tp values (1000012,"1000012")`)
}

func TestExchangePartitionCheckConstraintStates(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec(`create database check_constraint`)
tk.MustExec(`set @@global.tidb_enable_check_constraint = 1`)
tk.MustExec(`use check_constraint`)
tk.MustExec(`create table nt (a int check (a > 75) not ENFORCED, b int check (b > 50) ENFORCED)`)
tk.MustExec(`create table pt (a int check (a < 75) ENFORCED, b int check (b < 75) ENFORCED) partition by range (a) (partition p0 values less than (50), partition p1 values less than (100) )`)

tk2 := testkit.NewTestKit(t, store)
tk2.MustExec(`use check_constraint`)
tk3 := testkit.NewTestKit(t, store)
tk3.MustExec(`use check_constraint`)
tk4 := testkit.NewTestKit(t, store)
tk4.MustExec(`use check_constraint`)
// TODO: error message to check.
errMsg := "[table:3819]Check constraint"

tk2.MustExec("begin")
// Get table mdl.
tk2.MustQuery(`select * from nt`).Check(testkit.Rows())
tk2.MustQuery(`select * from pt`).Check(testkit.Rows())
alterChan := make(chan error)
go func() {
err := tk3.ExecToErr(`alter table pt exchange partition p1 with table nt`)
alterChan <- err
}()
waitFor := func(tableName, s string, pos int) {
for {
select {
case alterErr := <-alterChan:
require.Fail(t, "Alter completed unexpectedly", "With error %v", alterErr)
default:
// Alter still running
}
res := tk4.MustQuery(`admin show ddl jobs where db_name = 'check_constraint' and table_name = '` + tableName + `' and job_type = 'exchange partition'`).Rows()
if len(res) == 1 && res[0][pos] == s {
logutil.BgLogger().Info("Got state", zap.String("State", s))
break
}
gotime.Sleep(50 * gotime.Millisecond)
}
}
waitFor("nt", "write only", 4)

tk.MustExec(`insert into nt values (60, 60)`)
// violate pt (a < 75)
tk.MustContainErrMsg(`insert into nt values (80, 60)`, errMsg)
// violate pt (b < 75)
tk.MustContainErrMsg(`insert into nt values (60, 80)`, errMsg)
// violate pt (a < 75)
tk.MustContainErrMsg(`update nt set a = 80 where a = 60`, errMsg)
// violate pt (b < 75)
tk.MustContainErrMsg(`update nt set b = 80 where b = 60`, errMsg)

tk.MustExec(`insert into pt values (60, 60)`)
// violate nt (b > 50)
//tk.MustContainErrMsg(`insert into pt values (60, 50)`, errMsg)
// violate nt (b > 50)
//tk.MustContainErrMsg(`update pt set b = 50 where b = 60`, errMsg)
// row in partition p0(less than (50)), is ok.
tk.MustExec(`insert into pt values (30, 50)`)

tk.MustExec(`set @@global.tidb_enable_check_constraint = 0`)
// The failed sql above, now will be success.
tk.MustExec(`insert into nt values (80, 60)`)
tk.MustExec(`insert into nt values (60, 80)`)
tk.MustExec(`update nt set a = 80 where a = 60`)
tk.MustExec(`insert into pt values (60, 50)`)
tk.MustExec(`update pt set b = 50 where b = 60`)

tk.MustExec(`set @@global.tidb_enable_check_constraint = 1`)

// Release table mdl.
tk2.MustExec("commit")
// wait alter sql finish.
_ = <-alterChan
tk.MustExec(`drop database check_constraint`)
}

func TestAddKeyPartitionStates(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
39 changes: 21 additions & 18 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,20 @@ func (t *TableCommon) WritableConstraint() []*table.Constraint {
return writeableConstraint
}

// CheckRowConstraint verify row check constraints.
func (t *TableCommon) CheckRowConstraint(sctx sessionctx.Context, rowToCheck []types.Datum) error {
for _, constraint := range t.WritableConstraint() {
ok, isNull, err := constraint.ConstraintExpr.EvalInt(sctx, chunk.MutRowFromDatums(rowToCheck).ToRow())
if err != nil {
return err
}
if ok == 0 && !isNull {
return table.ErrCheckConstraintViolated.FastGenByArgs(constraint.Name.O)
}
}
return nil
}

// FullHiddenColsAndVisibleCols implements table FullHiddenColsAndVisibleCols interface.
func (t *TableCommon) FullHiddenColsAndVisibleCols() []*table.Column {
if len(t.FullHiddenColsAndVisibleColumns) > 0 {
Expand Down Expand Up @@ -507,14 +521,9 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context,
}
}
// check data constraint
for _, constraint := range t.WritableConstraint() {
ok, isNull, err := constraint.ConstraintExpr.EvalInt(sctx, chunk.MutRowFromDatums(rowToCheck).ToRow())
if err != nil {
return err
}
if ok == 0 && !isNull {
return table.ErrCheckConstraintViolated.FastGenByArgs(constraint.Name.O)
}
err = t.CheckRowConstraint(sctx, rowToCheck)
if err != nil {
return err
}
sessVars := sctx.GetSessionVars()
// rebuild index
Expand Down Expand Up @@ -966,17 +975,11 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts .
row = append(row, value)
}
}

for _, constraint := range t.WritableConstraint() {
ok, isNull, err := constraint.ConstraintExpr.EvalInt(sctx, chunk.MutRowFromDatums(r).ToRow())
if err != nil {
return nil, err
}
if ok == 0 && !isNull {
return nil, table.ErrCheckConstraintViolated.FastGenByArgs(constraint.Name.O)
}
// check data constraint
err = t.CheckRowConstraint(sctx, r)
if err != nil {
return nil, err
}

writeBufs := sessVars.GetWriteStmtBufs()
adjustRowValuesBuf(writeBufs, len(row))
key := t.RecordKey(recordID)
Expand Down