Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: use new row-format in tidb #12634

Merged
merged 8 commits into from
Jan 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func encodeNewRow(ctx sessionctx.Context, t table.Table, row []types.Datum) ([]b
skimmedRow = append(skimmedRow, row[col.Offset])
}
}
newRowValue, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, skimmedRow, colIDs, nil, nil)
sctx, rd := ctx.GetSessionVars().StmtCtx, &ctx.GetSessionVars().RowEncoder
newRowValue, err := tablecodec.EncodeRow(sctx, skimmedRow, colIDs, nil, nil, rd)
if err != nil {
return nil, err
}
Expand Down
22 changes: 12 additions & 10 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,23 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/rowcodec"
)

// BatchPointGetExec executes a bunch of point select queries.
type BatchPointGetExec struct {
baseExecutor

tblInfo *model.TableInfo
idxInfo *model.IndexInfo
handles []int64
idxVals [][]types.Datum
startTS uint64
snapshot kv.Snapshot
inited bool
values [][]byte
index int
tblInfo *model.TableInfo
idxInfo *model.IndexInfo
handles []int64
idxVals [][]types.Datum
startTS uint64
snapshot kv.Snapshot
inited bool
values [][]byte
index int
rowDecoder *rowcodec.ChunkDecoder
}

// Open implements the Executor interface.
Expand All @@ -65,7 +67,7 @@ func (e *BatchPointGetExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
for !req.IsFull() && e.index < len(e.values) {
handle, val := e.handles[e.index], e.values[e.index]
err := decodeRowValToChunk(e.base(), e.tblInfo, handle, val, req)
err := decodeRowValToChunk(e.base(), e.tblInfo, handle, val, req, e.rowDecoder)
if err != nil {
return err
}
Expand Down
36 changes: 36 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
Expand Down Expand Up @@ -2651,6 +2652,39 @@ func (b *executorBuilder) buildSQLBindExec(v *plannercore.SQLBindPlan) Executor
return e
}

func newRowDecoder(ctx sessionctx.Context, schema *expression.Schema, tbl *model.TableInfo) *rowcodec.ChunkDecoder {
getColInfoByID := func(tbl *model.TableInfo, colID int64) *model.ColumnInfo {
for _, col := range tbl.Columns {
if col.ID == colID {
return col
}
}
return nil
}
handleColID := int64(-1)
reqCols := make([]rowcodec.ColInfo, len(schema.Columns))
for i := range schema.Columns {
idx, col := i, schema.Columns[i]
isPK := (tbl.PKIsHandle && mysql.HasPriKeyFlag(col.RetType.Flag)) || col.ID == model.ExtraHandleID
if isPK {
handleColID = col.ID
}
reqCols[idx] = rowcodec.ColInfo{
ID: col.ID,
Tp: int32(col.RetType.Tp),
Flag: int32(col.RetType.Flag),
Flen: col.RetType.Flen,
Decimal: col.RetType.Decimal,
Elems: col.RetType.Elems,
}
}
defVal := func(i int) (types.Datum, error) {
ci := getColInfoByID(tbl, reqCols[i].ID)
return table.GetColDefaultValue(ctx, ci)
}
return rowcodec.NewChunkDecoder(reqCols, handleColID, defVal, ctx.GetSessionVars().TimeZone)
}

func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan) Executor {
if b.ctx.GetSessionVars().IsPessimisticReadConsistency() {
if err := b.refreshForUpdateTS(); err != nil {
Expand All @@ -2663,10 +2697,12 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
b.err = err
return nil
}
decoder := newRowDecoder(b.ctx, plan.Schema(), plan.TblInfo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse the Decoder for the same table?

Copy link
Contributor Author

@lysu lysu Dec 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

our PointGetExecutor is pointget + projection , so plan.Schema() will get different results for select a from t where id = 1 and select a, b from t where id = 1(len1 vs len2), in previous commits, I have try to reuse decoder in cached plan level, but after discuss with others, it introduce some complex code, maybe we can renew at here but do it later?

e := &BatchPointGetExec{
baseExecutor: newBaseExecutor(b.ctx, plan.Schema(), plan.ExplainID()),
tblInfo: plan.TblInfo,
idxInfo: plan.IndexInfo,
rowDecoder: decoder,
startTS: startTS,
}
var capacity int
Expand Down
4 changes: 3 additions & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import (
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/testutil"
Expand Down Expand Up @@ -3319,7 +3320,8 @@ func setColValue(c *C, txn kv.Transaction, key kv.Key, v types.Datum) {
row := []types.Datum{v, {}}
colIDs := []int64{2, 3}
sc := &stmtctx.StatementContext{TimeZone: time.Local}
value, err := tablecodec.EncodeRow(sc, row, colIDs, nil, nil)
rd := rowcodec.Encoder{Enable: true}
value, err := tablecodec.EncodeRow(sc, row, colIDs, nil, nil, &rd)
c.Assert(err, IsNil)
err = txn.Set(key, value)
c.Assert(err, IsNil)
Expand Down
52 changes: 45 additions & 7 deletions executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/rowcodec"
)

type memIndexReader struct {
Expand Down Expand Up @@ -139,8 +140,13 @@ type memTableReader struct {
addedRows [][]types.Datum
retFieldTypes []*types.FieldType
colIDs map[int64]int
buffer allocBuf
}

type allocBuf struct {
// cache for decode handle.
handleBytes []byte
rd *rowcodec.BytesDecoder
}

func buildMemTableReader(us *UnionScanExec, tblReader *TableReaderExecutor) *memTableReader {
Expand All @@ -149,6 +155,18 @@ func buildMemTableReader(us *UnionScanExec, tblReader *TableReaderExecutor) *mem
colIDs[col.ID] = i
}

colInfo := make([]rowcodec.ColInfo, 0, len(us.columns))
for i := range us.columns {
col := us.columns[i]
colInfo = append(colInfo, rowcodec.ColInfo{
ID: col.ID,
Tp: int32(col.Tp),
Flag: int32(col.Flag),
IsPKHandle: us.table.Meta().PKIsHandle && mysql.HasPriKeyFlag(col.Flag),
})
}

rd := rowcodec.NewByteDecoder(colInfo, -1, nil, nil)
return &memTableReader{
ctx: us.ctx,
table: us.table.Meta(),
Expand All @@ -159,7 +177,10 @@ func buildMemTableReader(us *UnionScanExec, tblReader *TableReaderExecutor) *mem
addedRows: make([][]types.Datum, 0, len(us.dirty.addedRows)),
retFieldTypes: retTypes(us),
colIDs: colIDs,
handleBytes: make([]byte, 0, 16),
buffer: allocBuf{
handleBytes: make([]byte, 0, 16),
rd: rd,
},
}
}

Expand Down Expand Up @@ -196,12 +217,12 @@ func (m *memTableReader) decodeRecordKeyValue(key, value []byte) ([]types.Datum,
if err != nil {
return nil, errors.Trace(err)
}
return decodeRowData(m.ctx, m.table, m.columns, m.colIDs, handle, m.handleBytes, value)
return decodeRowData(m.ctx, m.table, m.columns, m.colIDs, handle, value, &m.buffer)
}

// decodeRowData uses to decode row data value.
func decodeRowData(ctx sessionctx.Context, tb *model.TableInfo, columns []*model.ColumnInfo, colIDs map[int64]int, handle int64, cacheBytes, value []byte) ([]types.Datum, error) {
values, err := getRowData(ctx.GetSessionVars().StmtCtx, tb, columns, colIDs, handle, cacheBytes, value)
func decodeRowData(ctx sessionctx.Context, tb *model.TableInfo, columns []*model.ColumnInfo, colIDs map[int64]int, handle int64, value []byte, buffer *allocBuf) ([]types.Datum, error) {
values, err := getRowData(ctx.GetSessionVars().StmtCtx, tb, columns, colIDs, handle, value, buffer)
if err != nil {
return nil, err
}
Expand All @@ -218,8 +239,11 @@ func decodeRowData(ctx sessionctx.Context, tb *model.TableInfo, columns []*model
}

// getRowData decodes raw byte slice to row data.
func getRowData(ctx *stmtctx.StatementContext, tb *model.TableInfo, columns []*model.ColumnInfo, colIDs map[int64]int, handle int64, cacheBytes, value []byte) ([][]byte, error) {
func getRowData(ctx *stmtctx.StatementContext, tb *model.TableInfo, columns []*model.ColumnInfo, colIDs map[int64]int, handle int64, value []byte, buffer *allocBuf) ([][]byte, error) {
pkIsHandle := tb.PKIsHandle
if rowcodec.IsNewFormat(value) {
return buffer.rd.DecodeToBytes(colIDs, handle, value, buffer.handleBytes)
}
values, err := tablecodec.CutRowNew(value, colIDs)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -239,7 +263,7 @@ func getRowData(ctx *stmtctx.StatementContext, tb *model.TableInfo, columns []*m
} else {
handleDatum = types.NewIntDatum(handle)
}
handleData, err1 := codec.EncodeValue(ctx, cacheBytes, handleDatum)
handleData, err1 := codec.EncodeValue(ctx, buffer.handleBytes, handleDatum)
if err1 != nil {
return nil, errors.Trace(err1)
}
Expand Down Expand Up @@ -381,6 +405,17 @@ func (m *memIndexLookUpReader) getMemRows() ([][]types.Datum, error) {
colIDs[col.ID] = i
}

colInfos := make([]rowcodec.ColInfo, 0, len(m.columns))
for i := range m.columns {
col := m.columns[i]
colInfos = append(colInfos, rowcodec.ColInfo{
ID: col.ID,
Tp: int32(col.Tp),
Flag: int32(col.Flag),
IsPKHandle: m.table.Meta().PKIsHandle && mysql.HasPriKeyFlag(col.Flag),
})
}
rd := rowcodec.NewByteDecoder(colInfos, -1, nil, nil)
memTblReader := &memTableReader{
ctx: m.ctx,
table: m.table.Meta(),
Expand All @@ -390,7 +425,10 @@ func (m *memIndexLookUpReader) getMemRows() ([][]types.Datum, error) {
addedRows: make([][]types.Datum, 0, len(handles)),
retFieldTypes: m.retFieldTypes,
colIDs: colIDs,
handleBytes: make([]byte, 0, 16),
buffer: allocBuf{
handleBytes: make([]byte, 0, 16),
rd: rd,
},
}

return memTblReader.getMemRows()
Expand Down
15 changes: 13 additions & 2 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/rowcodec"
)

func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
Expand Down Expand Up @@ -64,10 +65,12 @@ type PointGetExecutor struct {
done bool
lock bool
lockWaitTime int64
rowDecoder *rowcodec.ChunkDecoder
}

// Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field
func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan, startTs uint64) {
decoder := newRowDecoder(e.ctx, p.Schema(), p.TblInfo)
e.tblInfo = p.TblInfo
e.handle = p.Handle
e.idxInfo = p.IndexInfo
Expand All @@ -76,6 +79,7 @@ func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan, startTs uint64) {
e.done = false
e.lock = p.Lock
e.lockWaitTime = p.LockWaitTime
e.rowDecoder = decoder
}

// Open implements the Executor interface.
Expand Down Expand Up @@ -156,7 +160,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
}
return nil
}
return decodeRowValToChunk(e.base(), e.tblInfo, e.handle, val, req)
return decodeRowValToChunk(e.base(), e.tblInfo, e.handle, val, req, e.rowDecoder)
}

func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) error {
Expand Down Expand Up @@ -212,7 +216,14 @@ func encodeIndexKey(e *baseExecutor, tblInfo *model.TableInfo, idxInfo *model.In
return tablecodec.EncodeIndexSeekKey(tblInfo.ID, idxInfo.ID, encodedIdxVals), nil
}

func decodeRowValToChunk(e *baseExecutor, tblInfo *model.TableInfo, handle int64, rowVal []byte, chk *chunk.Chunk) error {
func decodeRowValToChunk(e *baseExecutor, tblInfo *model.TableInfo, handle int64, rowVal []byte, chk *chunk.Chunk, rd *rowcodec.ChunkDecoder) error {
if rowcodec.IsNewFormat(rowVal) {
return rd.DecodeToChunk(rowVal, handle, chk)
}
return decodeOldRowValToChunk(e, tblInfo, handle, rowVal, chk)
}

func decodeOldRowValToChunk(e *baseExecutor, tblInfo *model.TableInfo, handle int64, rowVal []byte, chk *chunk.Chunk) error {
colID2CutPos := make(map[int64]int, e.schema.Len())
for _, col := range e.schema.Columns {
if _, ok := colID2CutPos[col.ID]; !ok {
Expand Down
Loading