Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
lightning: fix int handle overflow and add retry for create schema (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jul 21, 2021
1 parent 9687508 commit ef5c749
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 32 deletions.
9 changes: 7 additions & 2 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1943,7 +1943,7 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID) erro
}

log.L().Info("start import engine", zap.Stringer("uuid", engineUUID),
zap.Int("ranges", len(ranges)))
zap.Int("ranges", len(ranges)), zap.Int64("count", lfLength), zap.Int64("size", lfTotalSize))
for {
unfinishedRanges := lf.unfinishedRanges(ranges)
if len(unfinishedRanges) == 0 {
Expand Down Expand Up @@ -2348,7 +2348,12 @@ func nextKey(key []byte) []byte {
// See: https://github.com/tikv/tikv/blob/f7f22f70e1585d7ca38a59ea30e774949160c3e8/components/raftstore/src/coprocessor/split_observer.rs#L36-L41
if tablecodec.IsRecordKey(key) {
tableID, handle, _ := tablecodec.DecodeRecordKey(key)
return tablecodec.EncodeRowKeyWithHandle(tableID, handle.Next())
nextHandle := handle.Next()
// int handle overflow, use the next table prefix as nextKey
if nextHandle.Compare(handle) <= 0 {
return tablecodec.EncodeTablePrefix(tableID + 1)
}
return tablecodec.EncodeRowKeyWithHandle(tableID, nextHandle)
}

// if key is an index, directly append a 0x00 to the key.
Expand Down
8 changes: 7 additions & 1 deletion pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,17 @@ func (s *localSuite) TestNextKey(c *C) {

// test recode key
// key with int handle
for _, handleID := range []int64{1, 255, math.MaxInt32} {
for _, handleID := range []int64{math.MinInt64, 1, 255, math.MaxInt32 - 1} {
key := tablecodec.EncodeRowKeyWithHandle(1, tidbkv.IntHandle(handleID))
c.Assert(nextKey(key), DeepEquals, []byte(tablecodec.EncodeRowKeyWithHandle(1, tidbkv.IntHandle(handleID+1))))
}

// overflowed
key := tablecodec.EncodeRowKeyWithHandle(1, tidbkv.IntHandle(math.MaxInt64))
next = tablecodec.EncodeTablePrefix(2)
c.Assert([]byte(key), Less, next)
c.Assert(nextKey(key), DeepEquals, next)

testDatums := [][]types.Datum{
{types.NewIntDatum(1), types.NewIntDatum(2)},
{types.NewIntDatum(255), types.NewIntDatum(256)},
Expand Down
4 changes: 2 additions & 2 deletions pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,10 @@ func paginateScanRegion(
ctx context.Context, client split.SplitClient, startKey, endKey []byte, limit int,
) ([]*split.RegionInfo, error) {
if len(endKey) != 0 && bytes.Compare(startKey, endKey) >= 0 {
log.L().Error("startKey > endKey when paginating scan region",
log.L().Error("startKey >= endKey when paginating scan region",
logutil.Key("startKey", startKey),
logutil.Key("endKey", endKey))
return nil, errors.Errorf("startKey > endKey when paginating scan region")
return nil, errors.Errorf("startKey >= endKey when paginating scan region")
}

var regions []*split.RegionInfo
Expand Down
20 changes: 16 additions & 4 deletions pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package restore

import (
"context"
"database/sql"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -555,10 +556,10 @@ func (worker *restoreSchemaWorker) makeJobs(
}

func (worker *restoreSchemaWorker) doJob() {
var session checkpoints.Session
var session *sql.Conn
defer func() {
if session != nil {
session.Close()
_ = session.Close()
}
}()
loop:
Expand All @@ -576,7 +577,14 @@ loop:
}
var err error
if session == nil {
session, err = worker.glue.GetSession(worker.ctx)
session, err = func() (*sql.Conn, error) {
// TODO: support lightning in SQL
db, err := worker.glue.GetDB()
if err != nil {
return nil, errors.Trace(err)
}
return db.Conn(worker.ctx)
}()
if err != nil {
worker.wg.Done()
worker.throw(err)
Expand All @@ -585,9 +593,13 @@ loop:
}
}
logger := log.With(zap.String("db", job.dbName), zap.String("table", job.tblName))
sqlWithRetry := common.SQLWithRetry{
Logger: log.L(),
DB: session,
}
for _, stmt := range job.stmts {
task := logger.Begin(zap.DebugLevel, fmt.Sprintf("execute SQL: %s", stmt.sql))
_, err = session.Execute(worker.ctx, stmt.sql)
err = sqlWithRetry.Exec(worker.ctx, "run create schema job", stmt.sql)
task.End(zap.ErrorLevel, err)
if err != nil {
err = errors.Annotatef(err, "%s %s failed", job.stmtType.String(), common.UniqueTable(job.dbName, job.tblName))
Expand Down
30 changes: 7 additions & 23 deletions pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1389,29 +1389,14 @@ func (s *restoreSchemaSuite) SetUpTest(c *C) {
Return(s.tableInfos, nil)
mockBackend.EXPECT().Close()
s.rc.backend = backend.MakeBackend(mockBackend)
mockSQLExecutor := mock.NewMockSQLExecutor(s.controller)
mockSQLExecutor.EXPECT().
ExecuteWithLog(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
AnyTimes().
Return(nil)
mockSession := mock.NewMockSession(s.controller)
mockSession.EXPECT().
Close().
AnyTimes().
Return()
mockSession.EXPECT().
Execute(gomock.Any(), gomock.Any()).
AnyTimes().
Return(nil, nil)

mockDB, sqlMock, err := sqlmock.New()
c.Assert(err, IsNil)
for i := 0; i < 17; i++ {
sqlMock.ExpectExec(".*").WillReturnResult(sqlmock.NewResult(int64(i), 1))
}
mockTiDBGlue := mock.NewMockGlue(s.controller)
mockTiDBGlue.EXPECT().
GetSQLExecutor().
AnyTimes().
Return(mockSQLExecutor)
mockTiDBGlue.EXPECT().
GetSession(gomock.Any()).
AnyTimes().
Return(mockSession, nil)
mockTiDBGlue.EXPECT().GetDB().AnyTimes().Return(mockDB, nil)
mockTiDBGlue.EXPECT().
OwnsSQLExecutor().
AnyTimes().
Expand All @@ -1421,7 +1406,6 @@ func (s *restoreSchemaSuite) SetUpTest(c *C) {
GetParser().
AnyTimes().
Return(parser)
mockSQLExecutor.EXPECT().Close()
s.rc.tidbGlue = mockTiDBGlue
}

Expand Down
3 changes: 3 additions & 0 deletions tests/lightning_various_types/data/vt.bigint-schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE TABLE `bigint`(
`id` BIGINT NOT NULL PRIMARY KEY
);
8 changes: 8 additions & 0 deletions tests/lightning_various_types/data/vt.bigint.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*!40101 SET NAMES binary*/;
INSERT INTO `bigint` VALUES
(-9223372036854775808),
(-8863313628261308831),
(-192707003722069405),
(0),
(5714455538631204570),
(9223372036854775807);
4 changes: 4 additions & 0 deletions tests/lightning_various_types/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,8 @@ for BACKEND in importer tidb local; do
check_contains 'c: 99999999999999999999.0'
check_contains 'd: 1.8446744073709552e19'

run_sql 'SELECT count(*), sum(id) FROM vt.bigint'
check_contains 'count(*): 6'
check_contains 'sum(id): -3341565093352173667'

done

0 comments on commit ef5c749

Please sign in to comment.