Skip to content

Commit

Permalink
refactor(merger): 去掉无用代码及过期注释,整理代码 (#225)
Browse files Browse the repository at this point in the history
  • Loading branch information
longyue0521 authored Jun 15, 2024
1 parent 88085be commit ac5a4b3
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 426 deletions.
1 change: 1 addition & 0 deletions .CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
- [feat(merger): 定义中立的特征表达数据、定义工厂方法根据特征数据来获取具体的merger](https://github.com/ecodeclub/eorm/pull/222)
- [refactor(merger): 重构AVG函数实现,重构所有rows.Rows实现的ConlumnType方法并添加测试](https://github.com/ecodeclub/eorm/pull/223)
- [feat(merger): 新增Distinct Merger](https://github.com/ecodeclub/eorm/pull/224)
- [refactor(merger): 去掉无用代码及过期注释,整理代码](https://github.com/ecodeclub/eorm/pull/225)
## v0.0.1:
- [Init Project](https://github.com/ecodeclub/eorm/pull/1)
- [Selector Definition](https://github.com/ecodeclub/eorm/pull/2)
Expand Down
143 changes: 66 additions & 77 deletions internal/merger/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ import (
)

var (
ErrInvalidColumnInfo = errors.New("factory: ColumnInfo非法")
ErrEmptyColumnList = errors.New("factory: 列列表为空")
ErrColumnNotFoundInSelectList = errors.New("factory: Select列表中未找到列")
ErrInvalidLimit = errors.New("factory: Limit小于1")
ErrInvalidOffset = errors.New("factory: Offset不等于0")
ErrInvalidFeatures = errors.New("factory: Features非法")
ErrInvalidColumnInfo = errors.New("merger: ColumnInfo非法")
ErrEmptyColumnList = errors.New("merger: 列列表为空")
ErrColumnNotFoundInSelectList = errors.New("merger: Select列表中未找到列")
ErrInvalidLimit = errors.New("merger: Limit小于1")
ErrInvalidOffset = errors.New("merger: Offset不等于0")
ErrInvalidFeatures = errors.New("merger: Features非法")
)

type (
Expand All @@ -62,31 +62,19 @@ type (
)

func (q QuerySpec) Validate() error {

if err := q.validateFeatures(); err != nil {
return err
}

if err := q.validateSelect(); err != nil {
return err
}

if err := q.validateGroupBy(); err != nil {
return err
}

if err := q.validateDistinct(); err != nil {
return err
}

if err := q.validateOrderBy(); err != nil {
return err
}

if err := q.validateLimit(); err != nil {
return err
validateFuncs := []func() error{
q.validateFeatures,
q.validateSelect,
q.validateGroupBy,
q.validateDistinct,
q.validateOrderBy,
q.validateLimit,
}
for _, f := range validateFuncs {
if err := f(); err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -199,6 +187,51 @@ func (q QuerySpec) validateLimit() error {
return nil
}

// New 根据原SQL查询特征、目标SQL查询特征创建、组合merger的工厂方法
func New(origin, target QuerySpec) (merger.Merger, error) {
for _, spec := range []QuerySpec{origin, target} {
if err := spec.Validate(); err != nil {
return nil, err
}
}
var mp = map[query.Feature]newMergerFunc{
query.AggregateFunc: newAggregateMerger,
query.GroupBy: newGroupByMergerWithoutHaving,
query.Distinct: newDistinctMerger,
query.OrderBy: newOrderByMerger,
}
var mergers []merger.Merger
for _, feature := range target.Features {
switch feature {
case query.AggregateFunc, query.GroupBy, query.Distinct, query.OrderBy:
m, err := mp[feature](origin, target)
if err != nil {
return nil, err
}
mergers = append(mergers, m)
case query.Limit:
var prev merger.Merger
if len(mergers) == 0 {
prev = batchmerger.NewMerger()
} else {
prev = mergers[len(mergers)-1]
mergers = mergers[:len(mergers)-1]
}
m, err := pagedmerger.NewMerger(prev, target.Offset, target.Limit)
if err != nil {
return nil, err
}
mergers = append(mergers, m)
default:
return nil, fmt.Errorf("%w: feature: %d", ErrInvalidFeatures, feature)
}
}
if len(mergers) == 0 {
mergers = append(mergers, batchmerger.NewMerger())
}
return &pipeline{mergers: mergers}, nil
}

func newAggregateMerger(origin, target QuerySpec) (merger.Merger, error) {
aggregators := getAggregators(origin, target)
// TODO: 当aggs为空时, 报不相关的错 merger: scan之前需要调用Next
Expand Down Expand Up @@ -262,55 +295,11 @@ func newOrderByMerger(origin, target QuerySpec) (merger.Merger, error) {
return sortmerger.NewMerger(isPreScanAll, columns...)
}

func New(origin, target QuerySpec) (merger.Merger, error) {
for _, spec := range []QuerySpec{origin, target} {
if err := spec.Validate(); err != nil {
return nil, err
}
}
var mp = map[query.Feature]newMergerFunc{
query.AggregateFunc: newAggregateMerger,
query.GroupBy: newGroupByMergerWithoutHaving,
query.Distinct: newDistinctMerger,
query.OrderBy: newOrderByMerger,
}
var mergers []merger.Merger
for _, feature := range target.Features {
switch feature {
case query.AggregateFunc, query.GroupBy, query.Distinct, query.OrderBy:
m, err := mp[feature](origin, target)
if err != nil {
return nil, err
}
mergers = append(mergers, m)
case query.Limit:
var prev merger.Merger
if len(mergers) == 0 {
prev = batchmerger.NewMerger()
} else {
prev = mergers[len(mergers)-1]
mergers = mergers[:len(mergers)-1]
}
m, err := pagedmerger.NewMerger(prev, target.Offset, target.Limit)
if err != nil {
return nil, err
}
mergers = append(mergers, m)
default:
return nil, fmt.Errorf("%w: feature: %d", ErrInvalidFeatures, feature)
}
}
if len(mergers) == 0 {
mergers = append(mergers, batchmerger.NewMerger())
}
return &MergerPipeline{mergers: mergers}, nil
}

type MergerPipeline struct {
type pipeline struct {
mergers []merger.Merger
}

func (m *MergerPipeline) Merge(ctx context.Context, results []rows.Rows) (rows.Rows, error) {
func (m *pipeline) Merge(ctx context.Context, results []rows.Rows) (rows.Rows, error) {
r, err := m.mergers[0].Merge(ctx, results)
if err != nil {
return nil, err
Expand All @@ -327,7 +316,7 @@ func (m *MergerPipeline) Merge(ctx context.Context, results []rows.Rows) (rows.R
return r, nil
}

// NewBatchMerger 仅供sharding_select通过测试使用,后续重构并删掉该方法并只保留上方New方法
// NewBatchMerger 仅供sharding_select.go使用,后续重构后需要删掉该方法并只保留上方New方法
func NewBatchMerger() (merger.Merger, error) {
return batchmerger.NewMerger(), nil
}
Loading

0 comments on commit ac5a4b3

Please sign in to comment.