Skip to content

Commit

Permalink
executor: fix auto-id allocation during statements retry (#20659)
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored Nov 9, 2020
1 parent af8dee1 commit 5222757
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 95 deletions.
5 changes: 0 additions & 5 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1531,11 +1531,6 @@ error = '''
The isolation level '%s' is not supported. Set tidb_skip_isolation_level_check=1 to skip this error
'''

["variable:8054"]
error = '''
cannot set variable to null
'''

["variable:8055"]
error = '''
snapshot is older than GC safe point %s
Expand Down
134 changes: 51 additions & 83 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
Expand Down Expand Up @@ -630,41 +631,20 @@ func (e *InsertValues) isAutoNull(ctx context.Context, d types.Datum, col *table
return false
}

func (e *InsertValues) hasAutoIncrementColumn() (int, bool) {
colIdx := -1
for i, c := range e.Table.Cols() {
func findAutoIncrementColumn(t table.Table) (col *table.Column, offsetInRow int, found bool) {
for i, c := range t.Cols() {
if mysql.HasAutoIncrementFlag(c.Flag) {
colIdx = i
break
return c, i, true
}
}
return colIdx, colIdx != -1
return nil, -1, false
}

func (e *InsertValues) lazyAdjustAutoIncrementDatumInRetry(ctx context.Context, rows [][]types.Datum, colIdx int) ([][]types.Datum, error) {
// Get the autoIncrement column.
col := e.Table.Cols()[colIdx]
// Consider the colIdx of autoIncrement in row are the same.
length := len(rows)
for i := 0; i < length; i++ {
autoDatum := rows[i][colIdx]

// autoID can be found in RetryInfo.
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
id, err := retryInfo.GetCurrAutoIncrementID()
if err != nil {
return nil, err
}
autoDatum.SetAutoID(id, col.Flag)

if err = col.HandleBadNull(&autoDatum, e.ctx.GetSessionVars().StmtCtx); err != nil {
return nil, err
}
rows[i][colIdx] = autoDatum
}
}
return rows, nil
func setDatumAutoIDAndCast(ctx sessionctx.Context, d *types.Datum, id int64, col *table.Column) error {
d.SetAutoID(id, col.Flag)
var err error
*d, err = table.CastValue(ctx, *d, col.ToInfo(), false, false)
return err
}

// lazyAdjustAutoIncrementDatum is quite similar to adjustAutoIncrementDatum
Expand All @@ -674,22 +654,14 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows []
if !e.lazyFillAutoID {
return rows, nil
}
// No autoIncrement column means no need to fill.
colIdx, ok := e.hasAutoIncrementColumn()
if !ok {
col, idx, found := findAutoIncrementColumn(e.Table)
if !found {
return rows, nil
}
// autoID can be found in RetryInfo.
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
return e.lazyAdjustAutoIncrementDatumInRetry(ctx, rows, colIdx)
}
// Get the autoIncrement column.
col := e.Table.Cols()[colIdx]
// Consider the colIdx of autoIncrement in row are the same.
length := len(rows)
for i := 0; i < length; i++ {
autoDatum := rows[i][colIdx]
rowCount := len(rows)
for processedIdx := 0; processedIdx < rowCount; processedIdx++ {
autoDatum := rows[processedIdx][idx]

var err error
var recordID int64
Expand All @@ -707,18 +679,32 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows []
}
e.ctx.GetSessionVars().StmtCtx.InsertID = uint64(recordID)
retryInfo.AddAutoIncrementID(recordID)
rows[i][colIdx] = autoDatum
continue
}

// Change NULL to auto id.
// Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set.
if autoDatum.IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 {
// Consume the auto IDs in RetryInfo first.
for retryInfo.Retrying && processedIdx < rowCount {
nextID, ok := retryInfo.GetCurrAutoIncrementID()
if !ok {
break
}
err = setDatumAutoIDAndCast(e.ctx, &rows[processedIdx][idx], nextID, col)
if err != nil {
return nil, err
}
processedIdx++
if processedIdx == rowCount {
return rows, nil
}
}
// Find consecutive num.
start := i
start := processedIdx
cnt := 1
for i+1 < length && e.isAutoNull(ctx, rows[i+1][colIdx], col) {
i++
for processedIdx+1 < rowCount && e.isAutoNull(ctx, rows[processedIdx+1][idx], col) {
processedIdx++
cnt++
}
// AllocBatchAutoIncrementValue allocates batch N consecutive autoIDs.
Expand All @@ -735,44 +721,33 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows []
// Assign autoIDs to rows.
for j := 0; j < cnt; j++ {
offset := j + start
d := rows[offset][colIdx]

id := int64(uint64(min) + uint64(j)*uint64(increment))
d.SetAutoID(id, col.Flag)
retryInfo.AddAutoIncrementID(id)

// The value of d is adjusted by auto ID, so we need to cast it again.
d, err := table.CastValue(e.ctx, d, col.ToInfo(), false, false)
err = setDatumAutoIDAndCast(e.ctx, &rows[offset][idx], id, col)
if err != nil {
return nil, err
}
rows[offset][colIdx] = d
retryInfo.AddAutoIncrementID(id)
}
continue
}

autoDatum.SetAutoID(recordID, col.Flag)
retryInfo.AddAutoIncrementID(recordID)

// the value of d is adjusted by auto ID, so we need to cast it again.
autoDatum, err = table.CastValue(e.ctx, autoDatum, col.ToInfo(), false, false)
err = setDatumAutoIDAndCast(e.ctx, &rows[processedIdx][idx], recordID, col)
if err != nil {
return nil, err
}
rows[i][colIdx] = autoDatum
retryInfo.AddAutoIncrementID(recordID)
}
return rows, nil
}

func (e *InsertValues) adjustAutoIncrementDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
id, err := retryInfo.GetCurrAutoIncrementID()
if err != nil {
return types.Datum{}, err
id, ok := retryInfo.GetCurrAutoIncrementID()
if ok {
d.SetAutoID(id, c.Flag)
return d, nil
}
d.SetAutoID(id, c.Flag)
return d, nil
}

var err error
Expand Down Expand Up @@ -811,20 +786,16 @@ func (e *InsertValues) adjustAutoIncrementDatum(ctx context.Context, d types.Dat
}
}

d.SetAutoID(recordID, c.Flag)
retryInfo.AddAutoIncrementID(recordID)

// the value of d is adjusted by auto ID, so we need to cast it again.
casted, err := table.CastValue(e.ctx, d, c.ToInfo(), false, false)
err = setDatumAutoIDAndCast(e.ctx, &d, recordID, c)
if err != nil {
return types.Datum{}, err
}
return casted, nil
retryInfo.AddAutoIncrementID(recordID)
return d, nil
}

func getAutoRecordID(d types.Datum, target *types.FieldType, isInsert bool) (int64, error) {
var recordID int64

switch target.Tp {
case mysql.TypeFloat, mysql.TypeDouble:
f := d.GetFloat64()
Expand All @@ -845,12 +816,11 @@ func getAutoRecordID(d types.Datum, target *types.FieldType, isInsert bool) (int
func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
autoRandomID, err := retryInfo.GetCurrAutoRandomID()
if err != nil {
return types.Datum{}, err
autoRandomID, ok := retryInfo.GetCurrAutoRandomID()
if ok {
d.SetAutoID(autoRandomID, c.Flag)
return d, nil
}
d.SetAutoID(autoRandomID, c.Flag)
return d, nil
}

var err error
Expand Down Expand Up @@ -897,14 +867,12 @@ func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum,
}
}

d.SetAutoID(recordID, c.Flag)
retryInfo.AddAutoRandomID(recordID)

casted, err := table.CastValue(e.ctx, d, c.ToInfo(), false, false)
err = setDatumAutoIDAndCast(e.ctx, &d, recordID, c)
if err != nil {
return types.Datum{}, err
}
return casted, nil
retryInfo.AddAutoRandomID(recordID)
return d, nil
}

// allocAutoRandomID allocates a random id for primary key column. It assumes tableInfo.AutoRandomBits > 0.
Expand Down
46 changes: 46 additions & 0 deletions executor/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1317,6 +1317,52 @@ func (s *seqTestSuite) TestPessimisticConflictRetryAutoID(c *C) {
}
}

func (s *seqTestSuite) TestInsertFromSelectConflictRetryAutoID(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (id int not null auto_increment unique key, idx int unique key, c int);")
tk.MustExec("create table src (a int);")
concurrency := 2
var wg sync.WaitGroup
var err []error
wgCount := concurrency + 1
wg.Add(wgCount)
err = make([]error, concurrency)
for i := 0; i < concurrency; i++ {
tk := testkit.NewTestKitWithInit(c, s.store)
go func(idx int) {
for i := 0; i < 10; i++ {
sql := fmt.Sprintf("insert into t(idx, c) select 1 as idx, 1 as c from src on duplicate key update c = %[1]d", i)
_, e := tk.Exec(sql)
if e != nil {
err[idx] = e
wg.Done()
return
}
}
wg.Done()
}(i)
}
var insertErr error
go func() {
tk := testkit.NewTestKitWithInit(c, s.store)
for i := 0; i < 10; i++ {
_, e := tk.Exec("insert into src values (null);")
if e != nil {
insertErr = e
wg.Done()
return
}
}
wg.Done()
}()
wg.Wait()
for _, e := range err {
c.Assert(e, IsNil)
}
c.Assert(insertErr, IsNil)
}

func (s *seqTestSuite) TestAutoRandIDRetry(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)

Expand Down
2 changes: 0 additions & 2 deletions sessionctx/variable/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ import (

// Error instances.
var (
errCantGetValidID = dbterror.ClassVariable.NewStd(mysql.ErrCantGetValidID)
errWarnDeprecatedSyntax = dbterror.ClassVariable.NewStd(mysql.ErrWarnDeprecatedSyntax)
ErrCantSetToNull = dbterror.ClassVariable.NewStd(mysql.ErrCantSetToNull)
ErrSnapshotTooOld = dbterror.ClassVariable.NewStd(mysql.ErrSnapshotTooOld)
ErrUnsupportedValueForVar = dbterror.ClassVariable.NewStd(mysql.ErrUnsupportedValueForVar)
ErrUnknownSystemVar = dbterror.ClassVariable.NewStd(mysql.ErrUnknownSystemVariable)
Expand Down
10 changes: 5 additions & 5 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (r *RetryInfo) AddAutoIncrementID(id int64) {
}

// GetCurrAutoIncrementID gets current autoIncrementID.
func (r *RetryInfo) GetCurrAutoIncrementID() (int64, error) {
func (r *RetryInfo) GetCurrAutoIncrementID() (int64, bool) {
return r.autoIncrementIDs.getCurrent()
}

Expand All @@ -97,7 +97,7 @@ func (r *RetryInfo) AddAutoRandomID(id int64) {
}

// GetCurrAutoRandomID gets current AutoRandomID.
func (r *RetryInfo) GetCurrAutoRandomID() (int64, error) {
func (r *RetryInfo) GetCurrAutoRandomID() (int64, bool) {
return r.autoRandomIDs.getCurrent()
}

Expand All @@ -117,13 +117,13 @@ func (r *retryInfoAutoIDs) clean() {
}
}

func (r *retryInfoAutoIDs) getCurrent() (int64, error) {
func (r *retryInfoAutoIDs) getCurrent() (int64, bool) {
if r.currentOffset >= len(r.autoIDs) {
return 0, errCantGetValidID
return 0, false
}
id := r.autoIDs[r.currentOffset]
r.currentOffset++
return id, nil
return id, true
}

// stmtFuture is used to async get timestamp for statement.
Expand Down

0 comments on commit 5222757

Please sign in to comment.