Skip to content

Commit

Permalink
restore: check header columns (pingcap#372)
Browse files Browse the repository at this point in the history
* check csv header columns

* resolve comments

* fix test

* fix unit test
  • Loading branch information
glorv authored Aug 24, 2020
1 parent daa94d0 commit 417d55f
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 17 deletions.
40 changes: 34 additions & 6 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1366,7 +1366,11 @@ func (t *TableRestore) populateChunks(rc *RestoreController, cp *TableCheckpoint
Timestamp: timestamp,
}
if len(chunk.Chunk.Columns) > 0 {
ccp.ColumnPermutation = t.parseColumnPermutations(chunk.Chunk.Columns)
perms, err := t.parseColumnPermutations(chunk.Chunk.Columns)
if err != nil {
return errors.Trace(err)
}
ccp.ColumnPermutation = perms
}
engine.Chunks = append(engine.Chunks, ccp)
}
Expand Down Expand Up @@ -1394,7 +1398,7 @@ func (t *TableRestore) populateChunks(rc *RestoreController, cp *TableCheckpoint
// The column permutation of (d, b, a) is set to be [2, 1, -1, 0].
//
// The argument `columns` _must_ be in lower case.
func (t *TableRestore) initializeColumns(columns []string, ccp *ChunkCheckpoint) {
func (t *TableRestore) initializeColumns(columns []string, ccp *ChunkCheckpoint) error {
var colPerm []int
if len(columns) == 0 {
colPerm = make([]int, 0, len(t.tableInfo.Core.Columns)+1)
Expand All @@ -1408,19 +1412,41 @@ func (t *TableRestore) initializeColumns(columns []string, ccp *ChunkCheckpoint)
colPerm = append(colPerm, -1)
}
} else {
colPerm = t.parseColumnPermutations(columns)
var err error
colPerm, err = t.parseColumnPermutations(columns)
if err != nil {
return errors.Trace(err)
}
}

ccp.ColumnPermutation = colPerm
return nil
}

func (t *TableRestore) parseColumnPermutations(columns []string) []int {
func (t *TableRestore) parseColumnPermutations(columns []string) ([]int, error) {
colPerm := make([]int, 0, len(t.tableInfo.Core.Columns)+1)

columnMap := make(map[string]int)
for i, column := range columns {
columnMap[column] = i
}

tableColumnMap := make(map[string]int)
for i, col := range t.tableInfo.Core.Columns {
tableColumnMap[col.Name.L] = i
}

// check if there are some unknown columns
var unknownCols []string
for _, c := range columns {
if _, ok := tableColumnMap[c]; !ok && c != model.ExtraHandleName.L {
unknownCols = append(unknownCols, c)
}
}
if len(unknownCols) > 0 {
return colPerm, errors.Errorf("unknown columns in header %s", unknownCols)
}

for _, colInfo := range t.tableInfo.Core.Columns {
if i, ok := columnMap[colInfo.Name.L]; ok {
colPerm = append(colPerm, i)
Expand All @@ -1438,7 +1464,7 @@ func (t *TableRestore) parseColumnPermutations(columns []string) []int {
colPerm = append(colPerm, -1)
}

return colPerm
return colPerm, nil
}

func getColumnNames(tableInfo *model.TableInfo, permutation []int) []string {
Expand Down Expand Up @@ -1775,7 +1801,9 @@ func (cr *chunkRestore) encodeLoop(
case nil:
if !initializedColumns {
if len(cr.chunk.ColumnPermutation) == 0 {
t.initializeColumns(columnNames, cr.chunk)
if err = t.initializeColumns(columnNames, cr.chunk); err != nil {
return
}
}
initializedColumns = true
}
Expand Down
69 changes: 58 additions & 11 deletions lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ package restore
import (
"context"
"fmt"
"github.com/pingcap/tidb-tools/pkg/filter"
"io/ioutil"
"path/filepath"
"sort"
"sync"
"time"

"github.com/pingcap/tidb-tools/pkg/filter"

"github.com/DATA-DOG/go-sqlmock"
"github.com/golang/mock/gomock"
. "github.com/pingcap/check"
Expand Down Expand Up @@ -265,7 +266,7 @@ func (s *restoreSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) {

var _ = Suite(&tableRestoreSuite{})

type tableRestoreSuite struct {
type tableRestoreSuiteBase struct {
tr *TableRestore
cfg *config.Config

Expand All @@ -274,7 +275,11 @@ type tableRestoreSuite struct {
tableMeta *mydump.MDTableMeta
}

func (s *tableRestoreSuite) SetUpSuite(c *C) {
type tableRestoreSuite struct {
tableRestoreSuiteBase
}

func (s *tableRestoreSuiteBase) SetUpSuite(c *C) {
// Produce a mock table info

p := parser.New()
Expand Down Expand Up @@ -313,6 +318,12 @@ func (s *tableRestoreSuite) SetUpSuite(c *C) {
fakeDataFiles = append(fakeDataFiles, mydump.FileInfo{TableName: filter.Table{"db", "table"}, FileMeta: mydump.SourceFileMeta{Path: fakeDataPath, Type: mydump.SourceTypeSQL, SortKey: fmt.Sprintf("%d", i)}, Size: 37})
}

fakeCsvContent := []byte("1,2,3\r\n4,5,6\r\n")
fakeDataPath := filepath.Join(fakeDataDir, "db.table.99.csv")
err = ioutil.WriteFile(fakeDataPath, fakeCsvContent, 0644)
c.Assert(err, IsNil)
fakeDataFiles = append(fakeDataFiles, mydump.FileInfo{TableName: filter.Table{"db", "table"}, FileMeta: mydump.SourceFileMeta{Path: fakeDataPath, Type: mydump.SourceTypeCSV, SortKey: "99"}, Size: 14})

s.tableMeta = &mydump.MDTableMeta{
DB: "db",
Name: "table",
Expand All @@ -322,7 +333,7 @@ func (s *tableRestoreSuite) SetUpSuite(c *C) {
}
}

func (s *tableRestoreSuite) SetUpTest(c *C) {
func (s *tableRestoreSuiteBase) SetUpTest(c *C) {
// Collect into the test TableRestore structure
var err error
s.tr, err = NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, s.tableInfo, &TableCheckpoint{})
Expand All @@ -344,7 +355,6 @@ func (s *tableRestoreSuite) TestPopulateChunks(c *C) {
rc := &RestoreController{cfg: s.cfg, ioWorkers: worker.NewPool(context.Background(), 1, "io")}
err := s.tr.populateChunks(rc, cp)
c.Assert(err, IsNil)

c.Assert(cp.Engines, DeepEquals, map[int32]*EngineCheckpoint{
-1: {
Status: CheckpointStatusLoaded,
Expand Down Expand Up @@ -425,25 +435,62 @@ func (s *tableRestoreSuite) TestPopulateChunks(c *C) {
},
},
},
2: {
Status: CheckpointStatusLoaded,
Chunks: []*ChunkCheckpoint{
{
Key: ChunkCheckpointKey{Path: s.tr.tableMeta.DataFiles[6].FileMeta.Path, Offset: 0},
FileMeta: s.tr.tableMeta.DataFiles[6].FileMeta,
Chunk: mydump.Chunk{
Offset: 0,
EndOffset: 14,
PrevRowIDMax: 42,
RowIDMax: 46,
},
Timestamp: 1234567897,
},
},
},
})

// set csv header to true, this will cause check columns fail
s.cfg.Mydumper.CSV.Header = true
s.cfg.Mydumper.StrictFormat = true
regionSize := s.cfg.Mydumper.MaxRegionSize
s.cfg.Mydumper.MaxRegionSize = 5
err = s.tr.populateChunks(rc, cp)
c.Assert(err, NotNil)
c.Assert(err, ErrorMatches, `.*unknown columns in header \[1 2 3\]`)
s.cfg.Mydumper.MaxRegionSize = regionSize
s.cfg.Mydumper.CSV.Header = false
}

func (s *tableRestoreSuite) TestInitializeColumns(c *C) {
ccp := &ChunkCheckpoint{}
s.tr.initializeColumns(nil, ccp)
c.Assert(s.tr.initializeColumns(nil, ccp), IsNil)
c.Assert(ccp.ColumnPermutation, DeepEquals, []int{0, 1, 2, -1})

ccp.ColumnPermutation = nil
s.tr.initializeColumns([]string{"b", "c", "a"}, ccp)
c.Assert(s.tr.initializeColumns([]string{"b", "c", "a"}, ccp), IsNil)
c.Assert(ccp.ColumnPermutation, DeepEquals, []int{2, 0, 1, -1})

ccp.ColumnPermutation = nil
s.tr.initializeColumns([]string{"b"}, ccp)
c.Assert(s.tr.initializeColumns([]string{"b"}, ccp), IsNil)
c.Assert(ccp.ColumnPermutation, DeepEquals, []int{-1, 0, -1, -1})

ccp.ColumnPermutation = nil
s.tr.initializeColumns([]string{"_tidb_rowid", "b", "a", "c"}, ccp)
c.Assert(s.tr.initializeColumns([]string{"_tidb_rowid", "b", "a", "c"}, ccp), IsNil)
c.Assert(ccp.ColumnPermutation, DeepEquals, []int{2, 1, 3, 0})

ccp.ColumnPermutation = nil
err := s.tr.initializeColumns([]string{"_tidb_rowid", "b", "a", "c", "d"}, ccp)
c.Assert(err, NotNil)
c.Assert(err, ErrorMatches, `unknown columns in header \[d\]`)

ccp.ColumnPermutation = nil
err = s.tr.initializeColumns([]string{"e", "b", "c", "d"}, ccp)
c.Assert(err, NotNil)
c.Assert(err, ErrorMatches, `unknown columns in header \[e d\]`)
}

func (s *tableRestoreSuite) TestCompareChecksumSuccess(c *C) {
Expand Down Expand Up @@ -567,12 +614,12 @@ func (s *tableRestoreSuite) TestImportKVFailure(c *C) {
var _ = Suite(&chunkRestoreSuite{})

type chunkRestoreSuite struct {
tableRestoreSuite
tableRestoreSuiteBase
cr *chunkRestore
}

func (s *chunkRestoreSuite) SetUpTest(c *C) {
s.tableRestoreSuite.SetUpTest(c)
s.tableRestoreSuiteBase.SetUpTest(c)

ctx := context.Background()
w := worker.NewPool(ctx, 5, "io")
Expand Down

0 comments on commit 417d55f

Please sign in to comment.