Skip to content

Commit

Permalink
*: use new row-format in tidb (#12634)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and sre-bot committed Jan 2, 2020
1 parent 4dfbb14 commit 0bab73a
Show file tree
Hide file tree
Showing 28 changed files with 292 additions and 110 deletions.
3 changes: 2 additions & 1 deletion executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func encodeNewRow(ctx sessionctx.Context, t table.Table, row []types.Datum) ([]b
skimmedRow = append(skimmedRow, row[col.Offset])
}
}
newRowValue, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, skimmedRow, colIDs, nil, nil)
sctx, rd := ctx.GetSessionVars().StmtCtx, &ctx.GetSessionVars().RowEncoder
newRowValue, err := tablecodec.EncodeRow(sctx, skimmedRow, colIDs, nil, nil, rd)
if err != nil {
return nil, err
}
Expand Down
22 changes: 12 additions & 10 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,23 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/rowcodec"
)

// BatchPointGetExec executes a bunch of point select queries.
type BatchPointGetExec struct {
baseExecutor

tblInfo *model.TableInfo
idxInfo *model.IndexInfo
handles []int64
idxVals [][]types.Datum
startTS uint64
snapshot kv.Snapshot
inited bool
values [][]byte
index int
tblInfo *model.TableInfo
idxInfo *model.IndexInfo
handles []int64
idxVals [][]types.Datum
startTS uint64
snapshot kv.Snapshot
inited bool
values [][]byte
index int
rowDecoder *rowcodec.ChunkDecoder
}

