From 3ed4ba51472338f5b2e8338702487c691facbf99 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Thu, 30 Nov 2023 18:16:19 +0800 Subject: [PATCH] *: support serialization and deserialization of aggregate function for spill (#46632) ref pingcap/tidb#47733 --- pkg/executor/aggfuncs/BUILD.bazel | 6 +- pkg/executor/aggfuncs/aggfuncs.go | 44 + pkg/executor/aggfuncs/func_avg.go | 40 + pkg/executor/aggfuncs/func_bitfuncs.go | 20 + pkg/executor/aggfuncs/func_count.go | 20 + pkg/executor/aggfuncs/func_first_row.go | 200 +++ pkg/executor/aggfuncs/func_group_concat.go | 20 + pkg/executor/aggfuncs/func_json_arrayagg.go | 20 + pkg/executor/aggfuncs/func_json_objectagg.go | 20 + pkg/executor/aggfuncs/func_max_min.go | 220 +++ pkg/executor/aggfuncs/func_sum.go | 40 + .../aggfuncs/spill_deserialize_helper.go | 397 +++++ pkg/executor/aggfuncs/spill_helper_test.go | 1382 +++++++++++++++++ .../aggfuncs/spill_serialize_helper.go | 226 +++ .../aggregate/agg_hash_base_worker.go | 5 +- pkg/executor/aggregate/agg_hash_executor.go | 18 +- .../aggregate/agg_hash_final_worker.go | 7 +- .../aggregate/agg_hash_partial_worker.go | 4 +- pkg/executor/benchmark_test.go | 2 +- pkg/executor/executor_pkg_test.go | 7 +- pkg/util/chunk/column.go | 10 + pkg/util/serialization/BUILD.bazel | 18 + pkg/util/serialization/common_util.go | 58 + .../serialization/deserialization_util.go | 236 +++ pkg/util/serialization/serialization_util.go | 208 +++ 25 files changed, 3204 insertions(+), 24 deletions(-) create mode 100644 pkg/executor/aggfuncs/spill_deserialize_helper.go create mode 100644 pkg/executor/aggfuncs/spill_helper_test.go create mode 100644 pkg/executor/aggfuncs/spill_serialize_helper.go create mode 100644 pkg/util/serialization/BUILD.bazel create mode 100644 pkg/util/serialization/common_util.go create mode 100644 pkg/util/serialization/deserialization_util.go create mode 100644 pkg/util/serialization/serialization_util.go diff --git a/pkg/executor/aggfuncs/BUILD.bazel b/pkg/executor/aggfuncs/BUILD.bazel index 9336517d7ecec..5fa60108574e1 100644 --- a/pkg/executor/aggfuncs/BUILD.bazel +++ b/pkg/executor/aggfuncs/BUILD.bazel @@ -27,6 +27,8 @@ go_library( "func_varpop.go", "func_varsamp.go", "row_number.go", + "spill_deserialize_helper.go", + "spill_serialize_helper.go", ], importpath = "github.com/pingcap/tidb/pkg/executor/aggfuncs", visibility = ["//visibility:public"], @@ -47,6 +49,7 @@ go_library( "//pkg/util/hack", "//pkg/util/logutil", "//pkg/util/selection", + "//pkg/util/serialization", "//pkg/util/set", "//pkg/util/stringutil", "@com_github_dgryski_go_farm//:go-farm", @@ -83,12 +86,13 @@ go_test( "func_varsamp_test.go", "main_test.go", "row_number_test.go", + "spill_helper_test.go", "window_func_test.go", ], embed = [":aggfuncs"], flaky = True, race = "on", - shard_count = 48, + shard_count = 50, deps = [ "//pkg/expression", "//pkg/expression/aggregation", diff --git a/pkg/executor/aggfuncs/aggfuncs.go b/pkg/executor/aggfuncs/aggfuncs.go index 344bacacb6339..9a9973369fbeb 100644 --- a/pkg/executor/aggfuncs/aggfuncs.go +++ b/pkg/executor/aggfuncs/aggfuncs.go @@ -139,8 +139,21 @@ const ( // to be any type. type PartialResult unsafe.Pointer +// AggPartialResultMapper contains aggregate function results +type AggPartialResultMapper map[string][]PartialResult + +type serializer interface { + // SerializePartialResult will serialize meta data of aggregate function into bytes and put them into chunk. + SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) + + // DeserializePartialResult deserializes from bytes to PartialResult. + DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) +} + // AggFunc is the interface to evaluate the aggregate functions. type AggFunc interface { + serializer + // AllocPartialResult allocates a specific data structure to store the // partial result, initializes it, and converts it to PartialResult to // return back. The second returned value is the memDelta used to trace @@ -196,6 +209,13 @@ func (*baseAggFunc) MergePartialResult(sessionctx.Context, PartialResult, Partia return 0, nil } +func (*baseAggFunc) SerializePartialResult(_ PartialResult, _ *chunk.Chunk, _ *SerializeHelper) { +} + +func (*baseAggFunc) DeserializePartialResult(_ *chunk.Chunk) ([]PartialResult, int64) { + return nil, 0 +} + // SlidingWindowAggFunc is the interface to evaluate the aggregate functions using sliding window. type SlidingWindowAggFunc interface { // Slide evaluates the aggregate functions using a sliding window. The input @@ -212,3 +232,27 @@ type MaxMinSlidingWindowAggFunc interface { // SetWindowStart sets the start position of window SetWindowStart(start uint64) } + +type deserializeFunc func(*deserializeHelper) (PartialResult, int64) + +func deserializePartialResultCommon(src *chunk.Chunk, ordinal int, deserializeFuncImpl deserializeFunc) ([]PartialResult, int64) { + dataCol := src.Column(ordinal) + totalMemDelta := int64(0) + spillHelper := newDeserializeHelper(dataCol, src.NumRows()) + partialResults := make([]PartialResult, 0, src.NumRows()) + + for { + pr, memDelta := deserializeFuncImpl(&spillHelper) + if pr == nil { + break + } + partialResults = append(partialResults, pr) + totalMemDelta += memDelta + } + + if len(partialResults) != src.NumRows() { + panic("Fail to deserialize partial result") + } + + return partialResults, totalMemDelta +} diff --git a/pkg/executor/aggfuncs/func_avg.go b/pkg/executor/aggfuncs/func_avg.go index af6969e6a3f03..ff5518e7bb42c 100644 --- a/pkg/executor/aggfuncs/func_avg.go +++ b/pkg/executor/aggfuncs/func_avg.go @@ -47,6 +47,26 @@ type baseAvgDecimal struct { baseAggFunc } +func (e *baseAvgDecimal) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4AvgDecimal)(partialResult) + resBuf := spillHelper.serializePartialResult4AvgDecimal(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *baseAvgDecimal) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *baseAvgDecimal) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4AvgDecimal)(pr) + success := helper.deserializePartialResult4AvgDecimal(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type partialResult4AvgDecimal struct { sum types.MyDecimal count int64 @@ -320,6 +340,26 @@ func (e *baseAvgFloat64) AppendFinalResult2Chunk(_ sessionctx.Context, pr Partia return nil } +func (e *baseAvgFloat64) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4AvgFloat64)(partialResult) + resBuf := spillHelper.serializePartialResult4AvgFloat64(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *baseAvgFloat64) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *baseAvgFloat64) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4AvgFloat64)(pr) + success := helper.deserializePartialResult4AvgFloat64(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type avgOriginal4Float64HighPrecision struct { baseAvgFloat64 } diff --git a/pkg/executor/aggfuncs/func_bitfuncs.go b/pkg/executor/aggfuncs/func_bitfuncs.go index d3ff75417a608..c7949f2bcdf02 100644 --- a/pkg/executor/aggfuncs/func_bitfuncs.go +++ b/pkg/executor/aggfuncs/func_bitfuncs.go @@ -48,6 +48,26 @@ func (e *baseBitAggFunc) AppendFinalResult2Chunk(_ sessionctx.Context, pr Partia return nil } +func (e *baseBitAggFunc) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4BitFunc)(partialResult) + resBuf := spillHelper.serializePartialResult4BitFunc(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *baseBitAggFunc) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *baseBitAggFunc) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4BitFunc)(pr) + success := helper.deserializePartialResult4BitFunc(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type bitOrUint64 struct { baseBitAggFunc } diff --git a/pkg/executor/aggfuncs/func_count.go b/pkg/executor/aggfuncs/func_count.go index e24e373e172fd..92826afcaccc1 100644 --- a/pkg/executor/aggfuncs/func_count.go +++ b/pkg/executor/aggfuncs/func_count.go @@ -47,6 +47,26 @@ func (e *baseCount) AppendFinalResult2Chunk(_ sessionctx.Context, pr PartialResu return nil } +func (e *baseCount) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4Count)(partialResult) + resBuf := spillHelper.serializePartialResult4Count(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *baseCount) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *baseCount) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := *(*partialResult4Count)(pr) + success := helper.deserializePartialResult4Count(&result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type countOriginal4Int struct { baseCount } diff --git a/pkg/executor/aggfuncs/func_first_row.go b/pkg/executor/aggfuncs/func_first_row.go index a4b2d7353a617..f2ad7f7145f02 100644 --- a/pkg/executor/aggfuncs/func_first_row.go +++ b/pkg/executor/aggfuncs/func_first_row.go @@ -163,6 +163,26 @@ func (e *firstRow4Int) AppendFinalResult2Chunk(_ sessionctx.Context, pr PartialR return nil } +func (e *firstRow4Int) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4FirstRowInt)(partialResult) + resBuf := spillHelper.serializePartialResult4FirstRowInt(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *firstRow4Int) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *firstRow4Int) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4FirstRowInt)(pr) + success := helper.deserializePartialResult4FirstRowInt(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type firstRow4Float32 struct { baseAggFunc } @@ -209,6 +229,26 @@ func (e *firstRow4Float32) AppendFinalResult2Chunk(_ sessionctx.Context, pr Part return nil } +func (e *firstRow4Float32) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4FirstRowFloat32)(partialResult) + resBuf := spillHelper.serializePartialResult4FirstRowFloat32(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *firstRow4Float32) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *firstRow4Float32) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4FirstRowFloat32)(pr) + success := helper.deserializePartialResult4FirstRowFloat32(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type firstRow4Float64 struct { baseAggFunc } @@ -255,6 +295,26 @@ func (e *firstRow4Float64) AppendFinalResult2Chunk(_ sessionctx.Context, pr Part return nil } +func (e *firstRow4Float64) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4FirstRowFloat64)(partialResult) + resBuf := spillHelper.serializePartialResult4FirstRowFloat64(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *firstRow4Float64) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *firstRow4Float64) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4FirstRowFloat64)(pr) + success := helper.deserializePartialResult4FirstRowFloat64(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type firstRow4String struct { baseAggFunc } @@ -302,6 +362,26 @@ func (e *firstRow4String) AppendFinalResult2Chunk(_ sessionctx.Context, pr Parti return nil } +func (e *firstRow4String) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4FirstRowString)(partialResult) + resBuf := spillHelper.serializePartialResult4FirstRowString(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *firstRow4String) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *firstRow4String) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4FirstRowString)(pr) + success := helper.deserializePartialResult4FirstRowString(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type firstRow4Time struct { baseAggFunc } @@ -348,6 +428,26 @@ func (e *firstRow4Time) AppendFinalResult2Chunk(_ sessionctx.Context, pr Partial return nil } +func (e *firstRow4Time) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4FirstRowTime)(partialResult) + resBuf := spillHelper.serializePartialResult4FirstRowTime(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *firstRow4Time) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *firstRow4Time) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4FirstRowTime)(pr) + success := helper.deserializePartialResult4FirstRowTime(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type firstRow4Duration struct { baseAggFunc } @@ -394,6 +494,26 @@ func (e *firstRow4Duration) AppendFinalResult2Chunk(_ sessionctx.Context, pr Par return nil } +func (e *firstRow4Duration) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4FirstRowDuration)(partialResult) + resBuf := spillHelper.serializePartialResult4FirstRowDuration(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *firstRow4Duration) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *firstRow4Duration) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4FirstRowDuration)(pr) + success := helper.deserializePartialResult4FirstRowDuration(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type firstRow4JSON struct { baseAggFunc } @@ -440,6 +560,26 @@ func (e *firstRow4JSON) AppendFinalResult2Chunk(_ sessionctx.Context, pr Partial return nil } +func (e *firstRow4JSON) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4FirstRowJSON)(partialResult) + resBuf := spillHelper.serializePartialResult4FirstRowJSON(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *firstRow4JSON) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *firstRow4JSON) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4FirstRowJSON)(pr) + success := helper.deserializePartialResult4FirstRowJSON(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type firstRow4Decimal struct { baseAggFunc } @@ -500,6 +640,26 @@ func (*firstRow4Decimal) MergePartialResult(_ sessionctx.Context, src, dst Parti return memDelta, nil } +func (e *firstRow4Decimal) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4FirstRowDecimal)(partialResult) + resBuf := spillHelper.serializePartialResult4FirstRowDecimal(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *firstRow4Decimal) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *firstRow4Decimal) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4FirstRowDecimal)(pr) + success := helper.deserializePartialResult4FirstRowDecimal(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type firstRow4Enum struct { baseAggFunc } @@ -547,6 +707,26 @@ func (e *firstRow4Enum) AppendFinalResult2Chunk(_ sessionctx.Context, pr Partial return nil } +func (e *firstRow4Enum) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4FirstRowEnum)(partialResult) + resBuf := spillHelper.serializePartialResult4FirstRowEnum(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *firstRow4Enum) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *firstRow4Enum) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4FirstRowEnum)(pr) + success := helper.deserializePartialResult4FirstRowEnum(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type firstRow4Set struct { baseAggFunc } @@ -593,3 +773,23 @@ func (e *firstRow4Set) AppendFinalResult2Chunk(_ sessionctx.Context, pr PartialR chk.AppendSet(e.ordinal, p.val) return nil } + +func (e *firstRow4Set) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4FirstRowSet)(partialResult) + resBuf := spillHelper.serializePartialResult4FirstRowSet(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *firstRow4Set) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *firstRow4Set) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4FirstRowSet)(pr) + success := helper.deserializePartialResult4FirstRowSet(result) + if !success { + return nil, 0 + } + return pr, memDelta +} diff --git a/pkg/executor/aggfuncs/func_group_concat.go b/pkg/executor/aggfuncs/func_group_concat.go index 4412c029ab26c..f036f100ba432 100644 --- a/pkg/executor/aggfuncs/func_group_concat.go +++ b/pkg/executor/aggfuncs/func_group_concat.go @@ -173,6 +173,26 @@ func (e *groupConcat) MergePartialResult(sctx sessionctx.Context, src, dst Parti return memDelta, e.truncatePartialResultIfNeed(sctx, p2.buffer) } +func (e *groupConcat) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4GroupConcat)(partialResult) + resBuf := spillHelper.serializePartialResult4GroupConcat(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *groupConcat) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *groupConcat) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4GroupConcat)(pr) + success := helper.deserializePartialResult4GroupConcat(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + // SetTruncated will be called in `executorBuilder#buildHashAgg` with duck-type. func (e *groupConcat) SetTruncated(t *int32) { e.truncated = t diff --git a/pkg/executor/aggfuncs/func_json_arrayagg.go b/pkg/executor/aggfuncs/func_json_arrayagg.go index ee73184bcb0d3..ba9b3e20d2731 100644 --- a/pkg/executor/aggfuncs/func_json_arrayagg.go +++ b/pkg/executor/aggfuncs/func_json_arrayagg.go @@ -91,3 +91,23 @@ func (*jsonArrayagg) MergePartialResult(_ sessionctx.Context, src, dst PartialRe p2.entries = append(p2.entries, p1.entries...) return 0, nil } + +func (e *jsonArrayagg) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4JsonArrayagg)(partialResult) + resBuf := spillHelper.serializePartialResult4JsonArrayagg(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *jsonArrayagg) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *jsonArrayagg) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4JsonArrayagg)(pr) + success := helper.deserializePartialResult4JsonArrayagg(result) + if !success { + return nil, 0 + } + return pr, memDelta +} diff --git a/pkg/executor/aggfuncs/func_json_objectagg.go b/pkg/executor/aggfuncs/func_json_objectagg.go index 6c44e5f999eb7..85cf2d10bcea5 100644 --- a/pkg/executor/aggfuncs/func_json_objectagg.go +++ b/pkg/executor/aggfuncs/func_json_objectagg.go @@ -114,6 +114,26 @@ func (e *jsonObjectAgg) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup return memDelta, nil } +func (e *jsonObjectAgg) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4JsonObjectAgg)(partialResult) + resBuf := spillHelper.serializePartialResult4JsonObjectAgg(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *jsonObjectAgg) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *jsonObjectAgg) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4JsonObjectAgg)(pr) + success, deserializeMemDelta := helper.deserializePartialResult4JsonObjectAgg(result) + if !success { + return nil, 0 + } + return pr, memDelta + deserializeMemDelta +} + func getRealJSONValue(value types.Datum, ft *types.FieldType) (interface{}, error) { realVal := value.Clone().GetValue() switch value.Kind() { diff --git a/pkg/executor/aggfuncs/func_max_min.go b/pkg/executor/aggfuncs/func_max_min.go index 9240dfd385e6b..315f435a5d117 100644 --- a/pkg/executor/aggfuncs/func_max_min.go +++ b/pkg/executor/aggfuncs/func_max_min.go @@ -301,6 +301,26 @@ func (e *maxMin4Int) MergePartialResult(_ sessionctx.Context, src, dst PartialRe return 0, nil } +func (e *maxMin4Int) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4MaxMinInt)(partialResult) + resBuf := spillHelper.serializePartialResult4MaxMinInt(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *maxMin4Int) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *maxMin4Int) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4MaxMinInt)(pr) + success := helper.deserializePartialResult4MaxMinInt(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type maxMin4IntSliding struct { maxMin4Int windowInfo @@ -443,6 +463,26 @@ func (e *maxMin4Uint) MergePartialResult(_ sessionctx.Context, src, dst PartialR return 0, nil } +func (e *maxMin4Uint) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4MaxMinUint)(partialResult) + resBuf := spillHelper.serializePartialResult4MaxMinUint(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *maxMin4Uint) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *maxMin4Uint) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4MaxMinUint)(pr) + success := helper.deserializePartialResult4MaxMinUint(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type maxMin4UintSliding struct { maxMin4Uint windowInfo @@ -582,6 +622,26 @@ func (e *maxMin4Float32) MergePartialResult(_ sessionctx.Context, src, dst Parti return 0, nil } +func (e *maxMin4Float32) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4MaxMinFloat32)(partialResult) + resBuf := spillHelper.serializePartialResult4MaxMinFloat32(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *maxMin4Float32) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *maxMin4Float32) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4MaxMinFloat32)(pr) + success := helper.deserializePartialResult4MaxMinFloat32(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type maxMin4Float32Sliding struct { maxMin4Float32 windowInfo @@ -719,6 +779,26 @@ func (e *maxMin4Float64) MergePartialResult(_ sessionctx.Context, src, dst Parti return 0, nil } +func (e *maxMin4Float64) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4MaxMinFloat64)(partialResult) + resBuf := spillHelper.serializePartialResult4MaxMinFloat64(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *maxMin4Float64) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *maxMin4Float64) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4MaxMinFloat64)(pr) + success := helper.deserializePartialResult4MaxMinFloat64(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type maxMin4Float64Sliding struct { maxMin4Float64 windowInfo @@ -868,6 +948,26 @@ func (e *maxMin4Decimal) MergePartialResult(_ sessionctx.Context, src, dst Parti return 0, nil } +func (e *maxMin4Decimal) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4MaxMinDecimal)(partialResult) + resBuf := spillHelper.serializePartialResult4MaxMinDecimal(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *maxMin4Decimal) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *maxMin4Decimal) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4MaxMinDecimal)(pr) + success := helper.deserializePartialResult4MaxMinDecimal(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type maxMin4DecimalSliding struct { maxMin4Decimal windowInfo @@ -1029,6 +1129,26 @@ func (e *maxMin4String) MergePartialResult(_ sessionctx.Context, src, dst Partia return 0, nil } +func (e *maxMin4String) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4MaxMinString)(partialResult) + resBuf := spillHelper.serializePartialResult4MaxMinString(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *maxMin4String) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *maxMin4String) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4MaxMinString)(pr) + success := helper.deserializePartialResult4MaxMinString(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type maxMin4StringSliding struct { maxMin4String windowInfo @@ -1212,6 +1332,26 @@ func (e *maxMin4TimeSliding) UpdatePartialResult(sctx sessionctx.Context, rowsIn return 0, nil } +func (e *maxMin4TimeSliding) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4MaxMinTime)(partialResult) + resBuf := spillHelper.serializePartialResult4MaxMinTime(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *maxMin4TimeSliding) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *maxMin4TimeSliding) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4MaxMinTime)(pr) + success := helper.deserializePartialResult4MaxMinTime(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + var _ SlidingWindowAggFunc = &maxMin4DurationSliding{} func (e *maxMin4TimeSliding) Slide(sctx sessionctx.Context, getRow func(uint64) chunk.Row, lastStart, lastEnd uint64, shiftStart, shiftEnd uint64, pr PartialResult) error { @@ -1308,6 +1448,26 @@ func (e *maxMin4Duration) MergePartialResult(_ sessionctx.Context, src, dst Part return 0, nil } +func (e *maxMin4Duration) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4MaxMinDuration)(partialResult) + resBuf := spillHelper.serializePartialResult4MaxMinDuration(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *maxMin4Duration) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *maxMin4Duration) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4MaxMinDuration)(pr) + success := helper.deserializePartialResult4MaxMinDuration(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type maxMin4DurationSliding struct { maxMin4Duration windowInfo @@ -1453,6 +1613,26 @@ func (e *maxMin4JSON) MergePartialResult(_ sessionctx.Context, src, dst PartialR return 0, nil } +func (e *maxMin4JSON) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4MaxMinJSON)(partialResult) + resBuf := spillHelper.serializePartialResult4MaxMinJSON(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *maxMin4JSON) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *maxMin4JSON) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4MaxMinJSON)(pr) + success := helper.deserializePartialResult4MaxMinJSON(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type maxMin4Enum struct { baseMaxMinAggFunc } @@ -1520,6 +1700,26 @@ func (e *maxMin4Enum) MergePartialResult(_ sessionctx.Context, src, dst PartialR return 0, nil } +func (e *maxMin4Enum) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4MaxMinEnum)(partialResult) + resBuf := spillHelper.serializePartialResult4MaxMinEnum(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *maxMin4Enum) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *maxMin4Enum) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4MaxMinEnum)(pr) + success := helper.deserializePartialResult4MaxMinEnum(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type maxMin4Set struct { baseMaxMinAggFunc } @@ -1586,3 +1786,23 @@ func (e *maxMin4Set) MergePartialResult(_ sessionctx.Context, src, dst PartialRe } return 0, nil } + +func (e *maxMin4Set) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4MaxMinSet)(partialResult) + resBuf := spillHelper.serializePartialResult4MaxMinSet(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *maxMin4Set) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *maxMin4Set) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4MaxMinSet)(pr) + success := helper.deserializePartialResult4MaxMinSet(result) + if !success { + return nil, 0 + } + return pr, memDelta +} diff --git a/pkg/executor/aggfuncs/func_sum.go b/pkg/executor/aggfuncs/func_sum.go index 32573eacc6fcd..23f39f26bce50 100644 --- a/pkg/executor/aggfuncs/func_sum.go +++ b/pkg/executor/aggfuncs/func_sum.go @@ -114,6 +114,26 @@ func (*baseSum4Float64) MergePartialResult(_ sessionctx.Context, src, dst Partia return 0, nil } +func (e *baseSum4Float64) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4SumFloat64)(partialResult) + resBuf := spillHelper.serializePartialResult4SumFloat64(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *baseSum4Float64) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *baseSum4Float64) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4SumFloat64)(pr) + success := helper.deserializePartialResult4SumFloat64(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + type sum4Float64 struct { baseSum4Float64 } @@ -155,6 +175,26 @@ type sum4Decimal struct { baseSumAggFunc } +func (e *sum4Decimal) SerializePartialResult(partialResult PartialResult, chk *chunk.Chunk, spillHelper *SerializeHelper) { + pr := (*partialResult4SumDecimal)(partialResult) + resBuf := spillHelper.serializePartialResult4SumDecimal(*pr) + chk.AppendBytes(e.ordinal, resBuf) +} + +func (e *sum4Decimal) DeserializePartialResult(src *chunk.Chunk) ([]PartialResult, int64) { + return deserializePartialResultCommon(src, e.ordinal, e.deserializeForSpill) +} + +func (e *sum4Decimal) deserializeForSpill(helper *deserializeHelper) (PartialResult, int64) { + pr, memDelta := e.AllocPartialResult() + result := (*partialResult4SumDecimal)(pr) + success := helper.deserializePartialResult4SumDecimal(result) + if !success { + return nil, 0 + } + return pr, memDelta +} + func (*sum4Decimal) AllocPartialResult() (pr PartialResult, memDelta int64) { p := new(partialResult4SumDecimal) return PartialResult(p), DefPartialResult4SumDecimalSize diff --git a/pkg/executor/aggfuncs/spill_deserialize_helper.go b/pkg/executor/aggfuncs/spill_deserialize_helper.go new file mode 100644 index 0000000000000..83e9d44f32b16 --- /dev/null +++ b/pkg/executor/aggfuncs/spill_deserialize_helper.go @@ -0,0 +1,397 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggfuncs + +import ( + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/hack" + util "github.com/pingcap/tidb/pkg/util/serialization" +) + +type deserializeHelper struct { + column *chunk.Column + readRowIndex int + totalRowCnt int + pab *util.PosAndBuf +} + +func newDeserializeHelper(column *chunk.Column, rowNum int) deserializeHelper { + return deserializeHelper{ + column: column, + readRowIndex: 0, + totalRowCnt: rowNum, + pab: &util.PosAndBuf{ + Pos: 0, + }, + } +} + +func (s *deserializeHelper) deserializePartialResult4Count(dst *partialResult4Count) bool { + if s.readRowIndex < s.totalRowCnt { + s.pab.Reset(s.column, s.readRowIndex) + *dst = util.DeserializeInt64(s.pab) + s.readRowIndex++ + return true + } + return false +} + +func (s *deserializeHelper) deserializePartialResult4MaxMinInt(dst *partialResult4MaxMinInt) bool { + if s.readRowIndex < s.totalRowCnt { + s.pab.Reset(s.column, s.readRowIndex) + dst.isNull = util.DeserializeBool(s.pab) + dst.val = util.DeserializeInt64(s.pab) + s.readRowIndex++ + return true + } + return false +} + +func (s *deserializeHelper) deserializePartialResult4MaxMinUint(dst *partialResult4MaxMinUint) bool { + if s.readRowIndex < s.totalRowCnt { + s.pab.Reset(s.column, s.readRowIndex) + dst.isNull = util.DeserializeBool(s.pab) + dst.val = util.DeserializeUint64(s.pab) + s.readRowIndex++ + return true + } + return false +} + +func (s *deserializeHelper) deserializePartialResult4MaxMinDecimal(dst *partialResult4MaxMinDecimal) bool { + if s.readRowIndex < s.totalRowCnt { + s.pab.Reset(s.column, s.readRowIndex) + dst.isNull = util.DeserializeBool(s.pab) + dst.val = util.DeserializeMyDecimal(s.pab) + s.readRowIndex++ + return true + } + return false +} + +func (s *deserializeHelper) deserializePartialResult4MaxMinFloat32(dst *partialResult4MaxMinFloat32) bool { + if s.readRowIndex < s.totalRowCnt { + s.pab.Reset(s.column, s.readRowIndex) + dst.isNull = util.DeserializeBool(s.pab) + dst.val = util.DeserializeFloat32(s.pab) + s.readRowIndex++ + return true + } + return false +} + +func (s *deserializeHelper) deserializePartialResult4MaxMinFloat64(dst *partialResult4MaxMinFloat64) bool { + if s.readRowIndex < s.totalRowCnt { + s.pab.Reset(s.column, s.readRowIndex) + dst.isNull = util.DeserializeBool(s.pab) + dst.val = util.DeserializeFloat64(s.pab) + s.readRowIndex++ + return true + } + return false +} + +func (s *deserializeHelper) deserializePartialResult4MaxMinTime(dst *partialResult4MaxMinTime) bool { + if s.readRowIndex < s.totalRowCnt { + s.pab.Reset(s.column, s.readRowIndex) + dst.isNull = util.DeserializeBool(s.pab) + dst.val = util.DeserializeTime(s.pab) + s.readRowIndex++ + return true + } + return false +} + +func (s *deserializeHelper) deserializePartialResult4MaxMinDuration(dst *partialResult4MaxMinDuration) bool { + if s.readRowIndex < s.totalRowCnt { + s.pab.Reset(s.column, s.readRowIndex) + dst.isNull = util.DeserializeBool(s.pab) + dst.val = util.DeserializeTypesDuration(s.pab) + s.readRowIndex++ + return true + } + return false +} + +func (s *deserializeHelper) deserializePartialResult4MaxMinString(dst *partialResult4MaxMinString) bool { + if s.readRowIndex < s.totalRowCnt { + s.pab.Reset(s.column, s.readRowIndex) + dst.isNull = util.DeserializeBool(s.pab) + dst.val = util.DeserializeString(s.pab) + s.readRowIndex++ + return true + } + return false +} + +func (s *deserializeHelper) deserializePartialResult4MaxMinJSON(dst *partialResult4MaxMinJSON) bool { + if s.readRowIndex < s.totalRowCnt { + s.pab.Reset(s.column, s.readRowIndex) + dst.isNull = util.DeserializeBool(s.pab) + dst.val = util.DeserializeBinaryJSON(s.pab) + s.readRowIndex++ + return true + } + return false +} + +func (s *deserializeHelper) deserializePartialResult4MaxMinEnum(dst *partialResult4MaxMinEnum) bool { + if s.readRowIndex < s.totalRowCnt { + s.pab.Reset(s.column, s.readRowIndex) + dst.isNull = util.DeserializeBool(s.pab) + dst.val = util.DeserializeEnum(s.pab) + s.readRowIndex++ + return true + } + return false +} + +func (s *deserializeHelper) deserializePartialResult4MaxMinSet(dst *partialResult4MaxMinSet) bool { + if s.readRowIndex < s.totalRowCnt { + s.pab.Reset(s.column, s.readRowIndex) + dst.isNull = util.DeserializeBool(s.pab) + dst.val = util.DeserializeSet(s.pab) + s.readRowIndex++ + return true + } + return false +} + +func (s *deserializeHelper) deserializePartialResult4AvgDecimal(dst *partialResult4AvgDecimal) bool { + if s.readRowIndex < s.totalRowCnt { + s.pab.Reset(s.column, s.readRowIndex) + dst.sum = util.DeserializeMyDecimal(s.pab) + dst.count = util.DeserializeInt64(s.pab) + s.readRowIndex++ + return true + } + return false +} + +func (s *deserializeHelper) deserializePartialResult4AvgFloat64(dst *partialResult4AvgFloat64) bool { + if s.readRowIndex < s.totalRowCnt { + s.pab.Reset(s.column, s.readRowIndex) + dst.sum = util.DeserializeFloat64(s.pab) + dst.count = util.DeserializeInt64(s.pab) + s.readRowIndex++ + return true + } + return false +} + +func (s *deserializeHelper) deserializePartialResult4SumDecimal(dst *partialResult4SumDecimal) bool { + if s.readRowIndex < s.totalRowCnt { + s.pab.Reset(s.column, s.readRowIndex) + dst.val = util.DeserializeMyDecimal(s.pab) + dst.notNullRowCount = util.DeserializeInt64(s.pab) + s.readRowIndex++ + return true + } + return false +} + +func (s *deserializeHelper) deserializePartialResult4SumFloat64(dst *partialResult4SumFloat64) bool { + if s.readRowIndex < s.totalRowCnt { + s.pab.Reset(s.column, s.readRowIndex) + dst.val = util.DeserializeFloat64(s.pab) + dst.notNullRowCount = util.DeserializeInt64(s.pab) + s.readRowIndex++ + return true + } + return false +} + +func (s *deserializeHelper) deserializeBasePartialResult4GroupConcat(dst *basePartialResult4GroupConcat) bool { + if s.readRowIndex < s.totalRowCnt { + s.pab.Reset(s.column, s.readRowIndex) + dst.valsBuf = util.DeserializeBytesBuffer(s.pab) + dst.buffer = util.DeserializeBytesBuffer(s.pab) + s.readRowIndex++ + return true + } + return false +} + +func (s *deserializeHelper) deserializePartialResult4GroupConcat(dst *partialResult4GroupConcat) bool { + base := basePartialResult4GroupConcat{} + success := s.deserializeBasePartialResult4GroupConcat(&base) + dst.valsBuf = base.valsBuf + dst.buffer = base.buffer + return success +} + +func (s *deserializeHelper) deserializePartialResult4BitFunc(dst *partialResult4BitFunc) bool { + if s.readRowIndex < s.totalRowCnt { + s.pab.Reset(s.column, s.readRowIndex) + *dst = util.DeserializeUint64(s.pab) + s.readRowIndex++ + return true + } + return false +} + +func (s *deserializeHelper) deserializePartialResult4JsonArrayagg(dst *partialResult4JsonArrayagg) bool { + if s.readRowIndex < s.totalRowCnt { + s.pab.Reset(s.column, s.readRowIndex) + byteNum := int64(len(s.pab.Buf)) + for s.pab.Pos < byteNum { + value := util.DeserializeInterface(s.pab) + dst.entries = append(dst.entries, value) + } + s.readRowIndex++ + return true + } + return false +} + +func (s *deserializeHelper) deserializePartialResult4JsonObjectAgg(dst *partialResult4JsonObjectAgg) (bool, int64) { + memDelta := int64(0) + dst.bInMap = 0 + dst.entries = make(map[string]interface{}) + if s.readRowIndex < s.totalRowCnt { + s.pab.Reset(s.column, s.readRowIndex) + byteNum := int64(len(s.pab.Buf)) + for s.pab.Pos < byteNum { + key := util.DeserializeString(s.pab) + realVal := util.DeserializeInterface(s.pab) + if _, ok := dst.entries[key]; !ok { + memDelta += int64(len(key)) + getValMemDelta(realVal) + if len(dst.entries)+1 > (1< b.lastCap) + b.lastCap = newCap + return retVal +} + +func TestPartialResult4Count(t *testing.T) { + serializeHelper := NewSerializeHelper() + + // Initialize test data + expectData := []partialResult4Count{-123, 0, 123} + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4Count) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4Count(*(*partialResult4Count)(pr)) + chunk.AppendBytes(0, serializedData) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4Count, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4Count(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4Count)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4MaxMinInt(t *testing.T) { + serializeHelper := NewSerializeHelper() + + // Initialize test data + expectData := []partialResult4MaxMinInt{ + {val: -123, isNull: true}, + {val: 0, isNull: false}, + {val: 123, isNull: true}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4MaxMinInt) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4MaxMinInt(*(*partialResult4MaxMinInt)(pr)) + chunk.AppendBytes(0, serializedData) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4MaxMinInt, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4MaxMinInt(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4MaxMinInt)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4MaxMinUint(t *testing.T) { + serializeHelper := NewSerializeHelper() + + // Initialize test data + expectData := []partialResult4MaxMinUint{ + {val: 0, isNull: true}, + {val: 1, isNull: false}, + {val: 2, isNull: true}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4MaxMinUint) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4MaxMinUint(*(*partialResult4MaxMinUint)(pr)) + chunk.AppendBytes(0, serializedData) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4MaxMinUint, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4MaxMinUint(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4MaxMinUint)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4MaxMinDecimal(t *testing.T) { + serializeHelper := NewSerializeHelper() + + // Initialize test data + expectData := []partialResult4MaxMinDecimal{ + {val: *types.NewDecFromInt(0), isNull: true}, + {val: *types.NewDecFromUint(123456), isNull: false}, + {val: *types.NewDecFromInt(99999), isNull: true}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4MaxMinDecimal) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4MaxMinDecimal(*(*partialResult4MaxMinDecimal)(pr)) + chunk.AppendBytes(0, serializedData) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4MaxMinDecimal, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4MaxMinDecimal(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4MaxMinDecimal)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4MaxMinFloat32(t *testing.T) { + serializeHelper := NewSerializeHelper() + + // Initialize test data + expectData := []partialResult4MaxMinFloat32{ + {val: -123.123, isNull: true}, + {val: 0.0, isNull: false}, + {val: 123.123, isNull: true}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4MaxMinFloat32) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4MaxMinFloat32(*(*partialResult4MaxMinFloat32)(pr)) + chunk.AppendBytes(0, serializedData) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4MaxMinFloat32, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4MaxMinFloat32(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4MaxMinFloat32)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4MaxMinFloat64(t *testing.T) { + serializeHelper := NewSerializeHelper() + + // Initialize test data + expectData := []partialResult4MaxMinFloat64{ + {val: -123.123, isNull: true}, + {val: 0.0, isNull: false}, + {val: 123.123, isNull: true}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4MaxMinFloat64) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4MaxMinFloat64(*(*partialResult4MaxMinFloat64)(pr)) + chunk.AppendBytes(0, serializedData) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4MaxMinFloat64, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4MaxMinFloat64(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4MaxMinFloat64)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4MaxMinTime(t *testing.T) { + serializeHelper := NewSerializeHelper() + + // Initialize test data + expectData := []partialResult4MaxMinTime{ + {val: types.NewTime(123, 10, 9), isNull: true}, + {val: types.NewTime(0, 0, 0), isNull: false}, + {val: types.NewTime(9876, 12, 10), isNull: true}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4MaxMinTime) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4MaxMinTime(*(*partialResult4MaxMinTime)(pr)) + chunk.AppendBytes(0, serializedData) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4MaxMinTime, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4MaxMinTime(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4MaxMinTime)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4MaxMinString(t *testing.T) { + serializeHelper := NewSerializeHelper() + bufSizeChecker := newBufferSizeChecker() + + // Initialize test data + expectData := []partialResult4MaxMinString{ + {val: string("12312412312"), isNull: true}, + {val: testLongStr1, isNull: false}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4MaxMinString) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4MaxMinString(*(*partialResult4MaxMinString)(pr)) + chunk.AppendBytes(0, serializedData) + require.True(t, bufSizeChecker.checkBufferCapacity(serializeHelper)) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4MaxMinString, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4MaxMinString(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4MaxMinString)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4MaxMinJSON(t *testing.T) { + serializeHelper := NewSerializeHelper() + bufSizeChecker := newBufferSizeChecker() + + // Initialize test data + expectData := []partialResult4MaxMinJSON{ + {val: types.BinaryJSON{TypeCode: 3, Value: []byte{}}, isNull: false}, + {val: types.BinaryJSON{TypeCode: 6, Value: getLargeRandBuffer()}, isNull: true}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4MaxMinJSON) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4MaxMinJSON(*(*partialResult4MaxMinJSON)(pr)) + chunk.AppendBytes(0, serializedData) + require.True(t, bufSizeChecker.checkBufferCapacity(serializeHelper)) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4MaxMinJSON, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4MaxMinJSON(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4MaxMinJSON)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4MaxMinEnum(t *testing.T) { + serializeHelper := NewSerializeHelper() + bufSizeChecker := newBufferSizeChecker() + + // Initialize test data + expectData := []partialResult4MaxMinEnum{ + {val: types.Enum{Name: string(""), Value: 123}, isNull: true}, + {val: types.Enum{Name: testLongStr1, Value: 0}, isNull: false}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4MaxMinEnum) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4MaxMinEnum(*(*partialResult4MaxMinEnum)(pr)) + chunk.AppendBytes(0, serializedData) + require.True(t, bufSizeChecker.checkBufferCapacity(serializeHelper)) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4MaxMinEnum, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4MaxMinEnum(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4MaxMinEnum)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4MaxMinSet(t *testing.T) { + serializeHelper := NewSerializeHelper() + bufSizeChecker := newBufferSizeChecker() + + // Initialize test data + expectData := []partialResult4MaxMinSet{ + {val: types.Set{Name: string(""), Value: 123}, isNull: true}, + {val: types.Set{Name: testLongStr1, Value: 0}, isNull: false}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4MaxMinSet) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4MaxMinSet(*(*partialResult4MaxMinSet)(pr)) + chunk.AppendBytes(0, serializedData) + require.True(t, bufSizeChecker.checkBufferCapacity(serializeHelper)) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4MaxMinSet, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4MaxMinSet(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4MaxMinSet)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4AvgDecimal(t *testing.T) { + serializeHelper := NewSerializeHelper() + + // Initialize test data + expectData := []partialResult4AvgDecimal{ + {sum: *types.NewDecFromInt(0), count: 0}, + {sum: *types.NewDecFromInt(12345), count: 123}, + {sum: *types.NewDecFromInt(87654), count: -123}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4AvgDecimal) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4AvgDecimal(*(*partialResult4AvgDecimal)(pr)) + chunk.AppendBytes(0, serializedData) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4AvgDecimal, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4AvgDecimal(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4AvgDecimal)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4AvgFloat64(t *testing.T) { + serializeHelper := NewSerializeHelper() + + // Initialize test data + expectData := []partialResult4AvgFloat64{ + {sum: 0.0, count: 0}, + {sum: 123.123, count: 123}, + {sum: -123.123, count: -123}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4AvgFloat64) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4AvgFloat64(*(*partialResult4AvgFloat64)(pr)) + chunk.AppendBytes(0, serializedData) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4AvgFloat64, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4AvgFloat64(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4AvgFloat64)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4SumDecimal(t *testing.T) { + serializeHelper := NewSerializeHelper() + + // Initialize test data + expectData := []partialResult4SumDecimal{ + {val: *types.NewDecFromInt(0), notNullRowCount: 0}, + {val: *types.NewDecFromInt(12345), notNullRowCount: 123}, + {val: *types.NewDecFromInt(87654), notNullRowCount: -123}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4SumDecimal) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4SumDecimal(*(*partialResult4SumDecimal)(pr)) + chunk.AppendBytes(0, serializedData) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4SumDecimal, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4SumDecimal(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4SumDecimal)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4SumFloat64(t *testing.T) { + serializeHelper := NewSerializeHelper() + + // Initialize test data + expectData := []partialResult4SumFloat64{ + {val: 0.0, notNullRowCount: 0}, + {val: 123.123, notNullRowCount: 123}, + {val: -123.123, notNullRowCount: -123}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4SumFloat64) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4SumFloat64(*(*partialResult4SumFloat64)(pr)) + chunk.AppendBytes(0, serializedData) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4SumFloat64, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4SumFloat64(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4SumFloat64)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestBasePartialResult4GroupConcat(t *testing.T) { + var serializeHelper = NewSerializeHelper() + bufSizeChecker := newBufferSizeChecker() + + // Initialize test data + expectData := []basePartialResult4GroupConcat{ + {valsBuf: bytes.NewBufferString(""), buffer: bytes.NewBufferString("")}, + {valsBuf: bytes.NewBufferString("xzxx"), buffer: bytes.NewBufferString(testLongStr2)}, + {valsBuf: bytes.NewBufferString(testLongStr1), buffer: bytes.NewBufferString(testLongStr2)}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(basePartialResult4GroupConcat) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializeBasePartialResult4GroupConcat(*(*basePartialResult4GroupConcat)(pr)) + chunk.AppendBytes(0, serializedData) + require.True(t, bufSizeChecker.checkBufferCapacity(serializeHelper)) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]basePartialResult4GroupConcat, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializeBasePartialResult4GroupConcat(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, (*basePartialResult4GroupConcat)(serializedPartialResults[i]).valsBuf.String(), deserializedPartialResults[i].valsBuf.String()) + require.Equal(t, (*basePartialResult4GroupConcat)(serializedPartialResults[i]).buffer.String(), deserializedPartialResults[i].buffer.String()) + } +} + +func TestPartialResult4BitFunc(t *testing.T) { + serializeHelper := NewSerializeHelper() + + // Initialize test data + expectData := []partialResult4BitFunc{0, 1, 2} + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4BitFunc) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4BitFunc(*(*partialResult4BitFunc)(pr)) + chunk.AppendBytes(0, serializedData) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4BitFunc, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4BitFunc(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4BitFunc)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4JsonArrayagg(t *testing.T) { + serializeHelper := NewSerializeHelper() + bufSizeChecker := newBufferSizeChecker() + + // Initialize test data + expectData := []partialResult4JsonArrayagg{ + {entries: []interface{}{int64(1), float64(1.1), "", true, types.Opaque{TypeCode: 1, Buf: getLargeRandBuffer()}, types.NewTime(9876, 12, 10)}}, + {entries: []interface{}{int64(1), float64(1.1), false, types.NewDuration(1, 2, 3, 4, 5), testLongStr1}}, + {entries: []interface{}{"dw啊q", float64(-1.1), int64(0), types.NewDuration(1, 2, 3, 4, 5), types.NewTime(123, 1, 2), testLongStr1, types.BinaryJSON{TypeCode: 1, Value: []byte(testLongStr2)}, types.Opaque{TypeCode: 6, Buf: getLargeRandBuffer()}}}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4JsonArrayagg) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4JsonArrayagg(*(*partialResult4JsonArrayagg)(pr)) + chunk.AppendBytes(0, serializedData) + require.True(t, bufSizeChecker.checkBufferCapacity(serializeHelper)) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4JsonArrayagg, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4JsonArrayagg(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4JsonArrayagg)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4JsonObjectAgg(t *testing.T) { + serializeHelper := NewSerializeHelper() + bufSizeChecker := newBufferSizeChecker() + + // Initialize test data + expectData := []partialResult4JsonObjectAgg{ + {entries: map[string]interface{}{"123": int64(1), "234": float64(1.1), "999": true, "235": "123"}, bInMap: 0}, + {entries: map[string]interface{}{"啊": testLongStr1, "我": float64(1.1), "反": int64(456)}, bInMap: 0}, + {entries: map[string]interface{}{"fe": testLongStr1, " ": int64(36798), "888": false, "": testLongStr2}, bInMap: 0}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4JsonObjectAgg) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4JsonObjectAgg(*(*partialResult4JsonObjectAgg)(pr)) + chunk.AppendBytes(0, serializedData) + require.True(t, bufSizeChecker.checkBufferCapacity(serializeHelper)) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4JsonObjectAgg, testDataNum+1) + index := 0 + for { + success, _ := deserializeHelper.deserializePartialResult4JsonObjectAgg(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4JsonObjectAgg)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4FirstRowDecimal(t *testing.T) { + serializeHelper := NewSerializeHelper() + + // Initialize test data + expectData := []partialResult4FirstRowDecimal{ + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: true, gotFirstRow: false}, val: *types.NewDecFromInt(0)}, + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: false, gotFirstRow: false}, val: *types.NewDecFromInt(123)}, + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: true, gotFirstRow: true}, val: *types.NewDecFromInt(12345)}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4FirstRowDecimal) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4FirstRowDecimal(*(*partialResult4FirstRowDecimal)(pr)) + chunk.AppendBytes(0, serializedData) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4FirstRowDecimal, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4FirstRowDecimal(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4FirstRowDecimal)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4FirstRowInt(t *testing.T) { + serializeHelper := NewSerializeHelper() + + // Initialize test data + expectData := []partialResult4FirstRowInt{ + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: true, gotFirstRow: false}, val: -123}, + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: false, gotFirstRow: false}, val: 0}, + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: true, gotFirstRow: true}, val: 123}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4FirstRowInt) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4FirstRowInt(*(*partialResult4FirstRowInt)(pr)) + chunk.AppendBytes(0, serializedData) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4FirstRowInt, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4FirstRowInt(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4FirstRowInt)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4FirstRowTime(t *testing.T) { + serializeHelper := NewSerializeHelper() + + // Initialize test data + expectData := []partialResult4FirstRowTime{ + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: true, gotFirstRow: false}, val: types.NewTime(0, 0, 1)}, + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: false, gotFirstRow: false}, val: types.NewTime(123, 0, 1)}, + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: true, gotFirstRow: true}, val: types.NewTime(456, 0, 1)}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4FirstRowTime) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4FirstRowTime(*(*partialResult4FirstRowTime)(pr)) + chunk.AppendBytes(0, serializedData) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4FirstRowTime, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4FirstRowTime(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4FirstRowTime)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4FirstRowString(t *testing.T) { + serializeHelper := NewSerializeHelper() + bufSizeChecker := newBufferSizeChecker() + + // Initialize test data + expectData := []partialResult4FirstRowString{ + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: true, gotFirstRow: false}, val: ""}, + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: false, gotFirstRow: false}, val: testLongStr1}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4FirstRowString) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4FirstRowString(*(*partialResult4FirstRowString)(pr)) + chunk.AppendBytes(0, serializedData) + require.True(t, bufSizeChecker.checkBufferCapacity(serializeHelper)) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4FirstRowString, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4FirstRowString(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4FirstRowString)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4FirstRowFloat32(t *testing.T) { + serializeHelper := NewSerializeHelper() + + // Initialize test data + expectData := []partialResult4FirstRowFloat32{ + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: true, gotFirstRow: false}, val: -1.1}, + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: false, gotFirstRow: false}, val: 0}, + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: true, gotFirstRow: true}, val: 1.1}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4FirstRowFloat32) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4FirstRowFloat32(*(*partialResult4FirstRowFloat32)(pr)) + chunk.AppendBytes(0, serializedData) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4FirstRowFloat32, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4FirstRowFloat32(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4FirstRowFloat32)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4FirstRowFloat64(t *testing.T) { + serializeHelper := NewSerializeHelper() + + // Initialize test data + expectData := []partialResult4FirstRowFloat64{ + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: true, gotFirstRow: false}, val: -1.1}, + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: false, gotFirstRow: false}, val: 0}, + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: true, gotFirstRow: true}, val: 1.1}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4FirstRowFloat64) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4FirstRowFloat64(*(*partialResult4FirstRowFloat64)(pr)) + chunk.AppendBytes(0, serializedData) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4FirstRowFloat64, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4FirstRowFloat64(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4FirstRowFloat64)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4FirstRowDuration(t *testing.T) { + serializeHelper := NewSerializeHelper() + + // Initialize test data + expectData := []partialResult4FirstRowDuration{ + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: true, gotFirstRow: false}, val: types.NewDuration(1, 2, 3, 4, 5)}, + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: false, gotFirstRow: false}, val: types.NewDuration(0, 0, 0, 0, 0)}, + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: true, gotFirstRow: true}, val: types.NewDuration(10, 20, 30, 40, 50)}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4FirstRowDuration) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4FirstRowDuration(*(*partialResult4FirstRowDuration)(pr)) + chunk.AppendBytes(0, serializedData) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4FirstRowDuration, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4FirstRowDuration(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4FirstRowDuration)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4FirstRowJSON(t *testing.T) { + serializeHelper := NewSerializeHelper() + bufSizeChecker := newBufferSizeChecker() + + // Initialize test data + expectData := []partialResult4FirstRowJSON{ + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: false, gotFirstRow: false}, val: types.BinaryJSON{TypeCode: 6, Value: []byte{}}}, + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: true, gotFirstRow: false}, val: types.BinaryJSON{TypeCode: 8, Value: getLargeRandBuffer()}}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4FirstRowJSON) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4FirstRowJSON(*(*partialResult4FirstRowJSON)(pr)) + chunk.AppendBytes(0, serializedData) + require.True(t, bufSizeChecker.checkBufferCapacity(serializeHelper)) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4FirstRowJSON, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4FirstRowJSON(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4FirstRowJSON)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4FirstRowEnum(t *testing.T) { + serializeHelper := NewSerializeHelper() + bufSizeChecker := newBufferSizeChecker() + + // Initialize test data + expectData := []partialResult4FirstRowEnum{ + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: true, gotFirstRow: false}, val: types.Enum{Name: string(""), Value: 123}}, + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: true, gotFirstRow: false}, val: types.Enum{Name: testLongStr2, Value: 999}}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4FirstRowEnum) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4FirstRowEnum(*(*partialResult4FirstRowEnum)(pr)) + chunk.AppendBytes(0, serializedData) + require.True(t, bufSizeChecker.checkBufferCapacity(serializeHelper)) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4FirstRowEnum, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4FirstRowEnum(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4FirstRowEnum)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} + +func TestPartialResult4FirstRowSet(t *testing.T) { + serializeHelper := NewSerializeHelper() + bufSizeChecker := newBufferSizeChecker() + + // Initialize test data + expectData := []partialResult4FirstRowSet{ + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: true, gotFirstRow: false}, val: types.Set{Name: string(""), Value: 123}}, + {basePartialResult4FirstRow: basePartialResult4FirstRow{isNull: true, gotFirstRow: false}, val: types.Set{Name: testLongStr1, Value: 999}}, + } + serializedPartialResults := make([]PartialResult, len(expectData)) + testDataNum := len(serializedPartialResults) + for i := range serializedPartialResults { + pr := new(partialResult4FirstRowSet) + *pr = expectData[i] + serializedPartialResults[i] = PartialResult(pr) + } + + // Serialize test data + chunk := getChunk() + for _, pr := range serializedPartialResults { + serializedData := serializeHelper.serializePartialResult4FirstRowSet(*(*partialResult4FirstRowSet)(pr)) + chunk.AppendBytes(0, serializedData) + require.True(t, bufSizeChecker.checkBufferCapacity(serializeHelper)) + } + + // Deserialize test data + deserializeHelper := newDeserializeHelper(chunk.Column(0), testDataNum) + deserializedPartialResults := make([]partialResult4FirstRowSet, testDataNum+1) + index := 0 + for { + success := deserializeHelper.deserializePartialResult4FirstRowSet(&deserializedPartialResults[index]) + if !success { + break + } + index++ + } + + chunk.Column(0).DestroyDataForTest() + + // Check some results + require.Equal(t, testDataNum, index) + for i := 0; i < testDataNum; i++ { + require.Equal(t, *(*partialResult4FirstRowSet)(serializedPartialResults[i]), deserializedPartialResults[i]) + } +} diff --git a/pkg/executor/aggfuncs/spill_serialize_helper.go b/pkg/executor/aggfuncs/spill_serialize_helper.go new file mode 100644 index 0000000000000..7a5aef465b80c --- /dev/null +++ b/pkg/executor/aggfuncs/spill_serialize_helper.go @@ -0,0 +1,226 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggfuncs + +import ( + util "github.com/pingcap/tidb/pkg/util/serialization" +) + +// SerializeHelper is the helper for serializing agg function meta data. +type SerializeHelper struct { + buf []byte +} + +// NewSerializeHelper creates a new SerializeHelper +func NewSerializeHelper() *SerializeHelper { + return &SerializeHelper{ + buf: make([]byte, 0, 64), + } +} + +func (s *SerializeHelper) serializePartialResult4Count(value partialResult4Count) []byte { + s.buf = s.buf[:0] + return util.SerializeInt64(value, s.buf) +} + +func (s *SerializeHelper) serializePartialResult4MaxMinInt(value partialResult4MaxMinInt) []byte { + s.buf = s.buf[:0] + s.buf = util.SerializeBool(value.isNull, s.buf) + return util.SerializeInt64(value.val, s.buf) +} + +func (s *SerializeHelper) serializePartialResult4MaxMinUint(value partialResult4MaxMinUint) []byte { + s.buf = s.buf[:0] + s.buf = util.SerializeBool(value.isNull, s.buf) + return util.SerializeUint64(value.val, s.buf) +} + +func (s *SerializeHelper) serializePartialResult4MaxMinDecimal(value partialResult4MaxMinDecimal) []byte { + s.buf = s.buf[:0] + s.buf = util.SerializeBool(value.isNull, s.buf) + return util.SerializeMyDecimal(&value.val, s.buf) +} + +func (s *SerializeHelper) serializePartialResult4MaxMinFloat32(value partialResult4MaxMinFloat32) []byte { + s.buf = s.buf[:0] + s.buf = util.SerializeBool(value.isNull, s.buf) + return util.SerializeFloat32(value.val, s.buf) +} + +func (s *SerializeHelper) serializePartialResult4MaxMinFloat64(value partialResult4MaxMinFloat64) []byte { + s.buf = s.buf[:0] + s.buf = util.SerializeBool(value.isNull, s.buf) + return util.SerializeFloat64(value.val, s.buf) +} + +func (s *SerializeHelper) serializePartialResult4MaxMinTime(value partialResult4MaxMinTime) []byte { + s.buf = s.buf[:0] + s.buf = util.SerializeBool(value.isNull, s.buf) + return util.SerializeTime(value.val, s.buf) +} + +func (s *SerializeHelper) serializePartialResult4MaxMinDuration(value partialResult4MaxMinDuration) []byte { + s.buf = s.buf[:0] + s.buf = util.SerializeBool(value.isNull, s.buf) + return util.SerializeTypesDuration(value.val, s.buf) +} + +func (s *SerializeHelper) serializePartialResult4MaxMinString(value partialResult4MaxMinString) []byte { + s.buf = s.buf[:0] + s.buf = util.SerializeBool(value.isNull, s.buf) + s.buf = util.SerializeString(value.val, s.buf) + return s.buf +} + +func (s *SerializeHelper) serializePartialResult4MaxMinJSON(value partialResult4MaxMinJSON) []byte { + s.buf = s.buf[:0] + s.buf = util.SerializeBool(value.isNull, s.buf) + s.buf = util.SerializeBinaryJSON(&value.val, s.buf) + return s.buf +} + +func (s *SerializeHelper) serializePartialResult4MaxMinEnum(value partialResult4MaxMinEnum) []byte { + s.buf = s.buf[:0] + s.buf = util.SerializeBool(value.isNull, s.buf) + s.buf = util.SerializeEnum(&value.val, s.buf) + return s.buf +} + +func (s *SerializeHelper) serializePartialResult4MaxMinSet(value partialResult4MaxMinSet) []byte { + s.buf = s.buf[:0] + s.buf = util.SerializeBool(value.isNull, s.buf) + s.buf = util.SerializeSet(&value.val, s.buf) + return s.buf +} + +func (s *SerializeHelper) serializePartialResult4AvgDecimal(value partialResult4AvgDecimal) []byte { + s.buf = s.buf[:0] + s.buf = util.SerializeMyDecimal(&value.sum, s.buf) + return util.SerializeInt64(value.count, s.buf) +} + +func (s *SerializeHelper) serializePartialResult4AvgFloat64(value partialResult4AvgFloat64) []byte { + s.buf = s.buf[:0] + s.buf = util.SerializeFloat64(value.sum, s.buf) + return util.SerializeInt64(value.count, s.buf) +} + +func (s *SerializeHelper) serializePartialResult4SumDecimal(value partialResult4SumDecimal) []byte { + s.buf = s.buf[:0] + s.buf = util.SerializeMyDecimal(&value.val, s.buf) + return util.SerializeInt64(value.notNullRowCount, s.buf) +} + +func (s *SerializeHelper) serializePartialResult4SumFloat64(value partialResult4SumFloat64) []byte { + s.buf = s.buf[:0] + s.buf = util.SerializeFloat64(value.val, s.buf) + return util.SerializeInt64(value.notNullRowCount, s.buf) +} + +func (s *SerializeHelper) serializeBasePartialResult4GroupConcat(value basePartialResult4GroupConcat) []byte { + s.buf = s.buf[:0] + s.buf = util.SerializeBytesBuffer(value.valsBuf, s.buf) + s.buf = util.SerializeBytesBuffer(value.buffer, s.buf) + return s.buf +} + +func (s *SerializeHelper) serializePartialResult4GroupConcat(value partialResult4GroupConcat) []byte { + return s.serializeBasePartialResult4GroupConcat(basePartialResult4GroupConcat{ + valsBuf: value.valsBuf, + buffer: value.buffer, + }) +} + +func (s *SerializeHelper) serializePartialResult4BitFunc(value partialResult4BitFunc) []byte { + return util.SerializeUint64(value, s.buf[:0]) +} + +func (s *SerializeHelper) serializePartialResult4JsonArrayagg(value partialResult4JsonArrayagg) []byte { + s.buf = s.buf[:0] + for _, value := range value.entries { + s.buf = util.SerializeInterface(value, s.buf) + } + return s.buf +} + +func (s *SerializeHelper) serializePartialResult4JsonObjectAgg(value partialResult4JsonObjectAgg) []byte { + s.buf = s.buf[:0] + for key, value := range value.entries { + s.buf = util.SerializeString(key, s.buf) + s.buf = util.SerializeInterface(value, s.buf) + } + return s.buf +} + +func (s *SerializeHelper) serializeBasePartialResult4FirstRow(value basePartialResult4FirstRow) []byte { + s.buf = s.buf[:0] + s.buf = util.SerializeBool(value.isNull, s.buf) + return util.SerializeBool(value.gotFirstRow, s.buf) +} + +func (s *SerializeHelper) serializePartialResult4FirstRowDecimal(value partialResult4FirstRowDecimal) []byte { + s.buf = s.serializeBasePartialResult4FirstRow(value.basePartialResult4FirstRow) + return util.SerializeMyDecimal(&value.val, s.buf) +} + +func (s *SerializeHelper) serializePartialResult4FirstRowInt(value partialResult4FirstRowInt) []byte { + s.buf = s.serializeBasePartialResult4FirstRow(value.basePartialResult4FirstRow) + return util.SerializeInt64(value.val, s.buf) +} + +func (s *SerializeHelper) serializePartialResult4FirstRowTime(value partialResult4FirstRowTime) []byte { + s.buf = s.serializeBasePartialResult4FirstRow(value.basePartialResult4FirstRow) + return util.SerializeTime(value.val, s.buf) +} + +func (s *SerializeHelper) serializePartialResult4FirstRowString(value partialResult4FirstRowString) []byte { + s.buf = s.serializeBasePartialResult4FirstRow(value.basePartialResult4FirstRow) + s.buf = util.SerializeString(value.val, s.buf) + return s.buf +} + +func (s *SerializeHelper) serializePartialResult4FirstRowFloat32(value partialResult4FirstRowFloat32) []byte { + s.buf = s.serializeBasePartialResult4FirstRow(value.basePartialResult4FirstRow) + return util.SerializeFloat32(value.val, s.buf) +} + +func (s *SerializeHelper) serializePartialResult4FirstRowFloat64(value partialResult4FirstRowFloat64) []byte { + s.buf = s.serializeBasePartialResult4FirstRow(value.basePartialResult4FirstRow) + return util.SerializeFloat64(value.val, s.buf) +} + +func (s *SerializeHelper) serializePartialResult4FirstRowDuration(value partialResult4FirstRowDuration) []byte { + s.buf = s.serializeBasePartialResult4FirstRow(value.basePartialResult4FirstRow) + s.buf = util.SerializeInt64(int64(value.val.Duration), s.buf) + return util.SerializeInt(value.val.Fsp, s.buf) +} + +func (s *SerializeHelper) serializePartialResult4FirstRowJSON(value partialResult4FirstRowJSON) []byte { + s.buf = s.serializeBasePartialResult4FirstRow(value.basePartialResult4FirstRow) + s.buf = util.SerializeBinaryJSON(&value.val, s.buf) + return s.buf +} + +func (s *SerializeHelper) serializePartialResult4FirstRowEnum(value partialResult4FirstRowEnum) []byte { + s.buf = s.serializeBasePartialResult4FirstRow(value.basePartialResult4FirstRow) + s.buf = util.SerializeEnum(&value.val, s.buf) + return s.buf +} + +func (s *SerializeHelper) serializePartialResult4FirstRowSet(value partialResult4FirstRowSet) []byte { + s.buf = s.serializeBasePartialResult4FirstRow(value.basePartialResult4FirstRow) + s.buf = util.SerializeSet(&value.val, s.buf) + return s.buf +} diff --git a/pkg/executor/aggregate/agg_hash_base_worker.go b/pkg/executor/aggregate/agg_hash_base_worker.go index b2ed73cd06f0a..e5551a9d450f7 100644 --- a/pkg/executor/aggregate/agg_hash_base_worker.go +++ b/pkg/executor/aggregate/agg_hash_base_worker.go @@ -23,9 +23,6 @@ import ( "github.com/pingcap/tidb/pkg/util/memory" ) -// AggPartialResultMapper contains aggregate function results -type AggPartialResultMapper map[string][]aggfuncs.PartialResult - // baseHashAggWorker stores the common attributes of HashAggFinalWorker and HashAggPartialWorker. // nolint:structcheck type baseHashAggWorker struct { @@ -52,7 +49,7 @@ func newBaseHashAggWorker(ctx sessionctx.Context, finishCh <-chan struct{}, aggF return baseWorker } -func (w *baseHashAggWorker) getPartialResult(_ *stmtctx.StatementContext, groupKey [][]byte, mapper AggPartialResultMapper) [][]aggfuncs.PartialResult { +func (w *baseHashAggWorker) getPartialResult(_ *stmtctx.StatementContext, groupKey [][]byte, mapper aggfuncs.AggPartialResultMapper) [][]aggfuncs.PartialResult { n := len(groupKey) partialResults := make([][]aggfuncs.PartialResult, n) allMemDelta := int64(0) diff --git a/pkg/executor/aggregate/agg_hash_executor.go b/pkg/executor/aggregate/agg_hash_executor.go index 7cf3608ab40ae..8608890c8f415 100644 --- a/pkg/executor/aggregate/agg_hash_executor.go +++ b/pkg/executor/aggregate/agg_hash_executor.go @@ -94,7 +94,7 @@ type HashAggExec struct { Sc *stmtctx.StatementContext PartialAggFuncs []aggfuncs.AggFunc FinalAggFuncs []aggfuncs.AggFunc - partialResultMap AggPartialResultMapper + partialResultMap aggfuncs.AggPartialResultMapper bInMap int64 // indicate there are 2^bInMap buckets in partialResultMap groupSet set.StringSetWithMemoryUsage groupKeys []string @@ -104,7 +104,7 @@ type HashAggExec struct { finishCh chan struct{} finalOutputCh chan *AfFinalResult - partialOutputChs []chan *AggPartialResultMapper + partialOutputChs []chan *aggfuncs.AggPartialResultMapper inputCh chan *HashAggInput partialInputChs []chan *chunk.Chunk partialWorkers []HashAggPartialWorker @@ -230,7 +230,7 @@ func (e *HashAggExec) Open(ctx context.Context) error { func (e *HashAggExec) initForUnparallelExec() { var setSize int64 e.groupSet, setSize = set.NewStringSetWithMemoryUsage() - e.partialResultMap = make(AggPartialResultMapper) + e.partialResultMap = make(aggfuncs.AggPartialResultMapper) e.bInMap = 0 failpoint.Inject("ConsumeRandomPanic", nil) e.memTracker.Consume(hack.DefBucketMemoryUsageForMapStrToSlice*(1<