diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 26873a22e..93cc97567 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -5,21 +5,28 @@ package restore import ( "context" "crypto/tls" + "encoding/hex" "encoding/json" + "fmt" "math" "sort" + "strconv" "sync" "time" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/parser/model" pd "github.com/pingcap/pd/v4/client" + "github.com/pingcap/pd/v4/server/schedule/placement" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/oracle" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/util/codec" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/backoff" @@ -42,6 +49,7 @@ type Client struct { cancel context.CancelFunc pdClient pd.Client + toolClient SplitClient fileImporter FileImporter workerPool *utils.WorkerPool tlsConf *tls.Config @@ -53,6 +61,8 @@ type Client struct { rateLimit uint64 isOnline bool hasSpeedLimited bool + + restoreStores []uint64 } // NewRestoreClient returns a new RestoreClient @@ -71,11 +81,12 @@ func NewRestoreClient( } return &Client{ - ctx: ctx, - cancel: cancel, - pdClient: pdClient, - db: db, - tlsConf: tlsConf, + ctx: ctx, + cancel: cancel, + pdClient: pdClient, + toolClient: NewSplitClient(pdClient, tlsConf), + db: db, + tlsConf: tlsConf, }, nil } @@ -457,6 +468,159 @@ func (rc *Client) ValidateChecksum( return nil } +const ( + restoreLabelKey = "exclusive" + restoreLabelValue = "restore" +) + +// LoadRestoreStores loads the stores used to restore data. +func (rc *Client) LoadRestoreStores(ctx context.Context) error { + if !rc.isOnline { + return nil + } + + stores, err := rc.pdClient.GetAllStores(ctx) + if err != nil { + return err + } + for _, s := range stores { + if s.GetState() != metapb.StoreState_Up { + continue + } + for _, l := range s.GetLabels() { + if l.GetKey() == restoreLabelKey && l.GetValue() == restoreLabelValue { + rc.restoreStores = append(rc.restoreStores, s.GetId()) + break + } + } + } + log.Info("load restore stores", zap.Uint64s("store-ids", rc.restoreStores)) + return nil +} + +// ResetRestoreLabels removes the exclusive labels of the restore stores. +func (rc *Client) ResetRestoreLabels(ctx context.Context) error { + if !rc.isOnline { + return nil + } + log.Info("start reseting store labels") + return rc.toolClient.SetStoresLabel(ctx, rc.restoreStores, restoreLabelKey, "") +} + +// SetupPlacementRules sets rules for the tables' regions. +func (rc *Client) SetupPlacementRules(ctx context.Context, tables []*model.TableInfo) error { + if !rc.isOnline || len(rc.restoreStores) == 0 { + return nil + } + log.Info("start setting placement rules") + rule, err := rc.toolClient.GetPlacementRule(ctx, "pd", "default") + if err != nil { + return err + } + rule.Index = 100 + rule.Override = true + rule.LabelConstraints = append(rule.LabelConstraints, placement.LabelConstraint{ + Key: restoreLabelKey, + Op: "in", + Values: []string{restoreLabelValue}, + }) + for _, t := range tables { + rule.ID = rc.getRuleID(t.ID) + rule.StartKeyHex = hex.EncodeToString(codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(t.ID))) + rule.EndKeyHex = hex.EncodeToString(codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(t.ID+1))) + err = rc.toolClient.SetPlacementRule(ctx, rule) + if err != nil { + return err + } + } + log.Info("finish setting placement rules") + return nil +} + +// WaitPlacementSchedule waits PD to move tables to restore stores. +func (rc *Client) WaitPlacementSchedule(ctx context.Context, tables []*model.TableInfo) error { + if !rc.isOnline || len(rc.restoreStores) == 0 { + return nil + } + log.Info("start waiting placement schedule") + ticker := time.NewTicker(time.Second * 10) + defer ticker.Stop() + for { + select { + case <-ticker.C: + ok, progress, err := rc.checkRegions(ctx, tables) + if err != nil { + return err + } + if ok { + log.Info("finish waiting placement schedule") + return nil + } + log.Info("placement schedule progress: " + progress) + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (rc *Client) checkRegions(ctx context.Context, tables []*model.TableInfo) (bool, string, error) { + for i, t := range tables { + start := codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(t.ID)) + end := codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(t.ID+1)) + ok, regionProgress, err := rc.checkRange(ctx, start, end) + if err != nil { + return false, "", err + } + if !ok { + return false, fmt.Sprintf("table %v/%v, %s", i, len(tables), regionProgress), nil + } + } + return true, "", nil +} + +func (rc *Client) checkRange(ctx context.Context, start, end []byte) (bool, string, error) { + regions, err := rc.toolClient.ScanRegions(ctx, start, end, -1) + if err != nil { + return false, "", err + } + for i, r := range regions { + NEXT_PEER: + for _, p := range r.Region.GetPeers() { + for _, storeID := range rc.restoreStores { + if p.GetStoreId() == storeID { + continue NEXT_PEER + } + } + return false, fmt.Sprintf("region %v/%v", i, len(regions)), nil + } + } + return true, "", nil +} + +// ResetPlacementRules removes placement rules for tables. +func (rc *Client) ResetPlacementRules(ctx context.Context, tables []*model.TableInfo) error { + if !rc.isOnline || len(rc.restoreStores) == 0 { + return nil + } + log.Info("start reseting placement rules") + var failedTables []int64 + for _, t := range tables { + err := rc.toolClient.DeletePlacementRule(ctx, "pd", rc.getRuleID(t.ID)) + if err != nil { + log.Info("failed to delete placement rule for table", zap.Int64("table-id", t.ID)) + failedTables = append(failedTables, t.ID) + } + } + if len(failedTables) > 0 { + return errors.Errorf("failed to delete placement rules for tables %v", failedTables) + } + return nil +} + +func (rc *Client) getRuleID(tableID int64) string { + return "restore-t" + strconv.FormatInt(tableID, 10) +} + // IsIncremental returns whether this backup is incremental func (rc *Client) IsIncremental() bool { return !(rc.backupMeta.StartVersion == rc.backupMeta.EndVersion || diff --git a/pkg/task/restore.go b/pkg/task/restore.go index c759fe8d5..a02e49cf1 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -8,6 +8,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/log" + "github.com/pingcap/parser/model" "github.com/pingcap/tidb-tools/pkg/filter" "github.com/spf13/pflag" "go.uber.org/zap" @@ -91,6 +92,10 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf if cfg.Online { client.EnableOnline() } + err = client.LoadRestoreStores(ctx) + if err != nil { + return err + } defer summary.Summary(cmdName) @@ -137,6 +142,10 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } summary.CollectInt("restore ranges", len(ranges)) + if err = splitPrepareWork(ctx, client, newTables); err != nil { + return err + } + ranges = restore.AttachFilesToRanges(files, ranges) // Redirect to log if there is no log file to avoid unreadable output. @@ -203,6 +212,10 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf return err } + if err = splitPostWork(ctx, client, newTables); err != nil { + return err + } + // Restore has finished. close(updateCh) @@ -305,3 +318,31 @@ func addPDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, removedSchedulers } return nil } + +func splitPrepareWork(ctx context.Context, client *restore.Client, tables []*model.TableInfo) error { + err := client.SetupPlacementRules(ctx, tables) + if err != nil { + log.Error("setup placement rules failed", zap.Error(err)) + return errors.Trace(err) + } + + err = client.WaitPlacementSchedule(ctx, tables) + if err != nil { + log.Error("wait placement schedule failed", zap.Error(err)) + return errors.Trace(err) + } + return nil +} + +func splitPostWork(ctx context.Context, client *restore.Client, tables []*model.TableInfo) error { + err := client.ResetPlacementRules(ctx, tables) + if err != nil { + return errors.Trace(err) + } + + err = client.ResetRestoreLabels(ctx) + if err != nil { + return errors.Trace(err) + } + return nil +} diff --git a/tests/br_db_online/run.sh b/tests/br_db_online/run.sh new file mode 100755 index 000000000..95c3121d4 --- /dev/null +++ b/tests/br_db_online/run.sh @@ -0,0 +1,54 @@ +#!/bin/sh +# +# Copyright 2020 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +DB="$TEST_NAME" + +run_sql "CREATE DATABASE $DB;" + +run_sql "CREATE TABLE $DB.usertable1 ( \ + YCSB_KEY varchar(64) NOT NULL, \ + FIELD0 varchar(1) DEFAULT NULL, \ + PRIMARY KEY (YCSB_KEY) \ +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;" + +run_sql "INSERT INTO $DB.usertable1 VALUES (\"a\", \"b\");" +run_sql "INSERT INTO $DB.usertable1 VALUES (\"aa\", \"b\");" + +run_sql "CREATE TABLE $DB.usertable2 ( \ + YCSB_KEY varchar(64) NOT NULL, \ + FIELD0 varchar(1) DEFAULT NULL, \ + PRIMARY KEY (YCSB_KEY) \ +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;" + +run_sql "INSERT INTO $DB.usertable2 VALUES (\"c\", \"d\");" + +# backup db +echo "backup start..." +run_br --pd $PD_ADDR backup db --db "$DB" -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 + +run_sql "DROP DATABASE $DB;" + +# restore db +echo "restore start..." +run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --online + +table_count=$(run_sql "use $DB; show tables;" | grep "Tables_in" | wc -l) +if [ "$table_count" -ne "2" ];then + echo "TEST: [$TEST_NAME] failed!" + exit 1 +fi + +run_sql "DROP DATABASE $DB;" diff --git a/tests/br_db_online_newkv/run.sh b/tests/br_db_online_newkv/run.sh new file mode 100755 index 000000000..d8c3f15ff --- /dev/null +++ b/tests/br_db_online_newkv/run.sh @@ -0,0 +1,77 @@ +#!/bin/sh +# +# Copyright 2020 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +DB="$TEST_NAME" + +run_sql "CREATE DATABASE $DB;" + +run_sql "CREATE TABLE $DB.usertable1 ( \ + YCSB_KEY varchar(64) NOT NULL, \ + FIELD0 varchar(1) DEFAULT NULL, \ + PRIMARY KEY (YCSB_KEY) \ +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;" + +run_sql "INSERT INTO $DB.usertable1 VALUES (\"a\", \"b\");" +run_sql "INSERT INTO $DB.usertable1 VALUES (\"aa\", \"b\");" + +run_sql "CREATE TABLE $DB.usertable2 ( \ + YCSB_KEY varchar(64) NOT NULL, \ + FIELD0 varchar(1) DEFAULT NULL, \ + PRIMARY KEY (YCSB_KEY) \ +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;" + +run_sql "INSERT INTO $DB.usertable2 VALUES (\"c\", \"d\");" + +# backup db +echo "backup start..." +run_br --pd $PD_ADDR backup db --db "$DB" -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 + +run_sql "DROP DATABASE $DB;" + +# enable placement rules +echo "config set enable-placement-rules true" | pd-ctl + +# add new tikv for restore +# actaul tikv_addr are TIKV_ADDR${i} +TIKV_ADDR="127.0.0.1:2017" +TIKV_STATUS_ADDR="127.0.0.1:2019" +TIKV_COUNT=3 + +echo "Starting restore TiKV..." +for i in $(seq $TIKV_COUNT); do + tikv-server \ + --pd "$PD_ADDR" \ + -A "$TIKV_ADDR$i" \ + --status-addr "$TIKV_STATUS_ADDR$i" \ + --log-file "$TEST_DIR/restore-tikv${i}.log" \ + -C "tests/config/restore-tikv.toml" \ + -s "$TEST_DIR/restore-tikv${i}" & +done +sleep 5 + +# restore db +echo "restore start..." +run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --online + +table_count=$(run_sql "use $DB; show tables;" | grep "Tables_in" | wc -l) +if [ "$table_count" -ne "2" ];then + echo "TEST: [$TEST_NAME] failed!" + exit 1 +fi + +echo "config set enable-placement-rules false" | pd-ctl + +run_sql "DROP DATABASE $DB;" diff --git a/tests/config/restore-tikv.toml b/tests/config/restore-tikv.toml new file mode 100644 index 000000000..010711cd4 --- /dev/null +++ b/tests/config/restore-tikv.toml @@ -0,0 +1,17 @@ +# config of tikv + +[server] +labels = { exclusive = "restore" } + +[coprocessor] +region-max-keys = 20 +region-split-keys = 12 + +[rocksdb] +max-open-files = 4096 +[raftdb] +max-open-files = 4096 +[raftstore] +# true (default value) for high reliability, this can prevent data loss when power failure. +sync-log = false +capacity = "10GB" \ No newline at end of file