Skip to content

Commit

Permalink
*: support serialization and deserialization of aggregate function fo…
Browse files Browse the repository at this point in the history
…r spill (#46632)

ref #47733
  • Loading branch information
xzhangxian1008 authored Nov 30, 2023
1 parent 8d181ed commit 3ed4ba5
Show file tree
Hide file tree
Showing 25 changed files with 3,204 additions and 24 deletions.
6 changes: 5 additions & 1 deletion pkg/executor/aggfuncs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ go_library(
"func_varpop.go",
"func_varsamp.go",
"row_number.go",
"spill_deserialize_helper.go",
"spill_serialize_helper.go",
],
importpath = "github.com/pingcap/tidb/pkg/executor/aggfuncs",
visibility = ["//visibility:public"],
Expand All @@ -47,6 +49,7 @@ go_library(
"//pkg/util/hack",
"//pkg/util/logutil",
"//pkg/util/selection",
"//pkg/util/serialization",
"//pkg/util/set",
"//pkg/util/stringutil",
"@com_github_dgryski_go_farm//:go-farm",
Expand Down Expand Up @@ -83,12 +86,13 @@ go_test(
"func_varsamp_test.go",
"main_test.go",
"row_number_test.go",
"spill_helper_test.go",
"window_func_test.go",
],
embed = [":aggfuncs"],
flaky = True,
race = "on",
shard_count = 48,
shard_count = 50,
deps = [
"//pkg/expression",
"//pkg/expression/aggregation",
Expand Down
44 changes: 44 additions & 0 deletions pkg/executor/aggfuncs/aggfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,21 @@ const (
// to be any type.
type PartialResult unsafe.Pointer

// AggPartialResultMapper contains aggregate function results
type AggPartialResultMapper map[string][]PartialResult

type serializer interface {
// SerializePartialResult will serialize meta data of aggregate function into bytes and put them into chunk.
SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper)

// DeserializePartialResult deserializes from bytes to PartialResult.
DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64)
}

// AggFunc is the interface to evaluate the aggregate functions.
type AggFunc interface {
serializer

// AllocPartialResult allocates a specific data structure to store the
// partial result, initializes it, and converts it to PartialResult to
// return back. The second returned value is the memDelta used to trace
Expand Down Expand Up @@ -196,6 +209,13 @@ func (*baseAggFunc) MergePartialResult(sessionctx.Context, PartialResult, Partia
return 0, nil
}

func (*baseAggFunc) SerializePartialResult(_ PartialResult, _ *chunk.Chunk, _ *SerializeHelper) {
}

func (*baseAggFunc) DeserializePartialResult(_ *chunk.Chunk) ([]PartialResult, int64) {
return nil, 0
}

// SlidingWindowAggFunc is the interface to evaluate the aggregate functions using sliding window.
type SlidingWindowAggFunc interface {
// Slide evaluates the aggregate functions using a sliding window. The input
Expand All @@ -212,3 +232,27 @@ type MaxMinSlidingWindowAggFunc interface {
// SetWindowStart sets the start position of window
SetWindowStart(start uint64)
}

type deserializeFunc func(*deserializeHelper) (PartialResult, int64)

func deserializePartialResultCommon(src *chunk.Chunk, ordinal int, deserializeFuncImpl deserializeFunc) ([]PartialResult, int64) {
dataCol := src.Column(ordinal)
totalMemDelta := int64(0)
spillHelper := newDeserializeHelper(dataCol, src.NumRows())
partialResults := make([]PartialResult, 0, src.NumRows())

for {
pr, memDelta := deserializeFuncImpl(&spillHelper)
if pr == nil {
break
}
partialResults = append(partialResults, pr)
totalMemDelta += memDelta
}

if len(partialResults) != src.NumRows() {
panic("Fail to deserialize partial result")
}

return partialResults, totalMemDelta
}
40 changes: 40 additions & 0 deletions pkg/executor/aggfuncs/func_avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,26 @@ type baseAvgDecimal struct {
baseAggFunc
}

func (e *baseAvgDecimal) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) {
pr := (*partialResult4AvgDecimal)(partialResult)
resBuf := spillHelper.serializePartialResult4AvgDecimal(*pr)
chk.AppendBytes(e.ordinal, resBuf)
}

func (e *baseAvgDecimal) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) {
return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill)
}

func (e *baseAvgDecimal) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) {
pr, memDelta := e.AllocPartialResult()
result := (*partialResult4AvgDecimal)(pr)
success := helper.deserializePartialResult4AvgDecimal(result)
if !success {
return nil, 0
}
return pr, memDelta
}

type partialResult4AvgDecimal struct {
sum types.MyDecimal
count int64
Expand Down Expand Up @@ -320,6 +340,26 @@ func (e *baseAvgFloat64) AppendFinalResult2Chunk(_ sessionctx.Context, pr Partia
return nil
}

func (e *baseAvgFloat64) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) {
pr := (*partialResult4AvgFloat64)(partialResult)
resBuf := spillHelper.serializePartialResult4AvgFloat64(*pr)
chk.AppendBytes(e.ordinal, resBuf)
}

func (e *baseAvgFloat64) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) {
return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill)
}

func (e *baseAvgFloat64) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) {
pr, memDelta := e.AllocPartialResult()
result := (*partialResult4AvgFloat64)(pr)
success := helper.deserializePartialResult4AvgFloat64(result)
if !success {
return nil, 0
}
return pr, memDelta
}

type avgOriginal4Float64HighPrecision struct {
baseAvgFloat64
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/executor/aggfuncs/func_bitfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,26 @@ func (e *baseBitAggFunc) AppendFinalResult2Chunk(_ sessionctx.Context, pr Partia
return nil
}

func (e *baseBitAggFunc) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) {
pr := (*partialResult4BitFunc)(partialResult)
resBuf := spillHelper.serializePartialResult4BitFunc(*pr)
chk.AppendBytes(e.ordinal, resBuf)
}

func (e *baseBitAggFunc) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) {
return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill)
}

func (e *baseBitAggFunc) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) {
pr, memDelta := e.AllocPartialResult()
result := (*partialResult4BitFunc)(pr)
success := helper.deserializePartialResult4BitFunc(result)
if !success {
return nil, 0
}
return pr, memDelta
}

type bitOrUint64 struct {
baseBitAggFunc
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/executor/aggfuncs/func_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,26 @@ func (e *baseCount) AppendFinalResult2Chunk(_ sessionctx.Context, pr PartialResu
return nil
}

func (e *baseCount) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) {
pr := (*partialResult4Count)(partialResult)
resBuf := spillHelper.serializePartialResult4Count(*pr)
chk.AppendBytes(e.ordinal, resBuf)
}

func (e *baseCount) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) {
return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill)
}

func (e *baseCount) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) {
pr, memDelta := e.AllocPartialResult()
result := *(*partialResult4Count)(pr)
success := helper.deserializePartialResult4Count(&result)
if !success {
return nil, 0
}
return pr, memDelta
}

type countOriginal4Int struct {
baseCount
}
Expand Down
Loading

0 comments on commit 3ed4ba5

Please sign in to comment.