Skip to content

Commit

Permalink
merger功能添加
Browse files Browse the repository at this point in the history
  • Loading branch information
juniaoshaonian committed Feb 28, 2023
1 parent b22d20b commit 5a888b0
Show file tree
Hide file tree
Showing 4 changed files with 330 additions and 0 deletions.
2 changes: 2 additions & 0 deletions internal/errs/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ var (
ErrSlaveNotFound = errors.New("eorm: slave不存在")
// ErrGetSlavesFromDNS 从dns获取slave列表失败
ErrGetSlavesFromDNS = errors.New("eorm: 从DNS获取slaves失败")
ErrMergerEmptyRows = errors.New("eorm: sql.Rows列表为空")
ErrMergerRowsIsNull = errors.New("eorm: sql.Rows列表中有元素为nil")
)

func NewFieldConflictError(field string) error {
Expand Down
101 changes: 101 additions & 0 deletions internal/merger/batchmerger/merger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2021 ecodeclub
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package batchmerger

import (
"context"
"database/sql"
"github.com/ecodeclub/eorm/internal/errs"
"github.com/ecodeclub/eorm/internal/merger"
"sync"
)

type Merger struct{}

func (m Merger) Merge(ctx context.Context, results []*sql.Rows) (merger.Rows, error) {
if len(results) == 0 {
return nil, errs.ErrMergerEmptyRows
}
for i := 0; i < len(results); i++ {
if results[i] == nil {
return nil, errs.ErrMergerRowsIsNull
}
}
return &MergerRows{
rows: results,
mu: &sync.RWMutex{},
}, nil
}

type MergerRows struct {
rows []*sql.Rows
cnt int
mu *sync.RWMutex
once sync.Once
}

func (m *MergerRows) Next() bool {
m.mu.RLock()
if m.cnt >= len(m.rows) {
m.mu.RUnlock()
return false
}
if ok := m.rows[m.cnt].Next(); ok {
m.mu.RUnlock()
return ok
}
m.mu.RUnlock()
m.mu.Lock()
defer m.mu.Unlock()
if m.cnt >= len(m.rows) {
return false
}
if ok := m.rows[m.cnt].Next(); ok {
return ok
}
m.cnt++
if m.cnt < len(m.rows) {
if ok := m.rows[m.cnt].Next(); ok {
return ok
}
}
return false

}

func (m *MergerRows) Scan(dest ...any) error {
m.mu.RLock()
defer m.mu.RUnlock()
return m.rows[m.cnt].Scan(dest...)

}

func (m *MergerRows) Close() error {
var err error
m.once.Do(func() {
for i := 0; i < len(m.rows); i++ {
row := m.rows[i]
err = row.Close()
if err != nil {
return
}
}
})
return err
}

func (m *MergerRows) Columns() ([]string, error) {
return m.rows[0].Columns()
}
196 changes: 196 additions & 0 deletions internal/merger/batchmerger/merger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
// Copyright 2021 ecodeclub
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package batchmerger

import (
"context"
"database/sql"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/ecodeclub/eorm/internal/errs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

type MergerSuite struct {
suite.Suite
mockDB01 *sql.DB
mock01 sqlmock.Sqlmock
mockDB02 *sql.DB
mock02 sqlmock.Sqlmock
mockDB03 *sql.DB
mock03 sqlmock.Sqlmock
}

func (ms *MergerSuite) SetupTest() {
t := ms.T()
ms.initMock(t)
}

func (ms *MergerSuite) TearDownTest() {
_ = ms.mockDB01.Close()
_ = ms.mockDB02.Close()
_ = ms.mockDB03.Close()
}

func (ms *MergerSuite) initMock(t *testing.T) {
var err error
ms.mockDB01, ms.mock01, err = sqlmock.New()
if err != nil {
t.Fatal(err)
}
ms.mockDB02, ms.mock02, err = sqlmock.New()
if err != nil {
t.Fatal(err)
}
ms.mockDB03, ms.mock03, err = sqlmock.New()
if err != nil {
t.Fatal(err)
}
}

func (ms *MergerSuite) TestMerger_NextandScan() {
ms.mock01.ExpectQuery("SELECT *").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("1"))
ms.mock02.ExpectQuery("SELECT *").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("2"))
ms.mock03.ExpectQuery("SELECT *").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("3").AddRow("4"))
testCases := []struct {
name string
sqlRows func() []*sql.Rows
wantVal []string
wantErr error
scanErr error
}{
{
name: "multi rows",
sqlRows: func() []*sql.Rows {
res := make([]*sql.Rows, 0, 3)
row01, _ := ms.mockDB01.QueryContext(context.Background(), "SELECT * FROM `t1`;")
res = append(res, row01)
row02, _ := ms.mockDB02.QueryContext(context.Background(), "SELECT * FROM `t1`;")
res = append(res, row02)
row03, _ := ms.mockDB03.QueryContext(context.Background(), "SELECT * FROM `t1`;")
res = append(res, row03)
return res
},
wantVal: []string{"1", "2", "3", "4"},
},
{
name: "empty rows",
sqlRows: func() []*sql.Rows {
return []*sql.Rows{}
},
wantErr: errs.ErrMergerEmptyRows,
},
{
name: "nil sqlrows",
sqlRows: func() []*sql.Rows {
return []*sql.Rows{nil}
},
wantErr: errs.ErrMergerRowsIsNull,
},
}
for _, tc := range testCases {
ms.T().Run(tc.name, func(t *testing.T) {
merger := Merger{}
rows, err := merger.Merge(context.Background(), tc.sqlRows())
assert.Equal(t, tc.wantErr, err)
if err != nil {
return
}
res := make([]string, 0, 4)
for rows.Next() {
var id string
err = rows.Scan(&id)
assert.Equal(t, tc.scanErr, err)
if err != nil {
return
}
res = append(res, id)
}
assert.Equal(t, tc.wantVal, res)
})
}

}
func (ms *MergerSuite) TestClose() {
ms.mock01.ExpectQuery("SELECT *").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("1"))
ms.mock02.ExpectQuery("SELECT *").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("2"))
ms.mock03.ExpectQuery("SELECT *").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("3").AddRow("4"))
ms.T().Run("close", func(t *testing.T) {
merger := Merger{}
res := make([]*sql.Rows, 0, 3)
row01, _ := ms.mockDB01.QueryContext(context.Background(), "SELECT * FROM `t1`;")
res = append(res, row01)
row02, _ := ms.mockDB02.QueryContext(context.Background(), "SELECT * FROM `t1`;")
res = append(res, row02)
row03, _ := ms.mockDB03.QueryContext(context.Background(), "SELECT * FROM `t1`;")
res = append(res, row03)
rows, err := merger.Merge(context.Background(), res)
require.NoError(t, err)
if err != nil {
return
}
err = rows.Close()
require.NoError(t, err)
})
}
func (ms *MergerSuite) TestColumns() {
ms.mock01.ExpectQuery("SELECT *").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("1"))
ms.mock02.ExpectQuery("SELECT *").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("2"))
ms.mock03.ExpectQuery("SELECT *").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("3").AddRow("4"))
testCases := []struct {
name string
wantCols []string
sqlRows func() []*sql.Rows
}{
{
name: "Columns",
wantCols: []string{"id"},
sqlRows: func() []*sql.Rows {
res := make([]*sql.Rows, 0, 3)
row01, _ := ms.mockDB01.QueryContext(context.Background(), "SELECT * FROM `t1`;")
res = append(res, row01)
row02, _ := ms.mockDB02.QueryContext(context.Background(), "SELECT * FROM `t1`;")
res = append(res, row02)
row03, _ := ms.mockDB03.QueryContext(context.Background(), "SELECT * FROM `t1`;")
res = append(res, row03)
return res
},
},
}
for _, tc := range testCases {
ms.T().Run(tc.name, func(t *testing.T) {
merger := Merger{}
rows, err := merger.Merge(context.Background(), tc.sqlRows())
require.NoError(ms.T(), err)
if err != nil {
return
}
cols, err := rows.Columns()
require.NoError(ms.T(), err)
if err != nil {
return
}
assert.Equal(t, tc.wantCols, cols)
})

}
}

func TestMerger(t *testing.T) {
suite.Run(t, &MergerSuite{})
}
31 changes: 31 additions & 0 deletions internal/merger/type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2021 ecodeclub
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package merger

import (
"context"
"database/sql"
)

type Merger interface {
Merge(ctx context.Context, results []*sql.Rows) (Rows, error)
}

type Rows interface {
Next() bool
Scan(dest ...any) error
Close() error
Columns() ([]string, error)
}

0 comments on commit 5a888b0

Please sign in to comment.