Skip to content

Commit

Permalink
merger: 使用 sqlx.Scanner 来读取数据 (#216)
Browse files Browse the repository at this point in the history
  • Loading branch information
flycash authored Aug 27, 2023
1 parent b801d2a commit d10f1b7
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 350 deletions.
1 change: 1 addition & 0 deletions .CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
- [eorm: ShardingInserter 修改为表维度执行](https://github.com/ecodeclub/eorm/pull/211)
- [eorm: 分库分表:ShardingUpdater 实现](https://github.com/ecodeclub/eorm/pull/201)
- [eorm: 分库分表:datasource-简单的分布式事务方案支持](https://github.com/ecodeclub/eorm/pull/204)
- [merger: 使用 sqlx.Scanner 来读取数据](https://github.com/ecodeclub/eorm/pull/216)

## v0.0.1:
- [Init Project](https://github.com/ecodeclub/eorm/pull/1)
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
.idea
*.iml
fuzz
go.work
35 changes: 13 additions & 22 deletions internal/merger/aggregatemerger/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ package aggregatemerger
import (
"context"
"database/sql"
"errors"
"sync"
_ "unsafe"

"github.com/ecodeclub/ekit/sqlx"

"github.com/ecodeclub/eorm/internal/merger"

"github.com/ecodeclub/eorm/internal/merger/utils"
Expand Down Expand Up @@ -54,9 +57,8 @@ func (m *Merger) Merge(ctx context.Context, results []*sql.Rows) (merger.Rows, e
return nil, errs.ErrMergerEmptyRows
}
for _, res := range results {
err := m.checkColumns(res)
if err != nil {
return nil, err
if res == nil {
return nil, errs.ErrMergerRowsIsNull
}
}

Expand All @@ -70,12 +72,6 @@ func (m *Merger) Merge(ctx context.Context, results []*sql.Rows) (merger.Rows, e
}, nil

}
func (m *Merger) checkColumns(rows *sql.Rows) error {
if rows == nil {
return errs.ErrMergerRowsIsNull
}
return nil
}

type Rows struct {
rowsList []*sql.Rows
Expand Down Expand Up @@ -149,23 +145,18 @@ func (r *Rows) getSqlRowsData() ([][]any, error) {
}
return rowsData, nil
}
func (r *Rows) getSqlRowData(row *sql.Rows) ([]any, error) {

func (*Rows) getSqlRowData(row *sql.Rows) ([]any, error) {
var colsData []any
var err error
if row.Next() {
colsData, err = utils.Scan(row)
if err != nil {
return nil, err
}
} else {
// sql.Rows迭代过程中发生报错,返回报错
if row.Err() != nil {
return nil, row.Err()
}
scanner, err := sqlx.NewSQLRowsScanner(row)
if err != nil {
return nil, err
}
colsData, err = scanner.Scan()
if errors.Is(err, sqlx.ErrNoMoreRows) {
return nil, errs.ErrMergerAggregateHasEmptyRows
}
return colsData, nil
return colsData, err
}

func (r *Rows) Scan(dest ...any) error {
Expand Down
42 changes: 11 additions & 31 deletions internal/merger/groupby_merger/aggregator_merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"sync"
_ "unsafe"

"github.com/ecodeclub/ekit/slice"

"github.com/ecodeclub/ekit/sqlx"

"github.com/ecodeclub/eorm/internal/merger/utils"
"go.uber.org/multierr"

Expand Down Expand Up @@ -61,11 +65,8 @@ func (a *AggregatorMerger) Merge(ctx context.Context, results []*sql.Rows) (merg
return nil, errs.ErrMergerEmptyRows
}

for _, res := range results {
err := a.checkColumns(res)
if err != nil {
return nil, err
}
if slice.Contains[*sql.Rows](results, nil) {
return nil, errs.ErrMergerRowsIsNull
}
dataMap, dataIndex, err := a.getCols(results)
if err != nil {
Expand All @@ -82,13 +83,6 @@ func (a *AggregatorMerger) Merge(ctx context.Context, results []*sql.Rows) (merg
cur: -1,
cols: a.columnsName,
}, nil

}
func (a *AggregatorMerger) checkColumns(rows *sql.Rows) error {
if rows == nil {
return errs.ErrMergerRowsIsNull
}
return nil
}

func (a *AggregatorMerger) getCols(rowsList []*sql.Rows) (*mapx.TreeMap[Key, [][]any], []Key, error) {
Expand All @@ -98,7 +92,11 @@ func (a *AggregatorMerger) getCols(rowsList []*sql.Rows) (*mapx.TreeMap[Key, [][
}
keys := make([]Key, 0, 16)
for _, res := range rowsList {
colsData, err := a.getCol(res)
scanner, err := sqlx.NewSQLRowsScanner(res)
if err != nil {
return nil, nil, err
}
colsData, err := scanner.ScanAll()
if err != nil {
return nil, nil, err
}
Expand All @@ -108,7 +106,6 @@ func (a *AggregatorMerger) getCols(rowsList []*sql.Rows) (*mapx.TreeMap[Key, [][
key.columnValues = append(key.columnValues, colData[groupByCol.Index])
}
val, ok := treeMap.Get(key)

if ok {
val = append(val, colData)
err = treeMap.Set(key, val)
Expand All @@ -127,23 +124,6 @@ func (a *AggregatorMerger) getCols(rowsList []*sql.Rows) (*mapx.TreeMap[Key, [][
return treeMap, keys, nil
}

func (a *AggregatorMerger) getCol(row *sql.Rows) ([][]any, error) {
ans := make([][]any, 0, 16)
for row.Next() {
colsData, err := utils.Scan(row)
if err != nil {
return nil, err
}
ans = append(ans, colsData)
}
if row.Err() != nil {
return nil, row.Err()
}

return ans, nil

}

type AggregatorRows struct {
rowsList []*sql.Rows
aggregators []aggregator.Aggregator
Expand Down
48 changes: 0 additions & 48 deletions internal/merger/utils/scan.go

This file was deleted.

Loading

0 comments on commit d10f1b7

Please sign in to comment.