diff --git a/br/pkg/backup/push.go b/br/pkg/backup/push.go index 2ffffe690ffe5..ed929a12895a4 100644 --- a/br/pkg/backup/push.go +++ b/br/pkg/backup/push.go @@ -218,10 +218,9 @@ func (push *pushDown) pushBackup( if len(errMsg) <= 0 { errMsg = errPb.Msg } - return errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v at %s: %s %s", + return errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v at %s: %s", store.GetId(), redact.String(store.GetAddress()), - req.StorageBackend.String(), errMsg, ) } diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 527e07f28ea27..601897883e727 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -474,10 +474,17 @@ func IsStreamRestore(cmdName string) bool { // RunRestore starts a restore task inside the current goroutine. func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { + if err := checkTaskExists(c, cfg); err != nil { + return errors.Annotate(err, "failed to check task exits") + } + if IsStreamRestore(cmdName) { return RunStreamRestore(c, g, cmdName, cfg) } + return runRestore(c, g, cmdName, cfg) +} +func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { cfg.Adjust() defer summary.Summary(cmdName) ctx, cancel := context.WithCancel(c) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 8596eb89d6974..6a0eaada78679 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -862,8 +862,8 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre return nil } -func checkConfigForStatus(cfg *StreamConfig) error { - if len(cfg.PD) == 0 { +func checkConfigForStatus(pd []string) error { + if len(pd) == 0 { return errors.Annotatef(berrors.ErrInvalidArgument, "the command needs access to PD, please specify `-u` or `--pd`") } @@ -913,7 +913,7 @@ func RunStreamStatus( ctx = opentracing.ContextWithSpan(ctx, span1) } - if err := checkConfigForStatus(cfg); err != nil { + if err := checkConfigForStatus(cfg.PD); err != nil { return err } ctl, err := makeStatusController(ctx, cfg, g) @@ -1028,6 +1028,32 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre return nil } +// checkTaskExists checks whether there is a log backup task running. +// If so, return an error. +func checkTaskExists(ctx context.Context, cfg *RestoreConfig) error { + if err := checkConfigForStatus(cfg.PD); err != nil { + return err + } + etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config) + if err != nil { + return err + } + cli := streamhelper.NewMetaDataClient(etcdCLI) + defer func() { + if err := cli.Close(); err != nil { + log.Error("failed to close the etcd client", zap.Error(err)) + } + }() + tasks, err := cli.GetAllTasks(ctx) + if err != nil { + return err + } + if len(tasks) > 0 { + return errors.Errorf("log backup task is running: %s, please stop the task before restore, and after PITR operation finished, create log-backup task again and create a full backup on this cluster", tasks[0].Info.Name) + } + return nil +} + // RunStreamRestore restores stream log. func RunStreamRestore( c context.Context, @@ -1089,7 +1115,7 @@ func RunStreamRestore( logStorage := cfg.Config.Storage cfg.Config.Storage = cfg.FullBackupStorage // TiFlash replica is restored to down-stream on 'pitr' currently. - if err = RunRestore(ctx, g, FullRestoreCmd, cfg); err != nil { + if err = runRestore(ctx, g, FullRestoreCmd, cfg); err != nil { return errors.Trace(err) } cfg.Config.Storage = logStorage diff --git a/br/tests/br_restore_log_task_enable/run.sh b/br/tests/br_restore_log_task_enable/run.sh new file mode 100644 index 0000000000000..923f8fe7c2b33 --- /dev/null +++ b/br/tests/br_restore_log_task_enable/run.sh @@ -0,0 +1,56 @@ +#!/bin/sh +# +# Copyright 2022 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux +DB="$TEST_NAME" +TABLE="usertable" + +# start log task +run_br log start --task-name 1234 -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR + +run_sql "CREATE DATABASE $DB;" +run_sql "CREATE TABLE $DB.$TABLE (id int);" +run_sql "INSERT INTO $DB.$TABLE VALUES (1), (2), (3);" + +# backup full +run_br backup full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR + +# clean db +run_sql "DROP DATABASE $DB;" + +# restore full (should be failed) +run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR && exit 1 + +# restore point (should be failed) +run_br restore point -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR && exit 1 + +# pause log task +run_br log pause --task-name 1234 --pd $PD_ADDR + +# restore full (should be failed) +run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR && exit 1 + +# restore point (should be failed) +run_br restore point -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR && exit 1 + +# stop log task +run_br log stop --task-name 1234 --pd $PD_ADDR + +# restore full (should be success) +run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR + +# clean db +run_sql "DROP DATABASE $DB"