Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importinto/lightning: do remote checksum via sql (#44803) #44887

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions br/pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type ExecutorBuilder struct {

oldTable *metautil.Table

concurrency uint
concurrency uint
backoffWeight int
}

// NewExecutorBuilder returns a new executor builder.
Expand All @@ -51,13 +52,19 @@ func (builder *ExecutorBuilder) SetConcurrency(conc uint) *ExecutorBuilder {
return builder
}

// SetBackoffWeight set the backoffWeight of the checksum executing.
func (builder *ExecutorBuilder) SetBackoffWeight(backoffWeight int) *ExecutorBuilder {
builder.backoffWeight = backoffWeight
return builder
}

// Build builds a checksum executor.
func (builder *ExecutorBuilder) Build() (*Executor, error) {
reqs, err := buildChecksumRequest(builder.table, builder.oldTable, builder.ts, builder.concurrency)
if err != nil {
return nil, errors.Trace(err)
}
return &Executor{reqs: reqs}, nil
return &Executor{reqs: reqs, backoffWeight: builder.backoffWeight}, nil
}

func buildChecksumRequest(
Expand Down Expand Up @@ -262,7 +269,8 @@ func updateChecksumResponse(resp, update *tipb.ChecksumResponse) {

// Executor is a checksum executor.
type Executor struct {
reqs []*kv.Request
reqs []*kv.Request
backoffWeight int
}

// Len returns the total number of checksum requests.
Expand Down Expand Up @@ -308,7 +316,11 @@ func (exec *Executor) Execute(
//
// It is useful in TiDB, however, it's a place holder in BR.
killed := uint32(0)
resp, err := sendChecksumRequest(ctx, client, req, kv.NewVariables(&killed))
vars := kv.NewVariables(&killed)
if exec.backoffWeight > 0 {
vars.BackOffWeight = exec.backoffWeight
}
resp, err := sendChecksumRequest(ctx, client, req, vars)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"//br/pkg/utils",
"//errno",
"//parser/model",
"//sessionctx/variable",
"//store/driver/error",
"//table/tables",
"//util",
Expand Down
43 changes: 43 additions & 0 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
tmysql "github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table/tables"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -428,3 +429,45 @@ func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo {
}
return nil
}

// GetBackoffWeightFromDB gets the backoff weight from database.
func GetBackoffWeightFromDB(ctx context.Context, db *sql.DB) (int, error) {
val, err := getSessionVariable(ctx, db, variable.TiDBBackOffWeight)
if err != nil {
return 0, err
}
return strconv.Atoi(val)
}

// copy from dbutil to avoid import cycle
func getSessionVariable(ctx context.Context, db *sql.DB, variable string) (value string, err error) {
query := fmt.Sprintf("SHOW VARIABLES LIKE '%s'", variable)
rows, err := db.QueryContext(ctx, query)

if err != nil {
return "", errors.Trace(err)
}
defer rows.Close()

// Show an example.
/*
mysql> SHOW VARIABLES LIKE "binlog_format";
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
*/

for rows.Next() {
if err = rows.Scan(&variable, &value); err != nil {
return "", errors.Trace(err)
}
}

if err := rows.Err(); err != nil {
return "", errors.Trace(err)
}

return value, nil
}
2 changes: 2 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ type PostRestore struct {
Level1Compact bool `toml:"level-1-compact" json:"level-1-compact"`
PostProcessAtLast bool `toml:"post-process-at-last" json:"post-process-at-last"`
Compact bool `toml:"compact" json:"compact"`
ChecksumViaSQL bool `toml:"checksum-via-sql" json:"checksum-via-sql"`
}

type CSVConfig struct {
Expand Down Expand Up @@ -745,6 +746,7 @@ func NewConfig() *Config {
Checksum: OpLevelRequired,
Analyze: OpLevelOptional,
PostProcessAtLast: true,
ChecksumViaSQL: true,
},
}
}
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ go_library(
"//parser/model",
"//parser/mysql",
"//planner/core",
"//sessionctx/variable",
"//store/driver",
"//store/pdtypes",
"//table",
Expand All @@ -76,6 +77,7 @@ go_library(
"@com_github_pingcap_kvproto//pkg/import_sstpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
Expand Down
54 changes: 50 additions & 4 deletions br/pkg/lightning/restore/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/metric"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tipb/go-tipb"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
Expand All @@ -51,6 +53,12 @@ var (
serviceSafePointTTL int64 = 10 * 60 // 10 min in seconds

minDistSQLScanConcurrency = 4

// DefaultBackoffWeight is the default value of tidb_backoff_weight for checksum.
// when TiKV client encounters an error of "region not leader", it will keep retrying every 500 ms.
// If it still fails after 2 * 20 = 40 seconds, it will return "region unavailable".
// If we increase the BackOffWeight to 6, then the TiKV client will keep retrying for 120 seconds.
DefaultBackoffWeight = 3 * tikvstore.DefBackOffWeight
)

// RemoteChecksum represents a checksum result got from tidb.
Expand Down Expand Up @@ -80,14 +88,26 @@ func newChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) (

// for v4.0.0 or upper, we can use the gc ttl api
var manager ChecksumManager
if pdVersion.Major >= 4 {
if pdVersion.Major >= 4 && !rc.cfg.PostRestore.ChecksumViaSQL {
tlsOpt := rc.tls.ToPDSecurityOption()
pdCli, err := pd.NewClientWithContext(ctx, []string{pdAddr}, tlsOpt)
if err != nil {
return nil, errors.Trace(err)
}

manager = newTiKVChecksumManager(store.GetClient(), pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency))
db, err := rc.tidbGlue.GetDB()
if err != nil {
return nil, errors.Trace(err)
}
backoffWeight, err := common.GetBackoffWeightFromDB(ctx, db)
// only set backoff weight when it's smaller than default value
if err == nil && backoffWeight >= DefaultBackoffWeight {
log.L().Info("get tidb_backoff_weight", zap.Int("backoff_weight", backoffWeight))
} else {
log.L().Info("set tidb_backoff_weight to default", zap.Int("backoff_weight", DefaultBackoffWeight))
backoffWeight = DefaultBackoffWeight
}
manager = newTiKVChecksumManager(store.GetClient(), pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight)
} else {
db, err := rc.tidbGlue.GetDB()
if err != nil {
Expand Down Expand Up @@ -125,16 +145,39 @@ func (e *tidbChecksumExecutor) Checksum(ctx context.Context, tableInfo *checkpoi

task := log.FromContext(ctx).With(zap.String("table", tableName)).Begin(zap.InfoLevel, "remote checksum")

conn, err := e.db.Conn(ctx)
if err != nil {
return nil, errors.Trace(err)
}
defer func() {
if err := conn.Close(); err != nil {
task.Warn("close connection failed", zap.Error(err))
}
}()
// ADMIN CHECKSUM TABLE <table>,<table> example.
// mysql> admin checksum table test.t;
// +---------+------------+---------------------+-----------+-------------+
// | Db_name | Table_name | Checksum_crc64_xor | Total_kvs | Total_bytes |
// +---------+------------+---------------------+-----------+-------------+
// | test | t | 8520875019404689597 | 7296873 | 357601387 |
// +---------+------------+---------------------+-----------+-------------+
backoffWeight, err := common.GetBackoffWeightFromDB(ctx, e.db)
if err == nil && backoffWeight < DefaultBackoffWeight {
task.Info("increase tidb_backoff_weight", zap.Int("original", backoffWeight), zap.Int("new", DefaultBackoffWeight))
// increase backoff weight
if _, err := conn.ExecContext(ctx, fmt.Sprintf("SET SESSION %s = '%d';", variable.TiDBBackOffWeight, DefaultBackoffWeight)); err != nil {
task.Warn("set tidb_backoff_weight failed", zap.Error(err))
} else {
defer func() {
if _, err := conn.ExecContext(ctx, fmt.Sprintf("SET SESSION %s = '%d';", variable.TiDBBackOffWeight, backoffWeight)); err != nil {
task.Warn("recover tidb_backoff_weight failed", zap.Error(err))
}
}()
}
}

cs := RemoteChecksum{}
err = common.SQLWithRetry{DB: e.db, Logger: task.Logger}.QueryRow(ctx, "compute remote checksum",
err = common.SQLWithRetry{DB: conn, Logger: task.Logger}.QueryRow(ctx, "compute remote checksum",
"ADMIN CHECKSUM TABLE "+tableName, &cs.Schema, &cs.Table, &cs.Checksum, &cs.TotalKVs, &cs.TotalBytes,
)
dur := task.End(zap.ErrorLevel, err)
Expand Down Expand Up @@ -257,20 +300,23 @@ type tikvChecksumManager struct {
client kv.Client
manager gcTTLManager
distSQLScanConcurrency uint
backoffWeight int
}

// newTiKVChecksumManager return a new tikv checksum manager
func newTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanConcurrency uint) *tikvChecksumManager {
func newTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanConcurrency uint, backoffWeight int) *tikvChecksumManager {
return &tikvChecksumManager{
client: client,
manager: newGCTTLManager(pdClient),
distSQLScanConcurrency: distSQLScanConcurrency,
backoffWeight: backoffWeight,
}
}

func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo, ts uint64) (*RemoteChecksum, error) {
executor, err := checksum.NewExecutorBuilder(tableInfo.Core, ts).
SetConcurrency(e.distSQLScanConcurrency).
SetBackoffWeight(e.backoffWeight).
Build()
if err != nil {
return nil, errors.Trace(err)
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/restore/checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestDoChecksum(t *testing.T) {
WithArgs("10m").
WillReturnResult(sqlmock.NewResult(2, 1))
mock.ExpectClose()
mock.ExpectClose()

ctx := MockDoChecksumCtx(db)
checksum, err := DoChecksum(ctx, &TidbTableInfo{DB: "test", Name: "t"})
Expand Down Expand Up @@ -216,6 +217,7 @@ func TestDoChecksumWithErrorAndLongOriginalLifetime(t *testing.T) {
WithArgs("300h").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectClose()
mock.ExpectClose()

ctx := MockDoChecksumCtx(db)
_, err = DoChecksum(ctx, &TidbTableInfo{DB: "test", Name: "t"})
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/restore/table_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,7 @@ func (s *tableRestoreSuite) TestCompareChecksumSuccess() {
WithArgs("10m").
WillReturnResult(sqlmock.NewResult(2, 1))
mock.ExpectClose()
mock.ExpectClose()

ctx := MockDoChecksumCtx(db)
remoteChecksum, err := DoChecksum(ctx, s.tr.tableInfo)
Expand Down Expand Up @@ -783,7 +784,7 @@ func (s *tableRestoreSuite) TestCompareChecksumFailure() {
WithArgs("10m").
WillReturnResult(sqlmock.NewResult(2, 1))
mock.ExpectClose()

mock.ExpectClose()
ctx := MockDoChecksumCtx(db)
remoteChecksum, err := DoChecksum(ctx, s.tr.tableInfo)
require.NoError(s.T(), err)
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/restore/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ var defaultImportantVariables = map[string]string{
"default_week_format": "0",
"block_encryption_mode": "aes-128-ecb",
"group_concat_max_len": "1024",
"tidb_backoff_weight": "6",
}

// defaultImportVariablesTiDB is used in ObtainImportantVariables to retrieve the system
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/restore/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ func TestObtainRowFormatVersionSucceed(t *testing.T) {

sysVars := ObtainImportantVariables(ctx, s.tiGlue.GetSQLExecutor(), true)
require.Equal(t, map[string]string{
"tidb_backoff_weight": "6",
"tidb_row_format_version": "2",
"max_allowed_packet": "1073741824",
"div_precision_increment": "10",
Expand Down Expand Up @@ -487,6 +488,7 @@ func TestObtainRowFormatVersionFailure(t *testing.T) {

sysVars := ObtainImportantVariables(ctx, s.tiGlue.GetSQLExecutor(), true)
require.Equal(t, map[string]string{
"tidb_backoff_weight": "6",
"tidb_row_format_version": "1",
"max_allowed_packet": "67108864",
"div_precision_increment": "4",
Expand Down