diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index f01a2fffd..6911ae9ed 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -1366,7 +1366,11 @@ func (t *TableRestore) populateChunks(rc *RestoreController, cp *TableCheckpoint Timestamp: timestamp, } if len(chunk.Chunk.Columns) > 0 { - ccp.ColumnPermutation = t.parseColumnPermutations(chunk.Chunk.Columns) + perms, err := t.parseColumnPermutations(chunk.Chunk.Columns) + if err != nil { + return errors.Trace(err) + } + ccp.ColumnPermutation = perms } engine.Chunks = append(engine.Chunks, ccp) } @@ -1394,7 +1398,7 @@ func (t *TableRestore) populateChunks(rc *RestoreController, cp *TableCheckpoint // The column permutation of (d, b, a) is set to be [2, 1, -1, 0]. // // The argument `columns` _must_ be in lower case. -func (t *TableRestore) initializeColumns(columns []string, ccp *ChunkCheckpoint) { +func (t *TableRestore) initializeColumns(columns []string, ccp *ChunkCheckpoint) error { var colPerm []int if len(columns) == 0 { colPerm = make([]int, 0, len(t.tableInfo.Core.Columns)+1) @@ -1408,19 +1412,41 @@ func (t *TableRestore) initializeColumns(columns []string, ccp *ChunkCheckpoint) colPerm = append(colPerm, -1) } } else { - colPerm = t.parseColumnPermutations(columns) + var err error + colPerm, err = t.parseColumnPermutations(columns) + if err != nil { + return errors.Trace(err) + } } ccp.ColumnPermutation = colPerm + return nil } -func (t *TableRestore) parseColumnPermutations(columns []string) []int { +func (t *TableRestore) parseColumnPermutations(columns []string) ([]int, error) { colPerm := make([]int, 0, len(t.tableInfo.Core.Columns)+1) columnMap := make(map[string]int) for i, column := range columns { columnMap[column] = i } + + tableColumnMap := make(map[string]int) + for i, col := range t.tableInfo.Core.Columns { + tableColumnMap[col.Name.L] = i + } + + // check if there are some unknown columns + var unknownCols []string + for _, c := range columns { + if _, ok := tableColumnMap[c]; !ok && c != model.ExtraHandleName.L { + unknownCols = append(unknownCols, c) + } + } + if len(unknownCols) > 0 { + return colPerm, errors.Errorf("unknown columns in header %s", unknownCols) + } + for _, colInfo := range t.tableInfo.Core.Columns { if i, ok := columnMap[colInfo.Name.L]; ok { colPerm = append(colPerm, i) @@ -1438,7 +1464,7 @@ func (t *TableRestore) parseColumnPermutations(columns []string) []int { colPerm = append(colPerm, -1) } - return colPerm + return colPerm, nil } func getColumnNames(tableInfo *model.TableInfo, permutation []int) []string { @@ -1775,7 +1801,9 @@ func (cr *chunkRestore) encodeLoop( case nil: if !initializedColumns { if len(cr.chunk.ColumnPermutation) == 0 { - t.initializeColumns(columnNames, cr.chunk) + if err = t.initializeColumns(columnNames, cr.chunk); err != nil { + return + } } initializedColumns = true } diff --git a/lightning/restore/restore_test.go b/lightning/restore/restore_test.go index ffd9e4e9c..df293dbfe 100644 --- a/lightning/restore/restore_test.go +++ b/lightning/restore/restore_test.go @@ -16,13 +16,14 @@ package restore import ( "context" "fmt" - "github.com/pingcap/tidb-tools/pkg/filter" "io/ioutil" "path/filepath" "sort" "sync" "time" + "github.com/pingcap/tidb-tools/pkg/filter" + "github.com/DATA-DOG/go-sqlmock" "github.com/golang/mock/gomock" . "github.com/pingcap/check" @@ -265,7 +266,7 @@ func (s *restoreSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) { var _ = Suite(&tableRestoreSuite{}) -type tableRestoreSuite struct { +type tableRestoreSuiteBase struct { tr *TableRestore cfg *config.Config @@ -274,7 +275,11 @@ type tableRestoreSuite struct { tableMeta *mydump.MDTableMeta } -func (s *tableRestoreSuite) SetUpSuite(c *C) { +type tableRestoreSuite struct { + tableRestoreSuiteBase +} + +func (s *tableRestoreSuiteBase) SetUpSuite(c *C) { // Produce a mock table info p := parser.New() @@ -313,6 +318,12 @@ func (s *tableRestoreSuite) SetUpSuite(c *C) { fakeDataFiles = append(fakeDataFiles, mydump.FileInfo{TableName: filter.Table{"db", "table"}, FileMeta: mydump.SourceFileMeta{Path: fakeDataPath, Type: mydump.SourceTypeSQL, SortKey: fmt.Sprintf("%d", i)}, Size: 37}) } + fakeCsvContent := []byte("1,2,3\r\n4,5,6\r\n") + fakeDataPath := filepath.Join(fakeDataDir, "db.table.99.csv") + err = ioutil.WriteFile(fakeDataPath, fakeCsvContent, 0644) + c.Assert(err, IsNil) + fakeDataFiles = append(fakeDataFiles, mydump.FileInfo{TableName: filter.Table{"db", "table"}, FileMeta: mydump.SourceFileMeta{Path: fakeDataPath, Type: mydump.SourceTypeCSV, SortKey: "99"}, Size: 14}) + s.tableMeta = &mydump.MDTableMeta{ DB: "db", Name: "table", @@ -322,7 +333,7 @@ func (s *tableRestoreSuite) SetUpSuite(c *C) { } } -func (s *tableRestoreSuite) SetUpTest(c *C) { +func (s *tableRestoreSuiteBase) SetUpTest(c *C) { // Collect into the test TableRestore structure var err error s.tr, err = NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, s.tableInfo, &TableCheckpoint{}) @@ -344,7 +355,6 @@ func (s *tableRestoreSuite) TestPopulateChunks(c *C) { rc := &RestoreController{cfg: s.cfg, ioWorkers: worker.NewPool(context.Background(), 1, "io")} err := s.tr.populateChunks(rc, cp) c.Assert(err, IsNil) - c.Assert(cp.Engines, DeepEquals, map[int32]*EngineCheckpoint{ -1: { Status: CheckpointStatusLoaded, @@ -425,25 +435,62 @@ func (s *tableRestoreSuite) TestPopulateChunks(c *C) { }, }, }, + 2: { + Status: CheckpointStatusLoaded, + Chunks: []*ChunkCheckpoint{ + { + Key: ChunkCheckpointKey{Path: s.tr.tableMeta.DataFiles[6].FileMeta.Path, Offset: 0}, + FileMeta: s.tr.tableMeta.DataFiles[6].FileMeta, + Chunk: mydump.Chunk{ + Offset: 0, + EndOffset: 14, + PrevRowIDMax: 42, + RowIDMax: 46, + }, + Timestamp: 1234567897, + }, + }, + }, }) + + // set csv header to true, this will cause check columns fail + s.cfg.Mydumper.CSV.Header = true + s.cfg.Mydumper.StrictFormat = true + regionSize := s.cfg.Mydumper.MaxRegionSize + s.cfg.Mydumper.MaxRegionSize = 5 + err = s.tr.populateChunks(rc, cp) + c.Assert(err, NotNil) + c.Assert(err, ErrorMatches, `.*unknown columns in header \[1 2 3\]`) + s.cfg.Mydumper.MaxRegionSize = regionSize + s.cfg.Mydumper.CSV.Header = false } func (s *tableRestoreSuite) TestInitializeColumns(c *C) { ccp := &ChunkCheckpoint{} - s.tr.initializeColumns(nil, ccp) + c.Assert(s.tr.initializeColumns(nil, ccp), IsNil) c.Assert(ccp.ColumnPermutation, DeepEquals, []int{0, 1, 2, -1}) ccp.ColumnPermutation = nil - s.tr.initializeColumns([]string{"b", "c", "a"}, ccp) + c.Assert(s.tr.initializeColumns([]string{"b", "c", "a"}, ccp), IsNil) c.Assert(ccp.ColumnPermutation, DeepEquals, []int{2, 0, 1, -1}) ccp.ColumnPermutation = nil - s.tr.initializeColumns([]string{"b"}, ccp) + c.Assert(s.tr.initializeColumns([]string{"b"}, ccp), IsNil) c.Assert(ccp.ColumnPermutation, DeepEquals, []int{-1, 0, -1, -1}) ccp.ColumnPermutation = nil - s.tr.initializeColumns([]string{"_tidb_rowid", "b", "a", "c"}, ccp) + c.Assert(s.tr.initializeColumns([]string{"_tidb_rowid", "b", "a", "c"}, ccp), IsNil) c.Assert(ccp.ColumnPermutation, DeepEquals, []int{2, 1, 3, 0}) + + ccp.ColumnPermutation = nil + err := s.tr.initializeColumns([]string{"_tidb_rowid", "b", "a", "c", "d"}, ccp) + c.Assert(err, NotNil) + c.Assert(err, ErrorMatches, `unknown columns in header \[d\]`) + + ccp.ColumnPermutation = nil + err = s.tr.initializeColumns([]string{"e", "b", "c", "d"}, ccp) + c.Assert(err, NotNil) + c.Assert(err, ErrorMatches, `unknown columns in header \[e d\]`) } func (s *tableRestoreSuite) TestCompareChecksumSuccess(c *C) { @@ -567,12 +614,12 @@ func (s *tableRestoreSuite) TestImportKVFailure(c *C) { var _ = Suite(&chunkRestoreSuite{}) type chunkRestoreSuite struct { - tableRestoreSuite + tableRestoreSuiteBase cr *chunkRestore } func (s *chunkRestoreSuite) SetUpTest(c *C) { - s.tableRestoreSuite.SetUpTest(c) + s.tableRestoreSuiteBase.SetUpTest(c) ctx := context.Background() w := worker.NewPool(ctx, 5, "io")