From 522275783e5f3741ca201c45e51cdab9dc1ca2bd Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 9 Nov 2020 16:37:12 +0800 Subject: [PATCH] executor: fix auto-id allocation during statements retry (#20659) --- errors.toml | 5 - executor/insert_common.go | 134 ++++++++++---------------- executor/seqtest/seq_executor_test.go | 46 +++++++++ sessionctx/variable/error.go | 2 - sessionctx/variable/session.go | 10 +- 5 files changed, 102 insertions(+), 95 deletions(-) diff --git a/errors.toml b/errors.toml index 87ea14ef381d5..4746277ab5442 100644 --- a/errors.toml +++ b/errors.toml @@ -1531,11 +1531,6 @@ error = ''' The isolation level '%s' is not supported. Set tidb_skip_isolation_level_check=1 to skip this error ''' -["variable:8054"] -error = ''' -cannot set variable to null -''' - ["variable:8055"] error = ''' snapshot is older than GC safe point %s diff --git a/executor/insert_common.go b/executor/insert_common.go index d742beefd17c4..d37fa54577843 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" @@ -630,41 +631,20 @@ func (e *InsertValues) isAutoNull(ctx context.Context, d types.Datum, col *table return false } -func (e *InsertValues) hasAutoIncrementColumn() (int, bool) { - colIdx := -1 - for i, c := range e.Table.Cols() { +func findAutoIncrementColumn(t table.Table) (col *table.Column, offsetInRow int, found bool) { + for i, c := range t.Cols() { if mysql.HasAutoIncrementFlag(c.Flag) { - colIdx = i - break + return c, i, true } } - return colIdx, colIdx != -1 + return nil, -1, false } -func (e *InsertValues) lazyAdjustAutoIncrementDatumInRetry(ctx context.Context, rows [][]types.Datum, colIdx int) ([][]types.Datum, error) { - // Get the autoIncrement column. - col := e.Table.Cols()[colIdx] - // Consider the colIdx of autoIncrement in row are the same. - length := len(rows) - for i := 0; i < length; i++ { - autoDatum := rows[i][colIdx] - - // autoID can be found in RetryInfo. - retryInfo := e.ctx.GetSessionVars().RetryInfo - if retryInfo.Retrying { - id, err := retryInfo.GetCurrAutoIncrementID() - if err != nil { - return nil, err - } - autoDatum.SetAutoID(id, col.Flag) - - if err = col.HandleBadNull(&autoDatum, e.ctx.GetSessionVars().StmtCtx); err != nil { - return nil, err - } - rows[i][colIdx] = autoDatum - } - } - return rows, nil +func setDatumAutoIDAndCast(ctx sessionctx.Context, d *types.Datum, id int64, col *table.Column) error { + d.SetAutoID(id, col.Flag) + var err error + *d, err = table.CastValue(ctx, *d, col.ToInfo(), false, false) + return err } // lazyAdjustAutoIncrementDatum is quite similar to adjustAutoIncrementDatum @@ -674,22 +654,14 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [] if !e.lazyFillAutoID { return rows, nil } - // No autoIncrement column means no need to fill. - colIdx, ok := e.hasAutoIncrementColumn() - if !ok { + col, idx, found := findAutoIncrementColumn(e.Table) + if !found { return rows, nil } - // autoID can be found in RetryInfo. retryInfo := e.ctx.GetSessionVars().RetryInfo - if retryInfo.Retrying { - return e.lazyAdjustAutoIncrementDatumInRetry(ctx, rows, colIdx) - } - // Get the autoIncrement column. - col := e.Table.Cols()[colIdx] - // Consider the colIdx of autoIncrement in row are the same. - length := len(rows) - for i := 0; i < length; i++ { - autoDatum := rows[i][colIdx] + rowCount := len(rows) + for processedIdx := 0; processedIdx < rowCount; processedIdx++ { + autoDatum := rows[processedIdx][idx] var err error var recordID int64 @@ -707,18 +679,32 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [] } e.ctx.GetSessionVars().StmtCtx.InsertID = uint64(recordID) retryInfo.AddAutoIncrementID(recordID) - rows[i][colIdx] = autoDatum continue } // Change NULL to auto id. // Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set. if autoDatum.IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 { + // Consume the auto IDs in RetryInfo first. + for retryInfo.Retrying && processedIdx < rowCount { + nextID, ok := retryInfo.GetCurrAutoIncrementID() + if !ok { + break + } + err = setDatumAutoIDAndCast(e.ctx, &rows[processedIdx][idx], nextID, col) + if err != nil { + return nil, err + } + processedIdx++ + if processedIdx == rowCount { + return rows, nil + } + } // Find consecutive num. - start := i + start := processedIdx cnt := 1 - for i+1 < length && e.isAutoNull(ctx, rows[i+1][colIdx], col) { - i++ + for processedIdx+1 < rowCount && e.isAutoNull(ctx, rows[processedIdx+1][idx], col) { + processedIdx++ cnt++ } // AllocBatchAutoIncrementValue allocates batch N consecutive autoIDs. @@ -735,31 +721,21 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [] // Assign autoIDs to rows. for j := 0; j < cnt; j++ { offset := j + start - d := rows[offset][colIdx] - id := int64(uint64(min) + uint64(j)*uint64(increment)) - d.SetAutoID(id, col.Flag) - retryInfo.AddAutoIncrementID(id) - - // The value of d is adjusted by auto ID, so we need to cast it again. - d, err := table.CastValue(e.ctx, d, col.ToInfo(), false, false) + err = setDatumAutoIDAndCast(e.ctx, &rows[offset][idx], id, col) if err != nil { return nil, err } - rows[offset][colIdx] = d + retryInfo.AddAutoIncrementID(id) } continue } - autoDatum.SetAutoID(recordID, col.Flag) - retryInfo.AddAutoIncrementID(recordID) - - // the value of d is adjusted by auto ID, so we need to cast it again. - autoDatum, err = table.CastValue(e.ctx, autoDatum, col.ToInfo(), false, false) + err = setDatumAutoIDAndCast(e.ctx, &rows[processedIdx][idx], recordID, col) if err != nil { return nil, err } - rows[i][colIdx] = autoDatum + retryInfo.AddAutoIncrementID(recordID) } return rows, nil } @@ -767,12 +743,11 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [] func (e *InsertValues) adjustAutoIncrementDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) { retryInfo := e.ctx.GetSessionVars().RetryInfo if retryInfo.Retrying { - id, err := retryInfo.GetCurrAutoIncrementID() - if err != nil { - return types.Datum{}, err + id, ok := retryInfo.GetCurrAutoIncrementID() + if ok { + d.SetAutoID(id, c.Flag) + return d, nil } - d.SetAutoID(id, c.Flag) - return d, nil } var err error @@ -811,20 +786,16 @@ func (e *InsertValues) adjustAutoIncrementDatum(ctx context.Context, d types.Dat } } - d.SetAutoID(recordID, c.Flag) - retryInfo.AddAutoIncrementID(recordID) - - // the value of d is adjusted by auto ID, so we need to cast it again. - casted, err := table.CastValue(e.ctx, d, c.ToInfo(), false, false) + err = setDatumAutoIDAndCast(e.ctx, &d, recordID, c) if err != nil { return types.Datum{}, err } - return casted, nil + retryInfo.AddAutoIncrementID(recordID) + return d, nil } func getAutoRecordID(d types.Datum, target *types.FieldType, isInsert bool) (int64, error) { var recordID int64 - switch target.Tp { case mysql.TypeFloat, mysql.TypeDouble: f := d.GetFloat64() @@ -845,12 +816,11 @@ func getAutoRecordID(d types.Datum, target *types.FieldType, isInsert bool) (int func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) { retryInfo := e.ctx.GetSessionVars().RetryInfo if retryInfo.Retrying { - autoRandomID, err := retryInfo.GetCurrAutoRandomID() - if err != nil { - return types.Datum{}, err + autoRandomID, ok := retryInfo.GetCurrAutoRandomID() + if ok { + d.SetAutoID(autoRandomID, c.Flag) + return d, nil } - d.SetAutoID(autoRandomID, c.Flag) - return d, nil } var err error @@ -897,14 +867,12 @@ func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum, } } - d.SetAutoID(recordID, c.Flag) - retryInfo.AddAutoRandomID(recordID) - - casted, err := table.CastValue(e.ctx, d, c.ToInfo(), false, false) + err = setDatumAutoIDAndCast(e.ctx, &d, recordID, c) if err != nil { return types.Datum{}, err } - return casted, nil + retryInfo.AddAutoRandomID(recordID) + return d, nil } // allocAutoRandomID allocates a random id for primary key column. It assumes tableInfo.AutoRandomBits > 0. diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index e5dee5a93620f..c8dffeb118d1a 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -1317,6 +1317,52 @@ func (s *seqTestSuite) TestPessimisticConflictRetryAutoID(c *C) { } } +func (s *seqTestSuite) TestInsertFromSelectConflictRetryAutoID(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (id int not null auto_increment unique key, idx int unique key, c int);") + tk.MustExec("create table src (a int);") + concurrency := 2 + var wg sync.WaitGroup + var err []error + wgCount := concurrency + 1 + wg.Add(wgCount) + err = make([]error, concurrency) + for i := 0; i < concurrency; i++ { + tk := testkit.NewTestKitWithInit(c, s.store) + go func(idx int) { + for i := 0; i < 10; i++ { + sql := fmt.Sprintf("insert into t(idx, c) select 1 as idx, 1 as c from src on duplicate key update c = %[1]d", i) + _, e := tk.Exec(sql) + if e != nil { + err[idx] = e + wg.Done() + return + } + } + wg.Done() + }(i) + } + var insertErr error + go func() { + tk := testkit.NewTestKitWithInit(c, s.store) + for i := 0; i < 10; i++ { + _, e := tk.Exec("insert into src values (null);") + if e != nil { + insertErr = e + wg.Done() + return + } + } + wg.Done() + }() + wg.Wait() + for _, e := range err { + c.Assert(e, IsNil) + } + c.Assert(insertErr, IsNil) +} + func (s *seqTestSuite) TestAutoRandIDRetry(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) diff --git a/sessionctx/variable/error.go b/sessionctx/variable/error.go index b6a5bf2901def..b9c21782e5be2 100644 --- a/sessionctx/variable/error.go +++ b/sessionctx/variable/error.go @@ -20,9 +20,7 @@ import ( // Error instances. var ( - errCantGetValidID = dbterror.ClassVariable.NewStd(mysql.ErrCantGetValidID) errWarnDeprecatedSyntax = dbterror.ClassVariable.NewStd(mysql.ErrWarnDeprecatedSyntax) - ErrCantSetToNull = dbterror.ClassVariable.NewStd(mysql.ErrCantSetToNull) ErrSnapshotTooOld = dbterror.ClassVariable.NewStd(mysql.ErrSnapshotTooOld) ErrUnsupportedValueForVar = dbterror.ClassVariable.NewStd(mysql.ErrUnsupportedValueForVar) ErrUnknownSystemVar = dbterror.ClassVariable.NewStd(mysql.ErrUnknownSystemVariable) diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 60e7185352727..0a78f2c5391e9 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -87,7 +87,7 @@ func (r *RetryInfo) AddAutoIncrementID(id int64) { } // GetCurrAutoIncrementID gets current autoIncrementID. -func (r *RetryInfo) GetCurrAutoIncrementID() (int64, error) { +func (r *RetryInfo) GetCurrAutoIncrementID() (int64, bool) { return r.autoIncrementIDs.getCurrent() } @@ -97,7 +97,7 @@ func (r *RetryInfo) AddAutoRandomID(id int64) { } // GetCurrAutoRandomID gets current AutoRandomID. -func (r *RetryInfo) GetCurrAutoRandomID() (int64, error) { +func (r *RetryInfo) GetCurrAutoRandomID() (int64, bool) { return r.autoRandomIDs.getCurrent() } @@ -117,13 +117,13 @@ func (r *retryInfoAutoIDs) clean() { } } -func (r *retryInfoAutoIDs) getCurrent() (int64, error) { +func (r *retryInfoAutoIDs) getCurrent() (int64, bool) { if r.currentOffset >= len(r.autoIDs) { - return 0, errCantGetValidID + return 0, false } id := r.autoIDs[r.currentOffset] r.currentOffset++ - return id, nil + return id, true } // stmtFuture is used to async get timestamp for statement.