From 89bf7432279a283224833f4e6c8c798b0302f2fa Mon Sep 17 00:00:00 2001 From: GMHDBJD <35025882+GMHDBJD@users.noreply.github.com> Date: Wed, 21 Jun 2023 21:41:42 +0800 Subject: [PATCH] importinto/lightning: do remote checksum via sql (#44803) close pingcap/tidb#41941, ref pingcap/tidb#44711 --- br/pkg/checksum/executor.go | 20 ++- br/pkg/lightning/backend/local/BUILD.bazel | 2 + br/pkg/lightning/backend/local/checksum.go | 45 +++++- .../lightning/backend/local/checksum_test.go | 2 + br/pkg/lightning/common/BUILD.bazel | 1 + br/pkg/lightning/common/common.go | 1 + br/pkg/lightning/common/util.go | 43 ++++++ br/pkg/lightning/config/config.go | 2 + br/pkg/lightning/importer/checksum_helper.go | 13 +- .../lightning/importer/table_import_test.go | 3 +- br/pkg/lightning/importer/tidb_test.go | 2 + br/tests/lightning_add_index/config1.toml | 3 + disttask/framework/dispatcher/dispatcher.go | 4 + disttask/framework/storage/task_table.go | 27 ++-- disttask/importinto/BUILD.bazel | 17 ++- disttask/importinto/dispatcher.go | 37 ----- disttask/importinto/job.go | 2 +- disttask/importinto/subtask_executor.go | 144 ++++++++++++++++++ disttask/importinto/subtask_executor_test.go | 73 +++++++++ executor/import_into.go | 2 +- executor/importer/BUILD.bazel | 1 - executor/importer/table_import.go | 63 -------- tests/realtikvtest/importintotest/job_test.go | 2 +- 23 files changed, 380 insertions(+), 129 deletions(-) create mode 100644 disttask/importinto/subtask_executor_test.go diff --git a/br/pkg/checksum/executor.go b/br/pkg/checksum/executor.go index 0159fe43b1d0c..409d40a3530a6 100644 --- a/br/pkg/checksum/executor.go +++ b/br/pkg/checksum/executor.go @@ -28,7 +28,8 @@ type ExecutorBuilder struct { oldTable *metautil.Table - concurrency uint + concurrency uint + backoffWeight int oldKeyspace []byte newKeyspace []byte @@ -56,6 +57,12 @@ func (builder *ExecutorBuilder) SetConcurrency(conc uint) *ExecutorBuilder { return builder } +// SetBackoffWeight set the backoffWeight of the checksum executing. +func (builder *ExecutorBuilder) SetBackoffWeight(backoffWeight int) *ExecutorBuilder { + builder.backoffWeight = backoffWeight + return builder +} + func (builder *ExecutorBuilder) SetOldKeyspace(keyspace []byte) *ExecutorBuilder { builder.oldKeyspace = keyspace return builder @@ -79,7 +86,7 @@ func (builder *ExecutorBuilder) Build() (*Executor, error) { if err != nil { return nil, errors.Trace(err) } - return &Executor{reqs: reqs}, nil + return &Executor{reqs: reqs, backoffWeight: builder.backoffWeight}, nil } func buildChecksumRequest( @@ -294,7 +301,8 @@ func updateChecksumResponse(resp, update *tipb.ChecksumResponse) { // Executor is a checksum executor. type Executor struct { - reqs []*kv.Request + reqs []*kv.Request + backoffWeight int } // Len returns the total number of checksum requests. @@ -347,7 +355,11 @@ func (exec *Executor) Execute( err error ) err = utils.WithRetry(ctx, func() error { - resp, err = sendChecksumRequest(ctx, client, req, kv.NewVariables(&killed)) + vars := kv.NewVariables(&killed) + if exec.backoffWeight > 0 { + vars.BackOffWeight = exec.backoffWeight + } + resp, err = sendChecksumRequest(ctx, client, req, vars) failpoint.Inject("checksumRetryErr", func(val failpoint.Value) { // first time reach here. return error if val.(bool) { diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index d8cf18d8ff669..01c1bd2c42001 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -48,6 +48,7 @@ go_library( "//kv", "//parser/model", "//parser/mysql", + "//sessionctx/variable", "//store/pdtypes", "//table", "//tablecodec", @@ -73,6 +74,7 @@ go_library( "@com_github_pingcap_kvproto//pkg/pdpb", "@com_github_pingcap_tipb//go-tipb", "@com_github_tikv_client_go_v2//error", + "@com_github_tikv_client_go_v2//kv", "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_pd_client//:client", diff --git a/br/pkg/lightning/backend/local/checksum.go b/br/pkg/lightning/backend/local/checksum.go index 3acc237068da8..6a41c714a2314 100644 --- a/br/pkg/lightning/backend/local/checksum.go +++ b/br/pkg/lightning/backend/local/checksum.go @@ -32,8 +32,10 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/metric" "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/mathutil" "github.com/pingcap/tipb/go-tipb" + tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" "go.uber.org/atomic" @@ -49,7 +51,14 @@ const ( var ( serviceSafePointTTL int64 = 10 * 60 // 10 min in seconds - minDistSQLScanConcurrency = 4 + // MinDistSQLScanConcurrency is the minimum value of tidb_distsql_scan_concurrency. + MinDistSQLScanConcurrency = 4 + + // DefaultBackoffWeight is the default value of tidb_backoff_weight for checksum. + // when TiKV client encounters an error of "region not leader", it will keep retrying every 500 ms. + // If it still fails after 2 * 20 = 40 seconds, it will return "region unavailable". + // If we increase the BackOffWeight to 6, then the TiKV client will keep retrying for 120 seconds. + DefaultBackoffWeight = 3 * tikvstore.DefBackOffWeight ) // RemoteChecksum represents a checksum result got from tidb. @@ -102,6 +111,15 @@ func (e *tidbChecksumExecutor) Checksum(ctx context.Context, tableInfo *checkpoi task := log.FromContext(ctx).With(zap.String("table", tableName)).Begin(zap.InfoLevel, "remote checksum") + conn, err := e.db.Conn(ctx) + if err != nil { + return nil, errors.Trace(err) + } + defer func() { + if err := conn.Close(); err != nil { + task.Warn("close connection failed", zap.Error(err)) + } + }() // ADMIN CHECKSUM TABLE ,
example. // mysql> admin checksum table test.t; // +---------+------------+---------------------+-----------+-------------+ @@ -109,9 +127,23 @@ func (e *tidbChecksumExecutor) Checksum(ctx context.Context, tableInfo *checkpoi // +---------+------------+---------------------+-----------+-------------+ // | test | t | 8520875019404689597 | 7296873 | 357601387 | // +---------+------------+---------------------+-----------+-------------+ + backoffWeight, err := common.GetBackoffWeightFromDB(ctx, e.db) + if err == nil && backoffWeight < DefaultBackoffWeight { + task.Info("increase tidb_backoff_weight", zap.Int("original", backoffWeight), zap.Int("new", DefaultBackoffWeight)) + // increase backoff weight + if _, err := conn.ExecContext(ctx, fmt.Sprintf("SET SESSION %s = '%d';", variable.TiDBBackOffWeight, DefaultBackoffWeight)); err != nil { + task.Warn("set tidb_backoff_weight failed", zap.Error(err)) + } else { + defer func() { + if _, err := conn.ExecContext(ctx, fmt.Sprintf("SET SESSION %s = '%d';", variable.TiDBBackOffWeight, backoffWeight)); err != nil { + task.Warn("recover tidb_backoff_weight failed", zap.Error(err)) + } + }() + } + } cs := RemoteChecksum{} - err = common.SQLWithRetry{DB: e.db, Logger: task.Logger}.QueryRow(ctx, "compute remote checksum", + err = common.SQLWithRetry{DB: conn, Logger: task.Logger}.QueryRow(ctx, "compute remote checksum", "ADMIN CHECKSUM TABLE "+tableName, &cs.Schema, &cs.Table, &cs.Checksum, &cs.TotalKVs, &cs.TotalBytes, ) dur := task.End(zap.ErrorLevel, err) @@ -239,22 +271,25 @@ type TiKVChecksumManager struct { client kv.Client manager gcTTLManager distSQLScanConcurrency uint + backoffWeight int } var _ ChecksumManager = &TiKVChecksumManager{} // NewTiKVChecksumManager return a new tikv checksum manager -func NewTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanConcurrency uint) *TiKVChecksumManager { +func NewTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanConcurrency uint, backoffWeight int) *TiKVChecksumManager { return &TiKVChecksumManager{ client: client, manager: newGCTTLManager(pdClient), distSQLScanConcurrency: distSQLScanConcurrency, + backoffWeight: backoffWeight, } } func (e *TiKVChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo, ts uint64) (*RemoteChecksum, error) { executor, err := checksum.NewExecutorBuilder(tableInfo.Core, ts). SetConcurrency(e.distSQLScanConcurrency). + SetBackoffWeight(e.backoffWeight). Build() if err != nil { return nil, errors.Trace(err) @@ -286,8 +321,8 @@ func (e *TiKVChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo if !common.IsRetryableError(err) { break } - if distSQLScanConcurrency > minDistSQLScanConcurrency { - distSQLScanConcurrency = mathutil.Max(distSQLScanConcurrency/2, minDistSQLScanConcurrency) + if distSQLScanConcurrency > MinDistSQLScanConcurrency { + distSQLScanConcurrency = mathutil.Max(distSQLScanConcurrency/2, MinDistSQLScanConcurrency) } } diff --git a/br/pkg/lightning/backend/local/checksum_test.go b/br/pkg/lightning/backend/local/checksum_test.go index bd5fdd1c65666..3506995514928 100644 --- a/br/pkg/lightning/backend/local/checksum_test.go +++ b/br/pkg/lightning/backend/local/checksum_test.go @@ -49,6 +49,7 @@ func TestDoChecksum(t *testing.T) { WithArgs("10m"). WillReturnResult(sqlmock.NewResult(2, 1)) mock.ExpectClose() + mock.ExpectClose() manager := NewTiDBChecksumExecutor(db) checksum, err := manager.Checksum(context.Background(), &TidbTableInfo{DB: "test", Name: "t"}) @@ -228,6 +229,7 @@ func TestDoChecksumWithErrorAndLongOriginalLifetime(t *testing.T) { WithArgs("300h"). WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectClose() + mock.ExpectClose() manager := NewTiDBChecksumExecutor(db) _, err = manager.Checksum(context.Background(), &TidbTableInfo{DB: "test", Name: "t"}) diff --git a/br/pkg/lightning/common/BUILD.bazel b/br/pkg/lightning/common/BUILD.bazel index 7bb9a3933d52a..52ff0a652c9c5 100644 --- a/br/pkg/lightning/common/BUILD.bazel +++ b/br/pkg/lightning/common/BUILD.bazel @@ -27,6 +27,7 @@ go_library( "//meta/autoid", "//parser/model", "//parser/mysql", + "//sessionctx/variable", "//store/driver/error", "//table/tables", "//types", diff --git a/br/pkg/lightning/common/common.go b/br/pkg/lightning/common/common.go index 4a9e0f461a477..aaf8860e4fb58 100644 --- a/br/pkg/lightning/common/common.go +++ b/br/pkg/lightning/common/common.go @@ -39,6 +39,7 @@ var DefaultImportantVariables = map[string]string{ "default_week_format": "0", "block_encryption_mode": "aes-128-ecb", "group_concat_max_len": "1024", + "tidb_backoff_weight": "6", } // DefaultImportVariablesTiDB is used in ObtainImportantVariables to retrieve the system diff --git a/br/pkg/lightning/common/util.go b/br/pkg/lightning/common/util.go index 632c98c73afca..db56721179625 100644 --- a/br/pkg/lightning/common/util.go +++ b/br/pkg/lightning/common/util.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/parser/model" tmysql "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/codec" @@ -605,3 +606,45 @@ func IsDupKeyError(err error) bool { } return false } + +// GetBackoffWeightFromDB gets the backoff weight from database. +func GetBackoffWeightFromDB(ctx context.Context, db *sql.DB) (int, error) { + val, err := getSessionVariable(ctx, db, variable.TiDBBackOffWeight) + if err != nil { + return 0, err + } + return strconv.Atoi(val) +} + +// copy from dbutil to avoid import cycle +func getSessionVariable(ctx context.Context, db *sql.DB, variable string) (value string, err error) { + query := fmt.Sprintf("SHOW VARIABLES LIKE '%s'", variable) + rows, err := db.QueryContext(ctx, query) + + if err != nil { + return "", errors.Trace(err) + } + defer rows.Close() + + // Show an example. + /* + mysql> SHOW VARIABLES LIKE "binlog_format"; + +---------------+-------+ + | Variable_name | Value | + +---------------+-------+ + | binlog_format | ROW | + +---------------+-------+ + */ + + for rows.Next() { + if err = rows.Scan(&variable, &value); err != nil { + return "", errors.Trace(err) + } + } + + if err := rows.Err(); err != nil { + return "", errors.Trace(err) + } + + return value, nil +} diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index e8363f2457bcb..da31ed3036523 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -599,6 +599,7 @@ type PostRestore struct { Level1Compact bool `toml:"level-1-compact" json:"level-1-compact"` PostProcessAtLast bool `toml:"post-process-at-last" json:"post-process-at-last"` Compact bool `toml:"compact" json:"compact"` + ChecksumViaSQL bool `toml:"checksum-via-sql" json:"checksum-via-sql"` } // StringOrStringSlice can unmarshal a TOML string as string slice with one element. @@ -974,6 +975,7 @@ func NewConfig() *Config { Checksum: OpLevelRequired, Analyze: OpLevelOptional, PostProcessAtLast: true, + ChecksumViaSQL: true, }, } } diff --git a/br/pkg/lightning/importer/checksum_helper.go b/br/pkg/lightning/importer/checksum_helper.go index e81703cfce85b..88bc40d5a72e1 100644 --- a/br/pkg/lightning/importer/checksum_helper.go +++ b/br/pkg/lightning/importer/checksum_helper.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend/local" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" + "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/metric" @@ -44,14 +45,22 @@ func NewChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) ( // for v4.0.0 or upper, we can use the gc ttl api var manager local.ChecksumManager - if pdVersion.Major >= 4 { + if pdVersion.Major >= 4 && !rc.cfg.PostRestore.ChecksumViaSQL { tlsOpt := rc.tls.ToPDSecurityOption() pdCli, err := pd.NewClientWithContext(ctx, []string{pdAddr}, tlsOpt) if err != nil { return nil, errors.Trace(err) } - manager = local.NewTiKVChecksumManager(store.GetClient(), pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency)) + backoffWeight, err := common.GetBackoffWeightFromDB(ctx, rc.db) + // only set backoff weight when it's smaller than default value + if err == nil && backoffWeight >= local.DefaultBackoffWeight { + log.FromContext(ctx).Info("get tidb_backoff_weight", zap.Int("backoff_weight", backoffWeight)) + } else { + log.FromContext(ctx).Info("set tidb_backoff_weight to default", zap.Int("backoff_weight", local.DefaultBackoffWeight)) + backoffWeight = local.DefaultBackoffWeight + } + manager = local.NewTiKVChecksumManager(store.GetClient(), pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight) } else { manager = local.NewTiDBChecksumExecutor(rc.db) } diff --git a/br/pkg/lightning/importer/table_import_test.go b/br/pkg/lightning/importer/table_import_test.go index 8304a204c44d7..39d105662f5ee 100644 --- a/br/pkg/lightning/importer/table_import_test.go +++ b/br/pkg/lightning/importer/table_import_test.go @@ -802,6 +802,7 @@ func (s *tableRestoreSuite) TestCompareChecksumSuccess() { WithArgs("10m"). WillReturnResult(sqlmock.NewResult(2, 1)) mock.ExpectClose() + mock.ExpectClose() ctx := MockDoChecksumCtx(db) remoteChecksum, err := DoChecksum(ctx, s.tr.tableInfo) @@ -832,7 +833,7 @@ func (s *tableRestoreSuite) TestCompareChecksumFailure() { WithArgs("10m"). WillReturnResult(sqlmock.NewResult(2, 1)) mock.ExpectClose() - + mock.ExpectClose() ctx := MockDoChecksumCtx(db) remoteChecksum, err := DoChecksum(ctx, s.tr.tableInfo) require.NoError(s.T(), err) diff --git a/br/pkg/lightning/importer/tidb_test.go b/br/pkg/lightning/importer/tidb_test.go index 64813d669ec7b..4c0c33e6efc1b 100644 --- a/br/pkg/lightning/importer/tidb_test.go +++ b/br/pkg/lightning/importer/tidb_test.go @@ -337,6 +337,7 @@ func TestObtainRowFormatVersionSucceed(t *testing.T) { sysVars := ObtainImportantVariables(ctx, s.db, true) require.Equal(t, map[string]string{ + "tidb_backoff_weight": "6", "tidb_row_format_version": "2", "max_allowed_packet": "1073741824", "div_precision_increment": "10", @@ -360,6 +361,7 @@ func TestObtainRowFormatVersionFailure(t *testing.T) { sysVars := ObtainImportantVariables(ctx, s.db, true) require.Equal(t, map[string]string{ + "tidb_backoff_weight": "6", "tidb_row_format_version": "1", "max_allowed_packet": "67108864", "div_precision_increment": "4", diff --git a/br/tests/lightning_add_index/config1.toml b/br/tests/lightning_add_index/config1.toml index 2391884fb6a56..36b03d49a1117 100644 --- a/br/tests/lightning_add_index/config1.toml +++ b/br/tests/lightning_add_index/config1.toml @@ -1,3 +1,6 @@ [tikv-importer] backend = 'local' add-index-by-sql = false + +[post-restore] +checksum-via-sql = false \ No newline at end of file diff --git a/disttask/framework/dispatcher/dispatcher.go b/disttask/framework/dispatcher/dispatcher.go index fb35aad243f2f..248b797c9b913 100644 --- a/disttask/framework/dispatcher/dispatcher.go +++ b/disttask/framework/dispatcher/dispatcher.go @@ -502,6 +502,10 @@ func (d *dispatcher) WithNewSession(fn func(se sessionctx.Context) error) error return d.taskMgr.WithNewSession(fn) } +func (d *dispatcher) WithNewTxn(ctx context.Context, fn func(se sessionctx.Context) error) error { + return d.taskMgr.WithNewTxn(ctx, fn) +} + func (*dispatcher) checkConcurrencyOverflow(cnt int) bool { if cnt >= DefaultDispatchConcurrency { logutil.BgLogger().Info("dispatch task loop, running GTask cnt is more than concurrency", diff --git a/disttask/framework/storage/task_table.go b/disttask/framework/storage/task_table.go index b0534a0149d88..f394e99a7a540 100644 --- a/disttask/framework/storage/task_table.go +++ b/disttask/framework/storage/task_table.go @@ -40,6 +40,8 @@ import ( type SessionExecutor interface { // WithNewSession executes the function with a new session. WithNewSession(fn func(se sessionctx.Context) error) error + // WithNewTxn executes the fn in a new transaction. + WithNewTxn(ctx context.Context, fn func(se sessionctx.Context) error) error } // TaskManager is the manager of global/sub task. @@ -80,9 +82,9 @@ func SetTaskManager(is *TaskManager) { taskManagerInstance.Store(is) } -// execSQL executes the sql and returns the result. +// ExecSQL executes the sql and returns the result. // TODO: consider retry. -func execSQL(ctx context.Context, se sessionctx.Context, sql string, args ...interface{}) ([]chunk.Row, error) { +func ExecSQL(ctx context.Context, se sessionctx.Context, sql string, args ...interface{}) ([]chunk.Row, error) { rs, err := se.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql, args...) if err != nil { return nil, err @@ -124,9 +126,10 @@ func (stm *TaskManager) WithNewSession(fn func(se sessionctx.Context) error) err } // WithNewTxn executes the fn in a new transaction. -func (stm *TaskManager) WithNewTxn(fn func(se sessionctx.Context) error) error { +func (stm *TaskManager) WithNewTxn(ctx context.Context, fn func(se sessionctx.Context) error) error { + ctx = util.WithInternalSourceType(ctx, kv.InternalDistTask) return stm.WithNewSession(func(se sessionctx.Context) (err error) { - _, err = execSQL(stm.ctx, se, "begin") + _, err = ExecSQL(ctx, se, "begin") if err != nil { return err } @@ -137,7 +140,7 @@ func (stm *TaskManager) WithNewTxn(fn func(se sessionctx.Context) error) error { if success { sql = "commit" } - _, commitErr := execSQL(stm.ctx, se, sql) + _, commitErr := ExecSQL(ctx, se, sql) if err == nil && commitErr != nil { err = commitErr } @@ -154,7 +157,7 @@ func (stm *TaskManager) WithNewTxn(fn func(se sessionctx.Context) error) error { func (stm *TaskManager) executeSQLWithNewSession(ctx context.Context, sql string, args ...interface{}) (rs []chunk.Row, err error) { err = stm.WithNewSession(func(se sessionctx.Context) error { - rs, err = execSQL(ctx, se, sql, args...) + rs, err = ExecSQL(ctx, se, sql, args...) return err }) @@ -177,7 +180,7 @@ func (stm *TaskManager) AddNewGlobalTask(key, tp string, concurrency int, meta [ // AddGlobalTaskWithSession adds a new task to global task table with session. func (stm *TaskManager) AddGlobalTaskWithSession(se sessionctx.Context, key, tp string, concurrency int, meta []byte) (taskID int64, err error) { - _, err = execSQL(stm.ctx, se, + _, err = ExecSQL(stm.ctx, se, `insert into mysql.tidb_global_task(task_key, type, state, concurrency, step, meta, state_update_time) values (%?, %?, %?, %?, %?, %?, %?)`, key, tp, proto.TaskStatePending, concurrency, proto.StepInit, meta, time.Now().UTC().String()) @@ -185,7 +188,7 @@ func (stm *TaskManager) AddGlobalTaskWithSession(se sessionctx.Context, key, tp return 0, err } - rs, err := execSQL(stm.ctx, se, "select @@last_insert_id") + rs, err := ExecSQL(stm.ctx, se, "select @@last_insert_id") if err != nil { return 0, err } @@ -415,8 +418,8 @@ func (stm *TaskManager) GetSchedulerIDsByTaskID(taskID int64) ([]string, error) // UpdateGlobalTaskAndAddSubTasks update the global task and add new subtasks func (stm *TaskManager) UpdateGlobalTaskAndAddSubTasks(gTask *proto.Task, subtasks []*proto.Subtask, isSubtaskRevert bool) error { - return stm.WithNewTxn(func(se sessionctx.Context) error { - _, err := execSQL(stm.ctx, se, "update mysql.tidb_global_task set state = %?, dispatcher_id = %?, step = %?, state_update_time = %?, concurrency = %?, meta = %?, error = %? where id = %?", + return stm.WithNewTxn(stm.ctx, func(se sessionctx.Context) error { + _, err := ExecSQL(stm.ctx, se, "update mysql.tidb_global_task set state = %?, dispatcher_id = %?, step = %?, state_update_time = %?, concurrency = %?, meta = %?, error = %? where id = %?", gTask.State, gTask.DispatcherID, gTask.Step, gTask.StateUpdateTime.UTC().String(), gTask.Concurrency, gTask.Meta, gTask.Error, gTask.ID) if err != nil { return err @@ -435,7 +438,7 @@ func (stm *TaskManager) UpdateGlobalTaskAndAddSubTasks(gTask *proto.Task, subtas for _, subtask := range subtasks { // TODO: insert subtasks in batch - _, err = execSQL(stm.ctx, se, "insert into mysql.tidb_background_subtask(step, task_key, exec_id, meta, state, type, checkpoint) values (%?, %?, %?, %?, %?, %?, %?)", + _, err = ExecSQL(stm.ctx, se, "insert into mysql.tidb_background_subtask(step, task_key, exec_id, meta, state, type, checkpoint) values (%?, %?, %?, %?, %?, %?, %?)", gTask.Step, gTask.ID, subtask.SchedulerID, subtask.Meta, subtaskState, proto.Type2Int(subtask.Type), []byte{}) if err != nil { return err @@ -456,7 +459,7 @@ func (stm *TaskManager) CancelGlobalTask(taskID int64) error { // CancelGlobalTaskByKeySession cancels global task by key using input session func (stm *TaskManager) CancelGlobalTaskByKeySession(se sessionctx.Context, taskKey string) error { - _, err := execSQL(stm.ctx, se, "update mysql.tidb_global_task set state=%? where task_key=%? and state in (%?, %?)", + _, err := ExecSQL(stm.ctx, se, "update mysql.tidb_global_task set state=%? where task_key=%? and state in (%?, %?)", proto.TaskStateCancelling, taskKey, proto.TaskStatePending, proto.TaskStateRunning) return err } diff --git a/disttask/importinto/BUILD.bazel b/disttask/importinto/BUILD.bazel index 5bfeecdbdd836..eabe0b7ecc10a 100644 --- a/disttask/importinto/BUILD.bazel +++ b/disttask/importinto/BUILD.bazel @@ -15,6 +15,7 @@ go_library( deps = [ "//br/pkg/lightning/backend", "//br/pkg/lightning/backend/kv", + "//br/pkg/lightning/backend/local", "//br/pkg/lightning/checkpoints", "//br/pkg/lightning/common", "//br/pkg/lightning/config", @@ -30,18 +31,22 @@ go_library( "//errno", "//executor/asyncloaddata", "//executor/importer", + "//kv", "//parser/ast", "//parser/mysql", "//sessionctx", + "//sessionctx/variable", "//table/tables", "//util/dbterror/exeerrors", "//util/etcd", "//util/logutil", + "//util/mathutil", "//util/sqlexec", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_google_uuid//:uuid", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_tikv_client_go_v2//util", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", ], @@ -50,16 +55,26 @@ go_library( go_test( name = "importinto_test", timeout = "short", - srcs = ["dispatcher_test.go"], + srcs = [ + "dispatcher_test.go", + "subtask_executor_test.go", + ], embed = [":importinto"], flaky = True, race = "on", deps = [ + "//br/pkg/lightning/verification", "//disttask/framework/proto", + "//disttask/framework/storage", "//domain/infosync", "//executor/importer", + "//parser/model", + "//testkit", + "//util/logutil", + "@com_github_ngaut_pools//:pools", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", "@com_github_stretchr_testify//suite", + "@com_github_tikv_client_go_v2//util", ], ) diff --git a/disttask/importinto/dispatcher.go b/disttask/importinto/dispatcher.go index dd834a87c84b0..3f4d6822bc55d 100644 --- a/disttask/importinto/dispatcher.go +++ b/disttask/importinto/dispatcher.go @@ -365,43 +365,6 @@ func preProcess(_ context.Context, _ dispatcher.TaskHandle, gTask *proto.Task, t return updateMeta(gTask, taskMeta) } -// postProcess does the post-processing for the task. -func postProcess(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) (err error) { - failpoint.Inject("syncBeforePostProcess", func() { - TestSyncChan <- struct{}{} - <-TestSyncChan - }) - // TODO: create table indexes depends on the option. - // globalTaskManager, err := storage.GetTaskManager() - // if err != nil { - // return err - // } - // create table indexes even if the post process is failed. - // defer func() { - // err2 := createTableIndexes(ctx, globalTaskManager, taskMeta, logger) - // err = multierr.Append(err, err2) - // }() - - controller, err := buildController(taskMeta) - if err != nil { - return err - } - // no need and should not call controller.InitDataFiles, files might not exist on this instance. - - logger.Info("post process") - - return verifyChecksum(ctx, controller, subtaskMeta.Checksum, logger) -} - -func verifyChecksum(ctx context.Context, controller *importer.LoadDataController, checksum Checksum, logger *zap.Logger) error { - if controller.Checksum == config.OpLevelOff { - return nil - } - localChecksum := verify.MakeKVChecksum(checksum.Size, checksum.KVs, checksum.Sum) - logger.Info("local checksum", zap.Object("checksum", &localChecksum)) - return controller.VerifyChecksum(ctx, localChecksum) -} - // nolint:deadcode func dropTableIndexes(ctx context.Context, handle dispatcher.TaskHandle, taskMeta *TaskMeta, logger *zap.Logger) error { tblInfo := taskMeta.Plan.TableInfo diff --git a/disttask/importinto/job.go b/disttask/importinto/job.go index c63aaa865c3b1..64b61048d8c88 100644 --- a/disttask/importinto/job.go +++ b/disttask/importinto/job.go @@ -155,7 +155,7 @@ func (ti *DistImporter) SubmitTask(ctx context.Context) (int64, *proto.Task, err var jobID, taskID int64 plan := ti.plan - if err = globalTaskManager.WithNewTxn(func(se sessionctx.Context) error { + if err = globalTaskManager.WithNewTxn(ctx, func(se sessionctx.Context) error { var err2 error exec := se.(sqlexec.SQLExecutor) // If 2 client try to execute IMPORT INTO concurrently, there's chance that both of them will pass the check. diff --git a/disttask/importinto/subtask_executor.go b/disttask/importinto/subtask_executor.go index 2b56683124b4b..be6de9a75d0c0 100644 --- a/disttask/importinto/subtask_executor.go +++ b/disttask/importinto/subtask_executor.go @@ -16,14 +16,25 @@ package importinto import ( "context" + "strconv" "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/br/pkg/lightning/backend/local" + "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/lightning/config" + verify "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/disttask/framework/proto" "github.com/pingcap/tidb/disttask/framework/scheduler" + "github.com/pingcap/tidb/disttask/framework/storage" "github.com/pingcap/tidb/executor/importer" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/mathutil" + "github.com/tikv/client-go/v2/util" "go.uber.org/zap" ) @@ -73,6 +84,139 @@ func (e *postProcessMinimalTaskExecutor) Run(ctx context.Context) error { return postProcess(ctx, mTask.taskMeta, &mTask.meta, mTask.logger) } +// postProcess does the post-processing for the task. +func postProcess(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) (err error) { + failpoint.Inject("syncBeforePostProcess", func() { + TestSyncChan <- struct{}{} + <-TestSyncChan + }) + + logger.Info("post process") + + // TODO: create table indexes depends on the option. + // create table indexes even if the post process is failed. + // defer func() { + // err2 := createTableIndexes(ctx, globalTaskManager, taskMeta, logger) + // err = multierr.Append(err, err2) + // }() + + return verifyChecksum(ctx, taskMeta, subtaskMeta, logger) +} + +func verifyChecksum(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) error { + if taskMeta.Plan.Checksum == config.OpLevelOff { + return nil + } + localChecksum := verify.MakeKVChecksum(subtaskMeta.Checksum.Size, subtaskMeta.Checksum.KVs, subtaskMeta.Checksum.Sum) + logger.Info("local checksum", zap.Object("checksum", &localChecksum)) + + failpoint.Inject("waitCtxDone", func() { + <-ctx.Done() + }) + + globalTaskManager, err := storage.GetTaskManager() + if err != nil { + return err + } + remoteChecksum, err := checksumTable(ctx, globalTaskManager, taskMeta, logger) + if err != nil { + return err + } + if !remoteChecksum.IsEqual(&localChecksum) { + err2 := common.ErrChecksumMismatch.GenWithStackByArgs( + remoteChecksum.Checksum, localChecksum.Sum(), + remoteChecksum.TotalKVs, localChecksum.SumKVS(), + remoteChecksum.TotalBytes, localChecksum.SumSize(), + ) + if taskMeta.Plan.Checksum == config.OpLevelOptional { + logger.Warn("verify checksum failed, but checksum is optional, will skip it", zap.Error(err2)) + err2 = nil + } + return err2 + } + logger.Info("checksum pass", zap.Object("local", &localChecksum)) + return nil +} + +func checksumTable(ctx context.Context, executor storage.SessionExecutor, taskMeta *TaskMeta, logger *zap.Logger) (*local.RemoteChecksum, error) { + var ( + tableName = common.UniqueTable(taskMeta.Plan.DBName, taskMeta.Plan.TableInfo.Name.L) + sql = "ADMIN CHECKSUM TABLE " + tableName + maxErrorRetryCount = 3 + distSQLScanConcurrencyFactor = 1 + remoteChecksum *local.RemoteChecksum + txnErr error + ) + + ctx = util.WithInternalSourceType(ctx, kv.InternalImportInto) + for i := 0; i < maxErrorRetryCount; i++ { + txnErr = executor.WithNewTxn(ctx, func(se sessionctx.Context) error { + // increase backoff weight + if err := setBackoffWeight(se, taskMeta, logger); err != nil { + logger.Warn("set tidb_backoff_weight failed", zap.Error(err)) + } + + distSQLScanConcurrency := se.GetSessionVars().DistSQLScanConcurrency() + se.GetSessionVars().SetDistSQLScanConcurrency(mathutil.Max(distSQLScanConcurrency/distSQLScanConcurrencyFactor, local.MinDistSQLScanConcurrency)) + defer func() { + se.GetSessionVars().SetDistSQLScanConcurrency(distSQLScanConcurrency) + }() + + rs, err := storage.ExecSQL(ctx, se, sql) + if err != nil { + return err + } + if len(rs) < 1 { + return errors.New("empty checksum result") + } + + failpoint.Inject("errWhenChecksum", func() { + if i == 0 { + failpoint.Return(errors.New("occur an error when checksum, coprocessor task terminated due to exceeding the deadline")) + } + }) + + // ADMIN CHECKSUM TABLE .
example. + // mysql> admin checksum table test.t; + // +---------+------------+---------------------+-----------+-------------+ + // | Db_name | Table_name | Checksum_crc64_xor | Total_kvs | Total_bytes | + // +---------+------------+---------------------+-----------+-------------+ + // | test | t | 8520875019404689597 | 7296873 | 357601387 | + // +---------+------------+------------- + remoteChecksum = &local.RemoteChecksum{ + Schema: rs[0].GetString(0), + Table: rs[0].GetString(1), + Checksum: rs[0].GetUint64(2), + TotalKVs: rs[0].GetUint64(3), + TotalBytes: rs[0].GetUint64(4), + } + return nil + }) + if !common.IsRetryableError(txnErr) { + break + } + distSQLScanConcurrencyFactor *= 2 + logger.Warn("retry checksum table", zap.Int("retry count", i+1), zap.Error(txnErr)) + } + return remoteChecksum, txnErr +} + +// TestChecksumTable is used to test checksum table in unit test. +func TestChecksumTable(ctx context.Context, executor storage.SessionExecutor, taskMeta *TaskMeta, logger *zap.Logger) (*local.RemoteChecksum, error) { + return checksumTable(ctx, executor, taskMeta, logger) +} + +func setBackoffWeight(se sessionctx.Context, taskMeta *TaskMeta, logger *zap.Logger) error { + backoffWeight := local.DefaultBackoffWeight + if val, ok := taskMeta.Plan.ImportantSysVars[variable.TiDBBackOffWeight]; ok { + if weight, err := strconv.Atoi(val); err == nil && weight > backoffWeight { + backoffWeight = weight + } + } + logger.Info("set backoff weight", zap.Int("weight", backoffWeight)) + return se.GetSessionVars().SetSystemVar(variable.TiDBBackOffWeight, strconv.Itoa(backoffWeight)) +} + func init() { scheduler.RegisterSubtaskExectorConstructor(proto.ImportInto, StepImport, // The order of the subtask executors is the same as the order of the subtasks. diff --git a/disttask/importinto/subtask_executor_test.go b/disttask/importinto/subtask_executor_test.go new file mode 100644 index 0000000000000..4596ffc795aa2 --- /dev/null +++ b/disttask/importinto/subtask_executor_test.go @@ -0,0 +1,73 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importinto_test + +import ( + "context" + "testing" + "time" + + "github.com/ngaut/pools" + "github.com/pingcap/failpoint" + verify "github.com/pingcap/tidb/br/pkg/lightning/verification" + "github.com/pingcap/tidb/disttask/framework/storage" + "github.com/pingcap/tidb/disttask/importinto" + "github.com/pingcap/tidb/executor/importer" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/util/logutil" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/util" +) + +func TestChecksumTable(t *testing.T) { + ctx := context.Background() + store := testkit.CreateMockStore(t) + gtk := testkit.NewTestKit(t, store) + pool := pools.NewResourcePool(func() (pools.Resource, error) { + return gtk.Session(), nil + }, 1, 1, time.Second) + defer pool.Close() + mgr := storage.NewTaskManager(util.WithInternalSourceType(ctx, "taskManager"), pool) + + taskMeta := &importinto.TaskMeta{ + Plan: importer.Plan{ + DBName: "db", + TableInfo: &model.TableInfo{ + Name: model.NewCIStr("tb"), + }, + }, + } + // fake result + localChecksum := verify.MakeKVChecksum(1, 1, 1) + gtk.MustExec("create database db") + gtk.MustExec("create table db.tb(id int)") + gtk.MustExec("insert into db.tb values(1)") + remoteChecksum, err := importinto.TestChecksumTable(ctx, mgr, taskMeta, logutil.BgLogger()) + require.NoError(t, err) + require.True(t, remoteChecksum.IsEqual(&localChecksum)) + // again + remoteChecksum, err = importinto.TestChecksumTable(ctx, mgr, taskMeta, logutil.BgLogger()) + require.NoError(t, err) + require.True(t, remoteChecksum.IsEqual(&localChecksum)) + + _ = failpoint.Enable("github.com/pingcap/tidb/disttask/importinto/errWhenChecksum", `return(true)`) + defer func() { + _ = failpoint.Disable("github.com/pingcap/tidb/disttask/importinto/errWhenChecksum") + }() + remoteChecksum, err = importinto.TestChecksumTable(ctx, mgr, taskMeta, logutil.BgLogger()) + require.NoError(t, err) + require.True(t, remoteChecksum.IsEqual(&localChecksum)) +} diff --git a/executor/import_into.go b/executor/import_into.go index 548cf26082d3d..92f16fb13f611 100644 --- a/executor/import_into.go +++ b/executor/import_into.go @@ -292,7 +292,7 @@ func cancelImportJob(ctx context.Context, manager *fstorage.TaskManager, jobID i // todo: after CANCEL, user can see the job status is Canceled immediately, but the job might still running. // and the state of framework task might became finished since framework don't force state change DAG when update task. // todo: add a CANCELLING status? - return manager.WithNewTxn(func(se sessionctx.Context) error { + return manager.WithNewTxn(ctx, func(se sessionctx.Context) error { exec := se.(sqlexec.SQLExecutor) if err2 := importer.CancelJob(ctx, exec, jobID); err2 != nil { return err2 diff --git a/executor/importer/BUILD.bazel b/executor/importer/BUILD.bazel index 200dcf9004bd4..2cb8288221492 100644 --- a/executor/importer/BUILD.bazel +++ b/executor/importer/BUILD.bazel @@ -64,7 +64,6 @@ go_library( "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//util", - "@com_github_tikv_pd_client//:client", "@org_golang_x_exp//slices", "@org_golang_x_sync//errgroup", "@org_uber_go_multierr//:multierr", diff --git a/executor/importer/table_import.go b/executor/importer/table_import.go index b8ea013c67439..086e648328c3c 100644 --- a/executor/importer/table_import.go +++ b/executor/importer/table_import.go @@ -28,7 +28,6 @@ import ( "github.com/docker/go-units" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" @@ -38,7 +37,6 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/mydump" - verify "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/br/pkg/storage" tidb "github.com/pingcap/tidb/config" tidbkv "github.com/pingcap/tidb/kv" @@ -46,7 +44,6 @@ import ( "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/syncutil" - pd "github.com/tikv/pd/client" "go.uber.org/multierr" "go.uber.org/zap" ) @@ -272,66 +269,6 @@ func (ti *TableImporter) getKVEncoder(chunk *checkpoints.ChunkCheckpoint) (kvEnc return newTableKVEncoder(cfg, ti) } -// VerifyChecksum verify the checksum of the table. -func (e *LoadDataController) VerifyChecksum(ctx context.Context, localChecksum verify.KVChecksum) (err error) { - task := log.BeginTask(e.logger, "verify checksum") - defer func() { - task.End(zap.ErrorLevel, err) - }() - tidbCfg := tidb.GetGlobalConfig() - hostPort := net.JoinHostPort("127.0.0.1", strconv.Itoa(int(tidbCfg.Status.StatusPort))) - tls, err2 := common.NewTLS( - tidbCfg.Security.ClusterSSLCA, - tidbCfg.Security.ClusterSSLCert, - tidbCfg.Security.ClusterSSLKey, - hostPort, - nil, nil, nil, - ) - if err2 != nil { - return errors.Trace(err2) - } - - // no need to close kvStore, since it's a cached store. - kvStore, err2 := getCachedKVStoreFrom(tidbCfg.Path, tls) - if err2 != nil { - return errors.Trace(err2) - } - // if context cancelled before this line, it returns "[pd] failed to get cluster id", not context.Canceled. - pdCli, err2 := pd.NewClientWithContext(ctx, []string{tidbCfg.Path}, tls.ToPDSecurityOption()) - if err2 != nil { - return errors.Trace(err2) - } - defer pdCli.Close() - - failpoint.Inject("waitCtxDone", func() { - <-ctx.Done() - }) - tableInfo := &checkpoints.TidbTableInfo{ - ID: e.Table.Meta().ID, - Name: e.Table.Meta().Name.O, - Core: e.Table.Meta(), - } - manager := local.NewTiKVChecksumManager(kvStore.GetClient(), pdCli, uint(e.DistSQLScanConcurrency)) - remoteChecksum, err2 := manager.Checksum(ctx, tableInfo) - if err2 != nil { - return err2 - } - if !remoteChecksum.IsEqual(&localChecksum) { - err3 := common.ErrChecksumMismatch.GenWithStackByArgs( - remoteChecksum.Checksum, localChecksum.Sum(), - remoteChecksum.TotalKVs, localChecksum.SumKVS(), - remoteChecksum.TotalBytes, localChecksum.SumSize(), - ) - if e.Checksum == config.OpLevelOptional { - e.logger.Warn("verify checksum failed, but checksum is optional, will skip it", log.ShortError(err3)) - err3 = nil - } - return err3 - } - e.logger.Info("checksum pass", zap.Object("local", &localChecksum)) - return nil -} - // PopulateChunks populates chunks from table regions. // in dist framework, this should be done in the tidb node which is responsible for splitting job into subtasks // then table-importer handles data belongs to the subtask. diff --git a/tests/realtikvtest/importintotest/job_test.go b/tests/realtikvtest/importintotest/job_test.go index 8645114010fe3..82397c946fa8e 100644 --- a/tests/realtikvtest/importintotest/job_test.go +++ b/tests/realtikvtest/importintotest/job_test.go @@ -465,7 +465,7 @@ func (s *mockGCSSuite) TestCancelJob() { s.NoError(failpoint.Disable("github.com/pingcap/tidb/disttask/importinto/waitBeforeSortChunk")) s.NoError(failpoint.Disable("github.com/pingcap/tidb/disttask/importinto/syncAfterJobStarted")) s.enableFailpoint("github.com/pingcap/tidb/disttask/importinto/syncBeforePostProcess", "return(true)") - s.enableFailpoint("github.com/pingcap/tidb/executor/importer/waitCtxDone", "return(true)") + s.enableFailpoint("github.com/pingcap/tidb/disttask/importinto/waitCtxDone", "return(true)") result2 := s.tk.MustQuery(fmt.Sprintf(`import into t2 FROM 'gs://test_cancel_job/t.csv?endpoint=%s' with detached`, gcsEndpoint)).Rows() s.Len(result2, 1)