Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
restore: support online restore (#114)
Browse files Browse the repository at this point in the history
Signed-off-by: disksing <i@disksing.com>
  • Loading branch information
disksing authored Mar 13, 2020
1 parent e476c82 commit 9c2bf9d
Show file tree
Hide file tree
Showing 5 changed files with 358 additions and 5 deletions.
174 changes: 169 additions & 5 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,28 @@ package restore
import (
"context"
"crypto/tls"
"encoding/hex"
"encoding/json"
"fmt"
"math"
"sort"
"strconv"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
pd "github.com/pingcap/pd/v4/client"
"github.com/pingcap/pd/v4/server/schedule/placement"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
Expand All @@ -42,6 +49,7 @@ type Client struct {
cancel context.CancelFunc

pdClient pd.Client
toolClient SplitClient
fileImporter FileImporter
workerPool *utils.WorkerPool
tlsConf *tls.Config
Expand All @@ -53,6 +61,8 @@ type Client struct {
rateLimit uint64
isOnline bool
hasSpeedLimited bool

restoreStores []uint64
}

// NewRestoreClient returns a new RestoreClient
Expand All @@ -71,11 +81,12 @@ func NewRestoreClient(
}

return &Client{
ctx: ctx,
cancel: cancel,
pdClient: pdClient,
db: db,
tlsConf: tlsConf,
ctx: ctx,
cancel: cancel,
pdClient: pdClient,
toolClient: NewSplitClient(pdClient, tlsConf),
db: db,
tlsConf: tlsConf,
}, nil
}

Expand Down Expand Up @@ -457,6 +468,159 @@ func (rc *Client) ValidateChecksum(
return nil
}

const (
restoreLabelKey = "exclusive"
restoreLabelValue = "restore"
)

// LoadRestoreStores loads the stores used to restore data.
func (rc *Client) LoadRestoreStores(ctx context.Context) error {
if !rc.isOnline {
return nil
}

stores, err := rc.pdClient.GetAllStores(ctx)
if err != nil {
return err
}
for _, s := range stores {
if s.GetState() != metapb.StoreState_Up {
continue
}
for _, l := range s.GetLabels() {
if l.GetKey() == restoreLabelKey && l.GetValue() == restoreLabelValue {
rc.restoreStores = append(rc.restoreStores, s.GetId())
break
}
}
}
log.Info("load restore stores", zap.Uint64s("store-ids", rc.restoreStores))
return nil
}

// ResetRestoreLabels removes the exclusive labels of the restore stores.
func (rc *Client) ResetRestoreLabels(ctx context.Context) error {
if !rc.isOnline {
return nil
}
log.Info("start reseting store labels")
return rc.toolClient.SetStoresLabel(ctx, rc.restoreStores, restoreLabelKey, "")
}

// SetupPlacementRules sets rules for the tables' regions.
func (rc *Client) SetupPlacementRules(ctx context.Context, tables []*model.TableInfo) error {
if !rc.isOnline || len(rc.restoreStores) == 0 {
return nil
}
log.Info("start setting placement rules")
rule, err := rc.toolClient.GetPlacementRule(ctx, "pd", "default")
if err != nil {
return err
}
rule.Index = 100
rule.Override = true
rule.LabelConstraints = append(rule.LabelConstraints, placement.LabelConstraint{
Key: restoreLabelKey,
Op: "in",
Values: []string{restoreLabelValue},
})
for _, t := range tables {
rule.ID = rc.getRuleID(t.ID)
rule.StartKeyHex = hex.EncodeToString(codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(t.ID)))
rule.EndKeyHex = hex.EncodeToString(codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(t.ID+1)))
err = rc.toolClient.SetPlacementRule(ctx, rule)
if err != nil {
return err
}
}
log.Info("finish setting placement rules")
return nil
}

// WaitPlacementSchedule waits PD to move tables to restore stores.
func (rc *Client) WaitPlacementSchedule(ctx context.Context, tables []*model.TableInfo) error {
if !rc.isOnline || len(rc.restoreStores) == 0 {
return nil
}
log.Info("start waiting placement schedule")
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ok, progress, err := rc.checkRegions(ctx, tables)
if err != nil {
return err
}
if ok {
log.Info("finish waiting placement schedule")
return nil
}
log.Info("placement schedule progress: " + progress)
case <-ctx.Done():
return ctx.Err()
}
}
}

func (rc *Client) checkRegions(ctx context.Context, tables []*model.TableInfo) (bool, string, error) {
for i, t := range tables {
start := codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(t.ID))
end := codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(t.ID+1))
ok, regionProgress, err := rc.checkRange(ctx, start, end)
if err != nil {
return false, "", err
}
if !ok {
return false, fmt.Sprintf("table %v/%v, %s", i, len(tables), regionProgress), nil
}
}
return true, "", nil
}

