Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

partition: make ExchangePartition follow check constraints during writeOnly state(Part2) #46030

Merged
merged 20 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,8 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ...
// Keep this as Schema ID of non-partitioned table
// to avoid trigger early rename in TiFlash
diff.AffectedOpts[0].SchemaID = job.SchemaID
// Mark it is not StatePublic.
diff.AffectedOpts[0].OldSchemaID = ptSchemaID
jiyfhust marked this conversation as resolved.
Show resolved Hide resolved
} else {
// Swap
diff.TableID = ptDefID
Expand Down
19 changes: 17 additions & 2 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2457,9 +2457,19 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
return ver, errors.Trace(err)
}
}
pt.ExchangePartitionInfo = &model.ExchangePartitionInfo{
CurrentIsPartitionTable: true,
ExchangePartitionTableID: nt.ID,
ExchangePartitionPartitionID: defID,
}
err = t.UpdateTable(ptSchemaID, pt)
if err != nil {
return ver, errors.Trace(err)
}
jiyfhust marked this conversation as resolved.
Show resolved Hide resolved
nt.ExchangePartitionInfo = &model.ExchangePartitionInfo{
ExchangePartitionID: ptID,
ExchangePartitionDefID: defID,
CurrentIsPartitionTable: false,
ExchangePartitionTableID: ptID,
ExchangePartitionPartitionID: defID,
jiyfhust marked this conversation as resolved.
Show resolved Hide resolved
}
// We need an interim schema version,
// so there are no non-matching rows inserted
Expand Down Expand Up @@ -2622,6 +2632,11 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
}

job.SchemaState = model.StatePublic
pt.ExchangePartitionInfo = nil
err = t.UpdateTable(ptSchemaID, pt)
if err != nil {
return ver, errors.Trace(err)
}
jiyfhust marked this conversation as resolved.
Show resolved Hide resolved
nt.ExchangePartitionInfo = nil
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, nt, true)
if err != nil {
Expand Down
27 changes: 25 additions & 2 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,34 @@ func needNotifyAndStopReorgWorker(job *model.Job) bool {

// rollbackExchangeTablePartition will clear the non-partitioned
// table's ExchangePartitionInfo state.
func rollbackExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo) (int64, error) {
func rollbackExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo) (ver int64, err error) {
var (
// defID only for updateSchemaVersion
defID int64
ptSchemaID int64
ptID int64
partName string
withValidation bool
)
if err = job.DecodeArgs(&defID, &ptSchemaID, &ptID, &partName, &withValidation); err != nil {
return ver, errors.Trace(err)
}
pt, err := getTableInfo(t, ptID, ptSchemaID)
if err != nil {
return ver, errors.Trace(err)
}
pt.ExchangePartitionInfo = nil
err = t.UpdateTable(ptSchemaID, pt)
if err != nil {
return ver, errors.Trace(err)
}

tblInfo.ExchangePartitionInfo = nil
job.State = model.JobStateRollbackDone
job.SchemaState = model.StatePublic
return updateVersionAndTableInfo(d, t, job, tblInfo, true)

ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true)
return ver, errors.Trace(err)
}

func rollingbackExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
Expand Down
22 changes: 4 additions & 18 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/executor/internal/exec"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -690,28 +689,15 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue
}
}
}
tbl := e.Table.Meta()

// Handle exchange partition
tbl := e.Table.Meta()
if tbl.ExchangePartitionInfo != nil {
is := e.Ctx().GetDomainInfoSchema().(infoschema.InfoSchema)
pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID)
if !tableFound {
return nil, errors.Errorf("exchange partition process table by id failed")
}
p, ok := pt.(table.PartitionedTable)
if !ok {
return nil, errors.Errorf("exchange partition process assert table partition failed")
}
err := p.CheckForExchangePartition(
e.Ctx(),
pt.Meta().Partition,
row,
tbl.ExchangePartitionInfo.ExchangePartitionDefID,
)
if err != nil {
if err := exchangePartitionCheckRow(e.Ctx(), row, e.Table); err != nil {
return nil, err
}
}

