Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log-backup: implement subscription of flush #39445

Merged
merged 49 commits into from
Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
77211fe
use span (tree?) to replace checkpoint heap
YuJuncen Nov 2, 2022
f6b3624
adapt ranges
YuJuncen Nov 11, 2022
3db857c
remove unrelatived code
YuJuncen Nov 11, 2022
0b00cc8
make CI happy
YuJuncen Nov 14, 2022
b3008d2
fixed a bug over subranges
YuJuncen Nov 14, 2022
e592b7e
try make linter happy
YuJuncen Nov 14, 2022
bf27ca8
Merge branch 'master' into span-tree
YuJuncen Nov 15, 2022
a7772ad
updated bazel
YuJuncen Nov 15, 2022
7613d04
Merge branch 'span-tree' of https://github.com/YuJuncen/tidb into spa…
YuJuncen Nov 15, 2022
13eee91
adapt subscribing
YuJuncen Nov 3, 2022
acb5b83
refactoring subscription
YuJuncen Nov 17, 2022
3786634
fix encode
YuJuncen Nov 17, 2022
0b0a322
make code more clear
YuJuncen Nov 17, 2022
0f7a93e
added test for subscriber
YuJuncen Nov 24, 2022
21198ed
fix some memory leakage
YuJuncen Nov 24, 2022
6578dc3
fixed panic
YuJuncen Nov 25, 2022
f718a02
fixed MergeAll
YuJuncen Nov 25, 2022
585d2e1
fix bugs
YuJuncen Nov 25, 2022
8764c03
Merge branch 'span-tree' of https://github.com/YuJuncen/tidb into sub…
YuJuncen Nov 25, 2022
fd4918c
address comments
YuJuncen Nov 28, 2022
102941e
check length before checking
YuJuncen Nov 28, 2022
7ba6318
Merge branch 'master' into span-tree
YuJuncen Nov 29, 2022
3e619d7
Merge branch 'master' into span-tree
ti-chi-bot Nov 29, 2022
79cfe5e
Merge branch 'master' into span-tree
ti-chi-bot Nov 29, 2022
0aec2af
Merge branch 'master' into span-tree
ti-chi-bot Nov 29, 2022
64c307b
Merge branch 'master' into span-tree
ti-chi-bot Nov 29, 2022
92ccbcb
Merge branch 'span-tree' of https://github.com/YuJuncen/tidb into sub…
YuJuncen Nov 29, 2022
7351c04
Merge branch 'master' of https://github.com/pingcap/tidb into subscri…
YuJuncen Nov 29, 2022
38ffb76
added some comments
YuJuncen Nov 29, 2022
c4c4772
fix ci
YuJuncen Nov 29, 2022
ce4e567
Merge branch 'master' of https://github.com/pingcap/tidb into subscri…
YuJuncen Nov 30, 2022
f0ddf2f
run bazel
YuJuncen Nov 30, 2022
e48f8ec
address comments
YuJuncen Nov 30, 2022
998aeaa
Merge branch 'master' of https://github.com/pingcap/tidb into subscri…
YuJuncen Dec 1, 2022
5145361
address comments
YuJuncen Dec 1, 2022
27bf2e1
make bazel happy
YuJuncen Dec 1, 2022
2371944
Merge branch 'master' into subscription-flush
YuJuncen Dec 1, 2022
786a95d
make better test case
YuJuncen Dec 1, 2022
dff87af
Merge branch 'subscription-flush' of https://github.com/YuJuncen/tidb…
YuJuncen Dec 1, 2022
3b16943
bazel
YuJuncen Dec 1, 2022
4e528fc
fix test case
YuJuncen Dec 1, 2022
2ad7f64
Merge branch 'master' into subscription-flush
joccau Dec 1, 2022
09acde5
Merge branch 'master' into subscription-flush
ti-chi-bot Dec 1, 2022
bb46a46
Merge branch 'master' into subscription-flush
ti-chi-bot Dec 1, 2022
764f16c
Merge branch 'master' into subscription-flush
ti-chi-bot Dec 1, 2022
449751e
Merge branch 'master' into subscription-flush
ti-chi-bot Dec 1, 2022
bacf211
Merge branch 'master' into subscription-flush
ti-chi-bot Dec 1, 2022
5b5a7c2
Merge branch 'master' into subscription-flush
ti-chi-bot Dec 1, 2022
34784a5
Merge branch 'master' into subscription-flush
ti-chi-bot Dec 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"advancer_env.go",
"client.go",
"collector.go",
"flush_subscriber.go",
"models.go",
"prefix_scanner.go",
"regioniter.go",
Expand All @@ -26,6 +27,8 @@ go_library(
"//kv",
"//metrics",
"//owner",
"//util/codec",
"//util/engine",
"//util/mathutil",
"@com_github_gogo_protobuf//proto",
"@com_github_golang_protobuf//proto",
Expand All @@ -40,8 +43,11 @@ go_library(
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//keepalive",
"@org_golang_google_grpc//status",
"@org_golang_x_sync//errgroup",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
],
)
Expand All @@ -54,6 +60,7 @@ go_test(
"basic_lib_for_test.go",
"integration_test.go",
"regioniter_test.go",
"subscription_test.go",
],
flaky = True,
race = "on",
Expand All @@ -64,11 +71,11 @@ go_test(
"//br/pkg/logutil",
"//br/pkg/redact",
"//br/pkg/storage",
"//br/pkg/streamhelper/config",
"//br/pkg/streamhelper/spans",
"//br/pkg/utils",
"//kv",
"//tablecodec",
"//util/codec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
Expand All @@ -82,6 +89,7 @@ go_test(
"@io_etcd_go_etcd_server_v3//mvcc",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//status",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
Expand Down
75 changes: 68 additions & 7 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type CheckpointAdvancer struct {

checkpoints *spans.ValueSortedFull
checkpointsMu sync.Mutex

subscriber *FlushSubscriber
subscriberMu sync.Mutex
}

// NewCheckpointAdvancer creates a checkpoint advancer with the env.
Expand Down Expand Up @@ -105,7 +108,7 @@ func (c *CheckpointAdvancer) GetCheckpointInRange(ctx context.Context, start, en
}
log.Debug("scan region", zap.Int("len", len(rs)))
for _, r := range rs {
err := collector.collectRegion(r)
err := collector.CollectRegion(r)
if err != nil {
log.Warn("meet error during getting checkpoint", logutil.ShortError(err))
return err
Expand Down Expand Up @@ -135,7 +138,7 @@ func (c *CheckpointAdvancer) tryAdvance(ctx context.Context, length int, getRang
workers := utils.NewWorkerPool(uint(config.DefaultMaxConcurrencyAdvance)*4, "sub ranges")
eg, cx := errgroup.WithContext(ctx)
collector := NewClusterCollector(ctx, c.env)
collector.setOnSuccessHook(func(u uint64, kr kv.KeyRange) {
collector.SetOnSuccessHook(func(u uint64, kr kv.KeyRange) {
c.checkpointsMu.Lock()
defer c.checkpointsMu.Unlock()
c.checkpoints.Merge(spans.Valued{Key: kr, Value: u})
Expand Down Expand Up @@ -166,15 +169,16 @@ func tsoBefore(n time.Duration) uint64 {
return oracle.ComposeTS(now.UnixMilli()-n.Milliseconds(), 0)
}

// CalculateGlobalCheckpointLight tries to advance the global checkpoint by the cache.
func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context) (uint64, error) {
func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context, threshold time.Duration) (uint64, error) {
var targets []spans.Valued
c.checkpoints.TraverseValuesLessThan(tsoBefore(config.DefaultTryAdvanceThreshold), func(v spans.Valued) bool {
c.checkpoints.TraverseValuesLessThan(tsoBefore(threshold), func(v spans.Valued) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether use tso - threshold rather than time.now() - threshold ?
If the system time is different from tso from PD.

Copy link
Contributor Author

@YuJuncen YuJuncen Nov 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot fetch TSO from this context. Fetching TSO needs to establish a long connection from PD server and this ability isn't directly exported by TiDB.

targets = append(targets, v)
return true
})
if len(targets) == 0 {
return 0, nil
c.checkpointsMu.Lock()
defer c.checkpointsMu.Unlock()
return c.checkpoints.MinValue(), nil
}
samples := targets
if len(targets) > 3 {
Expand All @@ -188,7 +192,9 @@ func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context)
if err != nil {
return 0, err
}
c.checkpointsMu.Lock()
ts := c.checkpoints.MinValue()
c.checkpointsMu.Unlock()
return ts, nil
}

Expand Down Expand Up @@ -285,6 +291,8 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
c.task = nil
c.taskRange = nil
c.checkpoints = nil
// This would be synced by `taskMu`, perhaps we'd better rename that to `tickMu`.
c.subscriber.Clear()
if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil {
log.Warn("failed to clear global checkpoint", logutil.ShortError(err))
}
Expand Down Expand Up @@ -323,14 +331,67 @@ func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context, getCheckpo
return nil
}

func (c *CheckpointAdvancer) stopSubscriber() {
c.subscriberMu.Lock()
defer c.subscriberMu.Unlock()
c.subscriber.Drop()
c.subscriber = nil
}

func (c *CheckpointAdvancer) spawnSubscriptionHandler(ctx context.Context) {
c.subscriberMu.Lock()
defer c.subscriberMu.Unlock()
c.subscriber = NewSubscriber(c.env, c.env, WithMasterContext(ctx))
es := c.subscriber.Events()

go func() {
for {
select {
case <-ctx.Done():
return
case event, ok := <-es:
if !ok {
return
}
c.checkpointsMu.Lock()
log.Debug("Accepting region flush event.",
zap.Stringer("range", logutil.StringifyRange(event.Key)),
zap.Uint64("checkpoint", event.Value))
c.checkpoints.Merge(event)
c.checkpointsMu.Unlock()
}
}
}()
}

func (c *CheckpointAdvancer) subscribeTick(ctx context.Context) error {
if c.subscriber == nil {
return nil
}
if err := c.subscriber.UpdateStoreTopology(ctx); err != nil {
log.Warn("[log backup advancer] Error when updating store topology.", logutil.ShortError(err))
}
c.subscriber.HandleErrors(ctx)
return c.subscriber.PendingErrors()
}

func (c *CheckpointAdvancer) tick(ctx context.Context) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
if c.task == nil {
log.Debug("No tasks yet, skipping advancing.")
return nil
}
err := c.advanceCheckpointBy(ctx, c.CalculateGlobalCheckpointLight)

threshold := c.Config().GetDefaultStartPollThreshold()
if err := c.subscribeTick(ctx); err != nil {
log.Warn("[log backup advancer] Subscriber meet error, would polling the checkpoint.", logutil.ShortError(err))
threshold = c.Config().GetSubscriberErrorStartPollThreshold()
}

err := c.advanceCheckpointBy(ctx, func(ctx context.Context) (uint64, error) {
return c.CalculateGlobalCheckpointLight(ctx, threshold)
})
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/streamhelper/advancer_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (c *CheckpointAdvancer) OnTick(ctx context.Context) (err error) {
func (c *CheckpointAdvancer) OnStart(ctx context.Context) {
metrics.AdvancerOwner.Set(1.0)
c.StartTaskListener(ctx)
c.spawnSubscriptionHandler(ctx)
go func() {
<-ctx.Done()
c.onStop()
Expand All @@ -43,6 +44,7 @@ func (c *CheckpointAdvancer) Name() string {

func (c *CheckpointAdvancer) onStop() {
metrics.AdvancerOwner.Set(0.0)
c.stopSubscriber()
}

func OwnerManagerForLogBackup(ctx context.Context, etcdCli *clientv3.Client) owner.Manager {
Expand Down
20 changes: 19 additions & 1 deletion br/pkg/streamhelper/advancer_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/util/engine"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
Expand All @@ -18,7 +19,7 @@ import (
// Env is the interface required by the advancer.
type Env interface {
// The region scanner provides the region information.
RegionScanner
TiKVClusterMeta
// LogBackupService connects to the TiKV, so we can collect the region checkpoints.
LogBackupService
// StreamMeta connects to the metadata service (normally PD).
Expand Down Expand Up @@ -48,6 +49,23 @@ func (c PDRegionScanner) RegionScan(ctx context.Context, key []byte, endKey []by
return rls, nil
}

func (c PDRegionScanner) Stores(ctx context.Context) ([]Store, error) {
joccau marked this conversation as resolved.
Show resolved Hide resolved
res, err := c.Client.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return nil, err
}
r := make([]Store, 0, len(res))
for _, re := range res {
if !engine.IsTiFlash(re) {
r = append(r, Store{
BootAt: uint64(re.StartTimestamp),
ID: re.GetId(),
})
}
}
return r, nil
}

// clusterEnv is the environment for running in the real cluster.
type clusterEnv struct {
clis *utils.StoreManager
Expand Down
7 changes: 0 additions & 7 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/kv"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -51,9 +50,6 @@ func TestTick(t *testing.T) {
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.StartTaskListener(ctx)
adv.UpdateConfigWith(func(cac *config.Config) {
cac.FullScanTick = 0
})
require.NoError(t, adv.OnTick(ctx))
for i := 0; i < 5; i++ {
cp := c.advanceCheckpoints()
Expand All @@ -76,9 +72,6 @@ func TestWithFailure(t *testing.T) {
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.StartTaskListener(ctx)
adv.UpdateConfigWith(func(cac *config.Config) {
cac.FullScanTick = 0
})
require.NoError(t, adv.OnTick(ctx))

cp := c.advanceCheckpoints()
Expand Down
Loading