From 69f68bf2270abefa62ae095670cbd6fcf926d15c Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 28 Jun 2021 14:49:26 +0800 Subject: [PATCH 1/4] fix next key --- pkg/lightning/backend/local/local.go | 9 +++++++-- pkg/lightning/backend/local/local_test.go | 8 +++++++- pkg/lightning/backend/local/localhelper.go | 4 ++-- tests/lightning_various_types/run.sh | 4 ++++ 4 files changed, 20 insertions(+), 5 deletions(-) diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index d4a08ca8a..45d32782f 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -1974,7 +1974,7 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID) erro } log.L().Info("start import engine", zap.Stringer("uuid", engineUUID), - zap.Int("ranges", len(ranges))) + zap.Int("ranges", len(ranges)), zap.Int64("count", lfLength), zap.Int64("size", lfTotalSize)) for { unfinishedRanges := lf.unfinishedRanges(ranges) if len(unfinishedRanges) == 0 { @@ -2382,7 +2382,12 @@ func nextKey(key []byte) []byte { // See: https://github.com/tikv/tikv/blob/f7f22f70e1585d7ca38a59ea30e774949160c3e8/components/raftstore/src/coprocessor/split_observer.rs#L36-L41 if tablecodec.IsRecordKey(key) { tableID, handle, _ := tablecodec.DecodeRecordKey(key) - return tablecodec.EncodeRowKeyWithHandle(tableID, handle.Next()) + nextHandle := handle.Next() + // int handle overflow, use the next table prefix as nextKey + if nextHandle.Compare(handle) <= 0 { + return tablecodec.EncodeTablePrefix(tableID + 1) + } + return tablecodec.EncodeRowKeyWithHandle(tableID, nextHandle) } // if key is an index, directly append a 0x00 to the key. diff --git a/pkg/lightning/backend/local/local_test.go b/pkg/lightning/backend/local/local_test.go index bb72dc192..11ac2c1d6 100644 --- a/pkg/lightning/backend/local/local_test.go +++ b/pkg/lightning/backend/local/local_test.go @@ -81,11 +81,17 @@ func (s *localSuite) TestNextKey(c *C) { // test recode key // key with int handle - for _, handleID := range []int64{1, 255, math.MaxInt32} { + for _, handleID := range []int64{math.MinInt64, 1, 255, math.MaxInt32 - 1} { key := tablecodec.EncodeRowKeyWithHandle(1, tidbkv.IntHandle(handleID)) c.Assert(nextKey(key), DeepEquals, []byte(tablecodec.EncodeRowKeyWithHandle(1, tidbkv.IntHandle(handleID+1)))) } + // overflowed + key := tablecodec.EncodeRowKeyWithHandle(1, tidbkv.IntHandle(math.MaxInt64)) + next = tablecodec.EncodeTablePrefix(2) + c.Assert(bytes.Compare(key, next) < 0, IsTrue) + c.Assert(nextKey(key), DeepEquals, next) + testDatums := [][]types.Datum{ {types.NewIntDatum(1), types.NewIntDatum(2)}, {types.NewIntDatum(255), types.NewIntDatum(256)}, diff --git a/pkg/lightning/backend/local/localhelper.go b/pkg/lightning/backend/local/localhelper.go index 6014a7e39..e2df6a222 100644 --- a/pkg/lightning/backend/local/localhelper.go +++ b/pkg/lightning/backend/local/localhelper.go @@ -365,10 +365,10 @@ func paginateScanRegion( ctx context.Context, client split.SplitClient, startKey, endKey []byte, limit int, ) ([]*split.RegionInfo, error) { if len(endKey) != 0 && bytes.Compare(startKey, endKey) >= 0 { - log.L().Error("startKey > endKey when paginating scan region", + log.L().Error("startKey >= endKey when paginating scan region", logutil.Key("startKey", startKey), logutil.Key("endKey", endKey)) - return nil, errors.Errorf("startKey > endKey when paginating scan region") + return nil, errors.Errorf("startKey >= endKey when paginating scan region") } var regions []*split.RegionInfo diff --git a/tests/lightning_various_types/run.sh b/tests/lightning_various_types/run.sh index fa0e72038..5e8dec1ba 100755 --- a/tests/lightning_various_types/run.sh +++ b/tests/lightning_various_types/run.sh @@ -110,4 +110,8 @@ for BACKEND in importer tidb local; do check_contains 'c: 99999999999999999999.0' check_contains 'd: 1.8446744073709552e19' + run_sql 'SELECT count(*), sum(id) FROM vt.bigint' + check_contains 'count(*): 6' + check_contains 'sum(id): -3341565093352173667' + done From 46385b4e7d3e978ae8c7cbb1391d66d24cce92f5 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 28 Jun 2021 14:52:45 +0800 Subject: [PATCH 2/4] add retry for create table --- pkg/lightning/restore/restore.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/pkg/lightning/restore/restore.go b/pkg/lightning/restore/restore.go index 83021d0b9..b704f1267 100644 --- a/pkg/lightning/restore/restore.go +++ b/pkg/lightning/restore/restore.go @@ -15,6 +15,7 @@ package restore import ( "context" + "database/sql" "fmt" "io" "math" @@ -586,10 +587,10 @@ func (worker *restoreSchemaWorker) makeJobs( } func (worker *restoreSchemaWorker) doJob() { - var session checkpoints.Session + var session *sql.Conn defer func() { if session != nil { - session.Close() + _ = session.Close() } }() loop: @@ -607,7 +608,14 @@ loop: } var err error if session == nil { - session, err = worker.glue.GetSession(worker.ctx) + session, err = func() (*sql.Conn, error) { + // TODO: support lightning in SQL + db, err := worker.glue.GetDB() + if err != nil { + return nil, errors.Trace(err) + } + return db.Conn(worker.ctx) + }() if err != nil { worker.wg.Done() worker.throw(err) @@ -616,9 +624,13 @@ loop: } } logger := log.With(zap.String("db", job.dbName), zap.String("table", job.tblName)) + sqlWithRetry := common.SQLWithRetry{ + Logger: log.L(), + DB: session, + } for _, stmt := range job.stmts { task := logger.Begin(zap.DebugLevel, fmt.Sprintf("execute SQL: %s", stmt.sql)) - _, err = session.Execute(worker.ctx, stmt.sql) + err = sqlWithRetry.Exec(worker.ctx, "run create schema job", stmt.sql) task.End(zap.ErrorLevel, err) if err != nil { err = errors.Annotatef(err, "%s %s failed", job.stmtType.String(), common.UniqueTable(job.dbName, job.tblName)) From 5732a8710149ee92a669988a145b69574aaf3130 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 28 Jun 2021 14:53:01 +0800 Subject: [PATCH 3/4] add test for bigint --- tests/lightning_various_types/data/vt.bigint-schema.sql | 3 +++ tests/lightning_various_types/data/vt.bigint.0.sql | 8 ++++++++ 2 files changed, 11 insertions(+) create mode 100644 tests/lightning_various_types/data/vt.bigint-schema.sql create mode 100644 tests/lightning_various_types/data/vt.bigint.0.sql diff --git a/tests/lightning_various_types/data/vt.bigint-schema.sql b/tests/lightning_various_types/data/vt.bigint-schema.sql new file mode 100644 index 000000000..47a742b83 --- /dev/null +++ b/tests/lightning_various_types/data/vt.bigint-schema.sql @@ -0,0 +1,3 @@ +CREATE TABLE `bigint`( + `id` BIGINT NOT NULL PRIMARY KEY +); diff --git a/tests/lightning_various_types/data/vt.bigint.0.sql b/tests/lightning_various_types/data/vt.bigint.0.sql new file mode 100644 index 000000000..020531a0e --- /dev/null +++ b/tests/lightning_various_types/data/vt.bigint.0.sql @@ -0,0 +1,8 @@ +/*!40101 SET NAMES binary*/; +INSERT INTO `bigint` VALUES +(-9223372036854775808), +(-8863313628261308831), +(-192707003722069405), +(0), +(5714455538631204570), +(9223372036854775807); From 6e35c868ef8752927e1e496ee81d3ee20cf2ccf4 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 28 Jun 2021 16:35:53 +0800 Subject: [PATCH 4/4] fix unit test --- pkg/lightning/backend/local/local_test.go | 2 +- pkg/lightning/restore/restore_test.go | 30 ++++++----------------- 2 files changed, 8 insertions(+), 24 deletions(-) diff --git a/pkg/lightning/backend/local/local_test.go b/pkg/lightning/backend/local/local_test.go index 11ac2c1d6..0cc259938 100644 --- a/pkg/lightning/backend/local/local_test.go +++ b/pkg/lightning/backend/local/local_test.go @@ -89,7 +89,7 @@ func (s *localSuite) TestNextKey(c *C) { // overflowed key := tablecodec.EncodeRowKeyWithHandle(1, tidbkv.IntHandle(math.MaxInt64)) next = tablecodec.EncodeTablePrefix(2) - c.Assert(bytes.Compare(key, next) < 0, IsTrue) + c.Assert([]byte(key), Less, next) c.Assert(nextKey(key), DeepEquals, next) testDatums := [][]types.Datum{ diff --git a/pkg/lightning/restore/restore_test.go b/pkg/lightning/restore/restore_test.go index a2205dfcd..c7ed188ed 100644 --- a/pkg/lightning/restore/restore_test.go +++ b/pkg/lightning/restore/restore_test.go @@ -1474,29 +1474,14 @@ func (s *restoreSchemaSuite) SetUpTest(c *C) { Return(s.tableInfos, nil) mockBackend.EXPECT().Close() s.rc.backend = backend.MakeBackend(mockBackend) - mockSQLExecutor := mock.NewMockSQLExecutor(s.controller) - mockSQLExecutor.EXPECT(). - ExecuteWithLog(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - AnyTimes(). - Return(nil) - mockSession := mock.NewMockSession(s.controller) - mockSession.EXPECT(). - Close(). - AnyTimes(). - Return() - mockSession.EXPECT(). - Execute(gomock.Any(), gomock.Any()). - AnyTimes(). - Return(nil, nil) + + mockDB, sqlMock, err := sqlmock.New() + c.Assert(err, IsNil) + for i := 0; i < 17; i++ { + sqlMock.ExpectExec(".*").WillReturnResult(sqlmock.NewResult(int64(i), 1)) + } mockTiDBGlue := mock.NewMockGlue(s.controller) - mockTiDBGlue.EXPECT(). - GetSQLExecutor(). - AnyTimes(). - Return(mockSQLExecutor) - mockTiDBGlue.EXPECT(). - GetSession(gomock.Any()). - AnyTimes(). - Return(mockSession, nil) + mockTiDBGlue.EXPECT().GetDB().AnyTimes().Return(mockDB, nil) mockTiDBGlue.EXPECT(). OwnsSQLExecutor(). AnyTimes(). @@ -1506,7 +1491,6 @@ func (s *restoreSchemaSuite) SetUpTest(c *C) { GetParser(). AnyTimes(). Return(parser) - mockSQLExecutor.EXPECT().Close() s.rc.tidbGlue = mockTiDBGlue }