sc := e.Ctx().GetSessionVars().StmtCtx
warnCnt := int(sc.WarningCount())
for i, gCol := range gCols {
Expand Down
91 changes: 73 additions & 18 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand All @@ -45,6 +46,76 @@ var (
_ exec.Executor = &LoadDataExec{}
)

// exchangePartitionCheckRow is only used for ExchangePartition durating write only state.
// It check if rowData inserted or updated violate partition definition or check constraints.
func exchangePartitionCheckRow(sctx sessionctx.Context, row []types.Datum, t table.Table) error {
tbl := t.Meta()
if tbl.ExchangePartitionInfo.CurrentIsPartitionTable == false {
jiyfhust marked this conversation as resolved.
Show resolved Hide resolved
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionTableID)
if !tableFound {
return errors.Errorf("exchange partition process table by id failed")
}
p, ok := pt.(table.PartitionedTable)
if !ok {
return errors.Errorf("exchange partition process assert table partition failed")
}
err := p.CheckForExchangePartition(
sctx,
pt.Meta().Partition,
row,
tbl.ExchangePartitionInfo.ExchangePartitionPartitionID,
)
if err != nil {
return err
}
if variable.EnableCheckConstraint.Load() {
cc, ok := pt.(table.CheckConstraintTable)
if !ok {
return errors.Errorf("exchange partition process assert check constraint failed")
}
err := cc.CheckRowConstraint(sctx, row)
if err != nil {
// TODO: make error include ExchangePartition info.
return err
}
}
} else {
if variable.EnableCheckConstraint.Load() {
p, ok := t.(table.PartitionedTable)
if !ok {
return errors.Errorf("exchange partition process assert table partition failed")
}
physicalTable, err := p.GetPartitionByRow(sctx, row)
jiyfhust marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
partID := physicalTable.GetPhysicalID()
if partID == tbl.ExchangePartitionInfo.ExchangePartitionPartitionID {
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
nt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionTableID)
if !tableFound {
// Now partID is nt tableID.
nt, tableFound = is.TableByID(partID)
if !tableFound {
return errors.Errorf("exchange partition process table by id failed")
}
}
cc, ok := nt.(table.CheckConstraintTable)
if !ok {
return errors.Errorf("exchange partition process assert check constraint failed")
}
err = cc.CheckRowConstraint(sctx, row)
if err != nil {
// TODO: make error include ExchangePartition info.
return err
}
}
}
}
return nil
}

// updateRecord updates the row specified by the handle `h`, from `oldData` to `newData`.
// `modified` means which columns are really modified. It's used for secondary indices.
// Length of `oldData` and `newData` equals to length of `t.WritableCols()`.
Expand Down Expand Up @@ -78,24 +149,8 @@ func updateRecord(
}

// Handle exchange partition
tbl := t.Meta()
if tbl.ExchangePartitionInfo != nil {
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID)
if !tableFound {
return false, errors.Errorf("exchange partition process table by id failed")
}
p, ok := pt.(table.PartitionedTable)
if !ok {
return false, errors.Errorf("exchange partition process assert table partition failed")
}
err := p.CheckForExchangePartition(
sctx,
pt.Meta().Partition,
newData,
tbl.ExchangePartitionInfo.ExchangePartitionDefID,
)
if err != nil {
if t.Meta().ExchangePartitionInfo != nil {
if err := exchangePartitionCheckRow(sctx, newData, t); err != nil {
return false, err
}
}
Expand Down
30 changes: 27 additions & 3 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,33 @@ func (b *Builder) applyReorganizePartition(m *meta.Meta, diff *model.SchemaDiff)
}

func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
// The partitioned table is not affected until the last stage
if diff.OldTableID == diff.TableID && diff.OldSchemaID == diff.SchemaID {
return b.applyTableUpdate(m, diff)
// It is not in StatePublic.
if len(diff.AffectedOpts) > 0 && diff.AffectedOpts[0].OldSchemaID != 0 {
ntSchemaID := diff.SchemaID
ntID := diff.TableID
ptSchemaID := diff.AffectedOpts[0].OldSchemaID
ptID := diff.AffectedOpts[0].TableID
currDiff := &model.SchemaDiff{
Type: diff.Type,
Version: diff.Version,
TableID: ntID,
SchemaID: ntSchemaID,
OldTableID: ntID,
OldSchemaID: ntSchemaID,
}
ntIDs, err := b.applyTableUpdate(m, currDiff)
if err != nil {
return nil, errors.Trace(err)
}
currDiff.TableID = ptID
currDiff.SchemaID = ptSchemaID
currDiff.OldTableID = ptID
currDiff.OldSchemaID = ptSchemaID
ptIDs, err := b.applyTableUpdate(m, currDiff)
if err != nil {
return nil, errors.Trace(err)
}
return append(ptIDs, ntIDs...), nil
}
ntSchemaID := diff.OldSchemaID
ntID := diff.OldTableID
Expand Down
7 changes: 5 additions & 2 deletions parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,8 +1164,11 @@ func (p PartitionType) String() string {

// ExchangePartitionInfo provides exchange partition info.
type ExchangePartitionInfo struct {
ExchangePartitionID int64 `json:"exchange_partition_id"`
ExchangePartitionDefID int64 `json:"exchange_partition_def_id"`
// Table which has the info is a partition table
CurrentIsPartitionTable bool
// nt tableID if CurrentIsPartitionTable is true, else pt tableID.
ExchangePartitionTableID int64 `json:"exchange_partition_table_id"`
ExchangePartitionPartitionID int64 `json:"exchange_partition_partition_id"`
// Deprecated, not used
XXXExchangePartitionFlag bool `json:"exchange_partition_flag"`
}
Expand Down
7 changes: 7 additions & 0 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,13 @@ type PartitionedTable interface {
CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) error
}

// CheckConstraintTable is only used for ExchangePartition during write only state.
// Function CheckRowConstraint will return error is row not satisfy check constraints.
type CheckConstraintTable interface {
Table
CheckRowConstraint(sctx sessionctx.Context, rowToCheck []types.Datum) error
}

// TableFromMeta builds a table.Table from *model.TableInfo.
// Currently, it is assigned to tables.TableFromMeta in tidb package's init function.
var TableFromMeta func(allocators autoid.Allocators, tblInfo *model.TableInfo) (Table, error)
Expand Down
Loading
Loading