Skip to content

Commit

Permalink
update kvproto and client-go
Browse files Browse the repository at this point in the history
  • Loading branch information
Defined2014 committed Oct 8, 2022
1 parent 20892f0 commit 135e67c
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 7 deletions.
95 changes: 94 additions & 1 deletion ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,93 @@ func GetFlashbackKeyRanges(sess sessionctx.Context, startKey kv.Key) ([]kv.KeyRa
return keyRanges, nil
}

func sendPrepareFlashbackToVersionRPC(
ctx context.Context,
s tikv.Storage,
r tikvstore.KeyRange,
) (rangetask.TaskStat, error) {
startKey, rangeEndKey := r.StartKey, r.EndKey
var taskStat rangetask.TaskStat
bo := tikv.NewBackoffer(ctx, flashbackMaxBackoff)
for {
select {
case <-ctx.Done():
return taskStat, errors.WithStack(ctx.Err())
default:
}

if len(rangeEndKey) > 0 && bytes.Compare(startKey, rangeEndKey) >= 0 {
break
}

loc, err := s.GetRegionCache().LocateKey(bo, startKey)
if err != nil {
return taskStat, err
}

endKey := loc.EndKey
isLast := len(endKey) == 0 || (len(rangeEndKey) > 0 && bytes.Compare(endKey, rangeEndKey) >= 0)
// If it is the last region
if isLast {
endKey = rangeEndKey
}

req := tikvrpc.NewRequest(tikvrpc.CmdPrepareFlashbackToVersion, &kvrpcpb.PrepareFlashbackToVersionRequest{
StartKey: startKey,
EndKey: endKey,
})

resp, err := s.SendReq(bo, req, loc.Region, flashbackTimeout)
if err != nil {
return taskStat, err
}
regionErr, err := resp.GetRegionError()
if err != nil {
return taskStat, err
}
if regionErr != nil {
err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String()))
if err != nil {
return taskStat, err
}
continue
}
if resp.Resp == nil {
return taskStat, errors.Errorf("prepare flashback missing resp body")
}
prepareFlashbackToVersionResp := resp.Resp.(*kvrpcpb.PrepareFlashbackToVersionResponse)
if err := prepareFlashbackToVersionResp.GetError(); err != "" {
return taskStat, errors.Errorf(err)
}
taskStat.CompletedRegions++
if isLast {
break
}
bo = tikv.NewBackoffer(ctx, flashbackMaxBackoff)
startKey = endKey
}
return taskStat, nil
}

func prepareFlashbackToVersion(
ctx context.Context,
d *ddlCtx,
startKey []byte, endKey []byte,
) (err error) {
return rangetask.NewRangeTaskRunner(
"flashback-to-version-runner",
d.store.(tikv.Storage),
int(variable.GetDDLFlashbackConcurrency()),
func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := sendPrepareFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), r)
logutil.BgLogger().Info("prepare flashback to version stats",
zap.Int("complete region", stats.CompletedRegions),
zap.Error(err))
return stats, err
},
).RunOnRange(ctx, startKey, endKey)
}

func sendFlashbackToVersionRPC(
ctx context.Context,
s tikv.Storage,
Expand Down Expand Up @@ -466,10 +553,16 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
return ver, nil
// Stage 3, get key ranges and get locks.
case model.StateWriteOnly:
_, err := GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0))
keyRanges, err := GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0))
if err != nil {
return ver, errors.Trace(err)
}
for _, ranges := range keyRanges {
if err = prepareFlashbackToVersion(d.ctx, d, ranges.StartKey, ranges.EndKey); err != nil {
logutil.BgLogger().Warn("[ddl] Get error when do flashback", zap.Error(err))
return ver, err
}
}
// TODO, lock all regions.
job.SchemaState = model.StateWriteReorganization
return updateSchemaVersion(d, t, job)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/kvproto v0.0.0-20220913050750-f6d05706948a
github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c
github.com/pingcap/log v1.1.0
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4
github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e
Expand All @@ -84,7 +84,7 @@ require (
github.com/stretchr/testify v1.8.0
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.1-0.20220923061703-33efe476e022
github.com/tikv/client-go/v2 v2.0.1-0.20220929111318-51f3bd3944c2
github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -752,8 +752,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20220510035547-0e2f26c0a46a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220913050750-f6d05706948a h1:LCtkOPEzjWk86NclzxdZ42nNtbhuIN1p6cpd/FYUqkU=
github.com/pingcap/kvproto v0.0.0-20220913050750-f6d05706948a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c h1:ceg4xjEEXNgPsScTQ5dtidiltLF4h17Y/jUqfyLAy9E=
github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down Expand Up @@ -901,8 +901,8 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR
github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.1-0.20220923061703-33efe476e022 h1:TxDSQAmtGdE34BvOaYF35mRrAXePeZEq8quvuAwrKsI=
github.com/tikv/client-go/v2 v2.0.1-0.20220923061703-33efe476e022/go.mod h1:6pedLz7wiINLHXwCT1+yMZmzuG42+ubtBkkfcwoukIo=
github.com/tikv/client-go/v2 v2.0.1-0.20220929111318-51f3bd3944c2 h1:A05tkatkgjqcTq94YteKkXFNPIELaGwAwjEzXsx9t6g=
github.com/tikv/client-go/v2 v2.0.1-0.20220929111318-51f3bd3944c2/go.mod h1:gdXot2ofS2EOGtrXQ2qyESonQX/gFmgtfBCqCOSWg9E=
github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db h1:r1eMh9Rny3hfWuBuxOnbsCRrR4FhthiNxLQ5rAUtaww=
github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db/go.mod h1:ew8kS0yIcEaSetuuywkTLIUBR+sz3J5XvAYRae11qwc=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
Expand Down

0 comments on commit 135e67c

Please sign in to comment.