diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 2f70a02ab6ffc..715e61cd0acaa 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -25,7 +25,7 @@ import ( "github.com/pingcap/parser/charset" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/types" @@ -127,6 +127,7 @@ func (s *testSuite) TestSelectMemTracker(c *C) { } func (s *testSuite) TestSelectNormalChunkSize(c *C) { + s.sctx.GetSessionVars().EnableArrow = false response, colTypes := s.createSelectNormal(100, 1000000, c, nil) response.Fetch(context.TODO()) s.testChunkSize(response, colTypes, c) @@ -288,6 +289,7 @@ func (s *testSuite) testChunkSize(response SelectResult, colTypes []*types.Field } func (s *testSuite) TestAnalyze(c *C) { + s.sctx.GetSessionVars().EnableArrow = false request, err := (&RequestBuilder{}).SetKeyRanges(nil). SetAnalyzeRequest(&tipb.AnalyzeReq{}). SetKeepOrder(true). @@ -313,6 +315,7 @@ func (s *testSuite) TestAnalyze(c *C) { } func (s *testSuite) TestChecksum(c *C) { + s.sctx.GetSessionVars().EnableArrow = false request, err := (&RequestBuilder{}).SetKeyRanges(nil). SetChecksumRequest(&tipb.ChecksumRequest{}). Build() @@ -342,6 +345,7 @@ type mockResponse struct { count int total int batch int + ctx sessionctx.Context sync.Mutex } @@ -365,20 +369,50 @@ func (resp *mockResponse) Next(ctx context.Context) (kv.ResultSubset, error) { numRows := mathutil.Min(resp.batch, resp.total-resp.count) resp.count += numRows - datum := types.NewIntDatum(1) - bytes := make([]byte, 0, 100) - bytes, _ = codec.EncodeValue(nil, bytes, datum, datum, datum, datum) - chunks := make([]tipb.Chunk, numRows) - for i := range chunks { - chkData := make([]byte, len(bytes)) - copy(chkData, bytes) - chunks[i] = tipb.Chunk{RowsData: chkData} + var chunks []tipb.Chunk + if !enableTypeArrow(resp.ctx) { + datum := types.NewIntDatum(1) + bytes := make([]byte, 0, 100) + bytes, _ = codec.EncodeValue(nil, bytes, datum, datum, datum, datum) + chunks = make([]tipb.Chunk, numRows) + for i := range chunks { + chkData := make([]byte, len(bytes)) + copy(chkData, bytes) + chunks[i] = tipb.Chunk{RowsData: chkData} + } + } else { + chunks = make([]tipb.Chunk, 0) + for numRows > 0 { + rows := mathutil.Min(numRows, 1024) + numRows -= rows + + colTypes := make([]*types.FieldType, 4) + for i := 0; i < 4; i++ { + colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} + } + chk := chunk.New(colTypes, numRows, numRows) + + for rowOrdinal := 0; rowOrdinal < rows; rowOrdinal++ { + for colOrdinal := 0; colOrdinal < 4; colOrdinal++ { + chk.AppendInt64(colOrdinal, 123) + } + } + + codec := chunk.NewCodec(colTypes) + buffer := codec.Encode(chk) + chunks = append(chunks, tipb.Chunk{RowsData: buffer}) + } } respPB := &tipb.SelectResponse{ Chunks: chunks, OutputCounts: []int64{1}, } + if enableTypeArrow(resp.ctx) { + respPB.EncodeType = tipb.EncodeType_TypeArrow + } else { + respPB.EncodeType = tipb.EncodeType_TypeDefault + } respBytes, err := respPB.Marshal() if err != nil { panic(err) @@ -407,125 +441,89 @@ func (r *mockResultSubset) MemSize() int64 { return int64(cap(r.data)) } // RespTime implements kv.ResultSubset interface. func (r *mockResultSubset) RespTime() time.Duration { return 0 } -func populateBuffer() []byte { - numCols := 4 - numRows := 1024 - buffer := make([]byte, 0, 1024) - sc := &stmtctx.StatementContext{TimeZone: time.Local} - - for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { - for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { - buffer, _ = codec.EncodeValue(sc, buffer, types.NewIntDatum(123)) - } - } - - return buffer -} - -func mockReadRowsData(buffer []byte, colTypes []*types.FieldType, chk *chunk.Chunk) (err error) { - chk.Reset() - numCols := 4 - numRows := 1024 - - decoder := codec.NewDecoder(chk, time.Local) - for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { - for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { - buffer, err = decoder.DecodeOne(buffer, colOrdinal, colTypes[colOrdinal]) - if err != nil { - return err - } - } - } - - return nil -} - -func BenchmarkReadRowsData(b *testing.B) { - numCols := 4 - numRows := 1024 - - colTypes := make([]*types.FieldType, numCols) - for i := 0; i < numCols; i++ { - colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} - } - chk := chunk.New(colTypes, numRows, numRows) - - buffer := populateBuffer() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - mockReadRowsData(buffer, colTypes, chk) - } -} - -func BenchmarkDecodeToChunk(b *testing.B) { - numCols := 4 - numRows := 1024 +func createSelectNormal(batch, totalRows int, ctx sessionctx.Context) (*selectResult, []*types.FieldType) { + request, _ := (&RequestBuilder{}).SetKeyRanges(nil). + SetDAGRequest(&tipb.DAGRequest{}). + SetDesc(false). + SetKeepOrder(false). + SetFromSessionVars(variable.NewSessionVars()). + SetMemTracker(memory.NewTracker(stringutil.StringerStr("testSuite.createSelectNormal"), + ctx.GetSessionVars().MemQuotaDistSQL)). + Build() - colTypes := make([]*types.FieldType, numCols) - for i := 0; i < numCols; i++ { - colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} + /// 4 int64 types. + colTypes := []*types.FieldType{ + { + Tp: mysql.TypeLonglong, + Flen: mysql.MaxIntWidth, + Decimal: 0, + Flag: mysql.BinaryFlag, + Charset: charset.CharsetBin, + Collate: charset.CollationBin, + }, } - chk := chunk.New(colTypes, numRows, numRows) + colTypes = append(colTypes, colTypes[0]) + colTypes = append(colTypes, colTypes[0]) + colTypes = append(colTypes, colTypes[0]) - for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { - for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { - chk.AppendInt64(colOrdinal, 123) - } - } + // Test Next. + var response SelectResult + response, _ = Select(context.TODO(), ctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false)) - codec := chunk.NewCodec(colTypes) - buffer := codec.Encode(chk) + result, _ := response.(*selectResult) + resp, _ := result.resp.(*mockResponse) + resp.total = totalRows + resp.batch = batch - b.ResetTimer() - for i := 0; i < b.N; i++ { - codec.DecodeToChunk(buffer, chk) - } + return result, colTypes } -func BenchmarkReadRowsDataWithNewChunk(b *testing.B) { - numCols := 4 - numRows := 1024 - - colTypes := make([]*types.FieldType, numCols) - for i := 0; i < numCols; i++ { - colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} - } - buffer := populateBuffer() - - b.ResetTimer() +func BenchmarkSelectResponseChunk_BigResponse(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() - chk := chunk.New(colTypes, numRows, numRows) + s := &testSuite{} + s.SetUpSuite(nil) + s.sctx.GetSessionVars().InitChunkSize = 32 + s.sctx.GetSessionVars().MaxChunkSize = 1024 + selectResult, colTypes := createSelectNormal(4000, 20000, s.sctx) + selectResult.Fetch(context.TODO()) + chk := chunk.NewChunkWithCapacity(colTypes, 1024) b.StartTimer() - mockReadRowsData(buffer, colTypes, chk) - } -} - -func BenchmarkDecodeToChunkWithNewChunk(b *testing.B) { - numCols := 4 - numRows := 1024 - - colTypes := make([]*types.FieldType, numCols) - for i := 0; i < numCols; i++ { - colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} - } - chk := chunk.New(colTypes, numRows, numRows) - - for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { - for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { - chk.AppendInt64(colOrdinal, 123) + for true { + err := selectResult.Next(context.TODO(), chk) + if err != nil { + panic(err) + } + if chk.NumRows() == 0 { + break + } + chk.Reset() } + s.TearDownSuite(nil) } +} - codec := chunk.NewCodec(colTypes) - buffer := codec.Encode(chk) - - b.ResetTimer() +func BenchmarkSelectResponseChunk_SmallResponse(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() - chk := chunk.New(colTypes, numRows, numRows) + s := &testSuite{} + s.SetUpSuite(nil) + s.sctx.GetSessionVars().InitChunkSize = 32 + s.sctx.GetSessionVars().MaxChunkSize = 1024 + selectResult, colTypes := createSelectNormal(32, 3200, s.sctx) + selectResult.Fetch(context.TODO()) + chk := chunk.NewChunkWithCapacity(colTypes, 1024) b.StartTimer() - codec.DecodeToChunk(buffer, chk) + for true { + err := selectResult.Next(context.TODO(), chk) + if err != nil { + panic(err) + } + if chk.NumRows() == 0 { + break + } + chk.Reset() + } + s.TearDownSuite(nil) } } diff --git a/distsql/request_builder_test.go b/distsql/request_builder_test.go index 2274f8003cf57..62c25cfc3e5f2 100644 --- a/distsql/request_builder_test.go +++ b/distsql/request_builder_test.go @@ -60,6 +60,7 @@ func (s *testSuite) SetUpSuite(c *C) { ctx.Store = &mock.Store{ Client: &mock.Client{ MockResponse: &mockResponse{ + ctx: ctx, batch: 1, total: 2, }, @@ -77,6 +78,7 @@ func (s *testSuite) SetUpTest(c *C) { store := ctx.Store.(*mock.Store) store.Client = &mock.Client{ MockResponse: &mockResponse{ + ctx: ctx, batch: 1, total: 2, }, diff --git a/distsql/select_result.go b/distsql/select_result.go index 49514b2f472f8..a2393d770f804 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -66,9 +66,10 @@ type selectResult struct { fieldTypes []*types.FieldType ctx sessionctx.Context - selectResp *tipb.SelectResponse - selectRespSize int // record the selectResp.Size() when it is initialized. - respChkIdx int + selectResp *tipb.SelectResponse + selectRespSize int // record the selectResp.Size() when it is initialized. + respChkIdx int + respChunkDecoder *chunk.Decoder feedback *statistics.QueryFeedback partialCount int64 // number of partial results. @@ -183,10 +184,39 @@ func (r *selectResult) readFromDefault(ctx context.Context, chk *chunk.Chunk) er } func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) error { - rowBatchData := r.selectResp.Chunks[r.respChkIdx].RowsData - codec := chunk.NewCodec(r.fieldTypes) - _ = codec.DecodeToChunk(rowBatchData, chk) - r.respChkIdx++ + if r.respChunkDecoder == nil { + r.respChunkDecoder = chunk.NewDecoder( + chunk.NewChunkWithCapacity(r.fieldTypes, 0), + r.fieldTypes, + ) + } + + for !chk.IsFull() { + if r.respChkIdx == len(r.selectResp.Chunks) { + err := r.getSelectResp() + if err != nil || r.selectResp == nil { + return err + } + } + + if r.respChunkDecoder.IsFinished() { + r.respChunkDecoder.Reset(r.selectResp.Chunks[r.respChkIdx].RowsData) + } + // If the next chunk size is greater than required rows * 0.8, reuse the memory of the next chunk and return + // immediately. Otherwise, splice the data to one chunk and wait the next chunk. + if r.respChunkDecoder.RemainedRows() > int(float64(chk.RequiredRows())*0.8) { + if chk.NumRows() > 0 { + return nil + } + r.respChunkDecoder.ReuseIntermChk(chk) + r.respChkIdx++ + return nil + } + r.respChunkDecoder.Decode(chk) + if r.respChunkDecoder.IsFinished() { + r.respChkIdx++ + } + } return nil } diff --git a/expression/integration_test.go b/expression/integration_test.go index c826d51118afd..a80f6925c541d 100755 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -5031,3 +5031,49 @@ func (s *testIntegrationSuite) TestNotExistFunc(c *C) { c.Assert(err.Error(), Equals, "[expression:1305]FUNCTION test.timestampliteral does not exist") } + +func (s *testIntegrationSuite) TestDecodetoChunkReuse(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table chk (a int,b varchar(20))") + for i := 0; i < 200; i++ { + if i%5 == 0 { + tk.MustExec(fmt.Sprintf("insert chk values (NULL,NULL)")) + continue + } + tk.MustExec(fmt.Sprintf("insert chk values (%d,'%s')", i, strconv.Itoa(i))) + } + + tk.Se.GetSessionVars().DistSQLScanConcurrency = 1 + tk.MustExec("set tidb_init_chunk_size = 2") + tk.MustExec("set tidb_max_chunk_size = 32") + defer func() { + tk.MustExec(fmt.Sprintf("set tidb_init_chunk_size = %d", variable.DefInitChunkSize)) + tk.MustExec(fmt.Sprintf("set tidb_max_chunk_size = %d", variable.DefMaxChunkSize)) + }() + rs, err := tk.Exec("select * from chk") + c.Assert(err, IsNil) + req := rs.NewChunk() + var count int + for { + err = rs.Next(context.TODO(), req) + c.Assert(err, IsNil) + numRows := req.NumRows() + if numRows == 0 { + break + } + for i := 0; i < numRows; i++ { + if count%5 == 0 { + c.Assert(req.GetRow(i).IsNull(0), Equals, true) + c.Assert(req.GetRow(i).IsNull(1), Equals, true) + } else { + c.Assert(req.GetRow(i).IsNull(0), Equals, false) + c.Assert(req.GetRow(i).IsNull(1), Equals, false) + c.Assert(req.GetRow(i).GetInt64(0), Equals, int64(count)) + c.Assert(req.GetRow(i).GetString(1), Equals, strconv.Itoa(count)) + } + count++ + } + } + c.Assert(count, Equals, 200) + rs.Close() +} diff --git a/session/session_test.go b/session/session_test.go index 2f41d9a0bf69e..9e9cf72beb36b 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2018,7 +2018,7 @@ func (s *testSchemaSuite) TestTableReaderChunk(c *C) { } c.Assert(count, Equals, 100) // FIXME: revert this result to new group value after distsql can handle initChunkSize. - c.Assert(numChunks, Equals, 10) + c.Assert(numChunks, Equals, 1) rs.Close() } diff --git a/util/chunk/codec.go b/util/chunk/codec.go index 24363ee344d8d..f866ac34c6efc 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -119,7 +119,7 @@ func (c *Codec) decodeColumn(buffer []byte, col *Column, ordinal int) (remained // decode nullBitmap. if nullCount > 0 { numNullBitmapBytes := (col.length + 7) / 8 - col.nullBitmap = append(col.nullBitmap[:0], buffer[:numNullBitmapBytes]...) + col.nullBitmap = buffer[:numNullBitmapBytes:numNullBitmapBytes] buffer = buffer[numNullBitmapBytes:] } else { c.setAllNotNull(col) @@ -130,7 +130,7 @@ func (c *Codec) decodeColumn(buffer []byte, col *Column, ordinal int) (remained numDataBytes := int64(numFixedBytes * col.length) if numFixedBytes == -1 { numOffsetBytes := (col.length + 1) * 8 - col.offsets = append(col.offsets[:0], bytesToI64Slice(buffer[:numOffsetBytes])...) + col.offsets = bytesToI64Slice(buffer[:numOffsetBytes:numOffsetBytes]) buffer = buffer[numOffsetBytes:] numDataBytes = col.offsets[col.length] } else if cap(col.elemBuf) < numFixedBytes { @@ -138,7 +138,7 @@ func (c *Codec) decodeColumn(buffer []byte, col *Column, ordinal int) (remained } // decode data. - col.data = append(col.data[:0], buffer[:numDataBytes]...) + col.data = buffer[:numDataBytes:numDataBytes] return buffer[numDataBytes:] } @@ -189,3 +189,125 @@ func init() { allNotNullBitmap[i] = 0xFF } } + +// Decoder decodes the data returned from the coprocessor and stores the result in Chunk. +// How Decoder works: +// 1. Initialization phase: Decode a whole input byte slice to Decoder.intermChk(intermediate chunk) using Codec.Decode. +// intermChk is introduced to simplify the implementation of decode phase. This phase uses pointer operations with +// less CPU and memory cost. +// 2. Decode phase: +// 2.1 Set the number of rows to be decoded to a value that is a multiple of 8 and greater than +// `chk.RequiredRows() - chk.NumRows()`. This reduces the overhead of copying the srcCol.nullBitMap into +// destCol.nullBitMap. +// 2.2 Append srcCol.offsets to destCol.offsets when the elements is of var-length type. And further adjust the +// offsets according to descCol.offsets[destCol.length]-srcCol.offsets[0]. +// 2.3 Append srcCol.nullBitMap to destCol.nullBitMap. +// 3. Go to step 1 when the input byte slice is consumed. +type Decoder struct { + intermChk *Chunk + codec *Codec + remainedRows int +} + +// NewDecoder creates a new Decoder object for decode a Chunk. +func NewDecoder(chk *Chunk, colTypes []*types.FieldType) *Decoder { + return &Decoder{intermChk: chk, codec: NewCodec(colTypes), remainedRows: 0} +} + +// Decode decodes multiple rows of Decoder.intermChk and stores the result in chk. +func (c *Decoder) Decode(chk *Chunk) { + requiredRows := chk.RequiredRows() - chk.NumRows() + // Set the requiredRows to a multiple of 8. + requiredRows = (requiredRows + 7) >> 3 << 3 + if requiredRows > c.remainedRows { + requiredRows = c.remainedRows + } + for i := 0; i < chk.NumCols(); i++ { + c.decodeColumn(chk, i, requiredRows) + } + c.remainedRows -= requiredRows +} + +// Reset decodes data and store the result in Decoder.intermChk. This decode phase uses pointer operations with less +// CPU and memory costs. +func (c *Decoder) Reset(data []byte) { + c.codec.DecodeToChunk(data, c.intermChk) + c.remainedRows = c.intermChk.NumRows() +} + +// IsFinished indicates whether Decoder.intermChk has been dried up. +func (c *Decoder) IsFinished() bool { + return c.remainedRows == 0 +} + +// RemainedRows indicates Decoder.intermChk has remained rows. +func (c *Decoder) RemainedRows() int { + return c.remainedRows +} + +// ReuseIntermChk swaps `Decoder.intermChk` with `chk` directly when `Decoder.intermChk.NumRows()` is no less +// than `chk.requiredRows * factor` where `factor` is 0.8 now. This can avoid the overhead of appending the +// data from `Decoder.intermChk` to `chk`. Moreover, the column.offsets needs to be further adjusted +// according to column.offset[0]. +func (c *Decoder) ReuseIntermChk(chk *Chunk) { + for i, col := range c.intermChk.columns { + col.length = c.remainedRows + elemLen := getFixedLen(c.codec.colTypes[i]) + if elemLen == varElemLen { + // For var-length types, we need to adjust the offsets before reuse. + if deltaOffset := col.offsets[0]; deltaOffset != 0 { + for j := 0; j < len(col.offsets); j++ { + col.offsets[j] -= deltaOffset + } + } + } + } + chk.SwapColumns(c.intermChk) + c.remainedRows = 0 +} + +func (c *Decoder) decodeColumn(chk *Chunk, ordinal int, requiredRows int) { + elemLen := getFixedLen(c.codec.colTypes[ordinal]) + numDataBytes := int64(elemLen * requiredRows) + srcCol := c.intermChk.columns[ordinal] + destCol := chk.columns[ordinal] + + if elemLen == varElemLen { + // For var-length types, we need to adjust the offsets after appending to destCol. + numDataBytes = srcCol.offsets[requiredRows] - srcCol.offsets[0] + deltaOffset := destCol.offsets[destCol.length] - srcCol.offsets[0] + destCol.offsets = append(destCol.offsets, srcCol.offsets[1:requiredRows+1]...) + for i := destCol.length + 1; i <= destCol.length+requiredRows; i++ { + destCol.offsets[i] = destCol.offsets[i] + deltaOffset + } + srcCol.offsets = srcCol.offsets[requiredRows:] + } + + numNullBitmapBytes := (requiredRows + 7) >> 3 + if destCol.length%8 == 0 { + destCol.nullBitmap = append(destCol.nullBitmap, srcCol.nullBitmap[:numNullBitmapBytes]...) + } else { + destCol.appendMultiSameNullBitmap(false, requiredRows) + bitMapLen := len(destCol.nullBitmap) + // bitOffset indicates the number of valid bits in destCol.nullBitmap's last byte. + bitOffset := destCol.length % 8 + startIdx := (destCol.length - 1) >> 3 + for i := 0; i < numNullBitmapBytes; i++ { + destCol.nullBitmap[startIdx+i] |= srcCol.nullBitmap[i] << bitOffset + // The high order 8-bitOffset bits in `srcCol.nullBitmap[i]` should be appended to the low order of the next slot. + if startIdx+i+1 < bitMapLen { + destCol.nullBitmap[startIdx+i+1] |= srcCol.nullBitmap[i] >> (8 - bitOffset) + } + } + } + // Set all the redundant bits in the last slot of destCol.nullBitmap to 0. + numRedundantBits := uint(len(destCol.nullBitmap)*8 - destCol.length - requiredRows) + bitMask := byte(1<<(8-numRedundantBits)) - 1 + destCol.nullBitmap[len(destCol.nullBitmap)-1] &= bitMask + + srcCol.nullBitmap = srcCol.nullBitmap[numNullBitmapBytes:] + destCol.length += requiredRows + + destCol.data = append(destCol.data, srcCol.data[:numDataBytes]...) + srcCol.data = srcCol.data[numDataBytes:] +}