Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

lightning: fix int handle overflow and add retry for create schema #1294

Merged
merged 4 commits into from
Jul 2, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1974,7 +1974,7 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID) erro
}

log.L().Info("start import engine", zap.Stringer("uuid", engineUUID),
zap.Int("ranges", len(ranges)))
zap.Int("ranges", len(ranges)), zap.Int64("count", lfLength), zap.Int64("size", lfTotalSize))
for {
unfinishedRanges := lf.unfinishedRanges(ranges)
if len(unfinishedRanges) == 0 {
Expand Down Expand Up @@ -2382,7 +2382,12 @@ func nextKey(key []byte) []byte {
// See: https://github.com/tikv/tikv/blob/f7f22f70e1585d7ca38a59ea30e774949160c3e8/components/raftstore/src/coprocessor/split_observer.rs#L36-L41
if tablecodec.IsRecordKey(key) {
tableID, handle, _ := tablecodec.DecodeRecordKey(key)
return tablecodec.EncodeRowKeyWithHandle(tableID, handle.Next())
nextHandle := handle.Next()
// int handle overflow, use the next table prefix as nextKey
if nextHandle.Compare(handle) <= 0 {
return tablecodec.EncodeTablePrefix(tableID + 1)
}
return tablecodec.EncodeRowKeyWithHandle(tableID, nextHandle)
}

// if key is an index, directly append a 0x00 to the key.
Expand Down
8 changes: 7 additions & 1 deletion pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,17 @@ func (s *localSuite) TestNextKey(c *C) {

// test recode key
// key with int handle
for _, handleID := range []int64{1, 255, math.MaxInt32} {
for _, handleID := range []int64{math.MinInt64, 1, 255, math.MaxInt32 - 1} {
key := tablecodec.EncodeRowKeyWithHandle(1, tidbkv.IntHandle(handleID))
c.Assert(nextKey(key), DeepEquals, []byte(tablecodec.EncodeRowKeyWithHandle(1, tidbkv.IntHandle(handleID+1))))
}

// overflowed
key := tablecodec.EncodeRowKeyWithHandle(1, tidbkv.IntHandle(math.MaxInt64))
next = tablecodec.EncodeTablePrefix(2)
c.Assert(bytes.Compare(key, next) < 0, IsTrue)
kennytm marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(nextKey(key), DeepEquals, next)

testDatums := [][]types.Datum{
{types.NewIntDatum(1), types.NewIntDatum(2)},
{types.NewIntDatum(255), types.NewIntDatum(256)},
Expand Down
4 changes: 2 additions & 2 deletions pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,10 @@ func paginateScanRegion(
ctx context.Context, client split.SplitClient, startKey, endKey []byte, limit int,
) ([]*split.RegionInfo, error) {
if len(endKey) != 0 && bytes.Compare(startKey, endKey) >= 0 {
log.L().Error("startKey > endKey when paginating scan region",
log.L().Error("startKey >= endKey when paginating scan region",
logutil.Key("startKey", startKey),
logutil.Key("endKey", endKey))
return nil, errors.Errorf("startKey > endKey when paginating scan region")
return nil, errors.Errorf("startKey >= endKey when paginating scan region")
}

var regions []*split.RegionInfo
Expand Down
20 changes: 16 additions & 4 deletions pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package restore

import (
"context"
"database/sql"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -586,10 +587,10 @@ func (worker *restoreSchemaWorker) makeJobs(
}

func (worker *restoreSchemaWorker) doJob() {
var session checkpoints.Session
var session *sql.Conn
defer func() {
if session != nil {
session.Close()
_ = session.Close()
}
}()
loop:
Expand All @@ -607,7 +608,14 @@ loop:
}
var err error
if session == nil {
session, err = worker.glue.GetSession(worker.ctx)
session, err = func() (*sql.Conn, error) {
// TODO: support lightning in SQL
db, err := worker.glue.GetDB()
if err != nil {
return nil, errors.Trace(err)
}
return db.Conn(worker.ctx)
}()
if err != nil {
worker.wg.Done()
worker.throw(err)
Expand All @@ -616,9 +624,13 @@ loop:
}
}
logger := log.With(zap.String("db", job.dbName), zap.String("table", job.tblName))
sqlWithRetry := common.SQLWithRetry{
Logger: log.L(),
DB: session,
}
for _, stmt := range job.stmts {
task := logger.Begin(zap.DebugLevel, fmt.Sprintf("execute SQL: %s", stmt.sql))
_, err = session.Execute(worker.ctx, stmt.sql)
err = sqlWithRetry.Exec(worker.ctx, "run create schema job", stmt.sql)
task.End(zap.ErrorLevel, err)
if err != nil {
err = errors.Annotatef(err, "%s %s failed", job.stmtType.String(), common.UniqueTable(job.dbName, job.tblName))
Expand Down
3 changes: 3 additions & 0 deletions tests/lightning_various_types/data/vt.bigint-schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE TABLE `bigint`(
`id` BIGINT NOT NULL PRIMARY KEY
);
8 changes: 8 additions & 0 deletions tests/lightning_various_types/data/vt.bigint.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*!40101 SET NAMES binary*/;
INSERT INTO `bigint` VALUES
(-9223372036854775808),
(-8863313628261308831),
(-192707003722069405),
(0),
(5714455538631204570),
(9223372036854775807);
4 changes: 4 additions & 0 deletions tests/lightning_various_types/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,8 @@ for BACKEND in importer tidb local; do
check_contains 'c: 99999999999999999999.0'
check_contains 'd: 1.8446744073709552e19'

run_sql 'SELECT count(*), sum(id) FROM vt.bigint'
check_contains 'count(*): 6'
check_contains 'sum(id): -3341565093352173667'

done