From 24cea14ded2a814dfbae8cca9cb1f9a7108a3f8e Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 23 Apr 2024 14:25:22 +0800 Subject: [PATCH 1/6] advancer: fix the incorrect gc safepoint behaviours --- br/pkg/streamhelper/advancer_env.go | 15 ++++++++++++++- br/pkg/streamhelper/basic_lib_for_test.go | 4 ++++ br/pkg/streamhelper/regioniter.go | 3 +++ br/pkg/streamhelper/regioniter_test.go | 4 ++++ 4 files changed, 25 insertions(+), 1 deletion(-) diff --git a/br/pkg/streamhelper/advancer_env.go b/br/pkg/streamhelper/advancer_env.go index 7092edb3686ea..045a72e912ff5 100644 --- a/br/pkg/streamhelper/advancer_env.go +++ b/br/pkg/streamhelper/advancer_env.go @@ -6,6 +6,7 @@ import ( "context" "time" + "github.com/pingcap/errors" logbackup "github.com/pingcap/kvproto/pkg/logbackuppb" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/pkg/config" @@ -46,7 +47,19 @@ type PDRegionScanner struct { // Returns the minimal service GC safe point across all services. // If the arguments is `0`, this would remove the service safe point. func (c PDRegionScanner) BlockGCUntil(ctx context.Context, at uint64) (uint64, error) { - return c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, int64(logBackupSafePointTTL.Seconds()), at) + minimalSafePoint, err := c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, int64(logBackupSafePointTTL.Seconds()), at) + if err != nil { + return 0, errors.Annotate(err, "failed to block gc until") + } + if minimalSafePoint > at { + return 0, errors.Errorf("failed to block gc until, minimal safe point %d is greater than the target %d", minimalSafePoint, at) + } + return at, nil +} + +func (c PDRegionScanner) RemoveGCSafepoint(ctx context.Context) error { + _, err := c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, 0, 0) + return err } // TODO: It should be able to synchoronize the current TS with the PD. diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 4d26ebdb14b62..55cfcb8ea7061 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -275,6 +275,10 @@ func (f *fakeCluster) BlockGCUntil(ctx context.Context, at uint64) (uint64, erro return at, nil } +func (f *fakeCluster) RemoveGCSafepoint(ctx context.Context) error { + return nil +} + func (f *fakeCluster) FetchCurrentTS(ctx context.Context) (uint64, error) { return f.currentTS, nil } diff --git a/br/pkg/streamhelper/regioniter.go b/br/pkg/streamhelper/regioniter.go index 80e75f1895b50..b91025136568d 100644 --- a/br/pkg/streamhelper/regioniter.go +++ b/br/pkg/streamhelper/regioniter.go @@ -43,6 +43,9 @@ type TiKVClusterMeta interface { // For now, all tasks (exactly one task in fact) use the same checkpoint. BlockGCUntil(ctx context.Context, at uint64) (uint64, error) + // RemoveGCSafepoint used to remove the service GC safe point in PD. + RemoveGCSafepoint(ctx context.Context) error + FetchCurrentTS(ctx context.Context) (uint64, error) } diff --git a/br/pkg/streamhelper/regioniter_test.go b/br/pkg/streamhelper/regioniter_test.go index 967b24b71f2a4..81fbad7d11f98 100644 --- a/br/pkg/streamhelper/regioniter_test.go +++ b/br/pkg/streamhelper/regioniter_test.go @@ -83,6 +83,10 @@ func (c constantRegions) BlockGCUntil(ctx context.Context, at uint64) (uint64, e return 0, status.Error(codes.Unimplemented, "Unsupported operation") } +func (c constantRegions) RemoveGCSafepoint(ctx context.Context) error { + return nil +} + // TODO: It should be able to synchoronize the current TS with the PD. func (c constantRegions) FetchCurrentTS(ctx context.Context) (uint64, error) { return oracle.ComposeTS(time.Now().UnixMilli(), 0), nil From df1ac7298a6bdc21df06b6f41d340b2eff22f7b3 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 23 Apr 2024 14:27:20 +0800 Subject: [PATCH 2/6] update --- br/pkg/streamhelper/advancer_env.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/br/pkg/streamhelper/advancer_env.go b/br/pkg/streamhelper/advancer_env.go index 045a72e912ff5..2d2816e3f9d1e 100644 --- a/br/pkg/streamhelper/advancer_env.go +++ b/br/pkg/streamhelper/advancer_env.go @@ -4,6 +4,7 @@ package streamhelper import ( "context" + "math" "time" "github.com/pingcap/errors" @@ -58,7 +59,7 @@ func (c PDRegionScanner) BlockGCUntil(ctx context.Context, at uint64) (uint64, e } func (c PDRegionScanner) RemoveGCSafepoint(ctx context.Context) error { - _, err := c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, 0, 0) + _, err := c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, 0, math.MaxUint64) return err } From f4eefd863faf337514d1e7a28cdcec11abad3565 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 23 Apr 2024 16:44:07 +0800 Subject: [PATCH 3/6] add test --- br/pkg/streamhelper/advancer.go | 2 +- br/pkg/streamhelper/advancer_env.go | 1 + br/pkg/streamhelper/advancer_test.go | 23 ++++++++++++++++++++++- br/pkg/streamhelper/basic_lib_for_test.go | 16 ++++++++-------- br/pkg/streamhelper/regioniter_test.go | 2 +- 5 files changed, 33 insertions(+), 11 deletions(-) diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index aaa4231010dc9..7aa459444ea3f 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -444,7 +444,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil { log.Warn("failed to clear global checkpoint", logutil.ShortError(err)) } - if _, err := c.env.BlockGCUntil(ctx, 0); err != nil { + if err := c.env.RemoveGCSafepoint(ctx); err != nil { log.Warn("failed to remove service GC safepoint", logutil.ShortError(err)) } metrics.LastCheckpoint.DeleteLabelValues(e.Name) diff --git a/br/pkg/streamhelper/advancer_env.go b/br/pkg/streamhelper/advancer_env.go index 2d2816e3f9d1e..2144e1d497039 100644 --- a/br/pkg/streamhelper/advancer_env.go +++ b/br/pkg/streamhelper/advancer_env.go @@ -59,6 +59,7 @@ func (c PDRegionScanner) BlockGCUntil(ctx context.Context, at uint64) (uint64, e } func (c PDRegionScanner) RemoveGCSafepoint(ctx context.Context) error { + // set ttl to 0, means remove the safe point. _, err := c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, 0, math.MaxUint64) return err } diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index ef617a736d132..ea7e5736ec702 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -48,6 +48,27 @@ func TestBasic(t *testing.T) { require.Equal(t, r.Checkpoint, minCheckpoint, "%d %d", r.Checkpoint, minCheckpoint) } +func TestRemoveSafepoint(t *testing.T) { + c := createFakeCluster(t, 4, false) + defer func() { + if t.Failed() { + fmt.Println(c) + } + }() + c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") + ctx := context.Background() + minCheckpoint := c.advanceCheckpoints() + env := &testEnv{fakeCluster: c, testCtx: t} + adv := streamhelper.NewCheckpointAdvancer(env) + coll := streamhelper.NewClusterCollector(ctx, env) + err := adv.GetCheckpointInRange(ctx, []byte{}, []byte{}, coll) + require.NoError(t, err) + r, err := coll.Finish(ctx) + require.NoError(t, err) + require.Len(t, r.FailureSubRanges, 0) + require.Equal(t, r.Checkpoint, minCheckpoint, "%d %d", r.Checkpoint, minCheckpoint) +} + func TestTick(t *testing.T) { c := createFakeCluster(t, 4, false) defer func() { @@ -208,7 +229,7 @@ func TestGCServiceSafePoint(t *testing.T) { req.Eventually(func() bool { env.fakeCluster.mu.Lock() defer env.fakeCluster.mu.Unlock() - return env.serviceGCSafePoint == 0 + return env.serviceGCSafePoint != 0 && env.serviceGCSafePointDeleted }, 3*time.Second, 100*time.Millisecond) } diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 55cfcb8ea7061..b33b163468749 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -100,10 +100,11 @@ type fakeCluster struct { regions []*region testCtx *testing.T - onGetClient func(uint64) error - onClearCache func(uint64) error - serviceGCSafePoint uint64 - currentTS uint64 + onGetClient func(uint64) error + onClearCache func(uint64) error + serviceGCSafePoint uint64 + serviceGCSafePointDeleted bool + currentTS uint64 } func (r *region) splitAt(newID uint64, k string) *region { @@ -264,10 +265,6 @@ func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.Ge func (f *fakeCluster) BlockGCUntil(ctx context.Context, at uint64) (uint64, error) { f.mu.Lock() defer f.mu.Unlock() - if at == 0 { - f.serviceGCSafePoint = at - return at, nil - } if f.serviceGCSafePoint > at { return f.serviceGCSafePoint, nil } @@ -276,6 +273,9 @@ func (f *fakeCluster) BlockGCUntil(ctx context.Context, at uint64) (uint64, erro } func (f *fakeCluster) RemoveGCSafepoint(ctx context.Context) error { + f.mu.Lock() + defer f.mu.Unlock() + f.serviceGCSafePointDeleted = true return nil } diff --git a/br/pkg/streamhelper/regioniter_test.go b/br/pkg/streamhelper/regioniter_test.go index 81fbad7d11f98..b60f2bd03480b 100644 --- a/br/pkg/streamhelper/regioniter_test.go +++ b/br/pkg/streamhelper/regioniter_test.go @@ -84,7 +84,7 @@ func (c constantRegions) BlockGCUntil(ctx context.Context, at uint64) (uint64, e } func (c constantRegions) RemoveGCSafepoint(ctx context.Context) error { - return nil + return status.Error(codes.Unimplemented, "Unsupported operation") } // TODO: It should be able to synchoronize the current TS with the PD. From 13c7d55666593779b3a726028c2dc59b50e59d8d Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 23 Apr 2024 16:54:44 +0800 Subject: [PATCH 4/6] update --- br/pkg/streamhelper/advancer_test.go | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index ea7e5736ec702..f8889c4f4e8a2 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -48,27 +48,6 @@ func TestBasic(t *testing.T) { require.Equal(t, r.Checkpoint, minCheckpoint, "%d %d", r.Checkpoint, minCheckpoint) } -func TestRemoveSafepoint(t *testing.T) { - c := createFakeCluster(t, 4, false) - defer func() { - if t.Failed() { - fmt.Println(c) - } - }() - c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") - ctx := context.Background() - minCheckpoint := c.advanceCheckpoints() - env := &testEnv{fakeCluster: c, testCtx: t} - adv := streamhelper.NewCheckpointAdvancer(env) - coll := streamhelper.NewClusterCollector(ctx, env) - err := adv.GetCheckpointInRange(ctx, []byte{}, []byte{}, coll) - require.NoError(t, err) - r, err := coll.Finish(ctx) - require.NoError(t, err) - require.Len(t, r.FailureSubRanges, 0) - require.Equal(t, r.Checkpoint, minCheckpoint, "%d %d", r.Checkpoint, minCheckpoint) -} - func TestTick(t *testing.T) { c := createFakeCluster(t, 4, false) defer func() { From e5fce65ca63444cffd5759289408c1236a2f9e0b Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 24 Apr 2024 10:22:52 +0800 Subject: [PATCH 5/6] update --- br/pkg/streamhelper/advancer.go | 2 +- br/pkg/streamhelper/advancer_env.go | 2 +- br/pkg/streamhelper/basic_lib_for_test.go | 2 +- br/pkg/streamhelper/regioniter.go | 4 ++-- br/pkg/streamhelper/regioniter_test.go | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index 7aa459444ea3f..0ed0c79b70d01 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -444,7 +444,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil { log.Warn("failed to clear global checkpoint", logutil.ShortError(err)) } - if err := c.env.RemoveGCSafepoint(ctx); err != nil { + if err := c.env.UnblockGC(ctx); err != nil { log.Warn("failed to remove service GC safepoint", logutil.ShortError(err)) } metrics.LastCheckpoint.DeleteLabelValues(e.Name) diff --git a/br/pkg/streamhelper/advancer_env.go b/br/pkg/streamhelper/advancer_env.go index 2144e1d497039..6b86dae1c3fa8 100644 --- a/br/pkg/streamhelper/advancer_env.go +++ b/br/pkg/streamhelper/advancer_env.go @@ -58,7 +58,7 @@ func (c PDRegionScanner) BlockGCUntil(ctx context.Context, at uint64) (uint64, e return at, nil } -func (c PDRegionScanner) RemoveGCSafepoint(ctx context.Context) error { +func (c PDRegionScanner) UnblockGC(ctx context.Context) error { // set ttl to 0, means remove the safe point. _, err := c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, 0, math.MaxUint64) return err diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index b33b163468749..82cdb709f11ca 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -272,7 +272,7 @@ func (f *fakeCluster) BlockGCUntil(ctx context.Context, at uint64) (uint64, erro return at, nil } -func (f *fakeCluster) RemoveGCSafepoint(ctx context.Context) error { +func (f *fakeCluster) UnblockGC(ctx context.Context) error { f.mu.Lock() defer f.mu.Unlock() f.serviceGCSafePointDeleted = true diff --git a/br/pkg/streamhelper/regioniter.go b/br/pkg/streamhelper/regioniter.go index b91025136568d..b6f77b45d574f 100644 --- a/br/pkg/streamhelper/regioniter.go +++ b/br/pkg/streamhelper/regioniter.go @@ -43,8 +43,8 @@ type TiKVClusterMeta interface { // For now, all tasks (exactly one task in fact) use the same checkpoint. BlockGCUntil(ctx context.Context, at uint64) (uint64, error) - // RemoveGCSafepoint used to remove the service GC safe point in PD. - RemoveGCSafepoint(ctx context.Context) error + // UnblockGC used to remove the service GC safe point in PD. + UnblockGC(ctx context.Context) error FetchCurrentTS(ctx context.Context) (uint64, error) } diff --git a/br/pkg/streamhelper/regioniter_test.go b/br/pkg/streamhelper/regioniter_test.go index b60f2bd03480b..367f3ff35884c 100644 --- a/br/pkg/streamhelper/regioniter_test.go +++ b/br/pkg/streamhelper/regioniter_test.go @@ -83,7 +83,7 @@ func (c constantRegions) BlockGCUntil(ctx context.Context, at uint64) (uint64, e return 0, status.Error(codes.Unimplemented, "Unsupported operation") } -func (c constantRegions) RemoveGCSafepoint(ctx context.Context) error { +func (c constantRegions) UnblockGC(ctx context.Context) error { return status.Error(codes.Unimplemented, "Unsupported operation") } From dc5cc2244c9466f9c4fa096bc28bf1b8681f78bc Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 24 Apr 2024 10:56:05 +0800 Subject: [PATCH 6/6] fix build --- br/pkg/streamhelper/advancer_env.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/br/pkg/streamhelper/advancer_env.go b/br/pkg/streamhelper/advancer_env.go index 6b86dae1c3fa8..d1f83d24bff93 100644 --- a/br/pkg/streamhelper/advancer_env.go +++ b/br/pkg/streamhelper/advancer_env.go @@ -48,12 +48,13 @@ type PDRegionScanner struct { // Returns the minimal service GC safe point across all services. // If the arguments is `0`, this would remove the service safe point. func (c PDRegionScanner) BlockGCUntil(ctx context.Context, at uint64) (uint64, error) { - minimalSafePoint, err := c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, int64(logBackupSafePointTTL.Seconds()), at) + minimalSafePoint, err := c.UpdateServiceGCSafePoint( + ctx, logBackupServiceID, int64(logBackupSafePointTTL.Seconds()), at) if err != nil { return 0, errors.Annotate(err, "failed to block gc until") } if minimalSafePoint > at { - return 0, errors.Errorf("failed to block gc until, minimal safe point %d is greater than the target %d", minimalSafePoint, at) + return 0, errors.Errorf("minimal safe point %d is greater than the target %d", minimalSafePoint, at) } return at, nil }