Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
lightning: fix int handle overflow and add retry for create schema (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Sep 1, 2021
1 parent cd2f2da commit cf2b43a
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 46 deletions.
12 changes: 8 additions & 4 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1418,10 +1418,9 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID) erro
}
remains := &syncdRanges{}

log.L().Info("start import engine", zap.Stringer("uuid", engineUUID),
zap.Int("ranges", len(ranges)), zap.Int64("count", lfLength), zap.Int64("size", lfTotalSize))
for {
log.L().Info("start import engine", zap.Stringer("uuid", engineUUID),
zap.Int("ranges", len(ranges)))

// if all the kv can fit in one region, skip split regions. TiDB will split one region for
// the table when table is created.
needSplit := len(ranges) > 1 || lfTotalSize > local.regionSplitSize || lfLength > regionMaxKeyCount
Expand Down Expand Up @@ -1734,7 +1733,12 @@ func nextKey(key []byte) []byte {
// See: https://github.com/tikv/tikv/blob/f7f22f70e1585d7ca38a59ea30e774949160c3e8/components/raftstore/src/coprocessor/split_observer.rs#L36-L41
if tablecodec.IsRecordKey(key) {
tableID, handle, _ := tablecodec.DecodeRecordKey(key)
return tablecodec.EncodeRowKeyWithHandle(tableID, handle.Next())
nextHandle := handle.Next()
// int handle overflow, use the next table prefix as nextKey
if nextHandle.Compare(handle) <= 0 {
return tablecodec.EncodeTablePrefix(tableID + 1)
}
return tablecodec.EncodeRowKeyWithHandle(tableID, nextHandle)
}

// if key is an index, directly append a 0x00 to the key.
Expand Down
8 changes: 7 additions & 1 deletion pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,17 @@ func (s *localSuite) TestNextKey(c *C) {

// test recode key
// key with int handle
for _, handleID := range []int64{1, 255, math.MaxInt32} {
for _, handleID := range []int64{math.MinInt64, 1, 255, math.MaxInt32 - 1} {
key := tablecodec.EncodeRowKeyWithHandle(1, tidbkv.IntHandle(handleID))
c.Assert(nextKey(key), DeepEquals, []byte(tablecodec.EncodeRowKeyWithHandle(1, tidbkv.IntHandle(handleID+1))))
}

// overflowed
key := tablecodec.EncodeRowKeyWithHandle(1, tidbkv.IntHandle(math.MaxInt64))
next = tablecodec.EncodeTablePrefix(2)
c.Assert([]byte(key), Less, next)
c.Assert(nextKey(key), DeepEquals, next)

testDatums := [][]types.Datum{
{types.NewIntDatum(1), types.NewIntDatum(2)},
{types.NewIntDatum(255), types.NewIntDatum(256)},
Expand Down
14 changes: 13 additions & 1 deletion pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,21 @@ func IsEmptyDir(name string) bool {
return len(entries) == 0
}

type QueryExecutor interface {
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}

type DBExecutor interface {
QueryExecutor
BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}

// SQLWithRetry constructs a retryable transaction.
type SQLWithRetry struct {
DB *sql.DB
// either *sql.DB or *sql.Conn
DB DBExecutor
Logger log.Logger
HideQueryLog bool
}
Expand Down
19 changes: 15 additions & 4 deletions pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,10 +484,10 @@ func (worker *restoreSchemaWorker) makeJobs(dbMetas []*mydump.MDDatabaseMeta) er
}

func (worker *restoreSchemaWorker) doJob() {
var session checkpoints.Session
var session *sql.Conn
defer func() {
if session != nil {
session.Close()
_ = session.Close()
}
}()
loop:
Expand All @@ -505,7 +505,14 @@ loop:
}
var err error
if session == nil {
session, err = worker.glue.GetSession(worker.ctx)
session, err = func() (*sql.Conn, error) {
// TODO: support lightning in SQL
db, err := worker.glue.GetDB()
if err != nil {
return nil, errors.Trace(err)
}
return db.Conn(worker.ctx)
}()
if err != nil {
worker.wg.Done()
worker.throw(err)
Expand All @@ -514,9 +521,13 @@ loop:
}
}
logger := log.With(zap.String("db", job.dbName), zap.String("table", job.tblName))
sqlWithRetry := common.SQLWithRetry{
Logger: log.L(),
DB: session,
}
for _, stmt := range job.stmts {
task := logger.Begin(zap.DebugLevel, fmt.Sprintf("execute SQL: %s", stmt.sql))
_, err = session.Execute(worker.ctx, stmt.sql)
err = sqlWithRetry.Exec(worker.ctx, "run create schema job", stmt.sql)
task.End(zap.ErrorLevel, err)
if err != nil {
err = errors.Annotatef(err, "%s %s failed", job.stmtType.String(), common.UniqueTable(job.dbName, job.tblName))
Expand Down
46 changes: 10 additions & 36 deletions pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,29 +1395,14 @@ func (s *restoreSchemaSuite) SetUpTest(c *C) {
Return(s.tableInfos, nil)
mockBackend.EXPECT().Close()
s.rc.backend = backend.MakeBackend(mockBackend)
mockSQLExecutor := mock.NewMockSQLExecutor(s.controller)
mockSQLExecutor.EXPECT().
ExecuteWithLog(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
AnyTimes().
Return(nil)
mockSession := mock.NewMockSession(s.controller)
mockSession.EXPECT().
Close().
AnyTimes().
Return()
mockSession.EXPECT().
Execute(gomock.Any(), gomock.Any()).
AnyTimes().
Return(nil, nil)

mockDB, sqlMock, err := sqlmock.New()
c.Assert(err, IsNil)
for i := 0; i < 17; i++ {
sqlMock.ExpectExec(".*").WillReturnResult(sqlmock.NewResult(int64(i), 1))
}
mockTiDBGlue := mock.NewMockGlue(s.controller)
mockTiDBGlue.EXPECT().
GetSQLExecutor().
AnyTimes().
Return(mockSQLExecutor)
mockTiDBGlue.EXPECT().
GetSession(gomock.Any()).
AnyTimes().
Return(mockSession, nil)
mockTiDBGlue.EXPECT().GetDB().AnyTimes().Return(mockDB, nil)
mockTiDBGlue.EXPECT().
OwnsSQLExecutor().
AnyTimes().
Expand All @@ -1427,7 +1412,6 @@ func (s *restoreSchemaSuite) SetUpTest(c *C) {
GetParser().
AnyTimes().
Return(parser)
mockSQLExecutor.EXPECT().Close()
s.rc.tidbGlue = mockTiDBGlue
}

Expand All @@ -1443,21 +1427,11 @@ func (s *restoreSchemaSuite) TestRestoreSchemaSuccessful(c *C) {

func (s *restoreSchemaSuite) TestRestoreSchemaFailed(c *C) {
injectErr := errors.New("Somthing wrong")
mockSession := mock.NewMockSession(s.controller)
mockSession.EXPECT().
Close().
AnyTimes().
Return()
mockSession.EXPECT().
Execute(gomock.Any(), gomock.Any()).
s.rc.tidbGlue.(*mock.MockGlue).EXPECT().
GetDB().
AnyTimes().
Return(nil, injectErr)
mockTiDBGlue := mock.NewMockGlue(s.controller)
mockTiDBGlue.EXPECT().
GetSession(gomock.Any()).
AnyTimes().
Return(mockSession, nil)
s.rc.tidbGlue = mockTiDBGlue

err := s.rc.restoreSchema(s.ctx)
c.Assert(err, NotNil)
c.Assert(errors.ErrorEqual(err, injectErr), IsTrue)
Expand Down
3 changes: 3 additions & 0 deletions tests/lightning_various_types/data/vt.bigint-schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE TABLE `bigint`(
`id` BIGINT NOT NULL PRIMARY KEY
);
8 changes: 8 additions & 0 deletions tests/lightning_various_types/data/vt.bigint.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*!40101 SET NAMES binary*/;
INSERT INTO `bigint` VALUES
(-9223372036854775808),
(-8863313628261308831),
(-192707003722069405),
(0),
(5714455538631204570),
(9223372036854775807);
4 changes: 4 additions & 0 deletions tests/lightning_various_types/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,8 @@ for BACKEND in importer tidb local; do
check_contains 'c: 99999999999999999999.0'
check_contains 'd: 1.8446744073709552e19'

run_sql 'SELECT count(*), sum(id) FROM vt.bigint'
check_contains 'count(*): 6'
check_contains 'sum(id): -3341565093352173667'

done

0 comments on commit cf2b43a

Please sign in to comment.