Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
support backupts (#172)
Browse files Browse the repository at this point in the history
* support backupts

* address comment

* address comment

* fix space
  • Loading branch information
3pointer authored Mar 12, 2020
1 parent 0e25496 commit 3419d8a
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 26 deletions.
44 changes: 26 additions & 18 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,33 @@ func NewBackupClient(ctx context.Context, mgr ClientMgr) (*Client, error) {
}

// GetTS returns the latest timestamp.
func (bc *Client) GetTS(ctx context.Context, duration time.Duration) (uint64, error) {
p, l, err := bc.mgr.GetPDClient().GetTS(ctx)
if err != nil {
return 0, errors.Trace(err)
}
backupTS := oracle.ComposeTS(p, l)

switch {
case duration < 0:
return 0, errors.New("negative timeago is not allowed")
case duration > 0:
log.Info("backup time ago", zap.Duration("timeago", duration))

backupTime := oracle.GetTimeFromTS(backupTS)
backupAgo := backupTime.Add(-duration)
if backupTS < oracle.ComposeTS(oracle.GetPhysical(backupAgo), l) {
return 0, errors.New("backup ts overflow please choose a smaller timeago")
func (bc *Client) GetTS(ctx context.Context, duration time.Duration, ts uint64) (uint64, error) {
var (
backupTS uint64
err error
)
if ts > 0 {
backupTS = ts
} else {
p, l, err := bc.mgr.GetPDClient().GetTS(ctx)
if err != nil {
return 0, errors.Trace(err)
}
backupTS = oracle.ComposeTS(p, l)

switch {
case duration < 0:
return 0, errors.New("negative timeago is not allowed")
case duration > 0:
log.Info("backup time ago", zap.Duration("timeago", duration))

backupTime := oracle.GetTimeFromTS(backupTS)
backupAgo := backupTime.Add(-duration)
if backupTS < oracle.ComposeTS(oracle.GetPhysical(backupAgo), l) {
return 0, errors.New("backup ts overflow please choose a smaller timeago")
}
backupTS = oracle.ComposeTS(oracle.GetPhysical(backupAgo), l)
}
backupTS = oracle.ComposeTS(oracle.GetPhysical(backupAgo), l)
}

// check backup time do not exceed GCSafePoint
Expand Down
17 changes: 12 additions & 5 deletions pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (r *testBackup) TestGetTS(c *C) {
// timeago not work
expectedDuration := 0
currentTs := time.Now().UnixNano() / int64(time.Millisecond)
ts, err := r.backupClient.GetTS(r.ctx, 0)
ts, err := r.backupClient.GetTS(r.ctx, 0, 0)
c.Assert(err, IsNil)
pdTs := oracle.ExtractPhysical(ts)
duration := int(currentTs - pdTs)
Expand All @@ -65,19 +65,19 @@ func (r *testBackup) TestGetTS(c *C) {
// timeago = "1.5m"
expectedDuration = 90000
currentTs = time.Now().UnixNano() / int64(time.Millisecond)
ts, err = r.backupClient.GetTS(r.ctx, 90*time.Second)
ts, err = r.backupClient.GetTS(r.ctx, 90*time.Second, 0)
c.Assert(err, IsNil)
pdTs = oracle.ExtractPhysical(ts)
duration = int(currentTs - pdTs)
c.Assert(duration, Greater, expectedDuration-deviation)
c.Assert(duration, Less, expectedDuration+deviation)

// timeago = "-1m"
_, err = r.backupClient.GetTS(r.ctx, -time.Minute)
_, err = r.backupClient.GetTS(r.ctx, -time.Minute, 0)
c.Assert(err, ErrorMatches, "negative timeago is not allowed")

// timeago = "1000000h" overflows
_, err = r.backupClient.GetTS(r.ctx, 1000000*time.Hour)
_, err = r.backupClient.GetTS(r.ctx, 1000000*time.Hour, 0)
c.Assert(err, ErrorMatches, "backup ts overflow.*")

// timeago = "10h" exceed GCSafePoint
Expand All @@ -86,8 +86,15 @@ func (r *testBackup) TestGetTS(c *C) {
now := oracle.ComposeTS(p, l)
_, err = r.backupClient.mgr.GetPDClient().UpdateGCSafePoint(r.ctx, now)
c.Assert(err, IsNil)
_, err = r.backupClient.GetTS(r.ctx, 10*time.Hour)
_, err = r.backupClient.GetTS(r.ctx, 10*time.Hour, 0)
c.Assert(err, ErrorMatches, "GC safepoint [0-9]+ exceed TS [0-9]+")

// timeago and backupts both exists, use backupts
backupts := oracle.ComposeTS(p+10, l)
ts, err = r.backupClient.GetTS(r.ctx, time.Minute, backupts)
c.Assert(err, IsNil)
c.Assert(ts, Equals, backupts)

}

func (r *testBackup) TestBuildTableRange(c *C) {
Expand Down
47 changes: 45 additions & 2 deletions pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ package task

import (
"context"
"strconv"
"time"

"github.com/pingcap/errors"
kvproto "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/spf13/pflag"
"go.uber.org/zap"

Expand All @@ -23,6 +28,7 @@ import (

const (
flagBackupTimeago = "timeago"
flagBackupTS = "backupts"
flagLastBackupTS = "lastbackupts"

defaultBackupConcurrency = 4
Expand All @@ -33,6 +39,7 @@ type BackupConfig struct {
Config

TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
}

Expand All @@ -43,7 +50,10 @@ func DefineBackupFlags(flags *pflag.FlagSet) {
"The history version of the backup task, e.g. 1m, 1h. Do not exceed GCSafePoint")

// TODO: remove experimental tag if it's stable
flags.Uint64(flagLastBackupTS, 0, "(experimental) the last time backup ts")
flags.Uint64(flagLastBackupTS, 0, "(experimental) the last time backup ts,"+
" use for incremental backup, support TSO only")
flags.String(flagBackupTS, "", "the backup ts support TSO or datetime,"+
" e.g. '400036290571534337', '2018-05-11 01:42:23'")
}

// ParseFromFlags parses the backup-related flags from the flag set.
Expand All @@ -60,6 +70,15 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
if err != nil {
return errors.Trace(err)
}
backupTS, err := flags.GetString(flagBackupTS)
if err != nil {
return errors.Trace(err)
}
cfg.BackupTS, err = parseTSString(backupTS)
if err != nil {
return errors.Trace(err)
}

if err = cfg.Config.ParseFromFlags(flags); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -96,7 +115,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
return err
}

backupTS, err := client.GetTS(ctx, cfg.TimeAgo)
backupTS, err := client.GetTS(ctx, cfg.TimeAgo, cfg.BackupTS)
if err != nil {
return err
}
Expand Down Expand Up @@ -198,3 +217,27 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
}
return nil
}

// parseTSString port from tidb setSnapshotTS
func parseTSString(ts string) (uint64, error) {
if len(ts) == 0 {
return 0, nil
}
if tso, err := strconv.ParseUint(ts, 10, 64); err == nil {
return tso, nil
}

loc := time.Local
sc := &stmtctx.StatementContext{
TimeZone: loc,
}
t, err := types.ParseTime(sc, ts, mysql.TypeTimestamp, types.MaxFsp)
if err != nil {
return 0, errors.Trace(err)
}
t1, err := t.GoTime(loc)
if err != nil {
return 0, errors.Trace(err)
}
return variable.GoTimeToTS(t1), nil
}
36 changes: 36 additions & 0 deletions pkg/task/backup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package task

import (
"testing"
"time"

. "github.com/pingcap/check"
)

var _ = Suite(&testBackupSuite{})

func TestT(t *testing.T) {
TestingT(t)
}

type testBackupSuite struct{}

func (s *testBackupSuite) TestParseTSString(c *C) {
var (
ts uint64
err error
)

ts, err = parseTSString("")
c.Assert(err, IsNil)
c.Assert(int(ts), Equals, 0)

ts, err = parseTSString("400036290571534337")
c.Assert(err, IsNil)
c.Assert(int(ts), Equals, 400036290571534337)

_, offset := time.Now().Local().Zone()
ts, err = parseTSString("2018-05-11 01:42:23")
c.Assert(err, IsNil)
c.Assert(int(ts), Equals, 400032515489792000-(offset*1000)<<18)
}
2 changes: 1 addition & 1 deletion pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type Config struct {
// DefineCommonFlags defines the flags common to all BRIE commands.
func DefineCommonFlags(flags *pflag.FlagSet) {
flags.BoolP(flagSendCreds, "c", true, "Whether send credentials to tikv")
flags.StringP(flagStorage, "s", "", `specify the url where backup storage, eg, "local:///path/to/save"`)
flags.StringP(flagStorage, "s", "", `specify the url where backup storage, eg, "s3:///path/to/save"`)
flags.StringSliceP(flagPD, "u", []string{"127.0.0.1:2379"}, "PD address")
flags.String(flagCA, "", "CA certificate path for TLS connection")
flags.String(flagCert, "", "Certificate path for TLS connection")
Expand Down

0 comments on commit 3419d8a

Please sign in to comment.