From 19fc8506f78ff8561a3f3f643b57a6f7a0b56b64 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 13 Jul 2023 14:44:14 +0800 Subject: [PATCH] This is an automated cherry-pick of #45330 Signed-off-by: ti-chi-bot --- br/pkg/lightning/backend/local/BUILD.bazel | 172 +++++++++++++++++++++ br/pkg/lightning/restore/checksum.go | 32 +++- br/pkg/lightning/restore/checksum_test.go | 21 +++ 3 files changed, 222 insertions(+), 3 deletions(-) create mode 100644 br/pkg/lightning/backend/local/BUILD.bazel diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel new file mode 100644 index 0000000000000..af57eeb6f7003 --- /dev/null +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -0,0 +1,172 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "local", + srcs = [ + "checksum.go", + "compress.go", + "disk_quota.go", + "duplicate.go", + "engine.go", + "iterator.go", + "key_adapter.go", + "local.go", + "local_freebsd.go", + "local_unix.go", + "local_unix_generic.go", + "local_windows.go", + "localhelper.go", + "region_job.go", + ], + importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/local", + visibility = ["//visibility:public"], + deps = [ + "//br/pkg/checksum", + "//br/pkg/errors", + "//br/pkg/lightning/backend", + "//br/pkg/lightning/backend/encode", + "//br/pkg/lightning/backend/kv", + "//br/pkg/lightning/checkpoints", + "//br/pkg/lightning/common", + "//br/pkg/lightning/config", + "//br/pkg/lightning/errormanager", + "//br/pkg/lightning/log", + "//br/pkg/lightning/manual", + "//br/pkg/lightning/metric", + "//br/pkg/lightning/mydump", + "//br/pkg/lightning/tikv", + "//br/pkg/lightning/verification", + "//br/pkg/logutil", + "//br/pkg/membuf", + "//br/pkg/pdutil", + "//br/pkg/restore/split", + "//br/pkg/utils", + "//br/pkg/version", + "//distsql", + "//infoschema", + "//kv", + "//parser/model", + "//parser/mysql", + "//sessionctx/variable", + "//store/pdtypes", + "//table", + "//tablecodec", + "//types", + "//util/codec", + "//util/engine", + "//util/hack", + "//util/mathutil", + "//util/ranger", + "@com_github_cockroachdb_pebble//:pebble", + "@com_github_cockroachdb_pebble//sstable", + "@com_github_coreos_go_semver//semver", + "@com_github_docker_go_units//:go-units", + "@com_github_google_btree//:btree", + "@com_github_google_uuid//:uuid", + "@com_github_klauspost_compress//gzip", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/errorpb", + "@com_github_pingcap_kvproto//pkg/import_sstpb", + "@com_github_pingcap_kvproto//pkg/kvrpcpb", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_pingcap_kvproto//pkg/pdpb", + "@com_github_pingcap_tipb//go-tipb", + "@com_github_tikv_client_go_v2//error", + "@com_github_tikv_client_go_v2//kv", + "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_pd_client//:client", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//backoff", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//credentials", + "@org_golang_google_grpc//credentials/insecure", + "@org_golang_google_grpc//keepalive", + "@org_golang_google_grpc//status", + "@org_golang_x_exp//slices", + "@org_golang_x_sync//errgroup", + "@org_golang_x_time//rate", + "@org_uber_go_atomic//:atomic", + "@org_uber_go_multierr//:multierr", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "local_test", + timeout = "short", + srcs = [ + "checksum_test.go", + "compress_test.go", + "disk_quota_test.go", + "duplicate_test.go", + "engine_test.go", + "iterator_test.go", + "key_adapter_test.go", + "local_check_test.go", + "local_test.go", + "localhelper_test.go", + "region_job_test.go", + ], + embed = [":local"], + flaky = True, + race = "on", + shard_count = 50, + deps = [ + "//br/pkg/lightning/backend", + "//br/pkg/lightning/backend/encode", + "//br/pkg/lightning/backend/kv", + "//br/pkg/lightning/checkpoints", + "//br/pkg/lightning/common", + "//br/pkg/lightning/config", + "//br/pkg/lightning/log", + "//br/pkg/lightning/mydump", + "//br/pkg/membuf", + "//br/pkg/mock/mocklocal", + "//br/pkg/pdutil", + "//br/pkg/restore/split", + "//br/pkg/utils", + "//ddl", + "//errno", + "//keyspace", + "//kv", + "//parser", + "//parser/ast", + "//parser/model", + "//parser/mysql", + "//sessionctx/stmtctx", + "//store/pdtypes", + "//table/tables", + "//tablecodec", + "//types", + "//util", + "//util/codec", + "//util/engine", + "//util/hack", + "//util/mock", + "@com_github_cockroachdb_pebble//:pebble", + "@com_github_cockroachdb_pebble//sstable", + "@com_github_coreos_go_semver//semver", + "@com_github_data_dog_go_sqlmock//:go-sqlmock", + "@com_github_docker_go_units//:go-units", + "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_golang_mock//gomock", + "@com_github_google_uuid//:uuid", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/errorpb", + "@com_github_pingcap_kvproto//pkg/import_sstpb", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_pingcap_kvproto//pkg/pdpb", + "@com_github_pingcap_tipb//go-tipb", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//errs", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", + "@org_uber_go_atomic//:atomic", + ], +) diff --git a/br/pkg/lightning/restore/checksum.go b/br/pkg/lightning/restore/checksum.go index 1271f84f44fb1..7bb837377a4f4 100644 --- a/br/pkg/lightning/restore/checksum.go +++ b/br/pkg/lightning/restore/checksum.go @@ -367,11 +367,37 @@ func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo return nil, err } +<<<<<<< HEAD:br/pkg/lightning/restore/checksum.go func (e *tikvChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) { +======= +var retryGetTSInterval = time.Second + +// Checksum implements the ChecksumManager interface. +func (e *TiKVChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) { +>>>>>>> 04f6570f1a7 (lightning: retry for leader change error when GetTS (#44478) (#44856) (#45330)):br/pkg/lightning/backend/local/checksum.go tbl := common.UniqueTable(tableInfo.DB, tableInfo.Name) - physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx) - if err != nil { - return nil, errors.Annotate(err, "fetch tso from pd failed") + var ( + physicalTS, logicalTS int64 + err error + retryTime int + ) + physicalTS, logicalTS, err = e.manager.pdClient.GetTS(ctx) + for err != nil { + if !pd.IsLeaderChange(errors.Cause(err)) { + return nil, errors.Annotate(err, "fetch tso from pd failed") + } + retryTime++ + if retryTime%60 == 0 { + log.FromContext(ctx).Warn("fetch tso from pd failed and retrying", + zap.Int("retryTime", retryTime), + zap.Error(err)) + } + select { + case <-ctx.Done(): + err = ctx.Err() + case <-time.After(retryGetTSInterval): + physicalTS, logicalTS, err = e.manager.pdClient.GetTS(ctx) + } } ts := oracle.ComposeTS(physicalTS, logicalTS) if err := e.manager.addOneJob(ctx, tbl, ts); err != nil { diff --git a/br/pkg/lightning/restore/checksum_test.go b/br/pkg/lightning/restore/checksum_test.go index 45a7b8b0ce06a..ab38595368b91 100644 --- a/br/pkg/lightning/restore/checksum_test.go +++ b/br/pkg/lightning/restore/checksum_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/errs" "go.uber.org/atomic" ) @@ -200,6 +201,18 @@ func TestDoChecksumWithTikv(t *testing.T) { require.Zero(t, checksumExec.manager.currentTS) require.Equal(t, 0, len(checksumExec.manager.tableGCSafeTS)) } + + // test PD leader change error + backup := retryGetTSInterval + retryGetTSInterval = time.Millisecond + t.Cleanup(func() { + retryGetTSInterval = backup + }) + pdClient.leaderChanging = true + kvClient.maxErrCount = 0 + checksumExec := &TiKVChecksumManager{manager: newGCTTLManager(pdClient), client: kvClient} + _, err := checksumExec.Checksum(ctx, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo}) + require.NoError(t, err) } func TestDoChecksumWithErrorAndLongOriginalLifetime(t *testing.T) { @@ -236,6 +249,7 @@ type testPDClient struct { count atomic.Int32 gcSafePoint []safePointTTL logicalTSCounter atomic.Uint64 + leaderChanging bool } func (c *testPDClient) currentSafePoint() uint64 { @@ -251,7 +265,14 @@ func (c *testPDClient) currentSafePoint() uint64 { } func (c *testPDClient) GetTS(ctx context.Context) (int64, int64, error) { +<<<<<<< HEAD:br/pkg/lightning/restore/checksum_test.go physicalTS := time.Now().UnixNano() / 1e6 +======= + physicalTS := time.Now().UnixMilli() + if c.leaderChanging && physicalTS%2 == 0 { + return 0, 0, errors.WithStack(errs.ErrClientTSOStreamClosed) + } +>>>>>>> 04f6570f1a7 (lightning: retry for leader change error when GetTS (#44478) (#44856) (#45330)):br/pkg/lightning/backend/local/checksum_test.go logicalTS := oracle.ExtractLogical(c.logicalTSCounter.Inc()) return physicalTS, logicalTS, nil }