diff --git a/br/pkg/lightning/importer/BUILD.bazel b/br/pkg/lightning/importer/BUILD.bazel index ca7cd11a97f7e..ecc31bc3c7354 100644 --- a/br/pkg/lightning/importer/BUILD.bazel +++ b/br/pkg/lightning/importer/BUILD.bazel @@ -91,6 +91,8 @@ go_library( "@com_github_tikv_pd_client//:client", "@io_etcd_go_etcd_client_v3//:client", "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", "@org_golang_x_exp//maps", "@org_golang_x_exp//slices", "@org_golang_x_sync//errgroup", diff --git a/br/pkg/lightning/importer/table_import.go b/br/pkg/lightning/importer/table_import.go index a44b1da35e5b0..0c0b21dfb28d8 100644 --- a/br/pkg/lightning/importer/table_import.go +++ b/br/pkg/lightning/importer/table_import.go @@ -54,6 +54,8 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/exp/slices" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // TableImporter is a helper struct to import a table. @@ -1034,15 +1036,26 @@ func (tr *TableImporter) postProcess( var remoteChecksum *local.RemoteChecksum remoteChecksum, err = DoChecksum(ctx, tr.tableInfo) + failpoint.Inject("checksum-error", func() { + tr.logger.Info("failpoint checksum-error injected.") + remoteChecksum = nil + err = status.Error(codes.Unknown, "Checksum meets error.") + }) if err != nil { - return false, err + if rc.cfg.PostRestore.Checksum != config.OpLevelOptional { + return false, err + } + tr.logger.Warn("do checksum failed, will skip this error and go on", log.ShortError(err)) + err = nil } - err = tr.compareChecksum(remoteChecksum, localChecksum) - // with post restore level 'optional', we will skip checksum error - if rc.cfg.PostRestore.Checksum == config.OpLevelOptional { - if err != nil { - tr.logger.Warn("compare checksum failed, will skip this error and go on", log.ShortError(err)) - err = nil + if remoteChecksum != nil { + err = tr.compareChecksum(remoteChecksum, localChecksum) + // with post restore level 'optional', we will skip checksum error + if rc.cfg.PostRestore.Checksum == config.OpLevelOptional { + if err != nil { + tr.logger.Warn("compare checksum failed, will skip this error and go on", log.ShortError(err)) + err = nil + } } } } else { diff --git a/br/tests/lightning_routes/config.toml b/br/tests/lightning_routes/config.toml index bb54609dd03b1..74913091c5916 100644 --- a/br/tests/lightning_routes/config.toml +++ b/br/tests/lightning_routes/config.toml @@ -8,3 +8,6 @@ schema-pattern = "routes_a*" table-pattern = "t*" target-schema = "routes_b" target-table = "u" + +[post-restore] +checksum = "optional" diff --git a/br/tests/lightning_routes/run.sh b/br/tests/lightning_routes/run.sh index 1db0ce2035021..5ae757eb0bd43 100755 --- a/br/tests/lightning_routes/run.sh +++ b/br/tests/lightning_routes/run.sh @@ -4,12 +4,17 @@ set -eux +echo "testing checksum-error..." +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/importer/checksum-error=1*return()" + run_sql 'DROP DATABASE IF EXISTS routes_a0;' run_sql 'DROP DATABASE IF EXISTS routes_a1;' run_sql 'DROP DATABASE IF EXISTS routes_b;' run_lightning +echo "test checksum-error success!" + run_sql 'SELECT count(1), sum(x) FROM routes_b.u;' check_contains 'count(1): 4' check_contains 'sum(x): 259' diff --git a/disttask/importinto/subtask_executor.go b/disttask/importinto/subtask_executor.go index be6de9a75d0c0..ebf3c37ab26ac 100644 --- a/disttask/importinto/subtask_executor.go +++ b/disttask/importinto/subtask_executor.go @@ -120,21 +120,26 @@ func verifyChecksum(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostPr } remoteChecksum, err := checksumTable(ctx, globalTaskManager, taskMeta, logger) if err != nil { - return err + if taskMeta.Plan.Checksum != config.OpLevelOptional { + return err + } + logger.Warn("checksumTable failed, will skip this error and go on", zap.Error(err)) } - if !remoteChecksum.IsEqual(&localChecksum) { - err2 := common.ErrChecksumMismatch.GenWithStackByArgs( - remoteChecksum.Checksum, localChecksum.Sum(), - remoteChecksum.TotalKVs, localChecksum.SumKVS(), - remoteChecksum.TotalBytes, localChecksum.SumSize(), - ) - if taskMeta.Plan.Checksum == config.OpLevelOptional { - logger.Warn("verify checksum failed, but checksum is optional, will skip it", zap.Error(err2)) - err2 = nil + if remoteChecksum != nil { + if !remoteChecksum.IsEqual(&localChecksum) { + err2 := common.ErrChecksumMismatch.GenWithStackByArgs( + remoteChecksum.Checksum, localChecksum.Sum(), + remoteChecksum.TotalKVs, localChecksum.SumKVS(), + remoteChecksum.TotalBytes, localChecksum.SumSize(), + ) + if taskMeta.Plan.Checksum == config.OpLevelOptional { + logger.Warn("verify checksum failed, but checksum is optional, will skip it", zap.Error(err2)) + err2 = nil + } + return err2 } - return err2 + logger.Info("checksum pass", zap.Object("local", &localChecksum)) } - logger.Info("checksum pass", zap.Object("local", &localChecksum)) return nil }