Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support serialization and deserialization of aggregate function for spill #46632

Merged
merged 84 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
04c674e
init
xzhangxian1008 Sep 4, 2023
1aab922
Merge branch 'master' of https://github.com/pingcap/tidb into improve…
xzhangxian1008 Sep 4, 2023
e06d4a2
bazel prepare
xzhangxian1008 Sep 4, 2023
6617970
change function name
xzhangxian1008 Sep 4, 2023
620c6ac
rename package
xzhangxian1008 Sep 5, 2023
cb1fb1a
Merge branch 'master' of https://github.com/pingcap/tidb into improve…
xzhangxian1008 Sep 5, 2023
0f033fb
refactor
xzhangxian1008 Sep 5, 2023
75e2f37
some changes
xzhangxian1008 Sep 6, 2023
71b92f8
implement some agg functions
xzhangxian1008 Sep 6, 2023
2e53250
remove useless codes
xzhangxian1008 Sep 6, 2023
7b9fcbc
change
xzhangxian1008 Sep 7, 2023
2e4593c
change more
xzhangxian1008 Sep 8, 2023
1ef51a0
support more
xzhangxian1008 Sep 8, 2023
edf8d4a
add more
xzhangxian1008 Sep 8, 2023
5d1f549
save works
xzhangxian1008 Sep 8, 2023
3c21ea9
add more
xzhangxian1008 Sep 11, 2023
9ed1491
tweaking
xzhangxian1008 Sep 11, 2023
e1883f6
save
xzhangxian1008 Sep 11, 2023
528aa77
add all
xzhangxian1008 Sep 12, 2023
fc83dc3
address comment
xzhangxian1008 Sep 12, 2023
b65aa65
prepare bazel
xzhangxian1008 Sep 12, 2023
f035d6b
Merge branch 'master' of https://github.com/pingcap/tidb into improve…
xzhangxian1008 Sep 13, 2023
447cc25
add tests
xzhangxian1008 Sep 13, 2023
2b53425
add tests
xzhangxian1008 Sep 14, 2023
a5ebdf5
add tests and fix bugs
xzhangxian1008 Sep 14, 2023
e6c9c40
Merge branch 'master' of https://github.com/pingcap/tidb into improve…
xzhangxian1008 Oct 11, 2023
0eb1735
merge
xzhangxian1008 Oct 16, 2023
6fa1004
prepare bazel
xzhangxian1008 Oct 16, 2023
4a4fbd6
Merge branch 'master' of https://github.com/pingcap/tidb into improve…
xzhangxian1008 Nov 1, 2023
bac013e
merge
xzhangxian1008 Nov 6, 2023
33f1e9f
remove useless codes
xzhangxian1008 Nov 6, 2023
da34139
remove useless parameter
xzhangxian1008 Nov 6, 2023
5a2c0d7
Merge branch 'master' of https://github.com/pingcap/tidb into improve…
xzhangxian1008 Nov 8, 2023
a2ce07a
fix some comments
xzhangxian1008 Nov 8, 2023
84ac6e2
tweaking
xzhangxian1008 Nov 8, 2023
f6168bf
refine
xzhangxian1008 Nov 8, 2023
a045baf
fix analysis fail
xzhangxian1008 Nov 8, 2023
881ece9
refine tests
xzhangxian1008 Nov 8, 2023
189d96c
fix ci analysis
xzhangxian1008 Nov 8, 2023
6f929ca
fix ci analysis
xzhangxian1008 Nov 8, 2023
41ee1b1
fix ci analysis
xzhangxian1008 Nov 8, 2023
555631c
tweaking
xzhangxian1008 Nov 9, 2023
8650506
address comments
xzhangxian1008 Nov 9, 2023
ac529f9
fix ci analysis
xzhangxian1008 Nov 9, 2023
261860f
revoke renaming
xzhangxian1008 Nov 14, 2023
0af7318
tweaking
xzhangxian1008 Nov 14, 2023
62938fc
address comment
xzhangxian1008 Nov 15, 2023
38ff0d8
address comment
xzhangxian1008 Nov 15, 2023
69a306d
address comments
xzhangxian1008 Nov 15, 2023
6affa73
fix ci
xzhangxian1008 Nov 15, 2023
78f69b1
change interface
xzhangxian1008 Nov 16, 2023
b54d8dd
use des se MyDecimal
xzhangxian1008 Nov 16, 2023
3df55e0
de se Time type
xzhangxian1008 Nov 16, 2023
e99a32c
use bool se des
xzhangxian1008 Nov 16, 2023
1ac26dd
se de isNull at the beginning
xzhangxian1008 Nov 17, 2023
f4a3c0b
change some SerializeXXX function signature
xzhangxian1008 Nov 17, 2023
97abe2c
remove additional line
xzhangxian1008 Nov 17, 2023
511eb4d
merge
xzhangxian1008 Nov 17, 2023
f1b23f7
make spillSerializeHelper public
xzhangxian1008 Nov 17, 2023
6ea0858
udpate bazel
xzhangxian1008 Nov 17, 2023
06d662d
make newSpillSerializeHelper be public
xzhangxian1008 Nov 17, 2023
2f9431e
rename
xzhangxian1008 Nov 22, 2023
bcf562a
revoke rename
xzhangxian1008 Nov 22, 2023
528f211
unify interface
xzhangxian1008 Nov 22, 2023
e2b3057
Merge branch 'master' of https://github.com/pingcap/tidb into improve…
xzhangxian1008 Nov 22, 2023
cf1b86c
address comments
xzhangxian1008 Nov 23, 2023
f89d75f
add tests and fix bugs
xzhangxian1008 Nov 24, 2023
a880ba0
remove some comments
xzhangxian1008 Nov 24, 2023
8e4d70d
address comments
xzhangxian1008 Nov 24, 2023
815f962
rename
xzhangxian1008 Nov 27, 2023
475eb6e
rename package
xzhangxian1008 Nov 27, 2023
2846d44
add se/de bytes.Buffer
xzhangxian1008 Nov 27, 2023
027d0d0
uniform length de/se
xzhangxian1008 Nov 27, 2023
f693be2
reduce repeated codes
xzhangxian1008 Nov 27, 2023
72571dd
uniform parameter
xzhangxian1008 Nov 28, 2023
7aa6ca7
uniform buffer de/se
xzhangxian1008 Nov 28, 2023
42e3aff
tweaking
xzhangxian1008 Nov 28, 2023
6e868c0
address comment
xzhangxian1008 Nov 29, 2023
1f2b7d2
tweaking
xzhangxian1008 Nov 30, 2023
b829db9
update bazel
xzhangxian1008 Nov 30, 2023
2c1c7a1
Merge branch 'master' of https://github.com/pingcap/tidb into improve…
xzhangxian1008 Nov 30, 2023
8d8db9e
tweaking
xzhangxian1008 Nov 30, 2023
f8ecb9a
remove unused function
xzhangxian1008 Nov 30, 2023
ec5b3e9
update bazel
xzhangxian1008 Nov 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions executor/aggfuncs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ go_library(
"func_varpop.go",
"func_varsamp.go",
"row_number.go",
"spill_deserialize_helper.go",
"spill_serialize_helper.go",
],
importpath = "github.com/pingcap/tidb/executor/aggfuncs",
visibility = ["//visibility:public"],
Expand All @@ -49,6 +51,7 @@ go_library(
"//util/mathutil",
"//util/selection",
"//util/set",
"//util/spill",
"//util/stringutil",
"@com_github_dgryski_go_farm//:go-farm",
"@com_github_pingcap_errors//:errors",
Expand Down
15 changes: 15 additions & 0 deletions executor/aggfuncs/aggfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,21 @@ const (
// to be any type.
type PartialResult unsafe.Pointer

// AggPartialResultMapper contains aggregate function results
type AggPartialResultMapper map[string][]PartialResult

type aggSpillHelper interface {
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved
// SerializeForSpill will serialize meta data of aggregate function into bytes and put them into chunk.
SerializeForSpill(ctx sessionctx.Context, partialResult PartialResult, chk *chunk.Chunk, spillHelper *SpillSerializeHelper)
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved

// DeserializeToPartialResultForSpill deserializes from bytes to PartialResult.
DeserializeToPartialResultForSpill(ctx sessionctx.Context, src *chunk.Chunk) ([]PartialResult, int64)
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved
}

// AggFunc is the interface to evaluate the aggregate functions.
type AggFunc interface {
aggSpillHelper

// AllocPartialResult allocates a specific data structure to store the
// partial result, initializes it, and converts it to PartialResult to
// return back. The second returned value is the memDelta used to trace
Expand Down Expand Up @@ -196,6 +209,8 @@ func (*baseAggFunc) MergePartialResult(sessionctx.Context, PartialResult, Partia
return 0, nil
}



// SlidingWindowAggFunc is the interface to evaluate the aggregate functions using sliding window.
type SlidingWindowAggFunc interface {
// Slide evaluates the aggregate functions using a sliding window. The input
Expand Down
86 changes: 86 additions & 0 deletions executor/aggfuncs/func_avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,40 @@ type baseAvgDecimal struct {
baseAggFunc
}

func (c *baseAvgDecimal) SerializeForSpill(_ sessionctx.Context, partialResult PartialResult, chk *chunk.Chunk, spillHelper *SpillSerializeHelper) {
pr := (*partialResult4AvgDecimal)(partialResult)
resBuf := spillHelper.serializePartialResult4AvgDecimal(*pr)
chk.AppendBytes(c.ordinal, resBuf)
}

func (c *baseAvgDecimal) DeserializeToPartialResultForSpill(_ sessionctx.Context, src *chunk.Chunk) ([]PartialResult, int64) {
dataCol := src.Column(c.ordinal)
totalMemDelta := int64(0)
spillHelper := newDeserializeHelper(dataCol, src.NumRows())
partialResults := make([]PartialResult, 0, src.NumRows())

for {
pr, memDelta := c.deserializeForSpill(&spillHelper)
if pr == nil {
break
}
partialResults = append(partialResults, pr)
totalMemDelta += memDelta
}

return partialResults, totalMemDelta
}

func (c *baseAvgDecimal) deserializeForSpill(helper *spillDeserializeHelper) (PartialResult, int64) {
pr, memDelta := c.AllocPartialResult()
result := (*partialResult4AvgDecimal)(pr)
success := helper.deserializePartialResult4AvgDecimal(result)
if !success {
return nil, 0
}
return pr, memDelta
}

type partialResult4AvgDecimal struct {
sum types.MyDecimal
count int64
Expand Down Expand Up @@ -285,6 +319,15 @@ func (e *avgOriginal4DistinctDecimal) AppendFinalResult2Chunk(_ sessionctx.Conte
return nil
}

// TODO implement it
func (c *avgOriginal4DistinctDecimal) SerializeForSpill(_ sessionctx.Context, partialResult PartialResult, chk *chunk.Chunk, spillHelper *SpillSerializeHelper) {
}

// TODO implement it
func (c *avgOriginal4DistinctDecimal) DeserializeToPartialResultForSpill(_ sessionctx.Context, src *chunk.Chunk) ([]PartialResult, int64) {
return nil, 0
}

// All the following avg function implementations return the float64 result,
// which store the partial results in "partialResult4AvgFloat64".
//
Expand Down Expand Up @@ -320,6 +363,40 @@ func (e *baseAvgFloat64) AppendFinalResult2Chunk(_ sessionctx.Context, pr Partia
return nil
}

func (c *baseAvgFloat64) SerializeForSpill(_ sessionctx.Context, partialResult PartialResult, chk *chunk.Chunk, spillHelper *SpillSerializeHelper) {
pr := (*partialResult4AvgFloat64)(partialResult)
resBuf := spillHelper.serializePartialResult4AvgFloat64(*pr)
chk.AppendBytes(c.ordinal, resBuf)
}

func (c *baseAvgFloat64) DeserializeToPartialResultForSpill(_ sessionctx.Context, src *chunk.Chunk) ([]PartialResult, int64) {
dataCol := src.Column(c.ordinal)
totalMemDelta := int64(0)
spillHelper := newDeserializeHelper(dataCol, src.NumRows())
partialResults := make([]PartialResult, 0, src.NumRows())

for {
pr, memDelta := c.deserializeForSpill(&spillHelper)
if pr == nil {
break
}
partialResults = append(partialResults, pr)
totalMemDelta += memDelta
}

return partialResults, totalMemDelta
}

func (c *baseAvgFloat64) deserializeForSpill(helper *spillDeserializeHelper) (PartialResult, int64) {
pr, memDelta := c.AllocPartialResult()
result := (*partialResult4AvgFloat64)(pr)
success := helper.deserializePartialResult4AvgFloat64(result)
if !success {
return nil, 0
}
return pr, memDelta
}

type avgOriginal4Float64HighPrecision struct {
baseAvgFloat64
}
Expand Down Expand Up @@ -460,3 +537,12 @@ func (e *avgOriginal4DistinctFloat64) AppendFinalResult2Chunk(_ sessionctx.Conte
chk.AppendFloat64(e.ordinal, p.sum/float64(p.count))
return nil
}

// TODO implement it
func (c *avgOriginal4DistinctFloat64) SerializeForSpill(_ sessionctx.Context, partialResult PartialResult, chk *chunk.Chunk, spillHelper *SpillSerializeHelper) {
}

// TODO implement it
func (c *avgOriginal4DistinctFloat64) DeserializeToPartialResultForSpill(_ sessionctx.Context, src *chunk.Chunk) ([]PartialResult, int64) {
return nil, 0
}
34 changes: 34 additions & 0 deletions executor/aggfuncs/func_bitfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,40 @@ func (e *baseBitAggFunc) AppendFinalResult2Chunk(_ sessionctx.Context, pr Partia
return nil
}

func (c *baseBitAggFunc) SerializeForSpill(_ sessionctx.Context, partialResult PartialResult, chk *chunk.Chunk, spillHelper *SpillSerializeHelper) {
pr := (*partialResult4BitFunc)(partialResult)
resBuf := spillHelper.serializePartialResult4BitFunc(*pr)
chk.AppendBytes(c.ordinal, resBuf)
}

func (c *baseBitAggFunc) DeserializeToPartialResultForSpill(_ sessionctx.Context, src *chunk.Chunk) ([]PartialResult, int64) {
dataCol := src.Column(c.ordinal)
totalMemDelta := int64(0)
spillHelper := newDeserializeHelper(dataCol, src.NumRows())
partialResults := make([]PartialResult, 0, src.NumRows())

for {
pr, memDelta := c.deserializeForSpill(&spillHelper)
if pr == nil {
break
}
partialResults = append(partialResults, pr)
totalMemDelta += memDelta
}

return partialResults, totalMemDelta
}

func (c *baseBitAggFunc) deserializeForSpill(helper *spillDeserializeHelper) (PartialResult, int64) {
pr, memDelta := c.AllocPartialResult()
result := (*partialResult4BitFunc)(pr)
success := helper.deserializePartialResult4BitFunc(result)
if !success {
return nil, 0
}
return pr, memDelta
}

type bitOrUint64 struct {
baseBitAggFunc
}
Expand Down
36 changes: 36 additions & 0 deletions executor/aggfuncs/func_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type baseCount struct {

type partialResult4Count = int64

const partialResult4CountByteLen = int(unsafe.Sizeof(partialResult4Count(0)))
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved

func (*baseCount) AllocPartialResult() (pr PartialResult, memDelta int64) {
return PartialResult(new(partialResult4Count)), DefPartialResult4CountSize
}
Expand All @@ -47,6 +49,40 @@ func (e *baseCount) AppendFinalResult2Chunk(_ sessionctx.Context, pr PartialResu
return nil
}

func (c *baseCount) SerializeForSpill(_ sessionctx.Context, partialResult PartialResult, chk *chunk.Chunk, spillHelper *SpillSerializeHelper) {
pr := (*partialResult4Count)(partialResult)
resBuf := spillHelper.serializePartialResult4Count(*pr)
chk.AppendBytes(c.ordinal, resBuf)
}

func (c *baseCount) DeserializeToPartialResultForSpill(_ sessionctx.Context, src *chunk.Chunk) ([]PartialResult, int64) {
dataCol := src.Column(c.ordinal)
totalMemDelta := int64(0)
spillHelper := newDeserializeHelper(dataCol, src.NumRows())
partialResults := make([]PartialResult, 0, src.NumRows())

for {
pr, memDelta := c.deserializeForSpill(&spillHelper)
if pr == nil {
break
}
partialResults = append(partialResults, pr)
totalMemDelta += memDelta
}

return partialResults, totalMemDelta
}

func (c *baseCount) deserializeForSpill(helper *spillDeserializeHelper) (PartialResult, int64) {
pr, memDelta := c.AllocPartialResult()
result := int64(*(*partialResult4Count)(pr))
success := helper.deserializePartialResult4Count(&result)
if !success {
return nil, 0
}
return pr, memDelta
}

type countOriginal4Int struct {
baseCount
}
Expand Down
54 changes: 54 additions & 0 deletions executor/aggfuncs/func_count_distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ func (e *countOriginalWithDistinct4Real) UpdatePartialResult(sctx sessionctx.Con
return memDelta, nil
}

// TODO implement it
func (e *countOriginalWithDistinct4Real) SerializeForSpill(_ sessionctx.Context, partialResult PartialResult, chk *chunk.Chunk, spillHelper *SpillSerializeHelper) {
}

// TODO implement it
func (e *countOriginalWithDistinct4Real) DeserializeToPartialResultForSpill(sctx sessionctx.Context, src *chunk.Chunk) ([]PartialResult, int64) {
return nil, 0
}

type partialResult4CountDistinctDecimal struct {
valSet set.StringSetWithMemoryUsage
}
Expand Down Expand Up @@ -194,6 +203,15 @@ func (e *countOriginalWithDistinct4Decimal) UpdatePartialResult(sctx sessionctx.
return memDelta, nil
}

// TODO implement it
func (e *countOriginalWithDistinct4Decimal) SerializeForSpill(_ sessionctx.Context, partialResult PartialResult, chk *chunk.Chunk, spillHelper *SpillSerializeHelper) {
}

// TODO implement it
func (e *countOriginalWithDistinct4Decimal) DeserializeToPartialResultForSpill(sctx sessionctx.Context, src *chunk.Chunk) ([]PartialResult, int64) {
return nil, 0
}

type partialResult4CountDistinctDuration struct {
valSet set.Int64SetWithMemoryUsage
}
Expand Down Expand Up @@ -241,6 +259,15 @@ func (e *countOriginalWithDistinct4Duration) UpdatePartialResult(sctx sessionctx
return memDelta, nil
}

// TODO implement it
func (e *countOriginalWithDistinct4Duration) SerializeForSpill(_ sessionctx.Context, partialResult PartialResult, chk *chunk.Chunk, spillHelper *SpillSerializeHelper) {
}

// TODO implement it
func (e *countOriginalWithDistinct4Duration) DeserializeToPartialResultForSpill(sctx sessionctx.Context, src *chunk.Chunk) ([]PartialResult, int64) {
return nil, 0
}

type partialResult4CountDistinctString struct {
valSet set.StringSetWithMemoryUsage
}
Expand Down Expand Up @@ -292,6 +319,15 @@ func (e *countOriginalWithDistinct4String) UpdatePartialResult(sctx sessionctx.C
return memDelta, nil
}

// TODO implement it
func (e *countOriginalWithDistinct4String) SerializeForSpill(_ sessionctx.Context, partialResult PartialResult, chk *chunk.Chunk, spillHelper *SpillSerializeHelper) {
}

// TODO implement it
func (e *countOriginalWithDistinct4String) DeserializeToPartialResultForSpill(sctx sessionctx.Context, src *chunk.Chunk) ([]PartialResult, int64) {
return nil, 0
}

type countOriginalWithDistinct struct {
baseCount
}
Expand Down Expand Up @@ -355,6 +391,15 @@ func (e *countOriginalWithDistinct) UpdatePartialResult(sctx sessionctx.Context,
return memDelta, nil
}

// TODO implement it
func (e *countOriginalWithDistinct) SerializeForSpill(_ sessionctx.Context, partialResult PartialResult, chk *chunk.Chunk, spillHelper *SpillSerializeHelper) {
}

// TODO implement it
func (e *countOriginalWithDistinct) DeserializeToPartialResultForSpill(sctx sessionctx.Context, src *chunk.Chunk) ([]PartialResult, int64) {
return nil, 0
}

// evalAndEncode eval one row with an expression and encode value to bytes.
func evalAndEncode(
sctx sessionctx.Context, arg expression.Expression, collator collate.Collator,
Expand Down Expand Up @@ -769,6 +814,15 @@ func (*baseApproxCountDistinct) MergePartialResult(_ sessionctx.Context, src, ds
return 0, nil
}

// TODO implement it
func (e *baseApproxCountDistinct) SerializeForSpill(_ sessionctx.Context, partialResult PartialResult, chk *chunk.Chunk, spillHelper *SpillSerializeHelper) {
}

// TODO implement it
func (e *baseApproxCountDistinct) DeserializeToPartialResultForSpill(sctx sessionctx.Context, src *chunk.Chunk) ([]PartialResult, int64) {
return nil, 0
}

type approxCountDistinctOriginal struct {
baseApproxCountDistinct
}
Expand Down
18 changes: 18 additions & 0 deletions executor/aggfuncs/func_group_concat.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,15 @@ func (e *groupConcatDistinct) UpdatePartialResult(sctx sessionctx.Context, rowsI
return memDelta, nil
}

// TODO implement it
func (c *groupConcatDistinct) SerializeForSpill(_ sessionctx.Context, partialResult PartialResult, chk *chunk.Chunk, spillHelper *SpillSerializeHelper) {
}

// TODO implement it
func (c *groupConcatDistinct) DeserializeToPartialResultForSpill(_ sessionctx.Context, src *chunk.Chunk) ([]PartialResult, int64) {
return nil, 0
}

// SetTruncated will be called in `executorBuilder#buildHashAgg` with duck-type.
func (e *groupConcatDistinct) SetTruncated(t *int32) {
e.truncated = t
Expand Down Expand Up @@ -611,6 +620,15 @@ func (*groupConcatDistinctOrder) MergePartialResult(sessionctx.Context, PartialR
return 0, plannercore.ErrInternal.GenWithStack("groupConcatDistinctOrder.MergePartialResult should not be called")
}

// TODO implement it
func (c *groupConcatDistinctOrder) SerializeForSpill(_ sessionctx.Context, partialResult PartialResult, chk *chunk.Chunk, spillHelper *SpillSerializeHelper) {
}

// TODO implement it
func (c *groupConcatDistinctOrder) DeserializeToPartialResultForSpill(_ sessionctx.Context, src *chunk.Chunk) ([]PartialResult, int64) {
return nil, 0
}

// GetDatumMemSize calculates the memory size of each types.Datum in sortRow.byItems.
// types.Datum memory size = variable type's memory size + variable value's memory size.
func GetDatumMemSize(d *types.Datum) int64 {
Expand Down
Loading