Skip to content

Commit

Permalink
This is an automated cherry-pick of #45330
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
lance6716 authored and ti-chi-bot committed Jul 14, 2023
1 parent 2db988e commit 19fc850
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 3 deletions.
172 changes: 172 additions & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "local",
srcs = [
"checksum.go",
"compress.go",
"disk_quota.go",
"duplicate.go",
"engine.go",
"iterator.go",
"key_adapter.go",
"local.go",
"local_freebsd.go",
"local_unix.go",
"local_unix_generic.go",
"local_windows.go",
"localhelper.go",
"region_job.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/local",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/checksum",
"//br/pkg/errors",
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/checkpoints",
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
"//br/pkg/lightning/errormanager",
"//br/pkg/lightning/log",
"//br/pkg/lightning/manual",
"//br/pkg/lightning/metric",
"//br/pkg/lightning/mydump",
"//br/pkg/lightning/tikv",
"//br/pkg/lightning/verification",
"//br/pkg/logutil",
"//br/pkg/membuf",
"//br/pkg/pdutil",
"//br/pkg/restore/split",
"//br/pkg/utils",
"//br/pkg/version",
"//distsql",
"//infoschema",
"//kv",
"//parser/model",
"//parser/mysql",
"//sessionctx/variable",
"//store/pdtypes",
"//table",
"//tablecodec",
"//types",
"//util/codec",
"//util/engine",
"//util/hack",
"//util/mathutil",
"//util/ranger",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//sstable",
"@com_github_coreos_go_semver//semver",
"@com_github_docker_go_units//:go-units",
"@com_github_google_btree//:btree",
"@com_github_google_uuid//:uuid",
"@com_github_klauspost_compress//gzip",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/pdpb",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//backoff",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//keepalive",
"@org_golang_google_grpc//status",
"@org_golang_x_exp//slices",
"@org_golang_x_sync//errgroup",
"@org_golang_x_time//rate",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "local_test",
timeout = "short",
srcs = [
"checksum_test.go",
"compress_test.go",
"disk_quota_test.go",
"duplicate_test.go",
"engine_test.go",
"iterator_test.go",
"key_adapter_test.go",
"local_check_test.go",
"local_test.go",
"localhelper_test.go",
"region_job_test.go",
],
embed = [":local"],
flaky = True,
race = "on",
shard_count = 50,
deps = [
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/checkpoints",
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
"//br/pkg/lightning/log",
"//br/pkg/lightning/mydump",
"//br/pkg/membuf",
"//br/pkg/mock/mocklocal",
"//br/pkg/pdutil",
"//br/pkg/restore/split",
"//br/pkg/utils",
"//ddl",
"//errno",
"//keyspace",
"//kv",
"//parser",
"//parser/ast",
"//parser/model",
"//parser/mysql",
"//sessionctx/stmtctx",
"//store/pdtypes",
"//table/tables",
"//tablecodec",
"//types",
"//util",
"//util/codec",
"//util/engine",
"//util/hack",
"//util/mock",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//sstable",
"@com_github_coreos_go_semver//semver",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_docker_go_units//:go-units",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_golang_mock//gomock",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/pdpb",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//errs",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_uber_go_atomic//:atomic",
],
)
32 changes: 29 additions & 3 deletions br/pkg/lightning/restore/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,11 +367,37 @@ func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo
return nil, err
}

<<<<<<< HEAD:br/pkg/lightning/restore/checksum.go
func (e *tikvChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) {
=======
var retryGetTSInterval = time.Second

// Checksum implements the ChecksumManager interface.
func (e *TiKVChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) {
>>>>>>> 04f6570f1a7 (lightning: retry for leader change error when GetTS (#44478) (#44856) (#45330)):br/pkg/lightning/backend/local/checksum.go
tbl := common.UniqueTable(tableInfo.DB, tableInfo.Name)
physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx)
if err != nil {
return nil, errors.Annotate(err, "fetch tso from pd failed")
var (
physicalTS, logicalTS int64
err error
retryTime int
)
physicalTS, logicalTS, err = e.manager.pdClient.GetTS(ctx)
for err != nil {
if !pd.IsLeaderChange(errors.Cause(err)) {
return nil, errors.Annotate(err, "fetch tso from pd failed")
}
retryTime++
if retryTime%60 == 0 {
log.FromContext(ctx).Warn("fetch tso from pd failed and retrying",
zap.Int("retryTime", retryTime),
zap.Error(err))
}
select {
case <-ctx.Done():
err = ctx.Err()
case <-time.After(retryGetTSInterval):
physicalTS, logicalTS, err = e.manager.pdClient.GetTS(ctx)
}
}
ts := oracle.ComposeTS(physicalTS, logicalTS)
if err := e.manager.addOneJob(ctx, tbl, ts); err != nil {
Expand Down
21 changes: 21 additions & 0 deletions br/pkg/lightning/restore/checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/errs"
"go.uber.org/atomic"
)

Expand Down Expand Up @@ -200,6 +201,18 @@ func TestDoChecksumWithTikv(t *testing.T) {
require.Zero(t, checksumExec.manager.currentTS)
require.Equal(t, 0, len(checksumExec.manager.tableGCSafeTS))
}

// test PD leader change error
backup := retryGetTSInterval
retryGetTSInterval = time.Millisecond
t.Cleanup(func() {
retryGetTSInterval = backup
})
pdClient.leaderChanging = true
kvClient.maxErrCount = 0
checksumExec := &TiKVChecksumManager{manager: newGCTTLManager(pdClient), client: kvClient}
_, err := checksumExec.Checksum(ctx, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo})
require.NoError(t, err)
}

func TestDoChecksumWithErrorAndLongOriginalLifetime(t *testing.T) {
Expand Down Expand Up @@ -236,6 +249,7 @@ type testPDClient struct {
count atomic.Int32
gcSafePoint []safePointTTL
logicalTSCounter atomic.Uint64
leaderChanging bool
}

func (c *testPDClient) currentSafePoint() uint64 {
Expand All @@ -251,7 +265,14 @@ func (c *testPDClient) currentSafePoint() uint64 {
}

func (c *testPDClient) GetTS(ctx context.Context) (int64, int64, error) {
<<<<<<< HEAD:br/pkg/lightning/restore/checksum_test.go
physicalTS := time.Now().UnixNano() / 1e6
=======
physicalTS := time.Now().UnixMilli()
if c.leaderChanging && physicalTS%2 == 0 {
return 0, 0, errors.WithStack(errs.ErrClientTSOStreamClosed)
}
>>>>>>> 04f6570f1a7 (lightning: retry for leader change error when GetTS (#44478) (#44856) (#45330)):br/pkg/lightning/backend/local/checksum_test.go
logicalTS := oracle.ExtractLogical(c.logicalTSCounter.Inc())
return physicalTS, logicalTS, nil
}
Expand Down

0 comments on commit 19fc850

Please sign in to comment.