diff --git a/errors.toml b/errors.toml index ae7aec9aa9cd5..d117ebbd05810 100644 --- a/errors.toml +++ b/errors.toml @@ -1386,11 +1386,6 @@ error = ''' The isolation level '%s' is not supported. Set tidb_skip_isolation_level_check=1 to skip this error ''' -["variable:8054"] -error = ''' -cannot set variable to null -''' - ["variable:8055"] error = ''' snapshot is older than GC safe point %s diff --git a/executor/insert_common.go b/executor/insert_common.go index a10c461b75c72..87fd23b2c0997 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" @@ -622,15 +623,13 @@ func (e *InsertValues) isAutoNull(ctx context.Context, d types.Datum, col *table return false } -func (e *InsertValues) hasAutoIncrementColumn() (int, bool) { - colIdx := -1 - for i, c := range e.Table.Cols() { +func findAutoIncrementColumn(t table.Table) (col *table.Column, offsetInRow int, found bool) { + for i, c := range t.Cols() { if mysql.HasAutoIncrementFlag(c.Flag) { - colIdx = i - break + return c, i, true } } - return colIdx, colIdx != -1 + return nil, -1, false } func (e *InsertValues) lazyAdjustAutoIncrementDatumInRetry(ctx context.Context, rows [][]types.Datum, colIdx int) ([][]types.Datum, error) { @@ -659,6 +658,13 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatumInRetry(ctx context.Context, return rows, nil } +func setDatumAutoIDAndCast(ctx sessionctx.Context, d *types.Datum, id int64, col *table.Column) error { + d.SetAutoID(id, col.Flag) + var err error + *d, err = table.CastValue(ctx, *d, col.ToInfo(), false, false) + return err +} + // lazyAdjustAutoIncrementDatum is quite similar to adjustAutoIncrementDatum // except it will cache auto increment datum previously for lazy batch allocation of autoID. func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [][]types.Datum) ([][]types.Datum, error) { @@ -666,22 +672,14 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [] if !e.lazyFillAutoID { return rows, nil } - // No autoIncrement column means no need to fill. - colIdx, ok := e.hasAutoIncrementColumn() - if !ok { + col, idx, found := findAutoIncrementColumn(e.Table) + if !found { return rows, nil } - // autoID can be found in RetryInfo. retryInfo := e.ctx.GetSessionVars().RetryInfo - if retryInfo.Retrying { - return e.lazyAdjustAutoIncrementDatumInRetry(ctx, rows, colIdx) - } - // Get the autoIncrement column. - col := e.Table.Cols()[colIdx] - // Consider the colIdx of autoIncrement in row are the same. - length := len(rows) - for i := 0; i < length; i++ { - autoDatum := rows[i][colIdx] + rowCount := len(rows) + for processedIdx := 0; processedIdx < rowCount; processedIdx++ { + autoDatum := rows[processedIdx][idx] var err error var recordID int64 @@ -699,18 +697,32 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [] } e.ctx.GetSessionVars().StmtCtx.InsertID = uint64(recordID) retryInfo.AddAutoIncrementID(recordID) - rows[i][colIdx] = autoDatum continue } // Change NULL to auto id. // Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set. if autoDatum.IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 { + // Consume the auto IDs in RetryInfo first. + for retryInfo.Retrying && processedIdx < rowCount { + nextID, ok := retryInfo.GetCurrAutoIncrementID() + if !ok { + break + } + err = setDatumAutoIDAndCast(e.ctx, &rows[processedIdx][idx], nextID, col) + if err != nil { + return nil, err + } + processedIdx++ + if processedIdx == rowCount { + return rows, nil + } + } // Find consecutive num. - start := i + start := processedIdx cnt := 1 - for i+1 < length && e.isAutoNull(ctx, rows[i+1][colIdx], col) { - i++ + for processedIdx+1 < rowCount && e.isAutoNull(ctx, rows[processedIdx+1][idx], col) { + processedIdx++ cnt++ } // AllocBatchAutoIncrementValue allocates batch N consecutive autoIDs. @@ -727,31 +739,21 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [] // Assign autoIDs to rows. for j := 0; j < cnt; j++ { offset := j + start - d := rows[offset][colIdx] - id := int64(uint64(min) + uint64(j)*uint64(increment)) - d.SetAutoID(id, col.Flag) - retryInfo.AddAutoIncrementID(id) - - // The value of d is adjusted by auto ID, so we need to cast it again. - d, err := table.CastValue(e.ctx, d, col.ToInfo(), false, false) + err = setDatumAutoIDAndCast(e.ctx, &rows[offset][idx], id, col) if err != nil { return nil, err } - rows[offset][colIdx] = d + retryInfo.AddAutoIncrementID(id) } continue } - autoDatum.SetAutoID(recordID, col.Flag) - retryInfo.AddAutoIncrementID(recordID) - - // the value of d is adjusted by auto ID, so we need to cast it again. - autoDatum, err = table.CastValue(e.ctx, autoDatum, col.ToInfo(), false, false) + err = setDatumAutoIDAndCast(e.ctx, &rows[processedIdx][idx], recordID, col) if err != nil { return nil, err } - rows[i][colIdx] = autoDatum + retryInfo.AddAutoIncrementID(recordID) } return rows, nil } @@ -759,12 +761,11 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [] func (e *InsertValues) adjustAutoIncrementDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) { retryInfo := e.ctx.GetSessionVars().RetryInfo if retryInfo.Retrying { - id, err := retryInfo.GetCurrAutoIncrementID() - if err != nil { - return types.Datum{}, err + id, ok := retryInfo.GetCurrAutoIncrementID() + if ok { + d.SetAutoID(id, c.Flag) + return d, nil } - d.SetAutoID(id, c.Flag) - return d, nil } var err error @@ -803,20 +804,16 @@ func (e *InsertValues) adjustAutoIncrementDatum(ctx context.Context, d types.Dat } } - d.SetAutoID(recordID, c.Flag) - retryInfo.AddAutoIncrementID(recordID) - - // the value of d is adjusted by auto ID, so we need to cast it again. - casted, err := table.CastValue(e.ctx, d, c.ToInfo(), false, false) + err = setDatumAutoIDAndCast(e.ctx, &d, recordID, c) if err != nil { return types.Datum{}, err } - return casted, nil + retryInfo.AddAutoIncrementID(recordID) + return d, nil } func getAutoRecordID(d types.Datum, target *types.FieldType, isInsert bool) (int64, error) { var recordID int64 - switch target.Tp { case mysql.TypeFloat, mysql.TypeDouble: f := d.GetFloat64() @@ -837,12 +834,11 @@ func getAutoRecordID(d types.Datum, target *types.FieldType, isInsert bool) (int func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) { retryInfo := e.ctx.GetSessionVars().RetryInfo if retryInfo.Retrying { - autoRandomID, err := retryInfo.GetCurrAutoRandomID() - if err != nil { - return types.Datum{}, err + autoRandomID, ok := retryInfo.GetCurrAutoRandomID() + if ok { + d.SetAutoID(autoRandomID, c.Flag) + return d, nil } - d.SetAutoID(autoRandomID, c.Flag) - return d, nil } var err error @@ -889,14 +885,12 @@ func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum, } } - d.SetAutoID(recordID, c.Flag) - retryInfo.AddAutoRandomID(recordID) - - casted, err := table.CastValue(e.ctx, d, c.ToInfo(), false, false) + err = setDatumAutoIDAndCast(e.ctx, &d, recordID, c) if err != nil { return types.Datum{}, err } - return casted, nil + retryInfo.AddAutoRandomID(recordID) + return d, nil } // allocAutoRandomID allocates a random id for primary key column. It assumes tableInfo.AutoRandomBits > 0. diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index 32b4798369aa2..46cffc1a895b5 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -1315,6 +1315,52 @@ func (s *seqTestSuite) TestPessimisticConflictRetryAutoID(c *C) { } } +func (s *seqTestSuite) TestInsertFromSelectConflictRetryAutoID(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (id int not null auto_increment unique key, idx int unique key, c int);") + tk.MustExec("create table src (a int);") + concurrency := 2 + var wg sync.WaitGroup + var err []error + wgCount := concurrency + 1 + wg.Add(wgCount) + err = make([]error, concurrency) + for i := 0; i < concurrency; i++ { + tk := testkit.NewTestKitWithInit(c, s.store) + go func(idx int) { + for i := 0; i < 10; i++ { + sql := fmt.Sprintf("insert into t(idx, c) select 1 as idx, 1 as c from src on duplicate key update c = %[1]d", i) + _, e := tk.Exec(sql) + if e != nil { + err[idx] = e + wg.Done() + return + } + } + wg.Done() + }(i) + } + var insertErr error + go func() { + tk := testkit.NewTestKitWithInit(c, s.store) + for i := 0; i < 10; i++ { + _, e := tk.Exec("insert into src values (null);") + if e != nil { + insertErr = e + wg.Done() + return + } + } + wg.Done() + }() + wg.Wait() + for _, e := range err { + c.Assert(e, IsNil) + } + c.Assert(insertErr, IsNil) +} + func (s *seqTestSuite) TestAutoRandIDRetry(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) diff --git a/sessionctx/variable/error.go b/sessionctx/variable/error.go index 016783bdb64a3..71a6530596ee1 100644 --- a/sessionctx/variable/error.go +++ b/sessionctx/variable/error.go @@ -20,9 +20,7 @@ import ( // Error instances. var ( - errCantGetValidID = terror.ClassVariable.New(mysql.ErrCantGetValidID, mysql.MySQLErrName[mysql.ErrCantGetValidID]) errWarnDeprecatedSyntax = terror.ClassVariable.New(mysql.ErrWarnDeprecatedSyntax, mysql.MySQLErrName[mysql.ErrWarnDeprecatedSyntax]) - ErrCantSetToNull = terror.ClassVariable.New(mysql.ErrCantSetToNull, mysql.MySQLErrName[mysql.ErrCantSetToNull]) ErrSnapshotTooOld = terror.ClassVariable.New(mysql.ErrSnapshotTooOld, mysql.MySQLErrName[mysql.ErrSnapshotTooOld]) ErrUnsupportedValueForVar = terror.ClassVariable.New(mysql.ErrUnsupportedValueForVar, mysql.MySQLErrName[mysql.ErrUnsupportedValueForVar]) ErrUnknownSystemVar = terror.ClassVariable.New(mysql.ErrUnknownSystemVariable, mysql.MySQLErrName[mysql.ErrUnknownSystemVariable]) diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index a5ff5929acaf6..5215e107403d3 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -82,7 +82,7 @@ func (r *RetryInfo) AddAutoIncrementID(id int64) { } // GetCurrAutoIncrementID gets current autoIncrementID. -func (r *RetryInfo) GetCurrAutoIncrementID() (int64, error) { +func (r *RetryInfo) GetCurrAutoIncrementID() (int64, bool) { return r.autoIncrementIDs.getCurrent() } @@ -92,7 +92,7 @@ func (r *RetryInfo) AddAutoRandomID(id int64) { } // GetCurrAutoRandomID gets current AutoRandomID. -func (r *RetryInfo) GetCurrAutoRandomID() (int64, error) { +func (r *RetryInfo) GetCurrAutoRandomID() (int64, bool) { return r.autoRandomIDs.getCurrent() } @@ -112,13 +112,13 @@ func (r *retryInfoAutoIDs) clean() { } } -func (r *retryInfoAutoIDs) getCurrent() (int64, error) { +func (r *retryInfoAutoIDs) getCurrent() (int64, bool) { if r.currentOffset >= len(r.autoIDs) { - return 0, errCantGetValidID + return 0, false } id := r.autoIDs[r.currentOffset] r.currentOffset++ - return id, nil + return id, true } // stmtFuture is used to async get timestamp for statement.