Skip to content

Commit

Permalink
advancer: fix the incorrect gc safepoint behaviours (#52835)
Browse files Browse the repository at this point in the history
ref #52082
  • Loading branch information
3pointer authored Apr 25, 2024
1 parent 54ba0ed commit 545b4a3
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 11 deletions.
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil {
log.Warn("failed to clear global checkpoint", logutil.ShortError(err))
}
if _, err := c.env.BlockGCUntil(ctx, 0); err != nil {
if err := c.env.UnblockGC(ctx); err != nil {
log.Warn("failed to remove service GC safepoint", logutil.ShortError(err))
}
metrics.LastCheckpoint.DeleteLabelValues(e.Name)
Expand Down
18 changes: 17 additions & 1 deletion br/pkg/streamhelper/advancer_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ package streamhelper

import (
"context"
"math"
"time"

"github.com/pingcap/errors"
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/config"
Expand Down Expand Up @@ -46,7 +48,21 @@ type PDRegionScanner struct {
// Returns the minimal service GC safe point across all services.
// If the arguments is `0`, this would remove the service safe point.
func (c PDRegionScanner) BlockGCUntil(ctx context.Context, at uint64) (uint64, error) {
return c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, int64(logBackupSafePointTTL.Seconds()), at)
minimalSafePoint, err := c.UpdateServiceGCSafePoint(
ctx, logBackupServiceID, int64(logBackupSafePointTTL.Seconds()), at)
if err != nil {
return 0, errors.Annotate(err, "failed to block gc until")
}
if minimalSafePoint > at {
return 0, errors.Errorf("minimal safe point %d is greater than the target %d", minimalSafePoint, at)
}
return at, nil
}

func (c PDRegionScanner) UnblockGC(ctx context.Context) error {
// set ttl to 0, means remove the safe point.
_, err := c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, 0, math.MaxUint64)
return err
}

// TODO: It should be able to synchoronize the current TS with the PD.
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func TestGCServiceSafePoint(t *testing.T) {
req.Eventually(func() bool {
env.fakeCluster.mu.Lock()
defer env.fakeCluster.mu.Unlock()
return env.serviceGCSafePoint == 0
return env.serviceGCSafePoint != 0 && env.serviceGCSafePointDeleted
}, 3*time.Second, 100*time.Millisecond)
}

Expand Down
20 changes: 12 additions & 8 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ type fakeCluster struct {
regions []*region
testCtx *testing.T

onGetClient func(uint64) error
onClearCache func(uint64) error
serviceGCSafePoint uint64
currentTS uint64
onGetClient func(uint64) error
onClearCache func(uint64) error
serviceGCSafePoint uint64
serviceGCSafePointDeleted bool
currentTS uint64
}

func (r *region) splitAt(newID uint64, k string) *region {
Expand Down Expand Up @@ -264,17 +265,20 @@ func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.Ge
func (f *fakeCluster) BlockGCUntil(ctx context.Context, at uint64) (uint64, error) {
f.mu.Lock()
defer f.mu.Unlock()
if at == 0 {
f.serviceGCSafePoint = at
return at, nil
}
if f.serviceGCSafePoint > at {
return f.serviceGCSafePoint, nil
}
f.serviceGCSafePoint = at
return at, nil
}

func (f *fakeCluster) UnblockGC(ctx context.Context) error {
f.mu.Lock()
defer f.mu.Unlock()
f.serviceGCSafePointDeleted = true
return nil
}

func (f *fakeCluster) FetchCurrentTS(ctx context.Context) (uint64, error) {
return f.currentTS, nil
}
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/streamhelper/regioniter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type TiKVClusterMeta interface {
// For now, all tasks (exactly one task in fact) use the same checkpoint.
BlockGCUntil(ctx context.Context, at uint64) (uint64, error)

// UnblockGC used to remove the service GC safe point in PD.
UnblockGC(ctx context.Context) error

FetchCurrentTS(ctx context.Context) (uint64, error)
}

Expand Down
4 changes: 4 additions & 0 deletions br/pkg/streamhelper/regioniter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func (c constantRegions) BlockGCUntil(ctx context.Context, at uint64) (uint64, e
return 0, status.Error(codes.Unimplemented, "Unsupported operation")
}

func (c constantRegions) UnblockGC(ctx context.Context) error {
return status.Error(codes.Unimplemented, "Unsupported operation")
}

// TODO: It should be able to synchoronize the current TS with the PD.
func (c constantRegions) FetchCurrentTS(ctx context.Context) (uint64, error) {
return oracle.ComposeTS(time.Now().UnixMilli(), 0), nil
Expand Down

0 comments on commit 545b4a3

Please sign in to comment.