Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

support backupts #172

Merged
merged 4 commits into from
Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 26 additions & 18 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,33 @@ func NewBackupClient(ctx context.Context, mgr ClientMgr) (*Client, error) {
}

// GetTS returns the latest timestamp.
func (bc *Client) GetTS(ctx context.Context, duration time.Duration) (uint64, error) {
p, l, err := bc.mgr.GetPDClient().GetTS(ctx)
if err != nil {
return 0, errors.Trace(err)
}
backupTS := oracle.ComposeTS(p, l)

switch {
case duration < 0:
return 0, errors.New("negative timeago is not allowed")
case duration > 0:
log.Info("backup time ago", zap.Duration("timeago", duration))

backupTime := oracle.GetTimeFromTS(backupTS)
backupAgo := backupTime.Add(-duration)
if backupTS < oracle.ComposeTS(oracle.GetPhysical(backupAgo), l) {
return 0, errors.New("backup ts overflow please choose a smaller timeago")
func (bc *Client) GetTS(ctx context.Context, duration time.Duration, ts uint64) (uint64, error) {
var (
backupTS uint64
err error
)
if ts > 0 {
backupTS = ts
} else {
p, l, err := bc.mgr.GetPDClient().GetTS(ctx)
if err != nil {
return 0, errors.Trace(err)
}
backupTS = oracle.ComposeTS(p, l)

switch {
case duration < 0:
return 0, errors.New("negative timeago is not allowed")
case duration > 0:
log.Info("backup time ago", zap.Duration("timeago", duration))

backupTime := oracle.GetTimeFromTS(backupTS)
backupAgo := backupTime.Add(-duration)
if backupTS < oracle.ComposeTS(oracle.GetPhysical(backupAgo), l) {
return 0, errors.New("backup ts overflow please choose a smaller timeago")
}
backupTS = oracle.ComposeTS(oracle.GetPhysical(backupAgo), l)
}
backupTS = oracle.ComposeTS(oracle.GetPhysical(backupAgo), l)
}

// check backup time do not exceed GCSafePoint
Expand Down
17 changes: 12 additions & 5 deletions pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (r *testBackup) TestGetTS(c *C) {
// timeago not work
expectedDuration := 0
currentTs := time.Now().UnixNano() / int64(time.Millisecond)
ts, err := r.backupClient.GetTS(r.ctx, 0)
ts, err := r.backupClient.GetTS(r.ctx, 0, 0)
c.Assert(err, IsNil)
pdTs := oracle.ExtractPhysical(ts)
duration := int(currentTs - pdTs)
Expand All @@ -65,19 +65,19 @@ func (r *testBackup) TestGetTS(c *C) {
// timeago = "1.5m"
expectedDuration = 90000
currentTs = time.Now().UnixNano() / int64(time.Millisecond)
ts, err = r.backupClient.GetTS(r.ctx, 90*time.Second)
ts, err = r.backupClient.GetTS(r.ctx, 90*time.Second, 0)
c.Assert(err, IsNil)
pdTs = oracle.ExtractPhysical(ts)
duration = int(currentTs - pdTs)
c.Assert(duration, Greater, expectedDuration-deviation)
c.Assert(duration, Less, expectedDuration+deviation)

// timeago = "-1m"
_, err = r.backupClient.GetTS(r.ctx, -time.Minute)
_, err = r.backupClient.GetTS(r.ctx, -time.Minute, 0)
c.Assert(err, ErrorMatches, "negative timeago is not allowed")

// timeago = "1000000h" overflows
_, err = r.backupClient.GetTS(r.ctx, 1000000*time.Hour)
_, err = r.backupClient.GetTS(r.ctx, 1000000*time.Hour, 0)
c.Assert(err, ErrorMatches, "backup ts overflow.*")

// timeago = "10h" exceed GCSafePoint
Expand All @@ -86,8 +86,15 @@ func (r *testBackup) TestGetTS(c *C) {
now := oracle.ComposeTS(p, l)
_, err = r.backupClient.mgr.GetPDClient().UpdateGCSafePoint(r.ctx, now)
c.Assert(err, IsNil)
_, err = r.backupClient.GetTS(r.ctx, 10*time.Hour)
_, err = r.backupClient.GetTS(r.ctx, 10*time.Hour, 0)
c.Assert(err, ErrorMatches, "GC safepoint [0-9]+ exceed TS [0-9]+")

// timeago and backupts both exists, use backupts
backupts := oracle.ComposeTS(p+10, l)
ts, err = r.backupClient.GetTS(r.ctx, time.Minute, backupts)
c.Assert(err, IsNil)
c.Assert(ts, Equals, backupts)

}

func (r *testBackup) TestBuildTableRange(c *C) {
Expand Down
47 changes: 45 additions & 2 deletions pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ package task

import (
"context"
"strconv"
"time"

"github.com/pingcap/errors"
kvproto "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/spf13/pflag"
"go.uber.org/zap"

Expand All @@ -23,6 +28,7 @@ import (

const (
flagBackupTimeago = "timeago"
flagBackupTS = "backupts"
flagLastBackupTS = "lastbackupts"

defaultBackupConcurrency = 4
Expand All @@ -33,6 +39,7 @@ type BackupConfig struct {
Config

TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
}

Expand All @@ -43,7 +50,10 @@ func DefineBackupFlags(flags *pflag.FlagSet) {
"The history version of the backup task, e.g. 1m, 1h. Do not exceed GCSafePoint")

// TODO: remove experimental tag if it's stable
flags.Uint64(flagLastBackupTS, 0, "(experimental) the last time backup ts")
flags.Uint64(flagLastBackupTS, 0, "(experimental) the last time backup ts,"+
" use for incremental backup, support TSO only")
flags.String(flagBackupTS, "", "the backup ts support TSO or datetime,"+
" e.g. '400036290571534337', '2018-05-11 01:42:23'")
}

// ParseFromFlags parses the backup-related flags from the flag set.
Expand All @@ -60,6 +70,15 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
if err != nil {
return errors.Trace(err)
}
backupTS, err := flags.GetString(flagBackupTS)
if err != nil {
return errors.Trace(err)
}
cfg.BackupTS, err = parseTSString(backupTS)
if err != nil {
return errors.Trace(err)
}

if err = cfg.Config.ParseFromFlags(flags); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -96,7 +115,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
return err
}

backupTS, err := client.GetTS(ctx, cfg.TimeAgo)
backupTS, err := client.GetTS(ctx, cfg.TimeAgo, cfg.BackupTS)
if err != nil {
return err
}
Expand Down Expand Up @@ -198,3 +217,27 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
}
return nil
}

// parseTSString port from tidb setSnapshotTS
func parseTSString(ts string) (uint64, error) {
if len(ts) == 0 {
return 0, nil
}
if tso, err := strconv.ParseUint(ts, 10, 64); err == nil {
return tso, nil
}

loc := time.Local
sc := &stmtctx.StatementContext{
TimeZone: loc,
}
t, err := types.ParseTime(sc, ts, mysql.TypeTimestamp, types.MaxFsp)
if err != nil {
return 0, errors.Trace(err)
}
t1, err := t.GoTime(loc)
if err != nil {
return 0, errors.Trace(err)
}
return variable.GoTimeToTS(t1), nil
}
36 changes: 36 additions & 0 deletions pkg/task/backup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package task

import (
"testing"
"time"

. "github.com/pingcap/check"
)

var _ = Suite(&testBackupSuite{})

func TestT(t *testing.T) {
TestingT(t)
}

type testBackupSuite struct{}

func (s *testBackupSuite) TestParseTSString(c *C) {
var (
ts uint64
err error
)

ts, err = parseTSString("")
c.Assert(err, IsNil)
c.Assert(int(ts), Equals, 0)

ts, err = parseTSString("400036290571534337")
c.Assert(err, IsNil)
c.Assert(int(ts), Equals, 400036290571534337)

_, offset := time.Now().Local().Zone()
ts, err = parseTSString("2018-05-11 01:42:23")
c.Assert(err, IsNil)
c.Assert(int(ts), Equals, 400032515489792000-(offset*1000)<<18)
}
2 changes: 1 addition & 1 deletion pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type Config struct {
// DefineCommonFlags defines the flags common to all BRIE commands.
func DefineCommonFlags(flags *pflag.FlagSet) {
flags.BoolP(flagSendCreds, "c", true, "Whether send credentials to tikv")
flags.StringP(flagStorage, "s", "", `specify the url where backup storage, eg, "local:///path/to/save"`)
flags.StringP(flagStorage, "s", "", `specify the url where backup storage, eg, "s3:///path/to/save"`)
3pointer marked this conversation as resolved.
Show resolved Hide resolved
flags.StringSliceP(flagPD, "u", []string{"127.0.0.1:2379"}, "PD address")
flags.String(flagCA, "", "CA certificate path for TLS connection")
flags.String(flagCert, "", "Certificate path for TLS connection")
Expand Down