From 135e67c03ef4b1ba8f898e98024f18acabc12bf0 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Wed, 28 Sep 2022 14:26:34 +0800 Subject: [PATCH] update kvproto and client-go --- ddl/cluster.go | 95 +++++++++++++++++++++++++++++++++++++++++++++++++- go.mod | 4 +-- go.sum | 8 ++--- 3 files changed, 100 insertions(+), 7 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index 940ff7b092922..fb9f11ff7d8f2 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -282,6 +282,93 @@ func GetFlashbackKeyRanges(sess sessionctx.Context, startKey kv.Key) ([]kv.KeyRa return keyRanges, nil } +func sendPrepareFlashbackToVersionRPC( + ctx context.Context, + s tikv.Storage, + r tikvstore.KeyRange, +) (rangetask.TaskStat, error) { + startKey, rangeEndKey := r.StartKey, r.EndKey + var taskStat rangetask.TaskStat + bo := tikv.NewBackoffer(ctx, flashbackMaxBackoff) + for { + select { + case <-ctx.Done(): + return taskStat, errors.WithStack(ctx.Err()) + default: + } + + if len(rangeEndKey) > 0 && bytes.Compare(startKey, rangeEndKey) >= 0 { + break + } + + loc, err := s.GetRegionCache().LocateKey(bo, startKey) + if err != nil { + return taskStat, err + } + + endKey := loc.EndKey + isLast := len(endKey) == 0 || (len(rangeEndKey) > 0 && bytes.Compare(endKey, rangeEndKey) >= 0) + // If it is the last region + if isLast { + endKey = rangeEndKey + } + + req := tikvrpc.NewRequest(tikvrpc.CmdPrepareFlashbackToVersion, &kvrpcpb.PrepareFlashbackToVersionRequest{ + StartKey: startKey, + EndKey: endKey, + }) + + resp, err := s.SendReq(bo, req, loc.Region, flashbackTimeout) + if err != nil { + return taskStat, err + } + regionErr, err := resp.GetRegionError() + if err != nil { + return taskStat, err + } + if regionErr != nil { + err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String())) + if err != nil { + return taskStat, err + } + continue + } + if resp.Resp == nil { + return taskStat, errors.Errorf("prepare flashback missing resp body") + } + prepareFlashbackToVersionResp := resp.Resp.(*kvrpcpb.PrepareFlashbackToVersionResponse) + if err := prepareFlashbackToVersionResp.GetError(); err != "" { + return taskStat, errors.Errorf(err) + } + taskStat.CompletedRegions++ + if isLast { + break + } + bo = tikv.NewBackoffer(ctx, flashbackMaxBackoff) + startKey = endKey + } + return taskStat, nil +} + +func prepareFlashbackToVersion( + ctx context.Context, + d *ddlCtx, + startKey []byte, endKey []byte, +) (err error) { + return rangetask.NewRangeTaskRunner( + "flashback-to-version-runner", + d.store.(tikv.Storage), + int(variable.GetDDLFlashbackConcurrency()), + func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { + stats, err := sendPrepareFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), r) + logutil.BgLogger().Info("prepare flashback to version stats", + zap.Int("complete region", stats.CompletedRegions), + zap.Error(err)) + return stats, err + }, + ).RunOnRange(ctx, startKey, endKey) +} + func sendFlashbackToVersionRPC( ctx context.Context, s tikv.Storage, @@ -466,10 +553,16 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, nil // Stage 3, get key ranges and get locks. case model.StateWriteOnly: - _, err := GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0)) + keyRanges, err := GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0)) if err != nil { return ver, errors.Trace(err) } + for _, ranges := range keyRanges { + if err = prepareFlashbackToVersion(d.ctx, d, ranges.StartKey, ranges.EndKey); err != nil { + logutil.BgLogger().Warn("[ddl] Get error when do flashback", zap.Error(err)) + return ver, err + } + } // TODO, lock all regions. job.SchemaState = model.StateWriteReorganization return updateSchemaVersion(d, t, job) diff --git a/go.mod b/go.mod index fea930c747662..f79a093db42fd 100644 --- a/go.mod +++ b/go.mod @@ -66,7 +66,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 - github.com/pingcap/kvproto v0.0.0-20220913050750-f6d05706948a + github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c github.com/pingcap/log v1.1.0 github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e @@ -84,7 +84,7 @@ require ( github.com/stretchr/testify v1.8.0 github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.1-0.20220923061703-33efe476e022 + github.com/tikv/client-go/v2 v2.0.1-0.20220929111318-51f3bd3944c2 github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 github.com/twmb/murmur3 v1.1.3 diff --git a/go.sum b/go.sum index 04713ecdd16ee..a31472ac5c3d9 100644 --- a/go.sum +++ b/go.sum @@ -752,8 +752,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20220510035547-0e2f26c0a46a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= -github.com/pingcap/kvproto v0.0.0-20220913050750-f6d05706948a h1:LCtkOPEzjWk86NclzxdZ42nNtbhuIN1p6cpd/FYUqkU= -github.com/pingcap/kvproto v0.0.0-20220913050750-f6d05706948a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c h1:ceg4xjEEXNgPsScTQ5dtidiltLF4h17Y/jUqfyLAy9E= +github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= @@ -901,8 +901,8 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= -github.com/tikv/client-go/v2 v2.0.1-0.20220923061703-33efe476e022 h1:TxDSQAmtGdE34BvOaYF35mRrAXePeZEq8quvuAwrKsI= -github.com/tikv/client-go/v2 v2.0.1-0.20220923061703-33efe476e022/go.mod h1:6pedLz7wiINLHXwCT1+yMZmzuG42+ubtBkkfcwoukIo= +github.com/tikv/client-go/v2 v2.0.1-0.20220929111318-51f3bd3944c2 h1:A05tkatkgjqcTq94YteKkXFNPIELaGwAwjEzXsx9t6g= +github.com/tikv/client-go/v2 v2.0.1-0.20220929111318-51f3bd3944c2/go.mod h1:gdXot2ofS2EOGtrXQ2qyESonQX/gFmgtfBCqCOSWg9E= github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db h1:r1eMh9Rny3hfWuBuxOnbsCRrR4FhthiNxLQ5rAUtaww= github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db/go.mod h1:ew8kS0yIcEaSetuuywkTLIUBR+sz3J5XvAYRae11qwc= github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=