-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: Support required rows rows for arrow decode format. #12613
Changes from 45 commits
a6405ca
1c2bcd2
1138b8e
10d6dcf
d0b5799
13d22ad
95e3d09
6ebd96c
32c7bfe
b3fd5ab
5f754d6
ec06f30
3e014b9
b0f5105
23149ca
c5ac8b2
0743aa2
cbe252f
1cf0853
d72d5af
97358b9
9a86bd6
d4b77aa
38cbf7c
ef9b412
6161ccf
75975b2
7574eb2
e606377
df40cd6
6931b70
9cc34e7
a359a38
5e01b66
18ff275
b7d94dc
99bb831
2788a89
650c96a
a1e92e7
bc610a1
1f782b1
0c4e14f
1c607bd
cc243d9
5f2aa80
774d633
5223d23
08eab3f
3c7b3aa
bc37593
f3fadc6
9a75aa7
2d3ccfc
26b5fda
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -529,3 +529,284 @@ func BenchmarkDecodeToChunkWithNewChunk(b *testing.B) { | |
codec.DecodeToChunk(buffer, chk) | ||
} | ||
} | ||
|
||
func BenchmarkDecodeToChunkWithRequestedRows_int(b *testing.B) { | ||
numCols := 4 | ||
numRows := 1024 | ||
|
||
colTypes := make([]*types.FieldType, numCols) | ||
for i := 0; i < numCols; i++ { | ||
colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} | ||
} | ||
chk := chunk.New(colTypes, numRows, numRows) | ||
|
||
for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { | ||
for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { | ||
chk.AppendInt64(colOrdinal, 123) | ||
} | ||
} | ||
|
||
codec := chunk.NewCodec(colTypes) | ||
buffer := codec.Encode(chk) | ||
|
||
acodec := chunk.NewDecoder( | ||
chunk.NewChunkWithCapacity(colTypes, 0), | ||
colTypes) | ||
|
||
chk.SetRequiredRows(1024, 1024) | ||
chk.Reset() | ||
|
||
b.ResetTimer() | ||
for i := 0; i < b.N; i++ { | ||
acodec.Reset(buffer) | ||
for !acodec.IsFinished() { | ||
for !chk.IsFull() && !acodec.IsFinished() { | ||
acodec.Decode(chk) | ||
} | ||
chk.Reset() | ||
} | ||
} | ||
} | ||
|
||
func BenchmarkDecodeToChunkWithRequestedRows_string(b *testing.B) { | ||
numCols := 4 | ||
numRows := 1024 | ||
|
||
colTypes := make([]*types.FieldType, numCols) | ||
for i := 0; i < numCols; i++ { | ||
colTypes[i] = &types.FieldType{Tp: mysql.TypeString} | ||
} | ||
chk := chunk.New(colTypes, numRows, numRows) | ||
|
||
for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { | ||
for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { | ||
chk.AppendString(colOrdinal, "123456") | ||
} | ||
} | ||
|
||
codec := chunk.NewCodec(colTypes) | ||
buffer := codec.Encode(chk) | ||
|
||
acodec := chunk.NewDecoder( | ||
chunk.NewChunkWithCapacity(colTypes, 0), | ||
colTypes) | ||
|
||
chk.SetRequiredRows(1024, 1024) | ||
chk.Reset() | ||
|
||
b.ResetTimer() | ||
for i := 0; i < b.N; i++ { | ||
acodec.Reset(buffer) | ||
for !acodec.IsFinished() { | ||
for !chk.IsFull() && !acodec.IsFinished() { | ||
acodec.Decode(chk) | ||
} | ||
chk.Reset() | ||
} | ||
} | ||
} | ||
|
||
func BenchmarkDecodeToChunkReuse_int(b *testing.B) { | ||
numCols := 4 | ||
numRows := 1024 | ||
|
||
colTypes := make([]*types.FieldType, numCols) | ||
for i := 0; i < numCols; i++ { | ||
colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong} | ||
} | ||
chk := chunk.New(colTypes, numRows, numRows) | ||
|
||
for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { | ||
for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { | ||
chk.AppendInt64(colOrdinal, 123) | ||
} | ||
} | ||
|
||
codec := chunk.NewCodec(colTypes) | ||
buffer := codec.Encode(chk) | ||
|
||
acodec := chunk.NewDecoder( | ||
chunk.NewChunkWithCapacity(colTypes, 0), | ||
colTypes) | ||
|
||
chk.SetRequiredRows(1024, 1024) | ||
chk.Reset() | ||
|
||
b.ResetTimer() | ||
for i := 0; i < b.N; i++ { | ||
acodec.Reset(buffer) | ||
for !acodec.IsFinished() { | ||
for !chk.IsFull() && !acodec.IsFinished() { | ||
acodec.ReuseIntermChk(chk) | ||
} | ||
chk.Reset() | ||
} | ||
} | ||
} | ||
|
||
func BenchmarkDecodeToChunkReuse_string(b *testing.B) { | ||
numCols := 4 | ||
numRows := 1024 | ||
|
||
colTypes := make([]*types.FieldType, numCols) | ||
for i := 0; i < numCols; i++ { | ||
colTypes[i] = &types.FieldType{Tp: mysql.TypeString} | ||
} | ||
chk := chunk.New(colTypes, numRows, numRows) | ||
|
||
for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { | ||
for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { | ||
chk.AppendString(colOrdinal, "123456") | ||
} | ||
} | ||
|
||
codec := chunk.NewCodec(colTypes) | ||
buffer := codec.Encode(chk) | ||
|
||
acodec := chunk.NewDecoder( | ||
chunk.NewChunkWithCapacity(colTypes, 0), | ||
colTypes) | ||
|
||
chk.SetRequiredRows(1024, 1024) | ||
chk.Reset() | ||
|
||
b.ResetTimer() | ||
for i := 0; i < b.N; i++ { | ||
acodec.Reset(buffer) | ||
for !acodec.IsFinished() { | ||
for !chk.IsFull() && !acodec.IsFinished() { | ||
acodec.ReuseIntermChk(chk) | ||
} | ||
chk.Reset() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
} | ||
} | ||
|
||
type dataBufferGen struct { | ||
times int | ||
data [][]byte | ||
cur int | ||
} | ||
|
||
func (g *dataBufferGen) get() []byte { | ||
if g.cur == len(g.data) { | ||
g.cur, g.times = 0, g.times-1 | ||
} | ||
if g.times == 0 { | ||
return nil | ||
} | ||
res := make([]byte, 0, 1024) | ||
res = append(res[:0], g.data[g.cur]...) | ||
return res | ||
} | ||
|
||
func populateBufferForChunk(numCols, numRows int) []byte { | ||
colTypes := make([]*types.FieldType, numCols) | ||
for i := 0; i < numCols; i++ { | ||
colTypes[i] = &types.FieldType{Tp: mysql.TypeString} | ||
} | ||
chk := chunk.New(colTypes, numRows, numRows) | ||
|
||
for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ { | ||
for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ { | ||
chk.AppendString(colOrdinal, "123456") | ||
} | ||
} | ||
codec := chunk.NewCodec(colTypes) | ||
return codec.Encode(chk) | ||
} | ||
|
||
func buildBigResponse(gen *dataBufferGen) { | ||
gen.data = make([][]byte, 20) | ||
for i := 0; i < 20; i++ { | ||
gen.data[i] = make([]byte, 0, 1024) | ||
} | ||
betchSize := 32 | ||
for i := 0; i < 20; i++ { | ||
gen.data[i] = populateBufferForChunk(4, betchSize) | ||
if betchSize < 1024 { | ||
betchSize *= 2 | ||
} | ||
} | ||
} | ||
|
||
func BenchmarkDecodeToChunkRequiredRows_BigResponse(b *testing.B) { | ||
datagen := &dataBufferGen{} | ||
buildBigResponse(datagen) | ||
|
||
colTypes := make([]*types.FieldType, 4) | ||
for i := 0; i < 4; i++ { | ||
colTypes[i] = &types.FieldType{Tp: mysql.TypeString} | ||
} | ||
chk := chunk.New(colTypes, 4, 1024) | ||
acodec := chunk.NewDecoder( | ||
chunk.NewChunkWithCapacity(colTypes, 0), | ||
colTypes) | ||
|
||
chk.SetRequiredRows(100, 1024) | ||
chk.Reset() | ||
datagen.times = b.N | ||
datagen.cur = 0 | ||
b.ResetTimer() | ||
|
||
for true { | ||
if chk.IsFull() { | ||
chk.Reset() | ||
} | ||
if acodec.IsFinished() { | ||
b.StopTimer() | ||
buffer := datagen.get() | ||
if buffer == nil { | ||
return | ||
} | ||
b.StartTimer() | ||
acodec.Reset(buffer) | ||
} | ||
// If the next chunk size is greater than required rows * 0.8, reuse the memory of the next chunk and return | ||
// immediately. Otherwise, splice the data to one chunk and wait the next chunk. | ||
if acodec.RemainedRows() > int(float64(chk.RequiredRows())*0.8) { | ||
if chk.NumRows() > 0 { | ||
chk.Reset() | ||
continue | ||
} | ||
acodec.ReuseIntermChk(chk) | ||
chk.Reset() | ||
datagen.cur++ | ||
continue | ||
} | ||
acodec.Decode(chk) | ||
if acodec.IsFinished() { | ||
datagen.cur++ | ||
} | ||
} | ||
} | ||
|
||
func BenchmarkDecodeToChunk_BigResponse(b *testing.B) { | ||
datagen := &dataBufferGen{} | ||
buildBigResponse(datagen) | ||
|
||
colTypes := make([]*types.FieldType, 4) | ||
for i := 0; i < 4; i++ { | ||
colTypes[i] = &types.FieldType{Tp: mysql.TypeString} | ||
} | ||
chk := chunk.New(colTypes, 4, 1024) | ||
chk.SetRequiredRows(100, 1024) | ||
chk.Reset() | ||
datagen.times = b.N | ||
datagen.cur = 0 | ||
b.ResetTimer() | ||
|
||
for true { | ||
b.StopTimer() | ||
buffer := datagen.get() | ||
if buffer == nil { | ||
return | ||
} | ||
b.StartTimer() | ||
|
||
codec := chunk.NewCodec(colTypes) | ||
_ = codec.DecodeToChunkTest(buffer, chk) | ||
datagen.cur++ | ||
chk.Reset() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,9 +66,10 @@ type selectResult struct { | |
fieldTypes []*types.FieldType | ||
ctx sessionctx.Context | ||
|
||
selectResp *tipb.SelectResponse | ||
selectRespSize int // record the selectResp.Size() when it is initialized. | ||
respChkIdx int | ||
selectResp *tipb.SelectResponse | ||
selectRespSize int // record the selectResp.Size() when it is initialized. | ||
respChkIdx int | ||
respChunkDecoder *chunk.Decoder | ||
|
||
feedback *statistics.QueryFeedback | ||
partialCount int64 // number of partial results. | ||
|
@@ -183,10 +184,39 @@ func (r *selectResult) readFromDefault(ctx context.Context, chk *chunk.Chunk) er | |
} | ||
|
||
func (r *selectResult) readFromArrow(ctx context.Context, chk *chunk.Chunk) error { | ||
rowBatchData := r.selectResp.Chunks[r.respChkIdx].RowsData | ||
codec := chunk.NewCodec(r.fieldTypes) | ||
_ = codec.DecodeToChunk(rowBatchData, chk) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we change |
||
r.respChkIdx++ | ||
if r.respChunkDecoder == nil { | ||
r.respChunkDecoder = chunk.NewDecoder( | ||
chunk.NewChunkWithCapacity(r.fieldTypes, 0), | ||
r.fieldTypes, | ||
) | ||
} | ||
|
||
for !chk.IsFull() { | ||
SunRunAway marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if r.respChkIdx == len(r.selectResp.Chunks) { | ||
err := r.getSelectResp() | ||
if err != nil || r.selectResp == nil { | ||
return err | ||
} | ||
} | ||
|
||
if r.respChunkDecoder.IsFinished() { | ||
r.respChunkDecoder.Reset(r.selectResp.Chunks[r.respChkIdx].RowsData) | ||
} | ||
// If the next chunk size is greater than required rows * 0.8, reuse the memory of the next chunk and return | ||
// immediately. Otherwise, splice the data to one chunk and wait the next chunk. | ||
if r.respChunkDecoder.RemainedRows() > int(float64(chk.RequiredRows())*0.8) { | ||
if chk.NumRows() > 0 { | ||
return nil | ||
} | ||
r.respChunkDecoder.ReuseIntermChk(chk) | ||
r.respChkIdx++ | ||
return nil | ||
} | ||
r.respChunkDecoder.Decode(chk) | ||
if r.respChunkDecoder.IsFinished() { | ||
r.respChkIdx++ | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buffer
will be reset after the first loop here since we'llReuseIntermChk
?