func (rc *Client) checkRange(ctx context.Context, start, end []byte) (bool, string, error) {
regions, err := rc.toolClient.ScanRegions(ctx, start, end, -1)
if err != nil {
return false, "", err
}
for i, r := range regions {
NEXT_PEER:
for _, p := range r.Region.GetPeers() {
for _, storeID := range rc.restoreStores {
if p.GetStoreId() == storeID {
continue NEXT_PEER
}
}
return false, fmt.Sprintf("region %v/%v", i, len(regions)), nil
}
}
return true, "", nil
}

// ResetPlacementRules removes placement rules for tables.
func (rc *Client) ResetPlacementRules(ctx context.Context, tables []*model.TableInfo) error {
if !rc.isOnline || len(rc.restoreStores) == 0 {
return nil
}
log.Info("start reseting placement rules")
var failedTables []int64
for _, t := range tables {
err := rc.toolClient.DeletePlacementRule(ctx, "pd", rc.getRuleID(t.ID))
if err != nil {
log.Info("failed to delete placement rule for table", zap.Int64("table-id", t.ID))
failedTables = append(failedTables, t.ID)
}
}
if len(failedTables) > 0 {
return errors.Errorf("failed to delete placement rules for tables %v", failedTables)
}
return nil
}

func (rc *Client) getRuleID(tableID int64) string {
return "restore-t" + strconv.FormatInt(tableID, 10)
}

// IsIncremental returns whether this backup is incremental
func (rc *Client) IsIncremental() bool {
return !(rc.backupMeta.StartVersion == rc.backupMeta.EndVersion ||
Expand Down
41 changes: 41 additions & 0 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/spf13/pflag"
"go.uber.org/zap"
Expand Down Expand Up @@ -91,6 +92,10 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if cfg.Online {
client.EnableOnline()
}
err = client.LoadRestoreStores(ctx)
if err != nil {
return err
}

defer summary.Summary(cmdName)

Expand Down Expand Up @@ -137,6 +142,10 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
}
summary.CollectInt("restore ranges", len(ranges))

if err = splitPrepareWork(ctx, client, newTables); err != nil {
return err
}

ranges = restore.AttachFilesToRanges(files, ranges)

// Redirect to log if there is no log file to avoid unreadable output.
Expand Down Expand Up @@ -203,6 +212,10 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
return err
}

if err = splitPostWork(ctx, client, newTables); err != nil {
return err
}

// Restore has finished.
close(updateCh)

Expand Down Expand Up @@ -305,3 +318,31 @@ func addPDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, removedSchedulers
}
return nil
}

func splitPrepareWork(ctx context.Context, client *restore.Client, tables []*model.TableInfo) error {
err := client.SetupPlacementRules(ctx, tables)
if err != nil {
log.Error("setup placement rules failed", zap.Error(err))
return errors.Trace(err)
}

err = client.WaitPlacementSchedule(ctx, tables)
if err != nil {
log.Error("wait placement schedule failed", zap.Error(err))
return errors.Trace(err)
}
return nil
}

func splitPostWork(ctx context.Context, client *restore.Client, tables []*model.TableInfo) error {
err := client.ResetPlacementRules(ctx, tables)
if err != nil {
return errors.Trace(err)
}

err = client.ResetRestoreLabels(ctx)
if err != nil {
return errors.Trace(err)
}
return nil
}
54 changes: 54 additions & 0 deletions tests/br_db_online/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/bin/sh
#
# Copyright 2020 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu
DB="$TEST_NAME"

run_sql "CREATE DATABASE $DB;"

run_sql "CREATE TABLE $DB.usertable1 ( \
YCSB_KEY varchar(64) NOT NULL, \
FIELD0 varchar(1) DEFAULT NULL, \
PRIMARY KEY (YCSB_KEY) \
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;"

run_sql "INSERT INTO $DB.usertable1 VALUES (\"a\", \"b\");"
run_sql "INSERT INTO $DB.usertable1 VALUES (\"aa\", \"b\");"

run_sql "CREATE TABLE $DB.usertable2 ( \
YCSB_KEY varchar(64) NOT NULL, \
FIELD0 varchar(1) DEFAULT NULL, \
PRIMARY KEY (YCSB_KEY) \
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;"

run_sql "INSERT INTO $DB.usertable2 VALUES (\"c\", \"d\");"

# backup db
echo "backup start..."
run_br --pd $PD_ADDR backup db --db "$DB" -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4

run_sql "DROP DATABASE $DB;"

# restore db
echo "restore start..."
run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --online

table_count=$(run_sql "use $DB; show tables;" | grep "Tables_in" | wc -l)
if [ "$table_count" -ne "2" ];then
echo "TEST: [$TEST_NAME] failed!"
exit 1
fi

run_sql "DROP DATABASE $DB;"
Loading

0 comments on commit 9c2bf9d

Please sign in to comment.