Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

分库分表:结果集处理——聚合函数(不含 Group By 子句) #187

Merged
merged 14 commits into from
Apr 14, 2023
22 changes: 11 additions & 11 deletions internal/merger/aggregatemerger/aggregator/avg.go
longyue0521 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ import (
)

// AVG 用于求平均值,通过sum/count求得。
// AVG 我们并不能预期在不同的数据库上,精度会不会损失,自己损失的话会有多少的损失。这很大程度上跟数据库类型,数据库驱动实现都有关
// AVG 我们并不能预期在不同的数据库上,精度会不会损失,以及损失的话会有多少的损失。这很大程度上跟数据库类型,数据库驱动实现都有关
type AVG struct {
sumColumnInfo ColumnInfo
countColumnInfo ColumnInfo
alias string
avgName string
}

// NewAVG sumInfo是sum的信息,countInfo是count的信息,这两个顺序不能颠倒
func NewAVG(sumInfo ColumnInfo, countInfo ColumnInfo, alias string) *AVG {
// NewAVG sumInfo是sum的信息,countInfo是count的信息,由于求avgName用于Rows的Column方法显示的名字
juniaoshaonian marked this conversation as resolved.
Show resolved Hide resolved
func NewAVG(sumInfo ColumnInfo, countInfo ColumnInfo, avgName string) *AVG {
return &AVG{
sumColumnInfo: sumInfo,
countColumnInfo: countInfo,
alias: alias,
avgName: avgName,
}
}

Expand All @@ -43,27 +43,27 @@ func (a *AVG) Aggregate(cols [][]any) (any, error) {
if sumIndex >= len(cols[0]) || sumIndex < 0 || countIndex >= len(cols[0]) || countIndex < 0 {
return nil, errs.ErrMergerInvalidAggregateColumnIndex
}
ok, avgFunc := a.findSumAndCountReflectKindBy(cols[0][sumIndex], cols[0][countIndex])
avgFunc, ok := a.findSumAndCountReflectKindBy(cols[0][sumIndex], cols[0][countIndex])
if !ok {
return nil, errs.ErrMergerAggregateFuncNotFound
}
return avgFunc(cols, sumIndex, countIndex)

}

func (a *AVG) findSumAndCountReflectKindBy(sum, count any) (bool, func([][]any, int, int) (any, error)) {
func (a *AVG) findSumAndCountReflectKindBy(sum, count any) (func([][]any, int, int) (float64, error), bool) {
sumKind := reflect.TypeOf(sum).Kind()
countKind := reflect.TypeOf(count).Kind()
val, ok := avgAggregateFuncMapping[[2]reflect.Kind{sumKind, countKind}]
return ok, val
return val, ok
}

func (a *AVG) ColumnName() string {
return a.alias
return a.avgName
}

// avgAggregator cols就是上面Aggregate的入参cols可以参Aggregate的描述
func avgAggregator[S AggregateElement, C AggregateElement](cols [][]any, sumIndex int, countIndex int) (any, error) {
func avgAggregator[S AggregateElement, C AggregateElement](cols [][]any, sumIndex int, countIndex int) (float64, error) {
var sum S
var count C
for _, col := range cols {
Expand All @@ -75,7 +75,7 @@ func avgAggregator[S AggregateElement, C AggregateElement](cols [][]any, sumInde

}

var avgAggregateFuncMapping = map[[2]reflect.Kind]func([][]any, int, int) (any, error){
var avgAggregateFuncMapping = map[[2]reflect.Kind]func([][]any, int, int) (float64, error){
[2]reflect.Kind{reflect.Int, reflect.Int}: avgAggregator[int, int],
[2]reflect.Kind{reflect.Int, reflect.Int8}: avgAggregator[int, int8],
[2]reflect.Kind{reflect.Int, reflect.Int16}: avgAggregator[int, int16],
Expand Down
38 changes: 19 additions & 19 deletions internal/merger/aggregatemerger/aggregator/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

type Count struct {
countInfo ColumnInfo
alias string
countName string
}

func (s *Count) Aggregate(cols [][]any) (any, error) {
Expand All @@ -32,43 +32,43 @@ func (s *Count) Aggregate(cols [][]any) (any, error) {
return nil, errs.ErrMergerInvalidAggregateColumnIndex
}
kind = reflect.TypeOf(cols[0][countIndex]).Kind()
countFunc, ok := CountAggregateFuncMapping[kind]
countFunc, ok := countAggregateFuncMapping[kind]
if !ok {
return nil, errs.ErrMergerAggregateFuncNotFound
}
return countFunc(cols, s.countInfo.Index)
}

func (s *Count) ColumnName() string {
return s.alias
return s.countName
}

func NewCount(info ColumnInfo, alias string) *Count {
func NewCount(info ColumnInfo) *Count {
return &Count{
countInfo: info,
alias: alias,
countName: info.Name,
}
}

func CountAggregate[T AggregateElement](cols [][]any, countIndex int) (any, error) {
func countAggregate[T AggregateElement](cols [][]any, countIndex int) (any, error) {
var count T
for _, col := range cols {
count += col[countIndex].(T)
}
return count, nil
}

var CountAggregateFuncMapping = map[reflect.Kind]func([][]any, int) (any, error){
reflect.Int: CountAggregate[int],
reflect.Int8: CountAggregate[int8],
reflect.Int16: CountAggregate[int16],
reflect.Int32: CountAggregate[int32],
reflect.Int64: CountAggregate[int64],
reflect.Uint8: CountAggregate[uint8],
reflect.Uint16: CountAggregate[uint16],
reflect.Uint32: CountAggregate[uint32],
reflect.Uint64: CountAggregate[uint64],
reflect.Float32: CountAggregate[float32],
reflect.Float64: CountAggregate[float64],
reflect.Uint: CountAggregate[uint],
var countAggregateFuncMapping = map[reflect.Kind]func([][]any, int) (any, error){
reflect.Int: countAggregate[int],
reflect.Int8: countAggregate[int8],
reflect.Int16: countAggregate[int16],
reflect.Int32: countAggregate[int32],
reflect.Int64: countAggregate[int64],
reflect.Uint8: countAggregate[uint8],
reflect.Uint16: countAggregate[uint16],
reflect.Uint32: countAggregate[uint32],
reflect.Uint64: countAggregate[uint64],
reflect.Float32: countAggregate[float32],
reflect.Float64: countAggregate[float64],
reflect.Uint: countAggregate[uint],
}
2 changes: 1 addition & 1 deletion internal/merger/aggregatemerger/aggregator/count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestCount_Aggregate(t *testing.T) {
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
count := NewCount(NewColInfo(tc.countIndex, "COUNT(id)"), "COUNT(id)")
count := NewCount(NewColInfo(tc.countIndex, "COUNT(id)"))
val, err := count.Aggregate(tc.input)
assert.Equal(t, tc.wantErr, err)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions internal/merger/aggregatemerger/aggregator/max.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

type Max struct {
maxColumnInfo ColumnInfo
alias string
maxName string
}

func (m *Max) Aggregate(cols [][]any) (any, error) {
Expand All @@ -41,13 +41,13 @@ func (m *Max) Aggregate(cols [][]any) (any, error) {
}

func (m *Max) ColumnName() string {
return m.alias
return m.maxName
}

func NewMax(info ColumnInfo, alias string) *Max {
func NewMax(info ColumnInfo) *Max {
return &Max{
maxColumnInfo: info,
alias: alias,
maxName: info.Name,
}
}

Expand All @@ -70,9 +70,9 @@ var maxFuncMapping = map[reflect.Kind]func([][]any, int) (any, error){
reflect.Uint: maxAggregator[uint],
}

type ExtremeValueFunc[T AggregateElement] func(T, T) bool
type extremeValueFunc[T AggregateElement] func(T, T) bool

func findExtremeValue[T AggregateElement](colsData [][]any, extremeValueFunc ExtremeValueFunc[T], index int) (any, error) {
func findExtremeValue[T AggregateElement](colsData [][]any, extremeValueFunc extremeValueFunc[T], index int) (any, error) {
var ans T
for idx, colData := range colsData {
data := colData[index].(T)
Expand Down
2 changes: 1 addition & 1 deletion internal/merger/aggregatemerger/aggregator/max_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestMax_Aggregate(t *testing.T) {
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
max := NewMax(NewColInfo(tc.maxIndex, "MAX(id)"), "MAX(id)")
max := NewMax(NewColInfo(tc.maxIndex, "MAX(id)"))
val, err := max.Aggregate(tc.input)
assert.Equal(t, tc.wantErr, err)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions internal/merger/aggregatemerger/aggregator/min.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

type Min struct {
minColumnInfo ColumnInfo
alias string
minName string
}

func (m *Min) Aggregate(cols [][]any) (any, error) {
Expand All @@ -41,13 +41,13 @@ func (m *Min) Aggregate(cols [][]any) (any, error) {
}

func (m *Min) ColumnName() string {
return m.alias
return m.minName
}

func NewMin(info ColumnInfo, alias string) *Min {
func NewMin(info ColumnInfo) *Min {
return &Min{
minColumnInfo: info,
alias: alias,
minName: info.Name,
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/merger/aggregatemerger/aggregator/min_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestMin_Aggregate(t *testing.T) {
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
min := NewMin(NewColInfo(tc.minIndex, "MIN(id)"), "MIN(id)")
min := NewMin(NewColInfo(tc.minIndex, "MIN(id)"))
val, err := min.Aggregate(tc.input)
assert.Equal(t, tc.wantErr, err)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions internal/merger/aggregatemerger/aggregator/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

type Sum struct {
sumColumnInfo ColumnInfo
alias string
sumName string
}

func (s *Sum) Aggregate(cols [][]any) (any, error) {
Expand All @@ -40,13 +40,13 @@ func (s *Sum) Aggregate(cols [][]any) (any, error) {
}

func (s *Sum) ColumnName() string {
return s.alias
return s.sumName
}

func NewSUM(info ColumnInfo, alias string) *Sum {
func NewSum(info ColumnInfo) *Sum {
return &Sum{
sumColumnInfo: info,
alias: alias,
sumName: info.Name,
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/merger/aggregatemerger/aggregator/sum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestSum_Aggregate(t *testing.T) {
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
sum := NewSUM(NewColInfo(tc.sumIndex, "SUM(id)"), "SUM(id)")
sum := NewSum(NewColInfo(tc.sumIndex, "SUM(id)"))
val, err := sum.Aggregate(tc.input)
assert.Equal(t, tc.wantErr, err)
if err != nil {
Expand Down
Loading