Skip to content

Commit

Permalink
分库分表:merger 分页实现 (#175)
Browse files Browse the repository at this point in the history

Co-authored-by: Ming Deng <mingflycash@gmail.com>
  • Loading branch information
juniaoshaonian and flycash authored Mar 27, 2023
1 parent 508dbc1 commit a97835a
Show file tree
Hide file tree
Showing 4 changed files with 781 additions and 6 deletions.
1 change: 1 addition & 0 deletions .CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- [eorm: 分库分表: Merger抽象与批量查询实现](https://github.com/ecodeclub/eorm/pull/160)
- [eorm: 增强的 ShardingAlgorithm 设计与实现](https://github.com/ecodeclub/eorm/pull/161)
- [eorm: 分库分表: Merger排序实现](https://github.com/ecodeclub/eorm/pull/166)
- [eorm: 分库分表: Merger分页实现](https://github.com/ecodeclub/eorm/pull/175)
- [eorm: BasicTypeValue重命名](https://github.com/ecodeclub/eorm/pull/177)
- [eorm: 分库分表: hash、shadow_hash算法不符合预期](https://github.com/ecodeclub/eorm/pull/174)

Expand Down
13 changes: 7 additions & 6 deletions internal/merger/internal/errs/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import (
)

var (
ErrEmptySortColumns = errors.New("merger: 排序列为空")
ErrMergerEmptyRows = errors.New("merger: sql.Rows列表为空")
ErrMergerRowsIsNull = errors.New("merger: sql.Rows列表中有元素为nil")
ErrMergerScanNotNext = errors.New("merger: Scan之前没有调用Next方法")
ErrMergerRowsClosed = errors.New("merger: Rows已经关闭")
ErrMergerRowsDiff = errors.New("merger: sql.Rows列表中的字段不同")
ErrEmptySortColumns = errors.New("merger: 排序列为空")
ErrMergerEmptyRows = errors.New("merger: sql.Rows列表为空")
ErrMergerRowsIsNull = errors.New("merger: sql.Rows列表中有元素为nil")
ErrMergerScanNotNext = errors.New("merger: Scan之前没有调用Next方法")
ErrMergerRowsClosed = errors.New("merger: Rows已经关闭")
ErrMergerRowsDiff = errors.New("merger: sql.Rows列表中的字段不同")
ErrMergerInvalidLimitOrOffset = errors.New("merger: offset或limit小于0")
)

func NewRepeatSortColumn(column string) error {
Expand Down
148 changes: 148 additions & 0 deletions internal/merger/pagedmerger/merger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright 2021 ecodeclub
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pagedmerger

import (
"context"
"database/sql"
"sync"

"github.com/ecodeclub/eorm/internal/merger"
"github.com/ecodeclub/eorm/internal/merger/internal/errs"
)

type Merger struct {
m merger.Merger
limit int
offset int
}

func NewMerger(m merger.Merger, offset int, limit int) (*Merger, error) {
if offset < 0 || limit <= 0 {
return nil, errs.ErrMergerInvalidLimitOrOffset
}

return &Merger{
m: m,
limit: limit,
offset: offset,
}, nil
}

func (m *Merger) Merge(ctx context.Context, results []*sql.Rows) (merger.Rows, error) {
rows, err := m.m.Merge(ctx, results)
if err != nil {
return nil, err
}
err = m.nextOffset(ctx, rows)
if err != nil {
return nil, err
}
return &Rows{
rows: rows,
mu: &sync.RWMutex{},
limit: m.limit,
}, nil
}

// nextOffset 会把游标挪到 offset 所指定的位置。
func (m *Merger) nextOffset(ctx context.Context, rows merger.Rows) error {
offset := m.offset
for i := 0; i < offset; i++ {
if ctx.Err() != nil {
return ctx.Err()
}
// 如果偏移量超过rows结果集返回的行数,不会报错。用户最终查到0行
if !rows.Next() {
return rows.Err()
}
}
return nil
}

type Rows struct {
rows merger.Rows
limit int
cnt int
lastErr error
closed bool
mu *sync.RWMutex
}

func (r *Rows) Next() bool {
r.mu.Lock()
if r.closed {
r.mu.Unlock()
return false
}
if r.cnt >= r.limit || r.lastErr != nil {
r.mu.Unlock()
_ = r.Close()
return false
}
canNext, err := r.nextVal()
if err != nil {
r.lastErr = err
r.mu.Unlock()
_ = r.Close()
return false
}
if !canNext {
r.mu.Unlock()
_ = r.Close()
return canNext
}
r.cnt++
r.mu.Unlock()
return canNext
}
func (r *Rows) nextVal() (bool, error) {
if r.rows.Next() {
return true, nil
}
if r.rows.Err() != nil {
return false, r.rows.Err()
}
return false, nil
}

func (r *Rows) Scan(dest ...any) error {
r.mu.RLock()
defer r.mu.RUnlock()
if r.lastErr != nil {
return r.lastErr
}
if r.closed {
return errs.ErrMergerRowsClosed
}
return r.rows.Scan(dest...)
}

func (r *Rows) Close() error {
r.mu.Lock()
defer r.mu.Unlock()
r.closed = true
return r.rows.Close()
}

func (r *Rows) Columns() ([]string, error) {
return r.rows.Columns()
}

func (r *Rows) Err() error {
r.mu.RLock()
defer r.mu.RUnlock()
return r.lastErr
}
Loading

0 comments on commit a97835a

Please sign in to comment.