Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

restore: update and restore GCLifeTime once when parallel #220

Merged
merged 14 commits into from
Aug 7, 2019
46 changes: 26 additions & 20 deletions lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ func MockDoChecksumCtx() context.Context {
func (s *restoreSuite) TestDoChecksum(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
defer db.Close()

mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E").
WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m"))
Expand All @@ -155,12 +154,14 @@ func (s *restoreSuite) TestDoChecksum(c *C) {
TotalKVs: 7296873,
TotalBytes: 357601387,
})

c.Assert(db.Close(), IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
}

func (s *restoreSuite) TestDoChecksumParallel(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
defer db.Close()

mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E").
WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m"))
Expand All @@ -178,13 +179,17 @@ func (s *restoreSuite) TestDoChecksumParallel(c *C) {
mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E").
WithArgs("10m").
WillReturnResult(sqlmock.NewResult(2, 1))
mock.ExpectClose()

ctx := MockDoChecksumCtx()

// db.Close() will close all connections from its idle pool, set it 1 to expect one close
db.SetMaxIdleConns(1)
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
defer wg.Done()
checksum, err := DoChecksum(ctx, db, "`test`.`t`")
c.Assert(err, IsNil)
c.Assert(*checksum, DeepEquals, RemoteChecksum{
Expand All @@ -194,19 +199,17 @@ func (s *restoreSuite) TestDoChecksumParallel(c *C) {
TotalKVs: 7296873,
TotalBytes: 357601387,
})
wg.Done()
}()
}
wg.Wait()

c.Assert(db.Close(), IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
mock.ExpectClose()
}

func (s *restoreSuite) TestIncreaseGCLifeTimeFail(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
defer db.Close()

for i := 0; i < 5; i++ {
mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E").
Expand All @@ -215,10 +218,11 @@ func (s *restoreSuite) TestIncreaseGCLifeTimeFail(c *C) {
WithArgs("100h0m0s").
WillReturnError(errors.Annotate(context.Canceled, "update gc error"))
}
// This recover GC Life Time SQL should not be executed
// This recover GC Life Time SQL should not be executed in DoChecksum
mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E").
WithArgs("10m").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectClose()

ctx := MockDoChecksumCtx()
var wg sync.WaitGroup
Expand All @@ -232,17 +236,16 @@ func (s *restoreSuite) TestIncreaseGCLifeTimeFail(c *C) {
}
wg.Wait()

// validate no more redundant GC recover
err = mock.ExpectationsWereMet()
c.Assert(err, ErrorMatches, "there is a remaining expectation.*\n.*\n.*\n.*\n.*\n.*\n.*")
_, err = db.Exec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E", "10m")
c.Assert(err, IsNil)

mock.ExpectClose()
c.Assert(db.Close(), IsNil)
err = mock.ExpectationsWereMet()
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *restoreSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
defer db.Close()

mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E").
WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("300h"))
Expand All @@ -251,18 +254,19 @@ func (s *restoreSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) {
mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E").
WithArgs("300h").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectClose()

ctx := MockDoChecksumCtx()
_, err = DoChecksum(ctx, db, "`test`.`t`")
c.Assert(err, ErrorMatches, "compute remote checksum failed: mock syntax error.*")

c.Assert(db.Close(), IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
mock.ExpectClose()
}

func (s *restoreSuite) TestSetSessionConcurrencyVars(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
defer db.Close()

mock.ExpectExec(
`SET\s+`+
Expand All @@ -272,6 +276,7 @@ func (s *restoreSuite) TestSetSessionConcurrencyVars(c *C) {
`SESSION tidb_checksum_table_concurrency = \?`).
WithArgs(123, 456, 789, 543).
WillReturnResult(sqlmock.NewResult(1, 4))
mock.ExpectClose()

ctx := context.Background()
setSessionConcurrencyVars(ctx, db, config.DBStore{
Expand All @@ -281,8 +286,8 @@ func (s *restoreSuite) TestSetSessionConcurrencyVars(c *C) {
ChecksumTableConcurrency: 543,
})

c.Assert(db.Close(), IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
mock.ExpectClose()
}

var _ = Suite(&tableRestoreSuite{})
Expand Down Expand Up @@ -454,7 +459,6 @@ func (s *tableRestoreSuite) TestInitializeColumns(c *C) {
func (s *tableRestoreSuite) TestCompareChecksumSuccess(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
defer db.Close()

mock.ExpectQuery("SELECT.*tikv_gc_life_time.*").
WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m"))
Expand All @@ -469,19 +473,20 @@ func (s *tableRestoreSuite) TestCompareChecksumSuccess(c *C) {
mock.ExpectExec("UPDATE.*tikv_gc_life_time.*").
WithArgs("10m").
WillReturnResult(sqlmock.NewResult(2, 1))
mock.ExpectClose()

ctx := MockDoChecksumCtx()
err = s.tr.compareChecksum(ctx, db, verification.MakeKVChecksum(1234567, 12345, 1234567890))
c.Assert(err, IsNil)

c.Assert(db.Close(), IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
mock.ExpectClose()

}

func (s *tableRestoreSuite) TestCompareChecksumFailure(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
defer db.Close()

mock.ExpectQuery("SELECT.*tikv_gc_life_time.*").
WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m"))
Expand All @@ -496,29 +501,30 @@ func (s *tableRestoreSuite) TestCompareChecksumFailure(c *C) {
mock.ExpectExec("UPDATE.*tikv_gc_life_time.*").
WithArgs("10m").
WillReturnResult(sqlmock.NewResult(2, 1))
mock.ExpectClose()

ctx := MockDoChecksumCtx()
err = s.tr.compareChecksum(ctx, db, verification.MakeKVChecksum(9876543, 54321, 1357924680))
c.Assert(err, ErrorMatches, "checksum mismatched.*")

c.Assert(db.Close(), IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
mock.ExpectClose()
}

func (s *tableRestoreSuite) TestAnalyzeTable(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
defer db.Close()

mock.ExpectExec("ANALYZE TABLE `db`\\.`table`").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectClose()

ctx := context.Background()
err = s.tr.analyzeTable(ctx, db)
c.Assert(err, IsNil)

c.Assert(db.Close(), IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
mock.ExpectClose()
}

func (s *tableRestoreSuite) TestImportKVSuccess(c *C) {
Expand Down