From a6405cabeaeb64c6d7a20f902021a6f9fc966d89 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 8 Oct 2019 10:38:52 +0800 Subject: [PATCH 01/34] add_encode_type_flag --- distsql/select_result.go | 15 ++++++--------- go.mod | 2 ++ go.sum | 4 ++-- store/mockstore/mocktikv/cop_handler_dag.go | 14 ++++++++++---- 4 files changed, 20 insertions(+), 15 deletions(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index 754a55f5a3703..86bfc9f19d070 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -147,18 +147,15 @@ func (r *selectResult) NextRaw(ctx context.Context) (data []byte, err error) { func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error { chk.Reset() // Check the returned data is default/arrow format. - if r.selectResp == nil || (len(r.selectResp.RowBatchData) == 0 && r.respChkIdx == len(r.selectResp.Chunks)) { + if r.selectResp == nil || r.respChkIdx == len(r.selectResp.Chunks) { err := r.getSelectResp() if err != nil || r.selectResp == nil { return err } // TODO(Shenghui Wu): add metrics - if len(r.selectResp.RowBatchData) == 0 { - r.encodeType = tipb.EncodeType_TypeDefault - } } - switch r.encodeType { + switch r.selectResp.EncodeType { case tipb.EncodeType_TypeDefault: return r.readFromDefault(ctx, chk) case tipb.EncodeType_TypeArrow: @@ -187,10 +184,10 @@ func (r *selectResult) readFromDefault(ctx context.Context, chk *chunk.Chunk) er } func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) error { - rowBatchData := r.selectResp.RowBatchData + rowBatchData := r.selectResp.Chunks[r.respChkIdx].RowsData codec := chunk.NewCodec(r.fieldTypes) - remained := codec.DecodeToChunk(rowBatchData, chk) - r.selectResp.RowBatchData = remained + _ = codec.DecodeToChunk(rowBatchData, chk) + r.respChkIdx++ return nil } @@ -227,7 +224,7 @@ func (r *selectResult) getSelectResp() error { r.feedback.Update(re.result.GetStartKey(), r.selectResp.OutputCounts) r.partialCount++ sc.MergeExecDetails(re.result.GetExecDetails(), nil) - if len(r.selectResp.Chunks) == 0 && len(r.selectResp.RowBatchData) == 0 { + if len(r.selectResp.Chunks) == 0 { continue } return nil diff --git a/go.mod b/go.mod index e1ae085082f7b..156a72a8ed881 100644 --- a/go.mod +++ b/go.mod @@ -78,3 +78,5 @@ require ( ) go 1.13 + +replace github.com/pingcap/tipb => github.com/wshwsh12/tipb v0.0.0-20190927062335-949d0a64656b diff --git a/go.sum b/go.sum index dd4bbfd3b5d8d..e13944ca16504 100644 --- a/go.sum +++ b/go.sum @@ -170,8 +170,6 @@ github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0 h1:GIEq+wZfrl2bc github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0/go.mod h1:G/6rJpnYwM0LKMec2rI82/5Kg6GaZMvlfB+e6/tvYmI= github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible h1:MkWCxgZpJBgY2f4HtwWMMFzSBb3+JPzeJgF3VrXE/bU= github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= -github.com/pingcap/tipb v0.0.0-20190806070524-16909e03435e h1:H7meq8QPmWGImOkHTQYAWw82zwIqndJaCDPVUknOHbM= -github.com/pingcap/tipb v0.0.0-20190806070524-16909e03435e/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -239,6 +237,8 @@ github.com/unrolled/render v0.0.0-20180914162206-b9786414de4d h1:ggUgChAeyge4NZ4 github.com/unrolled/render v0.0.0-20180914162206-b9786414de4d/go.mod h1:tu82oB5W2ykJRVioYsB+IQKcft7ryBr7w12qMBUPyXg= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/negroni v0.3.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= +github.com/wshwsh12/tipb v0.0.0-20190927062335-949d0a64656b h1:d+rY07ZVitlbgSiVrm+dZSEf9osjN5Fr08bGvhMcn7c= +github.com/wshwsh12/tipb v0.0.0-20190927062335-949d0a64656b/go.mod h1:UFKGXY+Ij4rifkYSXsZOrEQcV70dCog1faHm7QKJiWs= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/yookoala/realpath v1.0.0/go.mod h1:gJJMA9wuX7AcqLy1+ffPatSCySA1FQ2S8Ya9AIoYBpE= diff --git a/store/mockstore/mocktikv/cop_handler_dag.go b/store/mockstore/mocktikv/cop_handler_dag.go index 2765dec84d8e2..155bac5c346aa 100644 --- a/store/mockstore/mocktikv/cop_handler_dag.go +++ b/store/mockstore/mocktikv/cop_handler_dag.go @@ -609,10 +609,11 @@ func (h *rpcHandler) encodeDefault(selResp *tipb.SelectResponse, rows [][][]byte chunks = appendRow(chunks, requestedRow, i) } selResp.Chunks = chunks + selResp.EncodeType = tipb.EncodeType_TypeDefault } func (h *rpcHandler) encodeArrow(selResp *tipb.SelectResponse, rows [][][]byte, colTypes []*types.FieldType, colOrdinal []uint32, loc *time.Location) error { - rowBatchData := make([]byte, 0, 1024) + var chunks []tipb.Chunk respColTypes := make([]*types.FieldType, 0, len(colOrdinal)) for _, ordinal := range colOrdinal { respColTypes = append(respColTypes, colTypes[ordinal]) @@ -628,15 +629,20 @@ func (h *rpcHandler) encodeArrow(selResp *tipb.SelectResponse, rows [][][]byte, } } if i%rowsPerChunk == rowsPerChunk-1 { - rowBatchData = append(rowBatchData, encoder.Encode(chk)...) + chunks = append(chunks, tipb.Chunk{}) + cur := &chunks[len(chunks)-1] + cur.RowsData = append(cur.RowsData, encoder.Encode(chk)...) chk.Reset() } } if chk.NumRows() > 0 { - rowBatchData = append(rowBatchData, encoder.Encode(chk)...) + chunks = append(chunks, tipb.Chunk{}) + cur := &chunks[len(chunks)-1] + cur.RowsData = append(cur.RowsData, encoder.Encode(chk)...) chk.Reset() } - selResp.RowBatchData = rowBatchData + selResp.Chunks = chunks + selResp.EncodeType = tipb.EncodeType_TypeArrow return nil } From 1c2bcd236de86f7dddef75da912f425ffcd0eaab Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 8 Oct 2019 15:03:09 +0800 Subject: [PATCH 02/34] tidy mod --- go.mod | 4 +--- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 156a72a8ed881..5ce9b13d9bde7 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( github.com/pingcap/parser v0.0.0-20190923031704-33636bc5e5d6 github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0 github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible - github.com/pingcap/tipb v0.0.0-20190806070524-16909e03435e + github.com/pingcap/tipb v0.0.0-20191008064422-018b2fadf414 github.com/prometheus/client_golang v0.9.0 github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 github.com/prometheus/common v0.0.0-20181020173914-7e9e6cabbd39 // indirect @@ -78,5 +78,3 @@ require ( ) go 1.13 - -replace github.com/pingcap/tipb => github.com/wshwsh12/tipb v0.0.0-20190927062335-949d0a64656b diff --git a/go.sum b/go.sum index e13944ca16504..2ea5436c0eea2 100644 --- a/go.sum +++ b/go.sum @@ -170,6 +170,8 @@ github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0 h1:GIEq+wZfrl2bc github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0/go.mod h1:G/6rJpnYwM0LKMec2rI82/5Kg6GaZMvlfB+e6/tvYmI= github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible h1:MkWCxgZpJBgY2f4HtwWMMFzSBb3+JPzeJgF3VrXE/bU= github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= +github.com/pingcap/tipb v0.0.0-20191008064422-018b2fadf414 h1:1HaTk+HEzn0rVcsAbVz9GLB9Ft3/KeSl2Sy452tDkHk= +github.com/pingcap/tipb v0.0.0-20191008064422-018b2fadf414/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -237,8 +239,6 @@ github.com/unrolled/render v0.0.0-20180914162206-b9786414de4d h1:ggUgChAeyge4NZ4 github.com/unrolled/render v0.0.0-20180914162206-b9786414de4d/go.mod h1:tu82oB5W2ykJRVioYsB+IQKcft7ryBr7w12qMBUPyXg= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/negroni v0.3.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= -github.com/wshwsh12/tipb v0.0.0-20190927062335-949d0a64656b h1:d+rY07ZVitlbgSiVrm+dZSEf9osjN5Fr08bGvhMcn7c= -github.com/wshwsh12/tipb v0.0.0-20190927062335-949d0a64656b/go.mod h1:UFKGXY+Ij4rifkYSXsZOrEQcV70dCog1faHm7QKJiWs= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/yookoala/realpath v1.0.0/go.mod h1:gJJMA9wuX7AcqLy1+ffPatSCySA1FQ2S8Ya9AIoYBpE= From 10d6dcff5241e02d84b11642124288af3d9f01f0 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 8 Oct 2019 16:04:44 +0800 Subject: [PATCH 03/34] fix --- distsql/select_result.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index 86bfc9f19d070..f09c6f2a2503d 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -152,9 +152,8 @@ func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error { if err != nil || r.selectResp == nil { return err } - // TODO(Shenghui Wu): add metrics } - + // TODO(Shenghui Wu): add metrics switch r.selectResp.EncodeType { case tipb.EncodeType_TypeDefault: return r.readFromDefault(ctx, chk) From 13d22ad59c65f4a6056d5e2cd76a65c2abe37e29 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Wed, 9 Oct 2019 00:07:25 +0800 Subject: [PATCH 04/34] temp --- distsql/select_result.go | 43 ++++++++++++++--- util/chunk/codec.go | 99 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 131 insertions(+), 11 deletions(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index 754a55f5a3703..e0f8bde01e320 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -16,6 +16,7 @@ package distsql import ( "context" "fmt" + "github.com/pingcap/parser/mysql" "time" "github.com/pingcap/errors" @@ -66,9 +67,10 @@ type selectResult struct { fieldTypes []*types.FieldType ctx sessionctx.Context - selectResp *tipb.SelectResponse - selectRespSize int // record the selectResp.Size() when it is initialized. - respChkIdx int + selectResp *tipb.SelectResponse + selectRespSize int // record the selectResp.Size() when it is initialized. + respChkIdx int + respArrowDecoder *chunk.ArrowDecoder feedback *statistics.QueryFeedback partialCount int64 // number of partial results. @@ -187,10 +189,37 @@ func (r *selectResult) readFromDefault(ctx context.Context, chk *chunk.Chunk) er } func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) error { - rowBatchData := r.selectResp.RowBatchData - codec := chunk.NewCodec(r.fieldTypes) - remained := codec.DecodeToChunk(rowBatchData, chk) - r.selectResp.RowBatchData = remained + if r.respArrowDecoder == nil { + decoder := &chunk.ArrowDecoder{ + chunk.NewChunkWithCapacity(r.fieldTypes, r.ctx.GetSessionVars().MaxChunkSize), + r.fieldTypes, + 0, + 0, + } + r.respArrowDecoder = decoder + } + + for !chk.IsFull() { + if r.respChkIdx == len(r.selectResp.Chunks) && r.respArrowDecoder.Empty() { + err := r.getSelectResp() + if err != nil || r.selectResp == nil { + return err + } + } + + if r.respArrowDecoder.Len() >= chk.RequiredRows()-chk.NumRows() { + r.respArrowDecoder.Decode(chk, chk.RequiredRows()-chk.NumRows()) + } else { + r.respArrowDecoder.Decode(chk, r.respArrowDecoder.Len()) + } + + if r.respArrowDecoder.Empty() { + r.respArrowDecoder.Reset() + codec := chunk.NewCodec(r.fieldTypes) + _ := codec.DecodeToReadOnlyChunk(r.selectResp.Chunks[r.respChkIdx].RowsData, r.respArrowDecoder.GetChunk()) + r.respChkIdx++ + } + } return nil } diff --git a/util/chunk/codec.go b/util/chunk/codec.go index 24363ee344d8d..092e6e3948af8 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -41,7 +41,7 @@ func NewCodec(colTypes []*types.FieldType) *Codec { // Encode encodes a Chunk to a byte slice. func (c *Codec) Encode(chk *Chunk) []byte { buffer := make([]byte, 0, chk.MemoryUsage()) - for _, col := range chk.columns { + for _, col := range chk.Columns { buffer = c.encodeColumn(buffer, col) } return buffer @@ -92,15 +92,15 @@ func (c *Codec) Decode(buffer []byte) (*Chunk, []byte) { for ordinal := 0; len(buffer) > 0; ordinal++ { col := &Column{} buffer = c.decodeColumn(buffer, col, ordinal) - chk.columns = append(chk.columns, col) + chk.Columns = append(chk.Columns, col) } return chk, buffer } // DecodeToChunk decodes a Chunk from a byte slice, return the remained unused bytes. func (c *Codec) DecodeToChunk(buffer []byte, chk *Chunk) (remained []byte) { - for i := 0; i < len(chk.columns); i++ { - buffer = c.decodeColumn(buffer, chk.columns[i], i) + for i := 0; i < len(chk.Columns); i++ { + buffer = c.decodeColumn(buffer, chk.Columns[i], i) } return buffer } @@ -154,6 +154,51 @@ func (c *Codec) setAllNotNull(col *Column) { } } +// DecodeToChunk decodes a Read-Only Chunk from a byte slice, return the remained unused bytes. +func (c *Codec) DecodeToReadOnlyChunk(buffer []byte, chk *Chunk) (remained []byte) { + for i := 0; i < len(chk.Columns); i++ { + buffer = c.decodeReadOnlyColumn(buffer, chk.Columns[i], i) + } + return buffer +} + +// decodeColumn decodes a Read-Only Column from a byte slice, return the remained unused bytes. +func (c *Codec) decodeReadOnlyColumn(buffer []byte, col *Column, ordinal int) (remained []byte) { + // Todo(Shenghui Wu): Optimize all data is null. + // decode length. + col.length = int(binary.LittleEndian.Uint32(buffer)) + buffer = buffer[4:] + + // decode nullCount. + nullCount := int(binary.LittleEndian.Uint32(buffer)) + buffer = buffer[4:] + + // decode nullBitmap. + if nullCount > 0 { + numNullBitmapBytes := (col.length + 7) / 8 + col.nullBitmap = buffer[:numNullBitmapBytes] + buffer = buffer[numNullBitmapBytes:] + } else { + c.setAllNotNull(col) + } + + // decode offsets. + numFixedBytes := getFixedLen(c.colTypes[ordinal]) + numDataBytes := int64(numFixedBytes * col.length) + if numFixedBytes == -1 { + numOffsetBytes := (col.length + 1) * 8 + col.offsets = bytesToI64Slice(buffer[:numOffsetBytes]) + buffer = buffer[numOffsetBytes:] + numDataBytes = col.offsets[col.length] + } else if cap(col.elemBuf) < numFixedBytes { + col.elemBuf = make([]byte, numFixedBytes) + } + + // decode data. + col.data = buffer[:numDataBytes] + return buffer[numDataBytes:] +} + func bytesToI64Slice(b []byte) (i64s []int64) { if len(b) == 0 { return nil @@ -189,3 +234,49 @@ func init() { allNotNullBitmap[i] = 0xFF } } + +type ArrowDecoder struct { + chk *Chunk + colTypes []*types.FieldType + cur int + rows int +} + +func (c ArrowDecoder) Decode(target *Chunk, requestRows int) { + for i := 0; i < len(c.colTypes); i++ { + c.decodeColumn(target, i, requestRows) + } + c.cur = c.cur + requestRows +} + +func (c ArrowDecoder) Reset() { + c.cur = 0 + c.rows = 0 +} + +func (c ArrowDecoder) GetChunk() *Chunk { + return c.chk +} + +func (c ArrowDecoder) Len() int { + return c.rows - c.cur +} + +func (c ArrowDecoder) Empty() bool { + return c.cur == c.rows +} + +func (c ArrowDecoder) decodeColumn(target *Chunk, ordinal int, requestRows int) { + numFixedBytes := getFixedLen(c.colTypes[ordinal]) + + if numFixedBytes == -1 { + numOffsetBytes := (col.length + 1) * 8 + col.offsets = bytesToI64Slice(buffer[:numOffsetBytes]) + buffer = buffer[numOffsetBytes:] + numDataBytes = col.offsets[col.length] + } else if cap(col.elemBuf) < numFixedBytes { + col.elemBuf = make([]byte, numFixedBytes) + } + + target.Columns[ordinal].data = buffer[:numDataBytes] +} From 32c7bfec84330b38b4f39fbb89b85fca3da03d34 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Wed, 9 Oct 2019 13:12:40 +0800 Subject: [PATCH 05/34] requested_rows --- config/config.go | 2 +- distsql/select_result.go | 10 ++++------ util/chunk/codec.go | 40 +++++++++++++++++++++++++++------------- 3 files changed, 32 insertions(+), 20 deletions(-) diff --git a/config/config.go b/config/config.go index 23c6768b29f6f..37ee7666351b3 100644 --- a/config/config.go +++ b/config/config.go @@ -409,7 +409,7 @@ var defaultConf = Config{ MaxBatchWaitTime: 0, BatchWaitSize: 8, - EnableArrow: true, + EnableArrow: false, }, Binlog: Binlog{ WriteTimeout: "15s", diff --git a/distsql/select_result.go b/distsql/select_result.go index e0f8bde01e320..8d7802f3dc07a 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -16,7 +16,6 @@ package distsql import ( "context" "fmt" - "github.com/pingcap/parser/mysql" "time" "github.com/pingcap/errors" @@ -189,13 +188,12 @@ func (r *selectResult) readFromDefault(ctx context.Context, chk *chunk.Chunk) er } func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) error { + chk.SetRequiredRows(100000, 1024) if r.respArrowDecoder == nil { - decoder := &chunk.ArrowDecoder{ + decoder := chunk.NewArrowDecoder( chunk.NewChunkWithCapacity(r.fieldTypes, r.ctx.GetSessionVars().MaxChunkSize), r.fieldTypes, - 0, - 0, - } + ) r.respArrowDecoder = decoder } @@ -216,7 +214,7 @@ func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) erro if r.respArrowDecoder.Empty() { r.respArrowDecoder.Reset() codec := chunk.NewCodec(r.fieldTypes) - _ := codec.DecodeToReadOnlyChunk(r.selectResp.Chunks[r.respChkIdx].RowsData, r.respArrowDecoder.GetChunk()) + _ = codec.DecodeToReadOnlyChunk(r.selectResp.Chunks[r.respChkIdx].RowsData, r.respArrowDecoder.GetChunk()) r.respChkIdx++ } } diff --git a/util/chunk/codec.go b/util/chunk/codec.go index 092e6e3948af8..2ab3df5b26f4a 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -41,7 +41,7 @@ func NewCodec(colTypes []*types.FieldType) *Codec { // Encode encodes a Chunk to a byte slice. func (c *Codec) Encode(chk *Chunk) []byte { buffer := make([]byte, 0, chk.MemoryUsage()) - for _, col := range chk.Columns { + for _, col := range chk.columns { buffer = c.encodeColumn(buffer, col) } return buffer @@ -92,15 +92,15 @@ func (c *Codec) Decode(buffer []byte) (*Chunk, []byte) { for ordinal := 0; len(buffer) > 0; ordinal++ { col := &Column{} buffer = c.decodeColumn(buffer, col, ordinal) - chk.Columns = append(chk.Columns, col) + chk.columns = append(chk.columns, col) } return chk, buffer } // DecodeToChunk decodes a Chunk from a byte slice, return the remained unused bytes. func (c *Codec) DecodeToChunk(buffer []byte, chk *Chunk) (remained []byte) { - for i := 0; i < len(chk.Columns); i++ { - buffer = c.decodeColumn(buffer, chk.Columns[i], i) + for i := 0; i < len(chk.columns); i++ { + buffer = c.decodeColumn(buffer, chk.columns[i], i) } return buffer } @@ -156,8 +156,8 @@ func (c *Codec) setAllNotNull(col *Column) { // DecodeToChunk decodes a Read-Only Chunk from a byte slice, return the remained unused bytes. func (c *Codec) DecodeToReadOnlyChunk(buffer []byte, chk *Chunk) (remained []byte) { - for i := 0; i < len(chk.Columns); i++ { - buffer = c.decodeReadOnlyColumn(buffer, chk.Columns[i], i) + for i := 0; i < len(chk.columns); i++ { + buffer = c.decodeReadOnlyColumn(buffer, chk.columns[i], i) } return buffer } @@ -242,6 +242,10 @@ type ArrowDecoder struct { rows int } +func NewArrowDecoder(chk *Chunk, colTypes []*types.FieldType) *ArrowDecoder { + return &ArrowDecoder{chk: chk, colTypes: colTypes, cur: 0, rows: 0} +} + func (c ArrowDecoder) Decode(target *Chunk, requestRows int) { for i := 0; i < len(c.colTypes); i++ { c.decodeColumn(target, i, requestRows) @@ -268,15 +272,25 @@ func (c ArrowDecoder) Empty() bool { func (c ArrowDecoder) decodeColumn(target *Chunk, ordinal int, requestRows int) { numFixedBytes := getFixedLen(c.colTypes[ordinal]) + numDataBytes := int64(numFixedBytes * requestRows) + colSource := c.chk.columns[ordinal] + colTarget := target.columns[ordinal] if numFixedBytes == -1 { - numOffsetBytes := (col.length + 1) * 8 - col.offsets = bytesToI64Slice(buffer[:numOffsetBytes]) - buffer = buffer[numOffsetBytes:] - numDataBytes = col.offsets[col.length] - } else if cap(col.elemBuf) < numFixedBytes { - col.elemBuf = make([]byte, numFixedBytes) + colTarget.offsets = append(colTarget.offsets, colSource.offsets[1:requestRows]...) + for i := colTarget.length; i < colTarget.length+requestRows; i++ { + colTarget.offsets[i] = colTarget.offsets[i] - colSource.offsets[0] + colTarget.offsets[colTarget.length] + } + colSource.offsets = colSource.offsets[requestRows:] + } else if cap(colTarget.elemBuf) < numFixedBytes { + colTarget.elemBuf = make([]byte, numFixedBytes) + } + + for i := c.cur; i < c.cur+requestRows; i++ { + colTarget.appendNullBitmap(!colSource.IsNull(i)) + colTarget.length++ } - target.Columns[ordinal].data = buffer[:numDataBytes] + colTarget.data = append(colTarget.data, colSource.data[:numDataBytes]...) + colSource.data = colSource.data[numDataBytes:] } From ec06f30cb2172a9ea4c3d8aeb489229a34bb50e6 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Thu, 10 Oct 2019 12:34:29 +0800 Subject: [PATCH 06/34] fix --- config/config.go | 2 +- distsql/select_result.go | 5 ++++- session/session_test.go | 2 +- util/chunk/codec.go | 2 +- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/config/config.go b/config/config.go index 37ee7666351b3..23c6768b29f6f 100644 --- a/config/config.go +++ b/config/config.go @@ -409,7 +409,7 @@ var defaultConf = Config{ MaxBatchWaitTime: 0, BatchWaitSize: 8, - EnableArrow: false, + EnableArrow: true, }, Binlog: Binlog{ WriteTimeout: "15s", diff --git a/distsql/select_result.go b/distsql/select_result.go index 7d4981e9adb62..7092ce281b0e4 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -203,7 +203,6 @@ func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) erro codec := chunk.NewCodec(r.fieldTypes) _ = codec.DecodeToReadOnlyChunk(r.selectResp.Chunks[r.respChkIdx].RowsData, r.respArrowDecoder.GetChunk()) r.respArrowDecoder.Reset() - r.respChkIdx++ } if r.respArrowDecoder.Len() >= chk.RequiredRows()-chk.NumRows() { @@ -211,6 +210,10 @@ func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) erro } else { r.respArrowDecoder.Decode(chk, r.respArrowDecoder.Len()) } + + if r.respArrowDecoder.Empty() { + r.respChkIdx++ + } } return nil } diff --git a/session/session_test.go b/session/session_test.go index 2ae04357f0fe2..5580141d2393f 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2018,7 +2018,7 @@ func (s *testSchemaSuite) TestTableReaderChunk(c *C) { } c.Assert(count, Equals, 100) // FIXME: revert this result to new group value after distsql can handle initChunkSize. - c.Assert(numChunks, Equals, 10) + c.Assert(numChunks, Equals, 1) rs.Close() } diff --git a/util/chunk/codec.go b/util/chunk/codec.go index b1a8c9afc530c..d348028b7a251 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -278,7 +278,7 @@ func (c *ArrowDecoder) decodeColumn(target *Chunk, ordinal int, requestRows int) if numFixedBytes == -1 { colTarget.offsets = append(colTarget.offsets, colSource.offsets[1:requestRows+1]...) - for i := colTarget.length; i < colTarget.length+requestRows; i++ { + for i := colTarget.length + 1; i <= colTarget.length+requestRows; i++ { colTarget.offsets[i] = colTarget.offsets[i] - colSource.offsets[0] + colTarget.offsets[colTarget.length] } colSource.offsets = colSource.offsets[requestRows:] From b0f51053364b2ed94297fa7f4bcff4f1efdb887b Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Thu, 10 Oct 2019 15:11:38 +0800 Subject: [PATCH 07/34] adjust code --- distsql/select_result.go | 10 ++---- util/chunk/codec.go | 77 ++++++++++------------------------------ 2 files changed, 20 insertions(+), 67 deletions(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index 7092ce281b0e4..8dad718c96011 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -200,16 +200,10 @@ func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) erro } if r.respArrowDecoder.Empty() { - codec := chunk.NewCodec(r.fieldTypes) - _ = codec.DecodeToReadOnlyChunk(r.selectResp.Chunks[r.respChkIdx].RowsData, r.respArrowDecoder.GetChunk()) - r.respArrowDecoder.Reset() + r.respArrowDecoder.Reset(r.selectResp.Chunks[r.respChkIdx].RowsData) } - if r.respArrowDecoder.Len() >= chk.RequiredRows()-chk.NumRows() { - r.respArrowDecoder.Decode(chk, chk.RequiredRows()-chk.NumRows()) - } else { - r.respArrowDecoder.Decode(chk, r.respArrowDecoder.Len()) - } + r.respArrowDecoder.Decode(chk) if r.respArrowDecoder.Empty() { r.respChkIdx++ diff --git a/util/chunk/codec.go b/util/chunk/codec.go index d348028b7a251..7feea50ee8811 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -119,7 +119,7 @@ func (c *Codec) decodeColumn(buffer []byte, col *Column, ordinal int) (remained // decode nullBitmap. if nullCount > 0 { numNullBitmapBytes := (col.length + 7) / 8 - col.nullBitmap = append(col.nullBitmap[:0], buffer[:numNullBitmapBytes]...) + col.nullBitmap = buffer[:numNullBitmapBytes] buffer = buffer[numNullBitmapBytes:] } else { c.setAllNotNull(col) @@ -130,7 +130,7 @@ func (c *Codec) decodeColumn(buffer []byte, col *Column, ordinal int) (remained numDataBytes := int64(numFixedBytes * col.length) if numFixedBytes == -1 { numOffsetBytes := (col.length + 1) * 8 - col.offsets = append(col.offsets[:0], bytesToI64Slice(buffer[:numOffsetBytes])...) + col.offsets = bytesToI64Slice(buffer[:numOffsetBytes]) buffer = buffer[numOffsetBytes:] numDataBytes = col.offsets[col.length] } else if cap(col.elemBuf) < numFixedBytes { @@ -138,7 +138,7 @@ func (c *Codec) decodeColumn(buffer []byte, col *Column, ordinal int) (remained } // decode data. - col.data = append(col.data[:0], buffer[:numDataBytes]...) + col.data = buffer[:numDataBytes] return buffer[numDataBytes:] } @@ -154,51 +154,6 @@ func (c *Codec) setAllNotNull(col *Column) { } } -// DecodeToChunk decodes a Read-Only Chunk from a byte slice, return the remained unused bytes. -func (c *Codec) DecodeToReadOnlyChunk(buffer []byte, chk *Chunk) (remained []byte) { - for i := 0; i < len(chk.columns); i++ { - buffer = c.decodeReadOnlyColumn(buffer, chk.columns[i], i) - } - return buffer -} - -// decodeColumn decodes a Read-Only Column from a byte slice, return the remained unused bytes. -func (c *Codec) decodeReadOnlyColumn(buffer []byte, col *Column, ordinal int) (remained []byte) { - // Todo(Shenghui Wu): Optimize all data is null. - // decode length. - col.length = int(binary.LittleEndian.Uint32(buffer)) - buffer = buffer[4:] - - // decode nullCount. - nullCount := int(binary.LittleEndian.Uint32(buffer)) - buffer = buffer[4:] - - // decode nullBitmap. - if nullCount > 0 { - numNullBitmapBytes := (col.length + 7) / 8 - col.nullBitmap = buffer[:numNullBitmapBytes] - buffer = buffer[numNullBitmapBytes:] - } else { - c.setAllNotNull(col) - } - - // decode offsets. - numFixedBytes := getFixedLen(c.colTypes[ordinal]) - numDataBytes := int64(numFixedBytes * col.length) - if numFixedBytes == -1 { - numOffsetBytes := (col.length + 1) * 8 - col.offsets = bytesToI64Slice(buffer[:numOffsetBytes]) - buffer = buffer[numOffsetBytes:] - numDataBytes = col.offsets[col.length] - } else if cap(col.elemBuf) < numFixedBytes { - col.elemBuf = make([]byte, numFixedBytes) - } - - // decode data. - col.data = buffer[:numDataBytes] - return buffer[numDataBytes:] -} - func bytesToI64Slice(b []byte) (i64s []int64) { if len(b) == 0 { return nil @@ -235,6 +190,8 @@ func init() { } } +// ArrowDecoder is used to: +// 1. decode a Chunk from a byte slice. type ArrowDecoder struct { chk *Chunk colTypes []*types.FieldType @@ -242,30 +199,32 @@ type ArrowDecoder struct { rows int } +// NewArrowDecoder creates a new ArrowDecoder object for decode a Chunk. func NewArrowDecoder(chk *Chunk, colTypes []*types.FieldType) *ArrowDecoder { return &ArrowDecoder{chk: chk, colTypes: colTypes, cur: 0, rows: 0} } -func (c *ArrowDecoder) Decode(target *Chunk, requestRows int) { +// Decode decodes a Chunk from ArrowDecoder, save the remained unused bytes. +func (c *ArrowDecoder) Decode(target *Chunk) { + requestRows := target.RequiredRows() - target.NumRows() + if requestRows > c.rows-c.cur { + requestRows = c.rows - c.cur + } for i := 0; i < len(c.colTypes); i++ { c.decodeColumn(target, i, requestRows) } c.cur = c.cur + requestRows } -func (c *ArrowDecoder) Reset() { +// Reset resets ArrowDecoder using byte slice. +func (c *ArrowDecoder) Reset(data []byte) { + codec := NewCodec(c.colTypes) + codec.DecodeToChunk(data, c.chk) c.cur = 0 c.rows = c.chk.NumRows() } -func (c *ArrowDecoder) GetChunk() *Chunk { - return c.chk -} - -func (c *ArrowDecoder) Len() int { - return c.rows - c.cur -} - +// Empty indicate the ArrowDecoder is empty. func (c *ArrowDecoder) Empty() bool { return c.cur == c.rows } @@ -277,12 +236,12 @@ func (c *ArrowDecoder) decodeColumn(target *Chunk, ordinal int, requestRows int) colTarget := target.columns[ordinal] if numFixedBytes == -1 { + numDataBytes = colSource.offsets[requestRows] - colSource.offsets[0] colTarget.offsets = append(colTarget.offsets, colSource.offsets[1:requestRows+1]...) for i := colTarget.length + 1; i <= colTarget.length+requestRows; i++ { colTarget.offsets[i] = colTarget.offsets[i] - colSource.offsets[0] + colTarget.offsets[colTarget.length] } colSource.offsets = colSource.offsets[requestRows:] - numDataBytes = colTarget.offsets[colTarget.length+requestRows] - colTarget.offsets[colTarget.length] } else if cap(colTarget.elemBuf) < numFixedBytes { colTarget.elemBuf = make([]byte, numFixedBytes) } From 23149ca9e64b2396792bcf8813d606463b3774ef Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Fri, 11 Oct 2019 11:03:14 +0800 Subject: [PATCH 08/34] remove useless check --- distsql/select_result.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index 8dad718c96011..32af68e5a9024 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -192,7 +192,7 @@ func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) erro } for !chk.IsFull() { - if r.respChkIdx == len(r.selectResp.Chunks) && r.respArrowDecoder.Empty() { + if r.respChkIdx == len(r.selectResp.Chunks) { err := r.getSelectResp() if err != nil || r.selectResp == nil { return err From c5ac8b2bb6ff4d6a5eca1a62b0b1e07d71829f23 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Sat, 12 Oct 2019 13:52:53 +0800 Subject: [PATCH 09/34] temp --- distsql/distsql_test.go | 126 +++++++++++++++++++++++++++++++++++++++ distsql/select_result.go | 3 +- session/session_test.go | 2 +- util/chunk/codec.go | 57 ++++++++++++++++-- 4 files changed, 181 insertions(+), 7 deletions(-) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 2f70a02ab6ffc..a02632893175b 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -529,3 +529,129 @@ func BenchmarkDecodeToChunkWithNewChunk(b *testing.B) { codec.DecodeToChunk(buffer, chk) } } + +func BenchmarkDecodeToChunkWithRequestedRows_int(b *testing.B) { + numCols := 4 + numRows := 1024 + + colTypes := make([]*types.FieldType, numCols) + for i := 0; i < numCols; i++ { + colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} + } + chk := chunk.New(colTypes, numRows, numRows) + + for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { + for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { + chk.AppendInt64(colOrdinal, 123) + } + } + + codec := chunk.NewCodec(colTypes) + buffer := codec.Encode(chk) + + acodec := chunk.NewArrowDecoder( + chunk.NewChunkWithCapacity(colTypes, 0), + colTypes) + + chk.SetRequiredRows(1024, 1024) + chk.Reset() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + acodec.Reset(buffer) + for !acodec.Empty() { + for !chk.IsFull() && !acodec.Empty() { + acodec.Decode(chk) + } + chk.Reset() + } + } +} + +func BenchmarkDecodeToChunk_int(b *testing.B) { + numCols := 4 + numRows := 1024 + + colTypes := make([]*types.FieldType, numCols) + for i := 0; i < numCols; i++ { + colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} + } + chk := chunk.New(colTypes, numRows, numRows) + + for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { + for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { + chk.AppendInt64(colOrdinal, 123) + } + } + + codec := chunk.NewCodec(colTypes) + buffer := codec.Encode(chk) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + codec.DecodeToChunkTest(buffer, chk) + } +} + +func BenchmarkDecodeToChunkWithRequestedRows_string(b *testing.B) { + numCols := 4 + numRows := 1024 + + colTypes := make([]*types.FieldType, numCols) + for i := 0; i < numCols; i++ { + colTypes[i] = &types.FieldType{Tp: mysql.TypeString} + } + chk := chunk.New(colTypes, numRows, numRows) + + for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { + for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { + chk.AppendString(colOrdinal, "123456") + } + } + + codec := chunk.NewCodec(colTypes) + buffer := codec.Encode(chk) + + acodec := chunk.NewArrowDecoder( + chunk.NewChunkWithCapacity(colTypes, 0), + colTypes) + + chk.SetRequiredRows(1024, 1024) + chk.Reset() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + acodec.Reset(buffer) + for !acodec.Empty() { + for !chk.IsFull() && !acodec.Empty() { + acodec.Decode(chk) + } + chk.Reset() + } + } +} + +func BenchmarkDecodeToChunk_string(b *testing.B) { + numCols := 4 + numRows := 1024 + + colTypes := make([]*types.FieldType, numCols) + for i := 0; i < numCols; i++ { + colTypes[i] = &types.FieldType{Tp: mysql.TypeString} + } + chk := chunk.New(colTypes, numRows, numRows) + + for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { + for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { + chk.AppendString(colOrdinal, "123456") + } + } + + codec := chunk.NewCodec(colTypes) + buffer := codec.Encode(chk) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + codec.DecodeToChunkTest(buffer, chk) + } +} diff --git a/distsql/select_result.go b/distsql/select_result.go index 32af68e5a9024..101c679e44716 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -186,7 +186,7 @@ func (r *selectResult) readFromDefault(ctx context.Context, chk *chunk.Chunk) er func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) error { if r.respArrowDecoder == nil { r.respArrowDecoder = chunk.NewArrowDecoder( - chunk.NewChunkWithCapacity(r.fieldTypes, r.ctx.GetSessionVars().MaxChunkSize), + chunk.NewChunkWithCapacity(r.fieldTypes, 0), r.fieldTypes, ) } @@ -207,6 +207,7 @@ func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) erro if r.respArrowDecoder.Empty() { r.respChkIdx++ + return nil } } return nil diff --git a/session/session_test.go b/session/session_test.go index 5580141d2393f..2ae04357f0fe2 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2018,7 +2018,7 @@ func (s *testSchemaSuite) TestTableReaderChunk(c *C) { } c.Assert(count, Equals, 100) // FIXME: revert this result to new group value after distsql can handle initChunkSize. - c.Assert(numChunks, Equals, 1) + c.Assert(numChunks, Equals, 10) rs.Close() } diff --git a/util/chunk/codec.go b/util/chunk/codec.go index 7feea50ee8811..937459ec0d064 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -207,6 +207,7 @@ func NewArrowDecoder(chk *Chunk, colTypes []*types.FieldType) *ArrowDecoder { // Decode decodes a Chunk from ArrowDecoder, save the remained unused bytes. func (c *ArrowDecoder) Decode(target *Chunk) { requestRows := target.RequiredRows() - target.NumRows() + requestRows = (requestRows + 7) >> 3 << 3 if requestRows > c.rows-c.cur { requestRows = c.rows - c.cur } @@ -238,19 +239,65 @@ func (c *ArrowDecoder) decodeColumn(target *Chunk, ordinal int, requestRows int) if numFixedBytes == -1 { numDataBytes = colSource.offsets[requestRows] - colSource.offsets[0] colTarget.offsets = append(colTarget.offsets, colSource.offsets[1:requestRows+1]...) + deltaOffset := -colSource.offsets[0] + colTarget.offsets[colTarget.length] for i := colTarget.length + 1; i <= colTarget.length+requestRows; i++ { - colTarget.offsets[i] = colTarget.offsets[i] - colSource.offsets[0] + colTarget.offsets[colTarget.length] + colTarget.offsets[i] = colTarget.offsets[i] + deltaOffset } colSource.offsets = colSource.offsets[requestRows:] } else if cap(colTarget.elemBuf) < numFixedBytes { colTarget.elemBuf = make([]byte, numFixedBytes) } - for i := c.cur; i < c.cur+requestRows; i++ { - colTarget.appendNullBitmap(!colSource.IsNull(i)) - colTarget.length++ - } + numNullBitmapBytes := (requestRows + 7) >> 3 + colTarget.nullBitmap = append(colTarget.nullBitmap, colSource.nullBitmap[:numNullBitmapBytes]...) + colSource.nullBitmap = colSource.nullBitmap[numNullBitmapBytes:] + colTarget.length += requestRows colTarget.data = append(colTarget.data, colSource.data[:numDataBytes]...) colSource.data = colSource.data[numDataBytes:] } + +// DecodeToChunk decodes a Chunk from a byte slice, return the remained unused bytes. +func (c *Codec) DecodeToChunkTest(buffer []byte, chk *Chunk) (remained []byte) { + for i := 0; i < len(chk.columns); i++ { + buffer = c.decodeColumnTest(buffer, chk.columns[i], i) + } + return buffer +} + +// decodeColumn decodes a Column from a byte slice, return the remained unused bytes. +func (c *Codec) decodeColumnTest(buffer []byte, col *Column, ordinal int) (remained []byte) { + // Todo(Shenghui Wu): Optimize all data is null. + // decode length. + col.length = int(binary.LittleEndian.Uint32(buffer)) + buffer = buffer[4:] + + // decode nullCount. + nullCount := int(binary.LittleEndian.Uint32(buffer)) + buffer = buffer[4:] + + // decode nullBitmap. + if nullCount > 0 { + numNullBitmapBytes := (col.length + 7) / 8 + col.nullBitmap = append(col.nullBitmap[:0], buffer[:numNullBitmapBytes]...) + buffer = buffer[numNullBitmapBytes:] + } else { + c.setAllNotNull(col) + } + + // decode offsets. + numFixedBytes := getFixedLen(c.colTypes[ordinal]) + numDataBytes := int64(numFixedBytes * col.length) + if numFixedBytes == -1 { + numOffsetBytes := (col.length + 1) * 8 + col.offsets = append(col.offsets[:0], bytesToI64Slice(buffer[:numOffsetBytes])...) + buffer = buffer[numOffsetBytes:] + numDataBytes = col.offsets[col.length] + } else if cap(col.elemBuf) < numFixedBytes { + col.elemBuf = make([]byte, numFixedBytes) + } + + // decode data. + col.data = append(col.data[:0], buffer[:numDataBytes]...) + return buffer[numDataBytes:] +} From 0743aa2211f758afb257b84fed98e24f79d298f1 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Mon, 14 Oct 2019 15:21:45 +0800 Subject: [PATCH 10/34] fix --- util/chunk/codec.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/util/chunk/codec.go b/util/chunk/codec.go index 937459ec0d064..57874dec62c3c 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -249,8 +249,21 @@ func (c *ArrowDecoder) decodeColumn(target *Chunk, ordinal int, requestRows int) } numNullBitmapBytes := (requestRows + 7) >> 3 - colTarget.nullBitmap = append(colTarget.nullBitmap, colSource.nullBitmap[:numNullBitmapBytes]...) - colSource.nullBitmap = colSource.nullBitmap[numNullBitmapBytes:] + if colTarget.length%8 == 0 { + colTarget.nullBitmap = append(colTarget.nullBitmap, colSource.nullBitmap[:numNullBitmapBytes]...) + colSource.nullBitmap = colSource.nullBitmap[numNullBitmapBytes:] + } else { + colTarget.appendMultiSameNullBitmap(false, requestRows) + bitMapLen := len(colTarget.nullBitmap) + bitOffset := colTarget.length % 8 + bitMapStartIndex := (colTarget.length - 1) >> 3 + for i := 0; i < numNullBitmapBytes; i++ { + colTarget.nullBitmap[bitMapStartIndex+i] |= colSource.nullBitmap[i] << bitOffset + if bitMapLen > bitMapStartIndex+i+1 { + colTarget.nullBitmap[bitMapStartIndex+i+1] |= colSource.nullBitmap[i] >> (8 - bitOffset) + } + } + } colTarget.length += requestRows colTarget.data = append(colTarget.data, colSource.data[:numDataBytes]...) From 1cf0853b1c2c901acbe45c7088c911eddbbc9a06 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Mon, 14 Oct 2019 16:42:19 +0800 Subject: [PATCH 11/34] fix --- util/chunk/codec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/chunk/codec.go b/util/chunk/codec.go index 4be109581a8e8..8daa80094fd86 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -251,7 +251,6 @@ func (c *ArrowDecoder) decodeColumn(target *Chunk, ordinal int, requestRows int) numNullBitmapBytes := (requestRows + 7) >> 3 if colTarget.length%8 == 0 { colTarget.nullBitmap = append(colTarget.nullBitmap, colSource.nullBitmap[:numNullBitmapBytes]...) - colSource.nullBitmap = colSource.nullBitmap[numNullBitmapBytes:] } else { colTarget.appendMultiSameNullBitmap(false, requestRows) bitMapLen := len(colTarget.nullBitmap) @@ -264,6 +263,7 @@ func (c *ArrowDecoder) decodeColumn(target *Chunk, ordinal int, requestRows int) } } } + colSource.nullBitmap = colSource.nullBitmap[numNullBitmapBytes:] colTarget.length += requestRows colTarget.data = append(colTarget.data, colSource.data[:numDataBytes]...) From d72d5af45e7e3074e088f00e10510e8c4e9962db Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Mon, 14 Oct 2019 17:40:24 +0800 Subject: [PATCH 12/34] fix --- distsql/select_result.go | 1 - session/session_test.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index 101c679e44716..7de22512a6862 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -207,7 +207,6 @@ func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) erro if r.respArrowDecoder.Empty() { r.respChkIdx++ - return nil } } return nil diff --git a/session/session_test.go b/session/session_test.go index 2ae04357f0fe2..5580141d2393f 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2018,7 +2018,7 @@ func (s *testSchemaSuite) TestTableReaderChunk(c *C) { } c.Assert(count, Equals, 100) // FIXME: revert this result to new group value after distsql can handle initChunkSize. - c.Assert(numChunks, Equals, 10) + c.Assert(numChunks, Equals, 1) rs.Close() } From 9a86bd67b482ac6a4d193f8fb8900e56284b0a0e Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Fri, 18 Oct 2019 16:42:36 +0800 Subject: [PATCH 13/34] address comments --- distsql/distsql_test.go | 4 ++-- distsql/select_result.go | 2 +- util/chunk/codec.go | 39 ++++++++++++++++++++++++++------------- 3 files changed, 29 insertions(+), 16 deletions(-) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index a02632893175b..e5925c503bb6e 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -558,7 +558,7 @@ func BenchmarkDecodeToChunkWithRequestedRows_int(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - acodec.Reset(buffer) + acodec.ResetAndInit(buffer) for !acodec.Empty() { for !chk.IsFull() && !acodec.Empty() { acodec.Decode(chk) @@ -621,7 +621,7 @@ func BenchmarkDecodeToChunkWithRequestedRows_string(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - acodec.Reset(buffer) + acodec.ResetAndInit(buffer) for !acodec.Empty() { for !chk.IsFull() && !acodec.Empty() { acodec.Decode(chk) diff --git a/distsql/select_result.go b/distsql/select_result.go index 7de22512a6862..cd216d7f17f35 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -200,7 +200,7 @@ func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) erro } if r.respArrowDecoder.Empty() { - r.respArrowDecoder.Reset(r.selectResp.Chunks[r.respChkIdx].RowsData) + r.respArrowDecoder.ResetAndInit(r.selectResp.Chunks[r.respChkIdx].RowsData) } r.respArrowDecoder.Decode(chk) diff --git a/util/chunk/codec.go b/util/chunk/codec.go index 8daa80094fd86..d6590e6263c1e 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -192,42 +192,55 @@ func init() { // ArrowDecoder is used to: // 1. decode a Chunk from a byte slice. +// How ArrowDecoder works: +// 1. Construct : save colTypes, init a temp Chunk for middle calculation. +// 2. ResetAndInit: decode the byte slice to the temp Chunk, and set remainedRows = chk.NumRows() +// (Memory reuse, so the Chunk is Read-Only) +// 3. Decode: calculate the rows that append to the result Chunk. +// 1) Floor requestRows to the next multiple of 8, for more performance NullBitMap operator. +// 2) Append offsets from temp Chunk to result Chunk if the type is unfixed-len. +// And adjust offsets (add a delta) to keep correct. +// 3) Append request rows NullBitMap from temp Chunk to result Chunk. +// 3.1) If the numRows of result Chunk is divided by 8, append the NullBitMap directly. +// 3.2) If the numRows of result Chunk is not divided by 8, append the NullBitMap and +// do some bit operator to keep correct. +// 4) Append request rows data from temp Chunk to result Chunk. +// 4. If the ArrowDecoder is empty (remainedRows == 0), go to Step2 to ResetAndInit the ArrowDecoder. type ArrowDecoder struct { - chk *Chunk - colTypes []*types.FieldType - cur int - rows int + chk *Chunk + colTypes []*types.FieldType + remainedRows int } // NewArrowDecoder creates a new ArrowDecoder object for decode a Chunk. func NewArrowDecoder(chk *Chunk, colTypes []*types.FieldType) *ArrowDecoder { - return &ArrowDecoder{chk: chk, colTypes: colTypes, cur: 0, rows: 0} + return &ArrowDecoder{chk: chk, colTypes: colTypes, remainedRows: 0} } // Decode decodes a Chunk from ArrowDecoder, save the remained unused bytes. func (c *ArrowDecoder) Decode(target *Chunk) { requestRows := target.RequiredRows() - target.NumRows() + // Floor requestRows to the next multiple of 8 requestRows = (requestRows + 7) >> 3 << 3 - if requestRows > c.rows-c.cur { - requestRows = c.rows - c.cur + if requestRows > c.remainedRows { + requestRows = c.remainedRows } for i := 0; i < len(c.colTypes); i++ { c.decodeColumn(target, i, requestRows) } - c.cur = c.cur + requestRows + c.remainedRows -= requestRows } -// Reset resets ArrowDecoder using byte slice. -func (c *ArrowDecoder) Reset(data []byte) { +// ResetAndInit resets ArrowDecoder, and use data byte slice to init it. +func (c *ArrowDecoder) ResetAndInit(data []byte) { codec := NewCodec(c.colTypes) codec.DecodeToChunk(data, c.chk) - c.cur = 0 - c.rows = c.chk.NumRows() + c.remainedRows = c.chk.NumRows() } // Empty indicate the ArrowDecoder is empty. func (c *ArrowDecoder) Empty() bool { - return c.cur == c.rows + return c.remainedRows == 0 } func (c *ArrowDecoder) decodeColumn(target *Chunk, ordinal int, requestRows int) { From 38cbf7c1ebc0b4c5e12c12cf4f39f071c033ce2a Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 22 Oct 2019 15:02:26 +0800 Subject: [PATCH 14/34] address comments --- distsql/distsql_test.go | 8 +-- distsql/select_result.go | 4 +- util/chunk/codec.go | 108 +++++++++++++++++++-------------------- 3 files changed, 60 insertions(+), 60 deletions(-) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index e5925c503bb6e..8b4a57dd26e8f 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -559,8 +559,8 @@ func BenchmarkDecodeToChunkWithRequestedRows_int(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { acodec.ResetAndInit(buffer) - for !acodec.Empty() { - for !chk.IsFull() && !acodec.Empty() { + for !acodec.IsFinished() { + for !chk.IsFull() && !acodec.IsFinished() { acodec.Decode(chk) } chk.Reset() @@ -622,8 +622,8 @@ func BenchmarkDecodeToChunkWithRequestedRows_string(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { acodec.ResetAndInit(buffer) - for !acodec.Empty() { - for !chk.IsFull() && !acodec.Empty() { + for !acodec.IsFinished() { + for !chk.IsFull() && !acodec.IsFinished() { acodec.Decode(chk) } chk.Reset() diff --git a/distsql/select_result.go b/distsql/select_result.go index cd216d7f17f35..85a469b2424af 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -199,13 +199,13 @@ func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) erro } } - if r.respArrowDecoder.Empty() { + if r.respArrowDecoder.IsFinished() { r.respArrowDecoder.ResetAndInit(r.selectResp.Chunks[r.respChkIdx].RowsData) } r.respArrowDecoder.Decode(chk) - if r.respArrowDecoder.Empty() { + if r.respArrowDecoder.IsFinished() { r.respChkIdx++ } } diff --git a/util/chunk/codec.go b/util/chunk/codec.go index d6590e6263c1e..23fbb622615cc 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -193,94 +193,94 @@ func init() { // ArrowDecoder is used to: // 1. decode a Chunk from a byte slice. // How ArrowDecoder works: -// 1. Construct : save colTypes, init a temp Chunk for middle calculation. -// 2. ResetAndInit: decode the byte slice to the temp Chunk, and set remainedRows = chk.NumRows() -// (Memory reuse, so the Chunk is Read-Only) -// 3. Decode: calculate the rows that append to the result Chunk. -// 1) Floor requestRows to the next multiple of 8, for more performance NullBitMap operator. -// 2) Append offsets from temp Chunk to result Chunk if the type is unfixed-len. -// And adjust offsets (add a delta) to keep correct. -// 3) Append request rows NullBitMap from temp Chunk to result Chunk. -// 3.1) If the numRows of result Chunk is divided by 8, append the NullBitMap directly. -// 3.2) If the numRows of result Chunk is not divided by 8, append the NullBitMap and -// do some bit operator to keep correct. -// 4) Append request rows data from temp Chunk to result Chunk. -// 4. If the ArrowDecoder is empty (remainedRows == 0), go to Step2 to ResetAndInit the ArrowDecoder. +// 1. Initialization phase: Decode a whole input byte slice to ArrowDecoder.intermChk using Codec.Decode. intermChk is +// introduced to simplify the implementation of decode phase. This phase uses pointer operations with less CPU and +// memory cost. +// 2. Decode phase: +// 2.1 Set the number of rows that should be decoded to a multiple of 8 greater than +// targetChk.RequiredRows()-targetChk.NumRows(), this can reduce the cost when copying the srcCol.nullBitMap into destCol.nullBitMap. +// 2.2 Append srcCol.offsets to destCol.offsets when the elements is of var-length type. And further adjust the +// offsets according to descCol.offsets[destCol.length]-srcCol.offsets[0]. +// 2.3 Append srcCol.nullBitMap to destCol.nullBitMap. +// 3. Go to step 1 when the input byte slice is consumed. type ArrowDecoder struct { - chk *Chunk + intermChk *Chunk colTypes []*types.FieldType remainedRows int } // NewArrowDecoder creates a new ArrowDecoder object for decode a Chunk. func NewArrowDecoder(chk *Chunk, colTypes []*types.FieldType) *ArrowDecoder { - return &ArrowDecoder{chk: chk, colTypes: colTypes, remainedRows: 0} + return &ArrowDecoder{intermChk: chk, colTypes: colTypes, remainedRows: 0} } // Decode decodes a Chunk from ArrowDecoder, save the remained unused bytes. func (c *ArrowDecoder) Decode(target *Chunk) { - requestRows := target.RequiredRows() - target.NumRows() - // Floor requestRows to the next multiple of 8 - requestRows = (requestRows + 7) >> 3 << 3 - if requestRows > c.remainedRows { - requestRows = c.remainedRows + requiredRows := target.RequiredRows() - target.NumRows() + // Floor requiredRows to the next multiple of 8 + requiredRows = (requiredRows + 7) >> 3 << 3 + if requiredRows > c.remainedRows { + requiredRows = c.remainedRows } for i := 0; i < len(c.colTypes); i++ { - c.decodeColumn(target, i, requestRows) + c.decodeColumn(target, i, requiredRows) } - c.remainedRows -= requestRows + c.remainedRows -= requiredRows } // ResetAndInit resets ArrowDecoder, and use data byte slice to init it. func (c *ArrowDecoder) ResetAndInit(data []byte) { codec := NewCodec(c.colTypes) - codec.DecodeToChunk(data, c.chk) - c.remainedRows = c.chk.NumRows() + codec.DecodeToChunk(data, c.intermChk) + c.remainedRows = c.intermChk.NumRows() } -// Empty indicate the ArrowDecoder is empty. -func (c *ArrowDecoder) Empty() bool { +// IsFinished indicate the ArrowDecoder is consumed. +func (c *ArrowDecoder) IsFinished() bool { return c.remainedRows == 0 } -func (c *ArrowDecoder) decodeColumn(target *Chunk, ordinal int, requestRows int) { - numFixedBytes := getFixedLen(c.colTypes[ordinal]) - numDataBytes := int64(numFixedBytes * requestRows) - colSource := c.chk.columns[ordinal] - colTarget := target.columns[ordinal] +func (c *ArrowDecoder) decodeColumn(target *Chunk, ordinal int, requiredRows int) { + elemLen := getFixedLen(c.colTypes[ordinal]) + numDataBytes := int64(elemLen * requiredRows) + srcCol := c.intermChk.columns[ordinal] + destCol := target.columns[ordinal] - if numFixedBytes == -1 { - numDataBytes = colSource.offsets[requestRows] - colSource.offsets[0] - colTarget.offsets = append(colTarget.offsets, colSource.offsets[1:requestRows+1]...) - deltaOffset := -colSource.offsets[0] + colTarget.offsets[colTarget.length] - for i := colTarget.length + 1; i <= colTarget.length+requestRows; i++ { - colTarget.offsets[i] = colTarget.offsets[i] + deltaOffset + if elemLen == varElemLen { + numDataBytes = srcCol.offsets[requiredRows] - srcCol.offsets[0] + deltaOffset := destCol.offsets[destCol.length] - srcCol.offsets[0] + destCol.offsets = append(destCol.offsets, srcCol.offsets[1:requiredRows+1]...) + + for i := destCol.length + 1; i <= destCol.length+requiredRows; i++ { + destCol.offsets[i] = destCol.offsets[i] + deltaOffset } - colSource.offsets = colSource.offsets[requestRows:] - } else if cap(colTarget.elemBuf) < numFixedBytes { - colTarget.elemBuf = make([]byte, numFixedBytes) + srcCol.offsets = srcCol.offsets[requiredRows:] + } else if cap(destCol.elemBuf) < elemLen { + destCol.elemBuf = make([]byte, elemLen) } - numNullBitmapBytes := (requestRows + 7) >> 3 - if colTarget.length%8 == 0 { - colTarget.nullBitmap = append(colTarget.nullBitmap, colSource.nullBitmap[:numNullBitmapBytes]...) + numNullBitmapBytes := (requiredRows + 7) >> 3 + if destCol.length%8 == 0 { + destCol.nullBitmap = append(destCol.nullBitmap, srcCol.nullBitmap[:numNullBitmapBytes]...) } else { - colTarget.appendMultiSameNullBitmap(false, requestRows) - bitMapLen := len(colTarget.nullBitmap) - bitOffset := colTarget.length % 8 - bitMapStartIndex := (colTarget.length - 1) >> 3 + destCol.appendMultiSameNullBitmap(false, requiredRows) + bitMapLen := len(destCol.nullBitmap) + // bitOffset indicates the number of elements in destCol.nullBitmap's last byte. + bitOffset := destCol.length % 8 + startIdx := (destCol.length - 1) >> 3 for i := 0; i < numNullBitmapBytes; i++ { - colTarget.nullBitmap[bitMapStartIndex+i] |= colSource.nullBitmap[i] << bitOffset - if bitMapLen > bitMapStartIndex+i+1 { - colTarget.nullBitmap[bitMapStartIndex+i+1] |= colSource.nullBitmap[i] >> (8 - bitOffset) + destCol.nullBitmap[startIdx+i] |= srcCol.nullBitmap[i] << bitOffset + // the total number of elements maybe is small than bitMapLen * 8, and last some bits in srcCol.nullBitmap are useless. + if bitMapLen > startIdx+i+1 { + destCol.nullBitmap[startIdx+i+1] |= srcCol.nullBitmap[i] >> (8 - bitOffset) } } } - colSource.nullBitmap = colSource.nullBitmap[numNullBitmapBytes:] - colTarget.length += requestRows + srcCol.nullBitmap = srcCol.nullBitmap[numNullBitmapBytes:] + destCol.length += requiredRows - colTarget.data = append(colTarget.data, colSource.data[:numDataBytes]...) - colSource.data = colSource.data[numDataBytes:] + destCol.data = append(destCol.data, srcCol.data[:numDataBytes]...) + srcCol.data = srcCol.data[numDataBytes:] } // DecodeToChunkTest decodes a Chunk from a byte slice, return the remained unused bytes. (Only test, will remove before I merge this pr) From 6161ccf09cae3ad2ded3eb6a2210e0f53993ba95 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 22 Oct 2019 15:21:09 +0800 Subject: [PATCH 15/34] comments --- util/chunk/codec.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/util/chunk/codec.go b/util/chunk/codec.go index 23fbb622615cc..3b284b68c308d 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -194,7 +194,7 @@ func init() { // 1. decode a Chunk from a byte slice. // How ArrowDecoder works: // 1. Initialization phase: Decode a whole input byte slice to ArrowDecoder.intermChk using Codec.Decode. intermChk is -// introduced to simplify the implementation of decode phase. This phase uses pointer operations with less CPU and +// introduced to simplify the implementation of decode phase. This phase uses pointer operations with less CPU andF // memory cost. // 2. Decode phase: // 2.1 Set the number of rows that should be decoded to a multiple of 8 greater than @@ -235,7 +235,7 @@ func (c *ArrowDecoder) ResetAndInit(data []byte) { c.remainedRows = c.intermChk.NumRows() } -// IsFinished indicate the ArrowDecoder is consumed. +// IsFinished indicate the data byte slice is consumed. func (c *ArrowDecoder) IsFinished() bool { return c.remainedRows == 0 } From 75975b212e8fd52724073504c3c2d9316086ed25 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 22 Oct 2019 15:34:40 +0800 Subject: [PATCH 16/34] s/arrowdecoder /decoder --- distsql/distsql_test.go | 4 ++-- distsql/select_result.go | 4 ++-- util/chunk/codec.go | 26 +++++++++++++------------- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 8b4a57dd26e8f..aab76cd412db5 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -549,7 +549,7 @@ func BenchmarkDecodeToChunkWithRequestedRows_int(b *testing.B) { codec := chunk.NewCodec(colTypes) buffer := codec.Encode(chk) - acodec := chunk.NewArrowDecoder( + acodec := chunk.NewDecoder( chunk.NewChunkWithCapacity(colTypes, 0), colTypes) @@ -612,7 +612,7 @@ func BenchmarkDecodeToChunkWithRequestedRows_string(b *testing.B) { codec := chunk.NewCodec(colTypes) buffer := codec.Encode(chk) - acodec := chunk.NewArrowDecoder( + acodec := chunk.NewDecoder( chunk.NewChunkWithCapacity(colTypes, 0), colTypes) diff --git a/distsql/select_result.go b/distsql/select_result.go index 85a469b2424af..55d9d48effde9 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -69,7 +69,7 @@ type selectResult struct { selectResp *tipb.SelectResponse selectRespSize int // record the selectResp.Size() when it is initialized. respChkIdx int - respArrowDecoder *chunk.ArrowDecoder + respArrowDecoder *chunk.Decoder feedback *statistics.QueryFeedback partialCount int64 // number of partial results. @@ -185,7 +185,7 @@ func (r *selectResult) readFromDefault(ctx context.Context, chk *chunk.Chunk) er func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) error { if r.respArrowDecoder == nil { - r.respArrowDecoder = chunk.NewArrowDecoder( + r.respArrowDecoder = chunk.NewDecoder( chunk.NewChunkWithCapacity(r.fieldTypes, 0), r.fieldTypes, ) diff --git a/util/chunk/codec.go b/util/chunk/codec.go index 3b284b68c308d..87df981ccb285 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -190,10 +190,10 @@ func init() { } } -// ArrowDecoder is used to: +// Decoder is used to: // 1. decode a Chunk from a byte slice. -// How ArrowDecoder works: -// 1. Initialization phase: Decode a whole input byte slice to ArrowDecoder.intermChk using Codec.Decode. intermChk is +// How Decoder works: +// 1. Initialization phase: Decode a whole input byte slice to Decoder.intermChk using Codec.Decode. intermChk is // introduced to simplify the implementation of decode phase. This phase uses pointer operations with less CPU andF // memory cost. // 2. Decode phase: @@ -203,19 +203,19 @@ func init() { // offsets according to descCol.offsets[destCol.length]-srcCol.offsets[0]. // 2.3 Append srcCol.nullBitMap to destCol.nullBitMap. // 3. Go to step 1 when the input byte slice is consumed. -type ArrowDecoder struct { +type Decoder struct { intermChk *Chunk colTypes []*types.FieldType remainedRows int } -// NewArrowDecoder creates a new ArrowDecoder object for decode a Chunk. -func NewArrowDecoder(chk *Chunk, colTypes []*types.FieldType) *ArrowDecoder { - return &ArrowDecoder{intermChk: chk, colTypes: colTypes, remainedRows: 0} +// NewDecoder creates a new Decoder object for decode a Chunk. +func NewDecoder(chk *Chunk, colTypes []*types.FieldType) *Decoder { + return &Decoder{intermChk: chk, colTypes: colTypes, remainedRows: 0} } -// Decode decodes a Chunk from ArrowDecoder, save the remained unused bytes. -func (c *ArrowDecoder) Decode(target *Chunk) { +// Decode decodes a Chunk from Decoder, save the remained unused bytes. +func (c *Decoder) Decode(target *Chunk) { requiredRows := target.RequiredRows() - target.NumRows() // Floor requiredRows to the next multiple of 8 requiredRows = (requiredRows + 7) >> 3 << 3 @@ -228,19 +228,19 @@ func (c *ArrowDecoder) Decode(target *Chunk) { c.remainedRows -= requiredRows } -// ResetAndInit resets ArrowDecoder, and use data byte slice to init it. -func (c *ArrowDecoder) ResetAndInit(data []byte) { +// ResetAndInit resets Decoder, and use data byte slice to init it. +func (c *Decoder) ResetAndInit(data []byte) { codec := NewCodec(c.colTypes) codec.DecodeToChunk(data, c.intermChk) c.remainedRows = c.intermChk.NumRows() } // IsFinished indicate the data byte slice is consumed. -func (c *ArrowDecoder) IsFinished() bool { +func (c *Decoder) IsFinished() bool { return c.remainedRows == 0 } -func (c *ArrowDecoder) decodeColumn(target *Chunk, ordinal int, requiredRows int) { +func (c *Decoder) decodeColumn(target *Chunk, ordinal int, requiredRows int) { elemLen := getFixedLen(c.colTypes[ordinal]) numDataBytes := int64(elemLen * requiredRows) srcCol := c.intermChk.columns[ordinal] From e606377526de5dfcd5a6ca2dc7238c14e7dd222c Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 22 Oct 2019 20:33:45 +0800 Subject: [PATCH 17/34] address comments --- distsql/distsql_test.go | 4 ++-- distsql/select_result.go | 2 +- util/chunk/codec.go | 32 +++++++++++++++----------------- 3 files changed, 18 insertions(+), 20 deletions(-) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index aab76cd412db5..c9ab3666db92e 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -558,7 +558,7 @@ func BenchmarkDecodeToChunkWithRequestedRows_int(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - acodec.ResetAndInit(buffer) + acodec.Reset(buffer) for !acodec.IsFinished() { for !chk.IsFull() && !acodec.IsFinished() { acodec.Decode(chk) @@ -621,7 +621,7 @@ func BenchmarkDecodeToChunkWithRequestedRows_string(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - acodec.ResetAndInit(buffer) + acodec.Reset(buffer) for !acodec.IsFinished() { for !chk.IsFull() && !acodec.IsFinished() { acodec.Decode(chk) diff --git a/distsql/select_result.go b/distsql/select_result.go index 55d9d48effde9..eb712115b94a4 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -200,7 +200,7 @@ func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) erro } if r.respArrowDecoder.IsFinished() { - r.respArrowDecoder.ResetAndInit(r.selectResp.Chunks[r.respChkIdx].RowsData) + r.respArrowDecoder.Reset(r.selectResp.Chunks[r.respChkIdx].RowsData) } r.respArrowDecoder.Decode(chk) diff --git a/util/chunk/codec.go b/util/chunk/codec.go index 87df981ccb285..ff8c5841edc9c 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -190,15 +190,14 @@ func init() { } } -// Decoder is used to: -// 1. decode a Chunk from a byte slice. +// Decoder decodes the data returned from the coprocessor and stores the result in Chunk. // How Decoder works: // 1. Initialization phase: Decode a whole input byte slice to Decoder.intermChk using Codec.Decode. intermChk is // introduced to simplify the implementation of decode phase. This phase uses pointer operations with less CPU andF // memory cost. // 2. Decode phase: // 2.1 Set the number of rows that should be decoded to a multiple of 8 greater than -// targetChk.RequiredRows()-targetChk.NumRows(), this can reduce the cost when copying the srcCol.nullBitMap into destCol.nullBitMap. +// chk.RequiredRows() - chk.NumRows(), this can reduce the cost when copying the srcCol.nullBitMap into destCol.nullBitMap. // 2.2 Append srcCol.offsets to destCol.offsets when the elements is of var-length type. And further adjust the // offsets according to descCol.offsets[destCol.length]-srcCol.offsets[0]. // 2.3 Append srcCol.nullBitMap to destCol.nullBitMap. @@ -214,49 +213,48 @@ func NewDecoder(chk *Chunk, colTypes []*types.FieldType) *Decoder { return &Decoder{intermChk: chk, colTypes: colTypes, remainedRows: 0} } -// Decode decodes a Chunk from Decoder, save the remained unused bytes. -func (c *Decoder) Decode(target *Chunk) { - requiredRows := target.RequiredRows() - target.NumRows() - // Floor requiredRows to the next multiple of 8 +// Decode decodes multiple rows of Decoder.intermChk and stores the result in chk. +func (c *Decoder) Decode(chk *Chunk) { + requiredRows := chk.RequiredRows() - chk.NumRows() + // Set the requiredRows to a multiple of 8. requiredRows = (requiredRows + 7) >> 3 << 3 if requiredRows > c.remainedRows { requiredRows = c.remainedRows } for i := 0; i < len(c.colTypes); i++ { - c.decodeColumn(target, i, requiredRows) + c.decodeColumn(chk, i, requiredRows) } c.remainedRows -= requiredRows } -// ResetAndInit resets Decoder, and use data byte slice to init it. -func (c *Decoder) ResetAndInit(data []byte) { +// Reset decodes data and store the result in Decoder.intermChk. This decode phase uses pointer operations with less +// CPU and memory costs. +func (c *Decoder) Reset(data []byte) { codec := NewCodec(c.colTypes) codec.DecodeToChunk(data, c.intermChk) c.remainedRows = c.intermChk.NumRows() } -// IsFinished indicate the data byte slice is consumed. +// IsFinished indicates whether Decoder.intermChk has been dried up. func (c *Decoder) IsFinished() bool { return c.remainedRows == 0 } -func (c *Decoder) decodeColumn(target *Chunk, ordinal int, requiredRows int) { +func (c *Decoder) decodeColumn(chk *Chunk, ordinal int, requiredRows int) { elemLen := getFixedLen(c.colTypes[ordinal]) numDataBytes := int64(elemLen * requiredRows) srcCol := c.intermChk.columns[ordinal] - destCol := target.columns[ordinal] + destCol := chk.columns[ordinal] if elemLen == varElemLen { + // For var-length types, we need to adjust the offsets after appending to descCol. numDataBytes = srcCol.offsets[requiredRows] - srcCol.offsets[0] deltaOffset := destCol.offsets[destCol.length] - srcCol.offsets[0] destCol.offsets = append(destCol.offsets, srcCol.offsets[1:requiredRows+1]...) - for i := destCol.length + 1; i <= destCol.length+requiredRows; i++ { destCol.offsets[i] = destCol.offsets[i] + deltaOffset } srcCol.offsets = srcCol.offsets[requiredRows:] - } else if cap(destCol.elemBuf) < elemLen { - destCol.elemBuf = make([]byte, elemLen) } numNullBitmapBytes := (requiredRows + 7) >> 3 @@ -265,7 +263,7 @@ func (c *Decoder) decodeColumn(target *Chunk, ordinal int, requiredRows int) { } else { destCol.appendMultiSameNullBitmap(false, requiredRows) bitMapLen := len(destCol.nullBitmap) - // bitOffset indicates the number of elements in destCol.nullBitmap's last byte. + // bitOffset indicates the number of valid bits in destCol.nullBitmap's last byte. bitOffset := destCol.length % 8 startIdx := (destCol.length - 1) >> 3 for i := 0; i < numNullBitmapBytes; i++ { From df40cd64fa5af134a82b789782723ba58e0a3e5b Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Wed, 23 Oct 2019 11:08:53 +0800 Subject: [PATCH 18/34] address comments --- distsql/select_result.go | 14 +++++++------- util/chunk/codec.go | 6 +++--- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index eb712115b94a4..b7c021ef989d4 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -69,7 +69,7 @@ type selectResult struct { selectResp *tipb.SelectResponse selectRespSize int // record the selectResp.Size() when it is initialized. respChkIdx int - respArrowDecoder *chunk.Decoder + respChunkDecoder *chunk.Decoder feedback *statistics.QueryFeedback partialCount int64 // number of partial results. @@ -184,8 +184,8 @@ func (r *selectResult) readFromDefault(ctx context.Context, chk *chunk.Chunk) er } func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) error { - if r.respArrowDecoder == nil { - r.respArrowDecoder = chunk.NewDecoder( + if r.respChunkDecoder == nil { + r.respChunkDecoder = chunk.NewDecoder( chunk.NewChunkWithCapacity(r.fieldTypes, 0), r.fieldTypes, ) @@ -199,13 +199,13 @@ func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) erro } } - if r.respArrowDecoder.IsFinished() { - r.respArrowDecoder.Reset(r.selectResp.Chunks[r.respChkIdx].RowsData) + if r.respChunkDecoder.IsFinished() { + r.respChunkDecoder.Reset(r.selectResp.Chunks[r.respChkIdx].RowsData) } - r.respArrowDecoder.Decode(chk) + r.respChunkDecoder.Decode(chk) - if r.respArrowDecoder.IsFinished() { + if r.respChunkDecoder.IsFinished() { r.respChkIdx++ } } diff --git a/util/chunk/codec.go b/util/chunk/codec.go index ff8c5841edc9c..328653d56206e 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -247,7 +247,7 @@ func (c *Decoder) decodeColumn(chk *Chunk, ordinal int, requiredRows int) { destCol := chk.columns[ordinal] if elemLen == varElemLen { - // For var-length types, we need to adjust the offsets after appending to descCol. + // For var-length types, we need to adjust the offsets after appending to destCol. numDataBytes = srcCol.offsets[requiredRows] - srcCol.offsets[0] deltaOffset := destCol.offsets[destCol.length] - srcCol.offsets[0] destCol.offsets = append(destCol.offsets, srcCol.offsets[1:requiredRows+1]...) @@ -268,8 +268,8 @@ func (c *Decoder) decodeColumn(chk *Chunk, ordinal int, requiredRows int) { startIdx := (destCol.length - 1) >> 3 for i := 0; i < numNullBitmapBytes; i++ { destCol.nullBitmap[startIdx+i] |= srcCol.nullBitmap[i] << bitOffset - // the total number of elements maybe is small than bitMapLen * 8, and last some bits in srcCol.nullBitmap are useless. - if bitMapLen > startIdx+i+1 { + // The high order 8-bitOffset bits in `srcCol.nullBitmap[i]` should be appended to the low order of the next slot. + if startIdx+i+1 < bitMapLen { destCol.nullBitmap[startIdx+i+1] |= srcCol.nullBitmap[i] >> (8 - bitOffset) } } From 9cc34e7252114b0a7509040a8fa24a8f026f7c23 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Wed, 23 Oct 2019 23:05:04 +0800 Subject: [PATCH 19/34] adjust --- go.sum | 1 + util/chunk/codec.go | 11 +++++------ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go.sum b/go.sum index 1b011443ef80f..2c68fc2b92f8e 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,7 @@ github.com/blacktear23/go-proxyprotocol v0.0.0-20180807104634-af7a81e8dd0d/go.mo github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20171208011716-f6d7a1f6fbf3/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= +github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd h1:qMd81Ts1T2OTKmB4acZcyKaMtRnY5Y44NuXGX2GFJ1w= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= diff --git a/util/chunk/codec.go b/util/chunk/codec.go index 328653d56206e..f41d27281e3fd 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -204,13 +204,13 @@ func init() { // 3. Go to step 1 when the input byte slice is consumed. type Decoder struct { intermChk *Chunk - colTypes []*types.FieldType + codec *Codec remainedRows int } // NewDecoder creates a new Decoder object for decode a Chunk. func NewDecoder(chk *Chunk, colTypes []*types.FieldType) *Decoder { - return &Decoder{intermChk: chk, colTypes: colTypes, remainedRows: 0} + return &Decoder{intermChk: chk, codec: NewCodec(colTypes), remainedRows: 0} } // Decode decodes multiple rows of Decoder.intermChk and stores the result in chk. @@ -221,7 +221,7 @@ func (c *Decoder) Decode(chk *Chunk) { if requiredRows > c.remainedRows { requiredRows = c.remainedRows } - for i := 0; i < len(c.colTypes); i++ { + for i := 0; i < chk.NumCols(); i++ { c.decodeColumn(chk, i, requiredRows) } c.remainedRows -= requiredRows @@ -230,8 +230,7 @@ func (c *Decoder) Decode(chk *Chunk) { // Reset decodes data and store the result in Decoder.intermChk. This decode phase uses pointer operations with less // CPU and memory costs. func (c *Decoder) Reset(data []byte) { - codec := NewCodec(c.colTypes) - codec.DecodeToChunk(data, c.intermChk) + c.codec.DecodeToChunk(data, c.intermChk) c.remainedRows = c.intermChk.NumRows() } @@ -241,7 +240,7 @@ func (c *Decoder) IsFinished() bool { } func (c *Decoder) decodeColumn(chk *Chunk, ordinal int, requiredRows int) { - elemLen := getFixedLen(c.colTypes[ordinal]) + elemLen := getFixedLen(c.codec.colTypes[ordinal]) numDataBytes := int64(elemLen * requiredRows) srcCol := c.intermChk.columns[ordinal] destCol := chk.columns[ordinal] From a359a3866ea1c11ebbf75205bcc46fc485c25b9e Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Mon, 28 Oct 2019 15:43:53 +0800 Subject: [PATCH 20/34] fix typo --- util/chunk/codec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/chunk/codec.go b/util/chunk/codec.go index f41d27281e3fd..6f529e1054e56 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -193,7 +193,7 @@ func init() { // Decoder decodes the data returned from the coprocessor and stores the result in Chunk. // How Decoder works: // 1. Initialization phase: Decode a whole input byte slice to Decoder.intermChk using Codec.Decode. intermChk is -// introduced to simplify the implementation of decode phase. This phase uses pointer operations with less CPU andF +// introduced to simplify the implementation of decode phase. This phase uses pointer operations with less CPU and // memory cost. // 2. Decode phase: // 2.1 Set the number of rows that should be decoded to a multiple of 8 greater than From 5e01b664cdd8b6963559ce07b8bb0a26db98b7ff Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Mon, 28 Oct 2019 16:52:46 +0800 Subject: [PATCH 21/34] fix comments --- util/chunk/codec.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/util/chunk/codec.go b/util/chunk/codec.go index 6f529e1054e56..c6502b7cf6f39 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -192,12 +192,13 @@ func init() { // Decoder decodes the data returned from the coprocessor and stores the result in Chunk. // How Decoder works: -// 1. Initialization phase: Decode a whole input byte slice to Decoder.intermChk using Codec.Decode. intermChk is -// introduced to simplify the implementation of decode phase. This phase uses pointer operations with less CPU and -// memory cost. +// 1. Initialization phase: Decode a whole input byte slice to Decoder.intermChk(intermediate chunk) using Codec.Decode. +// intermChk is introduced to simplify the implementation of decode phase. This phase uses pointer operations with +// less CPU and memory cost. // 2. Decode phase: -// 2.1 Set the number of rows that should be decoded to a multiple of 8 greater than -// chk.RequiredRows() - chk.NumRows(), this can reduce the cost when copying the srcCol.nullBitMap into destCol.nullBitMap. +// 2.1 Set the number of rows to be decoded to a value that is a multiple of 8 and greater than +// `chk.RequiredRows() - chk.NumRows()`. This reduces the overhead of copying the srcCol.nullBitMap into +// destCol.nullBitMap. // 2.2 Append srcCol.offsets to destCol.offsets when the elements is of var-length type. And further adjust the // offsets according to descCol.offsets[destCol.length]-srcCol.offsets[0]. // 2.3 Append srcCol.nullBitMap to destCol.nullBitMap. From b7d94dca58716fb41b09b692986a922a8c116542 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Mon, 28 Oct 2019 17:37:30 +0800 Subject: [PATCH 22/34] magic 0.8 --- distsql/select_result.go | 9 +++++++++ util/chunk/codec.go | 29 ++++++++++++++++++++++++++--- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index b7c021ef989d4..2b942185d359d 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -203,6 +203,15 @@ func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) erro r.respChunkDecoder.Reset(r.selectResp.Chunks[r.respChkIdx].RowsData) } + if r.respChunkDecoder.RemainedRows() > int(float64(chk.RequiredRows())*0.8) { + if chk.NumRows() > 0 { + return nil + } + r.respChunkDecoder.ReuseIntermChk(chk) + r.respChkIdx++ + return nil + } + r.respChunkDecoder.Decode(chk) if r.respChunkDecoder.IsFinished() { diff --git a/util/chunk/codec.go b/util/chunk/codec.go index c6502b7cf6f39..985a015281f39 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -119,7 +119,7 @@ func (c *Codec) decodeColumn(buffer []byte, col *Column, ordinal int) (remained // decode nullBitmap. if nullCount > 0 { numNullBitmapBytes := (col.length + 7) / 8 - col.nullBitmap = buffer[:numNullBitmapBytes] + col.nullBitmap = buffer[:numNullBitmapBytes:numNullBitmapBytes] buffer = buffer[numNullBitmapBytes:] } else { c.setAllNotNull(col) @@ -130,7 +130,7 @@ func (c *Codec) decodeColumn(buffer []byte, col *Column, ordinal int) (remained numDataBytes := int64(numFixedBytes * col.length) if numFixedBytes == -1 { numOffsetBytes := (col.length + 1) * 8 - col.offsets = bytesToI64Slice(buffer[:numOffsetBytes]) + col.offsets = bytesToI64Slice(buffer[:numOffsetBytes:numOffsetBytes]) buffer = buffer[numOffsetBytes:] numDataBytes = col.offsets[col.length] } else if cap(col.elemBuf) < numFixedBytes { @@ -138,7 +138,7 @@ func (c *Codec) decodeColumn(buffer []byte, col *Column, ordinal int) (remained } // decode data. - col.data = buffer[:numDataBytes] + col.data = buffer[:numDataBytes:numDataBytes] return buffer[numDataBytes:] } @@ -240,6 +240,29 @@ func (c *Decoder) IsFinished() bool { return c.remainedRows == 0 } +// RemainedRows indicates Decoder.intermChk has remained rows. +func (c *Decoder) RemainedRows() int { + return c.remainedRows +} + +// ReuseIntermChk reuse Decoder.intermChk. +func (c *Decoder) ReuseIntermChk(chk *Chunk) { + for i := 0; i < c.intermChk.NumCols(); i++ { + c.intermChk.columns[i].length = c.remainedRows + elemLen := getFixedLen(c.codec.colTypes[i]) + if elemLen == varElemLen { + // For var-length types, we need to adjust the offsets before reuse. + if deltaOffset := c.intermChk.columns[i].offsets[0]; deltaOffset != 0 { + for j := 0; j < len(c.intermChk.columns[i].offsets); j++ { + c.intermChk.columns[i].offsets[j] -= deltaOffset + } + } + } + } + chk.SwapColumns(c.intermChk) + c.remainedRows = 0 +} + func (c *Decoder) decodeColumn(chk *Chunk, ordinal int, requiredRows int) { elemLen := getFixedLen(c.codec.colTypes[ordinal]) numDataBytes := int64(elemLen * requiredRows) From 2788a89ca4e07f76dba08856c0b6e69a83221477 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 29 Oct 2019 10:46:43 +0800 Subject: [PATCH 23/34] fix ci --- util/chunk/codec.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/util/chunk/codec.go b/util/chunk/codec.go index 985a015281f39..b769c23687434 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -297,6 +297,11 @@ func (c *Decoder) decodeColumn(chk *Chunk, ordinal int, requiredRows int) { } } } + // Set all the redundant bits in the last slot of destCol.nullBitmap to 0. + numRedundantBits := uint(len(destCol.nullBitmap)*8 - destCol.length - requiredRows) + bitMask := byte(1<<(8-numRedundantBits)) - 1 + destCol.nullBitmap[len(destCol.nullBitmap)-1] &= bitMask + srcCol.nullBitmap = srcCol.nullBitmap[numNullBitmapBytes:] destCol.length += requiredRows From 650c96af88ea789ab1ceef96fd8b0aa630704ce2 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 29 Oct 2019 11:07:19 +0800 Subject: [PATCH 24/34] add comment --- distsql/select_result.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index 2b942185d359d..000eb47e5ecf9 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -202,7 +202,8 @@ func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) erro if r.respChunkDecoder.IsFinished() { r.respChunkDecoder.Reset(r.selectResp.Chunks[r.respChkIdx].RowsData) } - + // If the next chunk size is greater than required rows * 0.8, reuse the memory of the next chunk and return + // immediately. Otherwise, splice the data to one chunk and wait the next chunk. if r.respChunkDecoder.RemainedRows() > int(float64(chk.RequiredRows())*0.8) { if chk.NumRows() > 0 { return nil From a1e92e778853a0210cd2f0605f22de20d6663ee6 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 29 Oct 2019 17:19:45 +0800 Subject: [PATCH 25/34] add benchmark --- distsql/distsql_test.go | 76 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index c9ab3666db92e..6dbd7011c5eea 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -655,3 +655,79 @@ func BenchmarkDecodeToChunk_string(b *testing.B) { codec.DecodeToChunkTest(buffer, chk) } } + +func BenchmarkDecodeToChunkReuse_int(b *testing.B) { + numCols := 4 + numRows := 1024 + + colTypes := make([]*types.FieldType, numCols) + for i := 0; i < numCols; i++ { + colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} + } + chk := chunk.New(colTypes, numRows, numRows) + + for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { + for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { + chk.AppendInt64(colOrdinal, 123) + } + } + + codec := chunk.NewCodec(colTypes) + buffer := codec.Encode(chk) + + acodec := chunk.NewDecoder( + chunk.NewChunkWithCapacity(colTypes, 0), + colTypes) + + chk.SetRequiredRows(1024, 1024) + chk.Reset() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + acodec.Reset(buffer) + for !acodec.IsFinished() { + for !chk.IsFull() && !acodec.IsFinished() { + acodec.ReuseIntermChk(chk) + } + chk.Reset() + } + } +} + +func BenchmarkDecodeToChunkReuse_string(b *testing.B) { + numCols := 4 + numRows := 1024 + + colTypes := make([]*types.FieldType, numCols) + for i := 0; i < numCols; i++ { + colTypes[i] = &types.FieldType{Tp: mysql.TypeString} + } + chk := chunk.New(colTypes, numRows, numRows) + + for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { + for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { + chk.AppendString(colOrdinal, "123456") + } + } + + codec := chunk.NewCodec(colTypes) + buffer := codec.Encode(chk) + + acodec := chunk.NewDecoder( + chunk.NewChunkWithCapacity(colTypes, 0), + colTypes) + + chk.SetRequiredRows(1024, 1024) + chk.Reset() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + acodec.Reset(buffer) + for !acodec.IsFinished() { + for !chk.IsFull() && !acodec.IsFinished() { + acodec.ReuseIntermChk(chk) + } + chk.Reset() + } + } +} From bc610a15946c74eab458bd05579bee3c4bea3a41 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Wed, 30 Oct 2019 16:00:13 +0800 Subject: [PATCH 26/34] address comments --- distsql/select_result.go | 2 -- expression/integration_test.go | 46 ++++++++++++++++++++++++++++++++++ util/chunk/codec.go | 15 ++++++----- 3 files changed, 55 insertions(+), 8 deletions(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index 000eb47e5ecf9..c67e5d2a56d86 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -212,9 +212,7 @@ func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) erro r.respChkIdx++ return nil } - r.respChunkDecoder.Decode(chk) - if r.respChunkDecoder.IsFinished() { r.respChkIdx++ } diff --git a/expression/integration_test.go b/expression/integration_test.go index 2b044e4143042..e7bd471f6f505 100755 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -5020,3 +5020,49 @@ func (s *testIntegrationSuite) TestNotExistFunc(c *C) { c.Assert(err.Error(), Equals, "[expression:1305]FUNCTION test.yyy does not exist") } + +func (s *testIntegrationSuite) TestDecodetoChunkReuse(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table chk (a int,b varchar(20))") + for i := 0; i < 200; i++ { + if i%5 == 0 { + tk.MustExec(fmt.Sprintf("insert chk values (NULL,NULL)")) + continue + } + tk.MustExec(fmt.Sprintf("insert chk values (%d,'%s')", i, strconv.Itoa(i))) + } + + tk.Se.GetSessionVars().DistSQLScanConcurrency = 1 + tk.MustExec("set tidb_init_chunk_size = 2") + tk.MustExec("set tidb_max_chunk_size = 32") + defer func() { + tk.MustExec(fmt.Sprintf("set tidb_init_chunk_size = %d", variable.DefInitChunkSize)) + tk.MustExec(fmt.Sprintf("set tidb_max_chunk_size = %d", variable.DefMaxChunkSize)) + }() + rs, err := tk.Exec("select * from chk") + c.Assert(err, IsNil) + req := rs.NewChunk() + var count int + for { + err = rs.Next(context.TODO(), req) + c.Assert(err, IsNil) + numRows := req.NumRows() + if numRows == 0 { + break + } + for i := 0; i < numRows; i++ { + if count%5 == 0 { + c.Assert(req.GetRow(i).IsNull(0), Equals, true) + c.Assert(req.GetRow(i).IsNull(1), Equals, true) + } else { + c.Assert(req.GetRow(i).IsNull(0), Equals, false) + c.Assert(req.GetRow(i).IsNull(1), Equals, false) + c.Assert(req.GetRow(i).GetInt64(0), Equals, int64(count)) + c.Assert(req.GetRow(i).GetString(1), Equals, strconv.Itoa(count)) + } + count++ + } + } + c.Assert(count, Equals, 200) + rs.Close() +} diff --git a/util/chunk/codec.go b/util/chunk/codec.go index b769c23687434..6c9e30260b2c5 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -245,16 +245,19 @@ func (c *Decoder) RemainedRows() int { return c.remainedRows } -// ReuseIntermChk reuse Decoder.intermChk. +// ReuseIntermChk swaps `Decoder.intermChk` with `chk` directly when `Decoder.intermChk.NumRows()` is no less +// than `chk.requiredRows * factor` where `factor` is 0.8 now. This can avoid the overhead of appending the +// data from `Decoder.intermChk` to `chk`. Moreover, the column.offsets needs to be further adjusted +// according to column.offset[0]. func (c *Decoder) ReuseIntermChk(chk *Chunk) { - for i := 0; i < c.intermChk.NumCols(); i++ { - c.intermChk.columns[i].length = c.remainedRows + for i, col := range c.intermChk.columns { + col.length = c.remainedRows elemLen := getFixedLen(c.codec.colTypes[i]) if elemLen == varElemLen { // For var-length types, we need to adjust the offsets before reuse. - if deltaOffset := c.intermChk.columns[i].offsets[0]; deltaOffset != 0 { - for j := 0; j < len(c.intermChk.columns[i].offsets); j++ { - c.intermChk.columns[i].offsets[j] -= deltaOffset + if deltaOffset := col.offsets[0]; deltaOffset != 0 { + for j := 0; j < len(col.offsets); j++ { + col.offsets[j] -= deltaOffset } } } From cc243d951a09e938f88851f16cf58758dc9ce411 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Mon, 4 Nov 2019 10:49:53 +0800 Subject: [PATCH 27/34] add big_response benchmark --- distsql/distsql_test.go | 167 +++++++++++++++++++++++++++++----------- 1 file changed, 123 insertions(+), 44 deletions(-) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 6dbd7011c5eea..2ddddfae8c9a7 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -568,31 +568,6 @@ func BenchmarkDecodeToChunkWithRequestedRows_int(b *testing.B) { } } -func BenchmarkDecodeToChunk_int(b *testing.B) { - numCols := 4 - numRows := 1024 - - colTypes := make([]*types.FieldType, numCols) - for i := 0; i < numCols; i++ { - colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} - } - chk := chunk.New(colTypes, numRows, numRows) - - for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { - for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { - chk.AppendInt64(colOrdinal, 123) - } - } - - codec := chunk.NewCodec(colTypes) - buffer := codec.Encode(chk) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - codec.DecodeToChunkTest(buffer, chk) - } -} - func BenchmarkDecodeToChunkWithRequestedRows_string(b *testing.B) { numCols := 4 numRows := 1024 @@ -631,44 +606,57 @@ func BenchmarkDecodeToChunkWithRequestedRows_string(b *testing.B) { } } -func BenchmarkDecodeToChunk_string(b *testing.B) { +func BenchmarkDecodeToChunkReuse_int(b *testing.B) { numCols := 4 numRows := 1024 colTypes := make([]*types.FieldType, numCols) for i := 0; i < numCols; i++ { - colTypes[i] = &types.FieldType{Tp: mysql.TypeString} + colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} } chk := chunk.New(colTypes, numRows, numRows) for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { - chk.AppendString(colOrdinal, "123456") + chk.AppendInt64(colOrdinal, 123) } } codec := chunk.NewCodec(colTypes) buffer := codec.Encode(chk) + acodec := chunk.NewDecoder( + chunk.NewChunkWithCapacity(colTypes, 0), + colTypes) + + chk.SetRequiredRows(1024, 1024) + chk.Reset() + b.ResetTimer() for i := 0; i < b.N; i++ { - codec.DecodeToChunkTest(buffer, chk) + acodec.Reset(buffer) + for !acodec.IsFinished() { + for !chk.IsFull() && !acodec.IsFinished() { + acodec.ReuseIntermChk(chk) + } + chk.Reset() + } } } -func BenchmarkDecodeToChunkReuse_int(b *testing.B) { +func BenchmarkDecodeToChunkReuse_string(b *testing.B) { numCols := 4 numRows := 1024 colTypes := make([]*types.FieldType, numCols) for i := 0; i < numCols; i++ { - colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} + colTypes[i] = &types.FieldType{Tp: mysql.TypeString} } chk := chunk.New(colTypes, numRows, numRows) for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { - chk.AppendInt64(colOrdinal, 123) + chk.AppendString(colOrdinal, "123456") } } @@ -694,10 +682,25 @@ func BenchmarkDecodeToChunkReuse_int(b *testing.B) { } } -func BenchmarkDecodeToChunkReuse_string(b *testing.B) { - numCols := 4 - numRows := 1024 +type dataBufferGen struct { + times int + data [][]byte + cur int +} + +func (g *dataBufferGen) get() []byte { + if g.cur == len(g.data) { + g.cur, g.times = 0, g.times-1 + } + if g.times == 0 { + return nil + } + res := make([]byte, 0, 1024) + res = append(res[:0], g.data[g.cur]...) + return res +} +func populateBufferForChunk(numCols, numRows int) []byte { colTypes := make([]*types.FieldType, numCols) for i := 0; i < numCols; i++ { colTypes[i] = &types.FieldType{Tp: mysql.TypeString} @@ -709,25 +712,101 @@ func BenchmarkDecodeToChunkReuse_string(b *testing.B) { chk.AppendString(colOrdinal, "123456") } } - codec := chunk.NewCodec(colTypes) - buffer := codec.Encode(chk) + return codec.Encode(chk) +} +func buildBigResponse(gen *dataBufferGen) { + gen.data = make([][]byte, 20) + for i := 0; i < 20; i++ { + gen.data[i] = make([]byte, 0, 1024) + } + betchSize := 32 + for i := 0; i < 20; i++ { + gen.data[i] = populateBufferForChunk(4, betchSize) + if betchSize < 1024 { + betchSize *= 2 + } + } +} + +func BenchmarkDecodeToChunkRequiredRows_BigResponse(b *testing.B) { + datagen := &dataBufferGen{} + buildBigResponse(datagen) + + colTypes := make([]*types.FieldType, 4) + for i := 0; i < 4; i++ { + colTypes[i] = &types.FieldType{Tp: mysql.TypeString} + } + chk := chunk.New(colTypes, 4, 1024) acodec := chunk.NewDecoder( chunk.NewChunkWithCapacity(colTypes, 0), colTypes) - chk.SetRequiredRows(1024, 1024) + chk.SetRequiredRows(100, 1024) chk.Reset() - + datagen.times = b.N + datagen.cur = 0 b.ResetTimer() - for i := 0; i < b.N; i++ { - acodec.Reset(buffer) - for !acodec.IsFinished() { - for !chk.IsFull() && !acodec.IsFinished() { - acodec.ReuseIntermChk(chk) + + for true { + if chk.IsFull() { + chk.Reset() + } + if acodec.IsFinished() { + b.StopTimer() + buffer := datagen.get() + if buffer == nil { + return + } + b.StartTimer() + acodec.Reset(buffer) + } + // If the next chunk size is greater than required rows * 0.8, reuse the memory of the next chunk and return + // immediately. Otherwise, splice the data to one chunk and wait the next chunk. + if acodec.RemainedRows() > int(float64(chk.RequiredRows())*0.8) { + if chk.NumRows() > 0 { + chk.Reset() + continue } + acodec.ReuseIntermChk(chk) chk.Reset() + datagen.cur++ + continue } + acodec.Decode(chk) + if acodec.IsFinished() { + datagen.cur++ + } + } +} + +func BenchmarkDecodeToChunk_BigResponse(b *testing.B) { + datagen := &dataBufferGen{} + buildBigResponse(datagen) + + colTypes := make([]*types.FieldType, 4) + for i := 0; i < 4; i++ { + colTypes[i] = &types.FieldType{Tp: mysql.TypeString} + } + chk := chunk.New(colTypes, 4, 1024) + chk.SetRequiredRows(100, 1024) + chk.Reset() + datagen.times = b.N + datagen.cur = 0 + b.ResetTimer() + + for true { + b.StopTimer() + buffer := datagen.get() + if buffer == nil { + return + } + b.StartTimer() + + codec := chunk.NewCodec(colTypes) + _ = codec.DecodeToChunkTest(buffer, chk) + datagen.cur++ + chk.Reset() } } From 5f2aa80f32ae4c2fb5c3e901bfa1e5d2a83af836 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Mon, 4 Nov 2019 15:23:34 +0800 Subject: [PATCH 28/34] temp --- distsql/distsql_test.go | 108 +++++++++++++++++++++++++++++++++------- util/chunk/codec.go | 45 ----------------- 2 files changed, 89 insertions(+), 64 deletions(-) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 2ddddfae8c9a7..a0f328c2a2e6f 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -16,6 +16,8 @@ package distsql import ( "context" "fmt" + "github.com/pingcap/tidb/util/mock" + "sync" "testing" "time" @@ -25,6 +27,7 @@ import ( "github.com/pingcap/parser/charset" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" @@ -781,32 +784,99 @@ func BenchmarkDecodeToChunkRequiredRows_BigResponse(b *testing.B) { } } -func BenchmarkDecodeToChunk_BigResponse(b *testing.B) { - datagen := &dataBufferGen{} - buildBigResponse(datagen) +type mockResponseForChunkEncode struct { + mockResponse +} + +func (resp *mockResponseForChunkEncode) Next(ctx context.Context) (kv.ResultSubset, error) { + resp.Lock() + defer resp.Unlock() + + if resp.count >= resp.total { + return nil, nil + } + numRows := mathutil.Min(resp.batch, resp.total-resp.count) + resp.count += numRows colTypes := make([]*types.FieldType, 4) for i := 0; i < 4; i++ { - colTypes[i] = &types.FieldType{Tp: mysql.TypeString} + colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} + } + chk := chunk.New(colTypes, numRows, numRows) + + for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { + for colOrdinal := 0; colOrdinal < 4; colOrdinal++ { + chk.AppendInt64(colOrdinal, 123) + } + } + codec := chunk.NewCodec(colTypes) + bytes := codec.Encode(chk) + + chunks := make([]tipb.Chunk, numRows) + for i := range chunks { + chkData := make([]byte, len(bytes)) + copy(chkData, bytes) + chunks[i] = tipb.Chunk{RowsData: chkData} } - chk := chunk.New(colTypes, 4, 1024) - chk.SetRequiredRows(100, 1024) - chk.Reset() - datagen.times = b.N - datagen.cur = 0 - b.ResetTimer() + respPB := &tipb.SelectResponse{ + Chunks: chunks, + OutputCounts: []int64{1}, + } + respBytes, err := respPB.Marshal() + if err != nil { + panic(err) + } + return &mockResultSubset{respBytes}, nil +} + +func createSelectNormal(batch, totalRows int, ctx sessionctx.Context) (*selectResult, []*types.FieldType) { + request, _ := (&RequestBuilder{}).SetKeyRanges(nil). + SetDAGRequest(&tipb.DAGRequest{}). + SetDesc(false). + SetKeepOrder(false). + SetFromSessionVars(variable.NewSessionVars()). + SetMemTracker(memory.NewTracker(stringutil.StringerStr("testSuite.createSelectNormal"), + ctx.GetSessionVars().MemQuotaDistSQL)). + Build() + + /// 4 int64 types. + colTypes := []*types.FieldType{ + { + Tp: mysql.TypeLonglong, + Flen: mysql.MaxIntWidth, + Decimal: 0, + Flag: mysql.BinaryFlag, + Charset: charset.CharsetBin, + Collate: charset.CollationBin, + }, + } + colTypes = append(colTypes, colTypes[0]) + colTypes = append(colTypes, colTypes[0]) + colTypes = append(colTypes, colTypes[0]) + + // Test Next. + var response SelectResult + response, _ = Select(context.TODO(), ctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false)) + + result, _ := response.(*selectResult) + resp, _ := result.resp.(*mockResponse) + resp.total = totalRows + resp.batch = batch + + return result, colTypes +} + +func BenchmarkSelectResponseChunk_BigResponse(b *testing.B) { + ctx := mock.NewContext() + + selectResult, colTypes := createSelectNormal(100, 100000, ctx) + selectResult.Fetch(context.TODO()) + chk := chunk.NewChunkWithCapacity(colTypes, 1024) for true { - b.StopTimer() - buffer := datagen.get() - if buffer == nil { + selectResult.Next(context.TODO(), chk) + if chk.NumCols() == 0 { return } - b.StartTimer() - - codec := chunk.NewCodec(colTypes) - _ = codec.DecodeToChunkTest(buffer, chk) - datagen.cur++ - chk.Reset() } } diff --git a/util/chunk/codec.go b/util/chunk/codec.go index 6c9e30260b2c5..f866ac34c6efc 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -311,48 +311,3 @@ func (c *Decoder) decodeColumn(chk *Chunk, ordinal int, requiredRows int) { destCol.data = append(destCol.data, srcCol.data[:numDataBytes]...) srcCol.data = srcCol.data[numDataBytes:] } - -// DecodeToChunkTest decodes a Chunk from a byte slice, return the remained unused bytes. (Only test, will remove before I merge this pr) -func (c *Codec) DecodeToChunkTest(buffer []byte, chk *Chunk) (remained []byte) { - for i := 0; i < len(chk.columns); i++ { - buffer = c.decodeColumnTest(buffer, chk.columns[i], i) - } - return buffer -} - -// decodeColumn decodes a Column from a byte slice, return the remained unused bytes. -func (c *Codec) decodeColumnTest(buffer []byte, col *Column, ordinal int) (remained []byte) { - // Todo(Shenghui Wu): Optimize all data is null. - // decode length. - col.length = int(binary.LittleEndian.Uint32(buffer)) - buffer = buffer[4:] - - // decode nullCount. - nullCount := int(binary.LittleEndian.Uint32(buffer)) - buffer = buffer[4:] - - // decode nullBitmap. - if nullCount > 0 { - numNullBitmapBytes := (col.length + 7) / 8 - col.nullBitmap = append(col.nullBitmap[:0], buffer[:numNullBitmapBytes]...) - buffer = buffer[numNullBitmapBytes:] - } else { - c.setAllNotNull(col) - } - - // decode offsets. - numFixedBytes := getFixedLen(c.colTypes[ordinal]) - numDataBytes := int64(numFixedBytes * col.length) - if numFixedBytes == -1 { - numOffsetBytes := (col.length + 1) * 8 - col.offsets = append(col.offsets[:0], bytesToI64Slice(buffer[:numOffsetBytes])...) - buffer = buffer[numOffsetBytes:] - numDataBytes = col.offsets[col.length] - } else if cap(col.elemBuf) < numFixedBytes { - col.elemBuf = make([]byte, numFixedBytes) - } - - // decode data. - col.data = append(col.data[:0], buffer[:numDataBytes]...) - return buffer[numDataBytes:] -} From 774d633a03fc41cc1516ff3df47cc07692010bfc Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Mon, 4 Nov 2019 23:09:14 +0800 Subject: [PATCH 29/34] benchmark --- distsql/distsql_test.go | 502 ++++++---------------------------------- 1 file changed, 67 insertions(+), 435 deletions(-) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index a0f328c2a2e6f..8434d430086fb 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -16,8 +16,7 @@ package distsql import ( "context" "fmt" - "github.com/pingcap/tidb/util/mock" - + "github.com/pingcap/tidb/config" "sync" "testing" "time" @@ -28,7 +27,6 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/types" @@ -368,20 +366,48 @@ func (resp *mockResponse) Next(ctx context.Context) (kv.ResultSubset, error) { numRows := mathutil.Min(resp.batch, resp.total-resp.count) resp.count += numRows - datum := types.NewIntDatum(1) - bytes := make([]byte, 0, 100) - bytes, _ = codec.EncodeValue(nil, bytes, datum, datum, datum, datum) - chunks := make([]tipb.Chunk, numRows) - for i := range chunks { - chkData := make([]byte, len(bytes)) - copy(chkData, bytes) - chunks[i] = tipb.Chunk{RowsData: chkData} + var chunks []tipb.Chunk + if !config.GetGlobalConfig().TiKVClient.EnableArrow { + datum := types.NewIntDatum(1) + bytes := make([]byte, 0, 100) + bytes, _ = codec.EncodeValue(nil, bytes, datum, datum, datum, datum) + chunks = make([]tipb.Chunk, numRows) + for i := range chunks { + chkData := make([]byte, len(bytes)) + copy(chkData, bytes) + chunks[i] = tipb.Chunk{RowsData: chkData} + } + } else { + chunks = make([]tipb.Chunk, 0) + for numRows > 0 { + rows := mathutil.Min(numRows, 1024) + numRows -= rows + + colTypes := make([]*types.FieldType, 4) + for i := 0; i < 4; i++ { + colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} + } + chk := chunk.New(colTypes, numRows, numRows) + + for rowOrdinal := 0; rowOrdinal < rows; rowOrdinal++ { + for colOrdinal := 0; colOrdinal < 4; colOrdinal++ { + chk.AppendInt64(colOrdinal, 123) + } + } + + codec := chunk.NewCodec(colTypes) + buffer := codec.Encode(chk) + chunks = append(chunks, tipb.Chunk{RowsData: buffer}) + } } respPB := &tipb.SelectResponse{ Chunks: chunks, OutputCounts: []int64{1}, } + if config.GetGlobalConfig().TiKVClient.EnableArrow { + respPB.EncodeType = tipb.EncodeType_TypeArrow + } respBytes, err := respPB.Marshal() if err != nil { panic(err) @@ -410,426 +436,6 @@ func (r *mockResultSubset) MemSize() int64 { return int64(cap(r.data)) } // RespTime implements kv.ResultSubset interface. func (r *mockResultSubset) RespTime() time.Duration { return 0 } -func populateBuffer() []byte { - numCols := 4 - numRows := 1024 - buffer := make([]byte, 0, 1024) - sc := &stmtctx.StatementContext{TimeZone: time.Local} - - for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { - for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { - buffer, _ = codec.EncodeValue(sc, buffer, types.NewIntDatum(123)) - } - } - - return buffer -} - -func mockReadRowsData(buffer []byte, colTypes []*types.FieldType, chk *chunk.Chunk) (err error) { - chk.Reset() - numCols := 4 - numRows := 1024 - - decoder := codec.NewDecoder(chk, time.Local) - for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { - for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { - buffer, err = decoder.DecodeOne(buffer, colOrdinal, colTypes[colOrdinal]) - if err != nil { - return err - } - } - } - - return nil -} - -func BenchmarkReadRowsData(b *testing.B) { - numCols := 4 - numRows := 1024 - - colTypes := make([]*types.FieldType, numCols) - for i := 0; i < numCols; i++ { - colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} - } - chk := chunk.New(colTypes, numRows, numRows) - - buffer := populateBuffer() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - mockReadRowsData(buffer, colTypes, chk) - } -} - -func BenchmarkDecodeToChunk(b *testing.B) { - numCols := 4 - numRows := 1024 - - colTypes := make([]*types.FieldType, numCols) - for i := 0; i < numCols; i++ { - colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} - } - chk := chunk.New(colTypes, numRows, numRows) - - for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { - for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { - chk.AppendInt64(colOrdinal, 123) - } - } - - codec := chunk.NewCodec(colTypes) - buffer := codec.Encode(chk) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - codec.DecodeToChunk(buffer, chk) - } -} - -func BenchmarkReadRowsDataWithNewChunk(b *testing.B) { - numCols := 4 - numRows := 1024 - - colTypes := make([]*types.FieldType, numCols) - for i := 0; i < numCols; i++ { - colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} - } - buffer := populateBuffer() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - b.StopTimer() - chk := chunk.New(colTypes, numRows, numRows) - b.StartTimer() - mockReadRowsData(buffer, colTypes, chk) - } -} - -func BenchmarkDecodeToChunkWithNewChunk(b *testing.B) { - numCols := 4 - numRows := 1024 - - colTypes := make([]*types.FieldType, numCols) - for i := 0; i < numCols; i++ { - colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} - } - chk := chunk.New(colTypes, numRows, numRows) - - for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { - for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { - chk.AppendInt64(colOrdinal, 123) - } - } - - codec := chunk.NewCodec(colTypes) - buffer := codec.Encode(chk) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - b.StopTimer() - chk := chunk.New(colTypes, numRows, numRows) - b.StartTimer() - codec.DecodeToChunk(buffer, chk) - } -} - -func BenchmarkDecodeToChunkWithRequestedRows_int(b *testing.B) { - numCols := 4 - numRows := 1024 - - colTypes := make([]*types.FieldType, numCols) - for i := 0; i < numCols; i++ { - colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} - } - chk := chunk.New(colTypes, numRows, numRows) - - for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { - for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { - chk.AppendInt64(colOrdinal, 123) - } - } - - codec := chunk.NewCodec(colTypes) - buffer := codec.Encode(chk) - - acodec := chunk.NewDecoder( - chunk.NewChunkWithCapacity(colTypes, 0), - colTypes) - - chk.SetRequiredRows(1024, 1024) - chk.Reset() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - acodec.Reset(buffer) - for !acodec.IsFinished() { - for !chk.IsFull() && !acodec.IsFinished() { - acodec.Decode(chk) - } - chk.Reset() - } - } -} - -func BenchmarkDecodeToChunkWithRequestedRows_string(b *testing.B) { - numCols := 4 - numRows := 1024 - - colTypes := make([]*types.FieldType, numCols) - for i := 0; i < numCols; i++ { - colTypes[i] = &types.FieldType{Tp: mysql.TypeString} - } - chk := chunk.New(colTypes, numRows, numRows) - - for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { - for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { - chk.AppendString(colOrdinal, "123456") - } - } - - codec := chunk.NewCodec(colTypes) - buffer := codec.Encode(chk) - - acodec := chunk.NewDecoder( - chunk.NewChunkWithCapacity(colTypes, 0), - colTypes) - - chk.SetRequiredRows(1024, 1024) - chk.Reset() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - acodec.Reset(buffer) - for !acodec.IsFinished() { - for !chk.IsFull() && !acodec.IsFinished() { - acodec.Decode(chk) - } - chk.Reset() - } - } -} - -func BenchmarkDecodeToChunkReuse_int(b *testing.B) { - numCols := 4 - numRows := 1024 - - colTypes := make([]*types.FieldType, numCols) - for i := 0; i < numCols; i++ { - colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} - } - chk := chunk.New(colTypes, numRows, numRows) - - for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { - for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { - chk.AppendInt64(colOrdinal, 123) - } - } - - codec := chunk.NewCodec(colTypes) - buffer := codec.Encode(chk) - - acodec := chunk.NewDecoder( - chunk.NewChunkWithCapacity(colTypes, 0), - colTypes) - - chk.SetRequiredRows(1024, 1024) - chk.Reset() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - acodec.Reset(buffer) - for !acodec.IsFinished() { - for !chk.IsFull() && !acodec.IsFinished() { - acodec.ReuseIntermChk(chk) - } - chk.Reset() - } - } -} - -func BenchmarkDecodeToChunkReuse_string(b *testing.B) { - numCols := 4 - numRows := 1024 - - colTypes := make([]*types.FieldType, numCols) - for i := 0; i < numCols; i++ { - colTypes[i] = &types.FieldType{Tp: mysql.TypeString} - } - chk := chunk.New(colTypes, numRows, numRows) - - for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { - for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { - chk.AppendString(colOrdinal, "123456") - } - } - - codec := chunk.NewCodec(colTypes) - buffer := codec.Encode(chk) - - acodec := chunk.NewDecoder( - chunk.NewChunkWithCapacity(colTypes, 0), - colTypes) - - chk.SetRequiredRows(1024, 1024) - chk.Reset() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - acodec.Reset(buffer) - for !acodec.IsFinished() { - for !chk.IsFull() && !acodec.IsFinished() { - acodec.ReuseIntermChk(chk) - } - chk.Reset() - } - } -} - -type dataBufferGen struct { - times int - data [][]byte - cur int -} - -func (g *dataBufferGen) get() []byte { - if g.cur == len(g.data) { - g.cur, g.times = 0, g.times-1 - } - if g.times == 0 { - return nil - } - res := make([]byte, 0, 1024) - res = append(res[:0], g.data[g.cur]...) - return res -} - -func populateBufferForChunk(numCols, numRows int) []byte { - colTypes := make([]*types.FieldType, numCols) - for i := 0; i < numCols; i++ { - colTypes[i] = &types.FieldType{Tp: mysql.TypeString} - } - chk := chunk.New(colTypes, numRows, numRows) - - for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { - for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { - chk.AppendString(colOrdinal, "123456") - } - } - codec := chunk.NewCodec(colTypes) - return codec.Encode(chk) -} - -func buildBigResponse(gen *dataBufferGen) { - gen.data = make([][]byte, 20) - for i := 0; i < 20; i++ { - gen.data[i] = make([]byte, 0, 1024) - } - betchSize := 32 - for i := 0; i < 20; i++ { - gen.data[i] = populateBufferForChunk(4, betchSize) - if betchSize < 1024 { - betchSize *= 2 - } - } -} - -func BenchmarkDecodeToChunkRequiredRows_BigResponse(b *testing.B) { - datagen := &dataBufferGen{} - buildBigResponse(datagen) - - colTypes := make([]*types.FieldType, 4) - for i := 0; i < 4; i++ { - colTypes[i] = &types.FieldType{Tp: mysql.TypeString} - } - chk := chunk.New(colTypes, 4, 1024) - acodec := chunk.NewDecoder( - chunk.NewChunkWithCapacity(colTypes, 0), - colTypes) - - chk.SetRequiredRows(100, 1024) - chk.Reset() - datagen.times = b.N - datagen.cur = 0 - b.ResetTimer() - - for true { - if chk.IsFull() { - chk.Reset() - } - if acodec.IsFinished() { - b.StopTimer() - buffer := datagen.get() - if buffer == nil { - return - } - b.StartTimer() - acodec.Reset(buffer) - } - // If the next chunk size is greater than required rows * 0.8, reuse the memory of the next chunk and return - // immediately. Otherwise, splice the data to one chunk and wait the next chunk. - if acodec.RemainedRows() > int(float64(chk.RequiredRows())*0.8) { - if chk.NumRows() > 0 { - chk.Reset() - continue - } - acodec.ReuseIntermChk(chk) - chk.Reset() - datagen.cur++ - continue - } - acodec.Decode(chk) - if acodec.IsFinished() { - datagen.cur++ - } - } -} - -type mockResponseForChunkEncode struct { - mockResponse -} - -func (resp *mockResponseForChunkEncode) Next(ctx context.Context) (kv.ResultSubset, error) { - resp.Lock() - defer resp.Unlock() - - if resp.count >= resp.total { - return nil, nil - } - numRows := mathutil.Min(resp.batch, resp.total-resp.count) - resp.count += numRows - - colTypes := make([]*types.FieldType, 4) - for i := 0; i < 4; i++ { - colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} - } - chk := chunk.New(colTypes, numRows, numRows) - - for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { - for colOrdinal := 0; colOrdinal < 4; colOrdinal++ { - chk.AppendInt64(colOrdinal, 123) - } - } - codec := chunk.NewCodec(colTypes) - bytes := codec.Encode(chk) - - chunks := make([]tipb.Chunk, numRows) - for i := range chunks { - chkData := make([]byte, len(bytes)) - copy(chkData, bytes) - chunks[i] = tipb.Chunk{RowsData: chkData} - } - - respPB := &tipb.SelectResponse{ - Chunks: chunks, - OutputCounts: []int64{1}, - } - respBytes, err := respPB.Marshal() - if err != nil { - panic(err) - } - return &mockResultSubset{respBytes}, nil -} - func createSelectNormal(batch, totalRows int, ctx sessionctx.Context) (*selectResult, []*types.FieldType) { request, _ := (&RequestBuilder{}).SetKeyRanges(nil). SetDAGRequest(&tipb.DAGRequest{}). @@ -868,15 +474,41 @@ func createSelectNormal(batch, totalRows int, ctx sessionctx.Context) (*selectRe } func BenchmarkSelectResponseChunk_BigResponse(b *testing.B) { - ctx := mock.NewContext() + s := &testSuite{} + s.SetUpSuite(nil) + b.StopTimer() + selectResult, colTypes := createSelectNormal(4000, b.N*4000, s.sctx) + selectResult.Fetch(context.TODO()) + chk := chunk.NewChunkWithCapacity(colTypes, 1024) + b.StartTimer() + for true { + err := selectResult.Next(context.TODO(), chk) + if err != nil { + panic(err) + } + if chk.NumRows() == 0 { + return + } + chk.Reset() + } +} - selectResult, colTypes := createSelectNormal(100, 100000, ctx) +func BenchmarkSelectResponseChunk_SmallResponse(b *testing.B) { + s := &testSuite{} + s.SetUpSuite(nil) + b.StopTimer() + selectResult, colTypes := createSelectNormal(32, b.N*32, s.sctx) selectResult.Fetch(context.TODO()) chk := chunk.NewChunkWithCapacity(colTypes, 1024) + b.StartTimer() for true { - selectResult.Next(context.TODO(), chk) - if chk.NumCols() == 0 { + err := selectResult.Next(context.TODO(), chk) + if err != nil { + panic(err) + } + if chk.NumRows() == 0 { return } + chk.Reset() } } From 08eab3fff2cd5d211ebc39693fa5aa90b399f6e2 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 5 Nov 2019 00:25:38 +0800 Subject: [PATCH 30/34] adjust --- distsql/distsql_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 8434d430086fb..6acefdbb02175 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -16,7 +16,6 @@ package distsql import ( "context" "fmt" - "github.com/pingcap/tidb/config" "sync" "testing" "time" @@ -25,6 +24,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/parser/charset" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" From 3c7b3aaf84b02984fa024a5625c692efd8ef8ba5 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 5 Nov 2019 09:49:34 +0800 Subject: [PATCH 31/34] fix ci --- distsql/distsql_test.go | 11 ++++++++--- distsql/request_builder_test.go | 2 ++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 6acefdbb02175..382ae8624df4b 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -24,7 +24,6 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/parser/charset" "github.com/pingcap/parser/mysql" - "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" @@ -128,6 +127,7 @@ func (s *testSuite) TestSelectMemTracker(c *C) { } func (s *testSuite) TestSelectNormalChunkSize(c *C) { + s.sctx.GetSessionVars().EnableArrow = false response, colTypes := s.createSelectNormal(100, 1000000, c, nil) response.Fetch(context.TODO()) s.testChunkSize(response, colTypes, c) @@ -289,6 +289,7 @@ func (s *testSuite) testChunkSize(response SelectResult, colTypes []*types.Field } func (s *testSuite) TestAnalyze(c *C) { + s.sctx.GetSessionVars().EnableArrow = false request, err := (&RequestBuilder{}).SetKeyRanges(nil). SetAnalyzeRequest(&tipb.AnalyzeReq{}). SetKeepOrder(true). @@ -314,6 +315,7 @@ func (s *testSuite) TestAnalyze(c *C) { } func (s *testSuite) TestChecksum(c *C) { + s.sctx.GetSessionVars().EnableArrow = false request, err := (&RequestBuilder{}).SetKeyRanges(nil). SetChecksumRequest(&tipb.ChecksumRequest{}). Build() @@ -343,6 +345,7 @@ type mockResponse struct { count int total int batch int + ctx sessionctx.Context sync.Mutex } @@ -367,7 +370,7 @@ func (resp *mockResponse) Next(ctx context.Context) (kv.ResultSubset, error) { resp.count += numRows var chunks []tipb.Chunk - if !config.GetGlobalConfig().TiKVClient.EnableArrow { + if !enableTypeArrow(resp.ctx) { datum := types.NewIntDatum(1) bytes := make([]byte, 0, 100) bytes, _ = codec.EncodeValue(nil, bytes, datum, datum, datum, datum) @@ -405,8 +408,10 @@ func (resp *mockResponse) Next(ctx context.Context) (kv.ResultSubset, error) { Chunks: chunks, OutputCounts: []int64{1}, } - if config.GetGlobalConfig().TiKVClient.EnableArrow { + if enableTypeArrow(resp.ctx) { respPB.EncodeType = tipb.EncodeType_TypeArrow + } else { + respPB.EncodeType = tipb.EncodeType_TypeDefault } respBytes, err := respPB.Marshal() if err != nil { diff --git a/distsql/request_builder_test.go b/distsql/request_builder_test.go index 2274f8003cf57..62c25cfc3e5f2 100644 --- a/distsql/request_builder_test.go +++ b/distsql/request_builder_test.go @@ -60,6 +60,7 @@ func (s *testSuite) SetUpSuite(c *C) { ctx.Store = &mock.Store{ Client: &mock.Client{ MockResponse: &mockResponse{ + ctx: ctx, batch: 1, total: 2, }, @@ -77,6 +78,7 @@ func (s *testSuite) SetUpTest(c *C) { store := ctx.Store.(*mock.Store) store.Client = &mock.Client{ MockResponse: &mockResponse{ + ctx: ctx, batch: 1, total: 2, }, From bc37593d26fbd6ede18acabd4a1b3819b0531b13 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 5 Nov 2019 14:52:35 +0800 Subject: [PATCH 32/34] benchmark --- distsql/distsql_test.go | 64 ++++++++++++++++++++++------------------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 382ae8624df4b..1afa004feaa7b 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -479,41 +479,45 @@ func createSelectNormal(batch, totalRows int, ctx sessionctx.Context) (*selectRe } func BenchmarkSelectResponseChunk_BigResponse(b *testing.B) { - s := &testSuite{} - s.SetUpSuite(nil) - b.StopTimer() - selectResult, colTypes := createSelectNormal(4000, b.N*4000, s.sctx) - selectResult.Fetch(context.TODO()) - chk := chunk.NewChunkWithCapacity(colTypes, 1024) - b.StartTimer() - for true { - err := selectResult.Next(context.TODO(), chk) - if err != nil { - panic(err) - } - if chk.NumRows() == 0 { - return + for i := 0; i < b.N; i++ { + b.StopTimer() + s := &testSuite{} + s.SetUpSuite(nil) + selectResult, colTypes := createSelectNormal(4000, 20000, s.sctx) + selectResult.Fetch(context.TODO()) + chk := chunk.NewChunkWithCapacity(colTypes, 1024) + b.StartTimer() + for true { + err := selectResult.Next(context.TODO(), chk) + if err != nil { + panic(err) + } + if chk.NumRows() == 0 { + break + } + chk.Reset() } - chk.Reset() } } func BenchmarkSelectResponseChunk_SmallResponse(b *testing.B) { - s := &testSuite{} - s.SetUpSuite(nil) - b.StopTimer() - selectResult, colTypes := createSelectNormal(32, b.N*32, s.sctx) - selectResult.Fetch(context.TODO()) - chk := chunk.NewChunkWithCapacity(colTypes, 1024) - b.StartTimer() - for true { - err := selectResult.Next(context.TODO(), chk) - if err != nil { - panic(err) - } - if chk.NumRows() == 0 { - return + for i := 0; i < b.N; i++ { + b.StopTimer() + s := &testSuite{} + s.SetUpSuite(nil) + selectResult, colTypes := createSelectNormal(32, 3200, s.sctx) + selectResult.Fetch(context.TODO()) + chk := chunk.NewChunkWithCapacity(colTypes, 1024) + b.StartTimer() + for true { + err := selectResult.Next(context.TODO(), chk) + if err != nil { + panic(err) + } + if chk.NumRows() == 0 { + break + } + chk.Reset() } - chk.Reset() } } From f3fadc67737248b48cbf3bb50d8e4894c62879c0 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 5 Nov 2019 15:50:07 +0800 Subject: [PATCH 33/34] teardownsuite --- distsql/distsql_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 1afa004feaa7b..f9316f6ebc7f9 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -497,6 +497,7 @@ func BenchmarkSelectResponseChunk_BigResponse(b *testing.B) { } chk.Reset() } + s.TearDownSuite(nil) } } @@ -519,5 +520,6 @@ func BenchmarkSelectResponseChunk_SmallResponse(b *testing.B) { } chk.Reset() } + s.TearDownSuite(nil) } } From 2d3ccfc22948f1dc5e94a6a0ef2af1bec4b9467e Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 5 Nov 2019 17:57:28 +0800 Subject: [PATCH 34/34] set chunk size --- distsql/distsql_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index f9316f6ebc7f9..715e61cd0acaa 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -483,6 +483,8 @@ func BenchmarkSelectResponseChunk_BigResponse(b *testing.B) { b.StopTimer() s := &testSuite{} s.SetUpSuite(nil) + s.sctx.GetSessionVars().InitChunkSize = 32 + s.sctx.GetSessionVars().MaxChunkSize = 1024 selectResult, colTypes := createSelectNormal(4000, 20000, s.sctx) selectResult.Fetch(context.TODO()) chk := chunk.NewChunkWithCapacity(colTypes, 1024) @@ -506,6 +508,8 @@ func BenchmarkSelectResponseChunk_SmallResponse(b *testing.B) { b.StopTimer() s := &testSuite{} s.SetUpSuite(nil) + s.sctx.GetSessionVars().InitChunkSize = 32 + s.sctx.GetSessionVars().MaxChunkSize = 1024 selectResult, colTypes := createSelectNormal(32, 3200, s.sctx) selectResult.Fetch(context.TODO()) chk := chunk.NewChunkWithCapacity(colTypes, 1024)