// Open implements the Executor interface.
Expand All @@ -65,7 +67,7 @@ func (e *BatchPointGetExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
for !req.IsFull() && e.index < len(e.values) {
handle, val := e.handles[e.index], e.values[e.index]
err := decodeRowValToChunk(e.base(), e.tblInfo, handle, val, req)
err := decodeRowValToChunk(e.base(), e.tblInfo, handle, val, req, e.rowDecoder)
if err != nil {
return err
}
Expand Down
36 changes: 36 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
Expand Down Expand Up @@ -2651,6 +2652,39 @@ func (b *executorBuilder) buildSQLBindExec(v *plannercore.SQLBindPlan) Executor
return e
}

func newRowDecoder(ctx sessionctx.Context, schema *expression.Schema, tbl *model.TableInfo) *rowcodec.ChunkDecoder {
getColInfoByID := func(tbl *model.TableInfo, colID int64) *model.ColumnInfo {
for _, col := range tbl.Columns {
if col.ID == colID {
return col
}
}
return nil
}
handleColID := int64(-1)
reqCols := make([]rowcodec.ColInfo, len(schema.Columns))
for i := range schema.Columns {
idx, col := i, schema.Columns[i]
isPK := (tbl.PKIsHandle && mysql.HasPriKeyFlag(col.RetType.Flag)) || col.ID == model.ExtraHandleID
if isPK {
handleColID = col.ID
}
reqCols[idx] = rowcodec.ColInfo{
ID: col.ID,
Tp: int32(col.RetType.Tp),
Flag: int32(col.RetType.Flag),
Flen: col.RetType.Flen,
Decimal: col.RetType.Decimal,
Elems: col.RetType.Elems,
}
}
defVal := func(i int) (types.Datum, error) {
ci := getColInfoByID(tbl, reqCols[i].ID)
return table.GetColDefaultValue(ctx, ci)
}
return rowcodec.NewChunkDecoder(reqCols, handleColID, defVal, ctx.GetSessionVars().TimeZone)
}

func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan) Executor {
if b.ctx.GetSessionVars().IsPessimisticReadConsistency() {
if err := b.refreshForUpdateTS(); err != nil {
Expand All @@ -2663,10 +2697,12 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
b.err = err
return nil
}
decoder := newRowDecoder(b.ctx, plan.Schema(), plan.TblInfo)
e := &BatchPointGetExec{
baseExecutor: newBaseExecutor(b.ctx, plan.Schema(), plan.ExplainID()),
tblInfo: plan.TblInfo,
idxInfo: plan.IndexInfo,
rowDecoder: decoder,
startTS: startTS,
}
var capacity int
Expand Down
4 changes: 3 additions & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import (
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/testutil"
Expand Down Expand Up @@ -3319,7 +3320,8 @@ func setColValue(c *C, txn kv.Transaction, key kv.Key, v types.Datum) {
row := []types.Datum{v, {}}
colIDs := []int64{2, 3}
sc := &stmtctx.StatementContext{TimeZone: time.Local}
value, err := tablecodec.EncodeRow(sc, row, colIDs, nil, nil)
rd := rowcodec.Encoder{Enable: true}
value, err := tablecodec.EncodeRow(sc, row, colIDs, nil, nil, &rd)
c.Assert(err, IsNil)
err = txn.Set(key, value)
c.Assert(err, IsNil)
Expand Down
52 changes: 45 additions & 7 deletions executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/rowcodec"
)

type memIndexReader struct {
Expand Down Expand Up @@ -139,8 +140,13 @@ type memTableReader struct {
addedRows [][]types.Datum
retFieldTypes []*types.FieldType
colIDs map[int64]int
buffer allocBuf
}

type allocBuf struct {
// cache for decode handle.
handleBytes []byte
rd *rowcodec.BytesDecoder
}

func buildMemTableReader(us *UnionScanExec, tblReader *TableReaderExecutor) *memTableReader {
Expand All @@ -149,6 +155,18 @@ func buildMemTableReader(us *UnionScanExec, tblReader *TableReaderExecutor) *mem
colIDs[col.ID] = i
}

colInfo := make([]rowcodec.ColInfo, 0, len(us.columns))
for i := range us.columns {
col := us.columns[i]
colInfo = append(colInfo, rowcodec.ColInfo{
ID: col.ID,
Tp: int32(col.Tp),
Flag: int32(col.Flag),
IsPKHandle: us.table.Meta().PKIsHandle && mysql.HasPriKeyFlag(col.Flag),
})
}

rd := rowcodec.NewByteDecoder(colInfo, -1, nil, nil)
return &memTableReader{
ctx: us.ctx,
table: us.table.Meta(),
Expand All @@ -159,7 +177,10 @@ func buildMemTableReader(us *UnionScanExec, tblReader *TableReaderExecutor) *mem
addedRows: make([][]types.Datum, 0, len(us.dirty.addedRows)),
retFieldTypes: retTypes(us),
colIDs: colIDs,
handleBytes: make([]byte, 0, 16),
buffer: allocBuf{
handleBytes: make([]byte, 0, 16),
rd: rd,
},
}
}

Expand Down Expand Up @@ -196,12 +217,12 @@ func (m *memTableReader) decodeRecordKeyValue(key, value []byte) ([]types.Datum,
if err != nil {
return nil, errors.Trace(err)
}
return decodeRowData(m.ctx, m.table, m.columns, m.colIDs, handle, m.handleBytes, value)
return decodeRowData(m.ctx, m.table, m.columns, m.colIDs, handle, value, &m.buffer)
}

// decodeRowData uses to decode row data value.
func decodeRowData(ctx sessionctx.Context, tb *model.TableInfo, columns []*model.ColumnInfo, colIDs map[int64]int, handle int64, cacheBytes, value []byte) ([]types.Datum, error) {
values, err := getRowData(ctx.GetSessionVars().StmtCtx, tb, columns, colIDs, handle, cacheBytes, value)
func decodeRowData(ctx sessionctx.Context, tb *model.TableInfo, columns []*model.ColumnInfo, colIDs map[int64]int, handle int64, value []byte, buffer *allocBuf) ([]types.Datum, error) {
values, err := getRowData(ctx.GetSessionVars().StmtCtx, tb, columns, colIDs, handle, value, buffer)
if err != nil {
return nil, err
}
Expand All @@ -218,8 +239,11 @@ func decodeRowData(ctx sessionctx.Context, tb *model.TableInfo, columns []*model
}

// getRowData decodes raw byte slice to row data.
func getRowData(ctx *stmtctx.StatementContext, tb *model.TableInfo, columns []*model.ColumnInfo, colIDs map[int64]int, handle int64, cacheBytes, value []byte) ([][]byte, error) {
func getRowData(ctx *stmtctx.StatementContext, tb *model.TableInfo, columns []*model.ColumnInfo, colIDs map[int64]int, handle int64, value []byte, buffer *allocBuf) ([][]byte, error) {
pkIsHandle := tb.PKIsHandle
if rowcodec.IsNewFormat(value) {
return buffer.rd.DecodeToBytes(colIDs, handle, value, buffer.handleBytes)
}
values, err := tablecodec.CutRowNew(value, colIDs)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -239,7 +263,7 @@ func getRowData(ctx *stmtctx.StatementContext, tb *model.TableInfo, columns []*m
} else {
handleDatum = types.NewIntDatum(handle)
}
handleData, err1 := codec.EncodeValue(ctx, cacheBytes, handleDatum)
handleData, err1 := codec.EncodeValue(ctx, buffer.handleBytes, handleDatum)
if err1 != nil {
return nil, errors.Trace(err1)
}
Expand Down Expand Up @@ -381,6 +405,17 @@ func (m *memIndexLookUpReader) getMemRows() ([][]types.Datum, error) {
colIDs[col.ID] = i
}

colInfos := make([]rowcodec.ColInfo, 0, len(m.columns))
for i := range m.columns {
col := m.columns[i]
colInfos = append(colInfos, rowcodec.ColInfo{
ID: col.ID,
Tp: int32(col.Tp),
Flag: int32(col.Flag),
IsPKHandle: m.table.Meta().PKIsHandle && mysql.HasPriKeyFlag(col.Flag),
})
}
rd := rowcodec.NewByteDecoder(colInfos, -1, nil, nil)
memTblReader := &memTableReader{
ctx: m.ctx,
table: m.table.Meta(),
Expand All @@ -390,7 +425,10 @@ func (m *memIndexLookUpReader) getMemRows() ([][]types.Datum, error) {
addedRows: make([][]types.Datum, 0, len(handles)),
retFieldTypes: m.retFieldTypes,
colIDs: colIDs,
handleBytes: make([]byte, 0, 16),
buffer: allocBuf{
handleBytes: make([]byte, 0, 16),
rd: rd,
},
}

return memTblReader.getMemRows()
Expand Down
15 changes: 13 additions & 2 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/rowcodec"
)

func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
Expand Down Expand Up @@ -64,10 +65,12 @@ type PointGetExecutor struct {
done bool
lock bool
lockWaitTime int64
rowDecoder *rowcodec.ChunkDecoder
}

// Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field
func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan, startTs uint64) {
decoder := newRowDecoder(e.ctx, p.Schema(), p.TblInfo)
e.tblInfo = p.TblInfo
e.handle = p.Handle
e.idxInfo = p.IndexInfo
Expand All @@ -76,6 +79,7 @@ func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan, startTs uint64) {
e.done = false
e.lock = p.Lock
e.lockWaitTime = p.LockWaitTime
e.rowDecoder = decoder
}

// Open implements the Executor interface.
Expand Down Expand Up @@ -156,7 +160,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
}
return nil
}
return decodeRowValToChunk(e.base(), e.tblInfo, e.handle, val, req)
return decodeRowValToChunk(e.base(), e.tblInfo, e.handle, val, req, e.rowDecoder)
}

func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) error {
Expand Down Expand Up @@ -212,7 +216,14 @@ func encodeIndexKey(e *baseExecutor, tblInfo *model.TableInfo, idxInfo *model.In
return tablecodec.EncodeIndexSeekKey(tblInfo.ID, idxInfo.ID, encodedIdxVals), nil
}

func decodeRowValToChunk(e *baseExecutor, tblInfo *model.TableInfo, handle int64, rowVal []byte, chk *chunk.Chunk) error {
func decodeRowValToChunk(e *baseExecutor, tblInfo *model.TableInfo, handle int64, rowVal []byte, chk *chunk.Chunk, rd *rowcodec.ChunkDecoder) error {
if rowcodec.IsNewFormat(rowVal) {
return rd.DecodeToChunk(rowVal, handle, chk)
}
return decodeOldRowValToChunk(e, tblInfo, handle, rowVal, chk)
}

func decodeOldRowValToChunk(e *baseExecutor, tblInfo *model.TableInfo, handle int64, rowVal []byte, chk *chunk.Chunk) error {
colID2CutPos := make(map[int64]int, e.schema.Len())
for _, col := range e.schema.Columns {
if _, ok := colID2CutPos[col.ID]; !ok {
Expand Down
Loading

0 comments on commit 0bab73a

Please sign in to comment.