diff --git a/executor/executor_test.go b/executor/executor_test.go index cc3f17253a4e9..50cf1d048494f 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -352,7 +352,7 @@ func checkCases(tests []testCase, ld *executor.LoadDataInfo, ctx.GetSessionVars().StmtCtx.DupKeyAsWarning = true ctx.GetSessionVars().StmtCtx.BadNullAsWarning = true ctx.GetSessionVars().StmtCtx.InLoadDataStmt = true - data, reachLimit, err1 := ld.InsertData(tt.data1, tt.data2) + data, reachLimit, err1 := ld.InsertData(context.Background(), tt.data1, tt.data2) c.Assert(err1, IsNil) c.Assert(reachLimit, IsFalse) if tt.restData == nil { diff --git a/executor/insert_common.go b/executor/insert_common.go index 8775256c2ebde..851305da981d9 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -186,7 +186,7 @@ func (e *InsertValues) insertRows(ctx context.Context, exec func(ctx context.Con rows := make([][]types.Datum, 0, len(e.Lists)) for i, list := range e.Lists { e.rowCount++ - row, err := e.evalRow(list, i) + row, err := e.evalRow(ctx, list, i) if err != nil { return err } @@ -228,7 +228,7 @@ func (e *InsertValues) handleErr(col *table.Column, val *types.Datum, rowIdx int // evalRow evaluates a to-be-inserted row. The value of the column may base on another column, // so we use setValueForRefColumn to fill the empty row some default values when needFillDefaultValues is true. -func (e *InsertValues) evalRow(list []expression.Expression, rowIdx int) ([]types.Datum, error) { +func (e *InsertValues) evalRow(ctx context.Context, list []expression.Expression, rowIdx int) ([]types.Datum, error) { rowLen := len(e.Table.Cols()) if e.hasExtraHandle { rowLen++ @@ -259,7 +259,7 @@ func (e *InsertValues) evalRow(list []expression.Expression, rowIdx int) ([]type e.evalBuffer.SetDatum(offset, val1) } - return e.fillRow(row, hasValue) + return e.fillRow(ctx, row, hasValue) } // setValueForRefColumn set some default values for the row to eval the row value with other columns, @@ -320,7 +320,7 @@ func (e *InsertValues) insertRowsFromSelect(ctx context.Context, exec func(ctx c for innerChunkRow := iter.Begin(); innerChunkRow != iter.End(); innerChunkRow = iter.Next() { innerRow := types.CloneRow(innerChunkRow.GetDatumRow(fields)) e.rowCount++ - row, err := e.getRow(innerRow) + row, err := e.getRow(ctx, innerRow) if err != nil { return err } @@ -361,7 +361,7 @@ func (e *InsertValues) doBatchInsert(ctx context.Context) error { // getRow gets the row which from `insert into select from` or `load data`. // The input values from these two statements are datums instead of // expressions which are used in `insert into set x=y`. -func (e *InsertValues) getRow(vals []types.Datum) ([]types.Datum, error) { +func (e *InsertValues) getRow(ctx context.Context, vals []types.Datum) ([]types.Datum, error) { row := make([]types.Datum, len(e.Table.Cols())) hasValue := make([]bool, len(e.Table.Cols())) for i, v := range vals { @@ -375,7 +375,7 @@ func (e *InsertValues) getRow(vals []types.Datum) ([]types.Datum, error) { hasValue[offset] = true } - return e.fillRow(row, hasValue) + return e.fillRow(ctx, row, hasValue) } func (e *InsertValues) filterErr(err error) error { @@ -409,10 +409,10 @@ func (e *InsertValues) getColDefaultValue(idx int, col *table.Column) (d types.D } // fillColValue fills the column value if it is not set in the insert statement. -func (e *InsertValues) fillColValue(datum types.Datum, idx int, column *table.Column, hasValue bool) (types.Datum, +func (e *InsertValues) fillColValue(ctx context.Context, datum types.Datum, idx int, column *table.Column, hasValue bool) (types.Datum, error) { if mysql.HasAutoIncrementFlag(column.Flag) { - d, err := e.adjustAutoIncrementDatum(datum, hasValue, column) + d, err := e.adjustAutoIncrementDatum(ctx, datum, hasValue, column) if err != nil { return types.Datum{}, err } @@ -430,12 +430,12 @@ func (e *InsertValues) fillColValue(datum types.Datum, idx int, column *table.Co // fillRow fills generated columns, auto_increment column and empty column. // For NOT NULL column, it will return error or use zero value based on sql_mode. -func (e *InsertValues) fillRow(row []types.Datum, hasValue []bool) ([]types.Datum, error) { +func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue []bool) ([]types.Datum, error) { gIdx := 0 for i, c := range e.Table.Cols() { var err error // Get the default value for all no value columns, the auto increment column is different from the others. - row[i], err = e.fillColValue(row[i], i, c, hasValue[i]) + row[i], err = e.fillColValue(ctx, row[i], i, c, hasValue[i]) if err != nil { return nil, err } @@ -462,7 +462,7 @@ func (e *InsertValues) fillRow(row []types.Datum, hasValue []bool) ([]types.Datu return row, nil } -func (e *InsertValues) adjustAutoIncrementDatum(d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) { +func (e *InsertValues) adjustAutoIncrementDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) { retryInfo := e.ctx.GetSessionVars().RetryInfo if retryInfo.Retrying { id, err := retryInfo.GetCurrAutoIncrementID() @@ -501,7 +501,7 @@ func (e *InsertValues) adjustAutoIncrementDatum(d types.Datum, hasValue bool, c // Change NULL to auto id. // Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set. if d.IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 { - recordID, err = e.Table.AllocAutoIncrementValue(e.ctx) + recordID, err = table.AllocAutoIncrementValue(ctx, e.Table, e.ctx) if e.filterErr(err) != nil { return types.Datum{}, err } diff --git a/executor/load_data.go b/executor/load_data.go index c42b770fc867d..f9b602df887a9 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -213,7 +213,7 @@ func (e *LoadDataInfo) getLine(prevData, curData []byte) ([]byte, []byte, bool) // If it has the rest of data isn't completed the processing, then it returns without completed data. // If the number of inserted rows reaches the batchRows, then the second return value is true. // If prevData isn't nil and curData is nil, there are no other data to deal with and the isEOF is true. -func (e *LoadDataInfo) InsertData(prevData, curData []byte) ([]byte, bool, error) { +func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte) ([]byte, bool, error) { if len(prevData) == 0 && len(curData) == 0 { return nil, false, nil } @@ -252,7 +252,7 @@ func (e *LoadDataInfo) InsertData(prevData, curData []byte) ([]byte, bool, error if err != nil { return nil, false, err } - rows = append(rows, e.colsToRow(cols)) + rows = append(rows, e.colsToRow(ctx, cols)) e.rowCount++ if e.maxRowsInBatch != 0 && e.rowCount%e.maxRowsInBatch == 0 { reachLimit = true @@ -281,7 +281,7 @@ func (e *LoadDataInfo) SetMessage() { e.ctx.GetSessionVars().StmtCtx.SetMessage(msg) } -func (e *LoadDataInfo) colsToRow(cols []field) []types.Datum { +func (e *LoadDataInfo) colsToRow(ctx context.Context, cols []field) []types.Datum { for i := 0; i < len(e.row); i++ { if i >= len(cols) { e.row[i].SetNull() @@ -295,7 +295,7 @@ func (e *LoadDataInfo) colsToRow(cols []field) []types.Datum { e.row[i].SetString(string(cols[i].str)) } } - row, err := e.getRow(e.row) + row, err := e.getRow(ctx, e.row) if err != nil { e.handleWarning(err) return nil diff --git a/executor/write_test.go b/executor/write_test.go index 2eb37e6c70151..50e59fd1689cd 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -1800,7 +1800,7 @@ func (s *testSuite4) TestLoadData(c *C) { // data1 = nil, data2 = nil, fields and lines is default ctx.GetSessionVars().StmtCtx.DupKeyAsWarning = true ctx.GetSessionVars().StmtCtx.BadNullAsWarning = true - _, reachLimit, err := ld.InsertData(nil, nil) + _, reachLimit, err := ld.InsertData(context.Background(), nil, nil) c.Assert(err, IsNil) c.Assert(reachLimit, IsFalse) r := tk.MustQuery(selectSQL) diff --git a/infoschema/tables.go b/infoschema/tables.go index 2ec17dc7f7b75..a2ac042066c63 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -1962,11 +1962,6 @@ func (it *infoschemaTable) UpdateRecord(ctx sessionctx.Context, h int64, oldData return table.ErrUnsupportedOp } -// AllocAutoIncrementValue implements table.Table AllocAutoIncrementValue interface. -func (it *infoschemaTable) AllocAutoIncrementValue(ctx sessionctx.Context) (int64, error) { - return 0, table.ErrUnsupportedOp -} - // AllocHandle implements table.Table AllocHandle interface. func (it *infoschemaTable) AllocHandle(ctx sessionctx.Context) (int64, error) { return 0, table.ErrUnsupportedOp @@ -2084,11 +2079,6 @@ func (vt *VirtualTable) UpdateRecord(ctx sessionctx.Context, h int64, oldData, n return table.ErrUnsupportedOp } -// AllocAutoIncrementValue implements table.Table AllocAutoIncrementValue interface. -func (vt *VirtualTable) AllocAutoIncrementValue(ctx sessionctx.Context) (int64, error) { - return 0, table.ErrUnsupportedOp -} - // AllocHandle implements table.Table AllocHandle interface. func (vt *VirtualTable) AllocHandle(ctx sessionctx.Context) (int64, error) { return 0, table.ErrUnsupportedOp diff --git a/server/conn.go b/server/conn.go index eb23fb118c8c3..728e43b495bfb 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1027,7 +1027,7 @@ func insertDataWithCommit(ctx context.Context, prevData, curData []byte, loadDat var err error var reachLimit bool for { - prevData, reachLimit, err = loadDataInfo.InsertData(prevData, curData) + prevData, reachLimit, err = loadDataInfo.InsertData(ctx, prevData, curData) if err != nil { return nil, err } diff --git a/table/table.go b/table/table.go index 2fba027e6df1c..1a3aad7a81d7e 100644 --- a/table/table.go +++ b/table/table.go @@ -18,6 +18,9 @@ package table import ( + "context" + + "github.com/opentracing/opentracing-go" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" @@ -143,9 +146,6 @@ type Table interface { // RemoveRecord removes a row in the table. RemoveRecord(ctx sessionctx.Context, h int64, r []types.Datum) error - // AllocAutoIncrementValue allocates an auto_increment value for a new row. - AllocAutoIncrementValue(ctx sessionctx.Context) (int64, error) - // AllocHandle allocates a handle for a new row. AllocHandle(ctx sessionctx.Context) (int64, error) @@ -167,6 +167,15 @@ type Table interface { Type() Type } +// AllocAutoIncrementValue allocates an auto_increment value for a new row. +func AllocAutoIncrementValue(ctx context.Context, t Table, sctx sessionctx.Context) (int64, error) { + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("table.AllocAutoIncrementValue", opentracing.ChildOf(span.Context())) + defer span1.Finish() + } + return t.Allocator(sctx).Alloc(t.Meta().ID) +} + // PhysicalTable is an abstraction for two kinds of table representation: partition or non-partitioned table. // PhysicalID is a ID that can be used to construct a key ranges, all the data in the key range belongs to the corresponding PhysicalTable. // For a non-partitioned table, its PhysicalID equals to its TableID; For a partition of a partitioned table, its PhysicalID is the partition's ID. diff --git a/table/tables/tables.go b/table/tables/tables.go index b8eb936aefe1e..e56d77677df71 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -914,11 +914,6 @@ func GetColDefaultValue(ctx sessionctx.Context, col *table.Column, defaultVals [ return colVal, nil } -// AllocAutoIncrementValue implements table.Table AllocAutoIncrementValue interface. -func (t *tableCommon) AllocAutoIncrementValue(ctx sessionctx.Context) (int64, error) { - return t.Allocator(ctx).Alloc(t.tableID) -} - // AllocHandle implements table.Table AllocHandle interface. func (t *tableCommon) AllocHandle(ctx sessionctx.Context) (int64, error) { rowID, err := t.Allocator(ctx).Alloc(t.tableID) diff --git a/table/tables/tables_test.go b/table/tables/tables_test.go index 43bd8e6c2ac0b..2afca28c3607c 100644 --- a/table/tables/tables_test.go +++ b/table/tables/tables_test.go @@ -95,7 +95,7 @@ func (ts *testSuite) TestBasic(c *C) { c.Assert(string(tb.RecordPrefix()), Not(Equals), "") c.Assert(tables.FindIndexByColName(tb, "b"), NotNil) - autoid, err := tb.AllocAutoIncrementValue(nil) + autoid, err := table.AllocAutoIncrementValue(context.Background(), tb, nil) c.Assert(err, IsNil) c.Assert(autoid, Greater, int64(0)) @@ -247,7 +247,7 @@ func (ts *testSuite) TestUniqueIndexMultipleNullEntries(c *C) { c.Assert(err, IsNil) c.Assert(handle, Greater, int64(0)) - autoid, err := tb.AllocAutoIncrementValue(nil) + autoid, err := table.AllocAutoIncrementValue(context.Background(), tb, nil) c.Assert(err, IsNil) c.Assert(autoid, Greater, int64(0))