From 1ea35d5feb79664760038ea0939f1ad2d67debae Mon Sep 17 00:00:00 2001 From: joccau Date: Wed, 7 Dec 2022 17:56:46 +0800 Subject: [PATCH 1/5] set gc disable when restore log Signed-off-by: joccau --- br/pkg/task/stream.go | 21 +++++++++++++++++++++ br/pkg/utils/db.go | 15 +++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index fdcc728a9ce5f..62060367af70f 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -458,6 +458,21 @@ func (s *streamMgr) checkStreamStartEnable(g glue.Glue) error { return nil } +// SetGcStatus sets status of GC enabled or disabled. +func SetGcStatus(g glue.Glue, store kv.Storage, enable bool) error { + se, err := g.CreateSession(store) + if err != nil { + return errors.Trace(err) + } + + execCtx := se.GetSessionCtx().(sqlexec.RestrictedSQLExecutor) + if err = utils.SetGcEnableStatus(execCtx, enable); err != nil { + log.Warn("failed to set gc status", zap.Bool("target-gc-status", enable)) + return errors.Trace(err) + } + return nil +} + // RunStreamCommand run all kinds of `stream task` func RunStreamCommand( ctx context.Context, @@ -1142,6 +1157,12 @@ func restoreStream( // Always run the post-work even on error, so we don't stuck in the import // mode or emptied schedulers defer restorePostWork(ctx, client, restoreSchedulers) + // It need disable GC in TiKV when piTR. + // because the process of PITR is concurrent and kv events isn't sorted by tso. + if err = SetGcStatus(g, mgr.GetStorage(), false); err != nil { + return errors.Trace(err) + } + defer SetGcStatus(g, mgr.GetStorage(), true) err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS) if err != nil { diff --git a/br/pkg/utils/db.go b/br/pkg/utils/db.go index 6cc9f82a1229a..2f502e16f1cd6 100644 --- a/br/pkg/utils/db.go +++ b/br/pkg/utils/db.go @@ -8,6 +8,7 @@ import ( "strings" "sync" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" @@ -98,6 +99,20 @@ func IsLogBackupEnabled(ctx sqlexec.RestrictedSQLExecutor) (bool, error) { return true, nil } +// SetGcEnableStatus sets the status of GC. +// gc.ratio-threshold = -1.0, which represents disable gc in TiKV. +// gc.ratio-threshold = 1.1 is the default value in TiKV. +func SetGcEnableStatus(ctx sqlexec.RestrictedSQLExecutor, enable bool) error { + ratio := 1.1 + if !enable { + ratio = -1.0 + } + + internalCtx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBR) + _, _, err := ctx.ExecRestrictedSQL(internalCtx, nil, "set config tikv `gc.ratio-threshold`=%?", ratio) + return errors.Trace(err) +} + // LogBackupTaskCountInc increases the count of log backup task. func LogBackupTaskCountInc() { LogBackupTaskMutex.Lock() From 1ec57ee29ac1848d9408a64a997d5898fa2bbfdb Mon Sep 17 00:00:00 2001 From: joccau Date: Wed, 7 Dec 2022 19:27:07 +0800 Subject: [PATCH 2/5] make lint Signed-off-by: joccau --- br/pkg/task/stream.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 62060367af70f..c0bdbad438444 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -467,9 +467,11 @@ func SetGcStatus(g glue.Glue, store kv.Storage, enable bool) error { execCtx := se.GetSessionCtx().(sqlexec.RestrictedSQLExecutor) if err = utils.SetGcEnableStatus(execCtx, enable); err != nil { - log.Warn("failed to set gc status", zap.Bool("target-gc-status", enable)) + log.Warn("failed to set gc status", zap.Bool("target-gc-status", enable), zap.Error(err)) return errors.Trace(err) } + + log.Info("set gc status", zap.Bool("enabled", enable)) return nil } @@ -1162,7 +1164,11 @@ func restoreStream( if err = SetGcStatus(g, mgr.GetStorage(), false); err != nil { return errors.Trace(err) } - defer SetGcStatus(g, mgr.GetStorage(), true) + defer func() { + if err = SetGcStatus(g, mgr.GetStorage(), true); err != nil { + log.Info("failed to set gc enabled", zap.Error(err)) + } + }() err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS) if err != nil { From 4bbacec09173b1eb9eca21f985aed8117c996a24 Mon Sep 17 00:00:00 2001 From: joccau Date: Thu, 8 Dec 2022 12:20:00 +0800 Subject: [PATCH 3/5] deal comments in pr Signed-off-by: joccau --- br/pkg/task/stream.go | 34 +++++++++++++++++++++++----------- br/pkg/utils/db.go | 39 +++++++++++++++++++++++++++++---------- br/pkg/utils/db_test.go | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 86 insertions(+), 21 deletions(-) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index c0bdbad438444..cd0d3677b51f9 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -458,21 +458,31 @@ func (s *streamMgr) checkStreamStartEnable(g glue.Glue) error { return nil } -// SetGcStatus sets status of GC enabled or disabled. -func SetGcStatus(g glue.Glue, store kv.Storage, enable bool) error { +type RestoreFunc func() error + +// KeepGcDisabled keeps GC disabled and return a function that used to gc enabled. +// gc.ratio-threshold = "-1.0", which represents disable gc in TiKV. +func KeepGcDisabled(g glue.Glue, store kv.Storage) (RestoreFunc, error) { se, err := g.CreateSession(store) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } execCtx := se.GetSessionCtx().(sqlexec.RestrictedSQLExecutor) - if err = utils.SetGcEnableStatus(execCtx, enable); err != nil { - log.Warn("failed to set gc status", zap.Bool("target-gc-status", enable), zap.Error(err)) - return errors.Trace(err) + oldRatio, err := utils.GetGcRatio(execCtx) + if err != nil { + return nil, errors.Trace(err) } - log.Info("set gc status", zap.Bool("enabled", enable)) - return nil + newRatio := "-1.0" + err = utils.SetGcRatio(execCtx, newRatio) + if err != nil { + return nil, errors.Trace(err) + } + + return func() error { + return utils.SetGcRatio(execCtx, oldRatio) + }, nil } // RunStreamCommand run all kinds of `stream task` @@ -1159,13 +1169,15 @@ func restoreStream( // Always run the post-work even on error, so we don't stuck in the import // mode or emptied schedulers defer restorePostWork(ctx, client, restoreSchedulers) - // It need disable GC in TiKV when piTR. + + // It need disable GC in TiKV when PiTR. // because the process of PITR is concurrent and kv events isn't sorted by tso. - if err = SetGcStatus(g, mgr.GetStorage(), false); err != nil { + restoreGc, err := KeepGcDisabled(g, mgr.GetStorage()) + if err != nil { return errors.Trace(err) } defer func() { - if err = SetGcStatus(g, mgr.GetStorage(), true); err != nil { + if err = restoreGc(); err != nil { log.Info("failed to set gc enabled", zap.Error(err)) } }() diff --git a/br/pkg/utils/db.go b/br/pkg/utils/db.go index 2f502e16f1cd6..5b8d2ebc7f146 100644 --- a/br/pkg/utils/db.go +++ b/br/pkg/utils/db.go @@ -99,18 +99,37 @@ func IsLogBackupEnabled(ctx sqlexec.RestrictedSQLExecutor) (bool, error) { return true, nil } -// SetGcEnableStatus sets the status of GC. -// gc.ratio-threshold = -1.0, which represents disable gc in TiKV. -// gc.ratio-threshold = 1.1 is the default value in TiKV. -func SetGcEnableStatus(ctx sqlexec.RestrictedSQLExecutor, enable bool) error { - ratio := 1.1 - if !enable { - ratio = -1.0 +func GetGcRatio(ctx sqlexec.RestrictedSQLExecutor) (string, error) { + valStr := "show config where name = 'log-backup.enable' and type = 'tikv'" + rows, fields, errSQL := ctx.ExecRestrictedSQL( + kv.WithInternalSourceType(context.Background(), kv.InternalTxnBR), + nil, + valStr, + ) + if errSQL != nil { + return "", errSQL + } + if len(rows) == 0 { + // no rows mean not support log backup. + return "", nil } - internalCtx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBR) - _, _, err := ctx.ExecRestrictedSQL(internalCtx, nil, "set config tikv `gc.ratio-threshold`=%?", ratio) - return errors.Trace(err) + d := rows[0].GetDatum(3, &fields[3].Column.FieldType) + return d.ToString() +} + +func SetGcRatio(ctx sqlexec.RestrictedSQLExecutor, ratio string) error { + _, _, err := ctx.ExecRestrictedSQL( + kv.WithInternalSourceType(context.Background(), kv.InternalTxnBR), + nil, + "set config tikv `gc.ratio-threshold`=%?", + ratio, + ) + if err != nil { + return errors.Trace(err) + } + log.Info("set config tikv gc.ratio-threshold", zap.String("ratio", ratio)) + return nil } // LogBackupTaskCountInc increases the count of log backup task. diff --git a/br/pkg/utils/db_test.go b/br/pkg/utils/db_test.go index 08eac1e82594c..1f7a0a87eafa9 100644 --- a/br/pkg/utils/db_test.go +++ b/br/pkg/utils/db_test.go @@ -115,3 +115,37 @@ func TestCheckLogBackupTaskExist(t *testing.T) { utils.LogBackupTaskCountDec() require.False(t, utils.CheckLogBackupTaskExist()) } + +func TestGc(t *testing.T) { + // config format: + // MySQL [(none)]> show config where name = 'gc.ratio-threshold'; + // +------+-------------------+--------------------+-------+ + // | Type | Instance | Name | Value | + // +------+-------------------+--------------------+-------+ + // | tikv | 172.16.6.46:3460 | gc.ratio-threshold | 1.1 | + // | tikv | 172.16.6.47:3460 | gc.ratio-threshold | 1.1 | + // +------+-------------------+--------------------+-------+ + fields := make([]*ast.ResultField, 4) + tps := []*types.FieldType{ + types.NewFieldType(mysql.TypeString), + types.NewFieldType(mysql.TypeString), + types.NewFieldType(mysql.TypeString), + types.NewFieldType(mysql.TypeFloat), + } + for i := 0; i < len(tps); i++ { + rf := new(ast.ResultField) + rf.Column = new(model.ColumnInfo) + rf.Column.FieldType = *tps[i] + fields[i] = rf + } + rows := make([]chunk.Row, 0, 2) + row := chunk.MutRowFromValues("tikv", " 127.0.0.1:20161", "log-backup.enable", 1.1).ToRow() + rows = append(rows, row) + row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20162", "log-backup.enable", 1.1).ToRow() + rows = append(rows, row) + + s := &mockRestrictedSQLExecutor{rows: rows, fields: fields} + ratio, err := utils.GetGcRatio(s) + require.Nil(t, err) + require.Equal(t, ratio, 1.1) +} From 80e2e20c20a58c5c760a8e9cb83343a9359aa64a Mon Sep 17 00:00:00 2001 From: joccau Date: Thu, 8 Dec 2022 13:21:48 +0800 Subject: [PATCH 4/5] add test case Signed-off-by: joccau --- br/pkg/task/stream.go | 2 +- br/pkg/utils/db.go | 6 +++--- br/pkg/utils/db_test.go | 29 ++++++++++++++++++++++++----- 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index cd0d3677b51f9..b33459bb98e2e 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1178,7 +1178,7 @@ func restoreStream( } defer func() { if err = restoreGc(); err != nil { - log.Info("failed to set gc enabled", zap.Error(err)) + log.Error("failed to set gc enabled", zap.Error(err)) } }() diff --git a/br/pkg/utils/db.go b/br/pkg/utils/db.go index 5b8d2ebc7f146..9574c06670573 100644 --- a/br/pkg/utils/db.go +++ b/br/pkg/utils/db.go @@ -100,7 +100,7 @@ func IsLogBackupEnabled(ctx sqlexec.RestrictedSQLExecutor) (bool, error) { } func GetGcRatio(ctx sqlexec.RestrictedSQLExecutor) (string, error) { - valStr := "show config where name = 'log-backup.enable' and type = 'tikv'" + valStr := "show config where name = 'gc.ratio-threshold' and type = 'tikv'" rows, fields, errSQL := ctx.ExecRestrictedSQL( kv.WithInternalSourceType(context.Background(), kv.InternalTxnBR), nil, @@ -126,9 +126,9 @@ func SetGcRatio(ctx sqlexec.RestrictedSQLExecutor, ratio string) error { ratio, ) if err != nil { - return errors.Trace(err) + return errors.Annotatef(err, "failed to set config `gc.ratio-threshold`=%s", ratio) } - log.Info("set config tikv gc.ratio-threshold", zap.String("ratio", ratio)) + log.Warn("set config tikv gc.ratio-threshold", zap.String("ratio", ratio)) return nil } diff --git a/br/pkg/utils/db_test.go b/br/pkg/utils/db_test.go index 1f7a0a87eafa9..1334d868641f0 100644 --- a/br/pkg/utils/db_test.go +++ b/br/pkg/utils/db_test.go @@ -4,6 +4,7 @@ package utils_test import ( "context" + "strings" "testing" "github.com/pingcap/errors" @@ -35,7 +36,19 @@ func (m *mockRestrictedSQLExecutor) ExecRestrictedSQL(ctx context.Context, opts if m.errHappen { return nil, nil, errors.New("injected error") } - return m.rows, m.fields, nil + + if strings.Contains(sql, "show config") { + return m.rows, m.fields, nil + } else if strings.Contains(sql, "set config") && strings.Contains(sql, "gc.ratio-threshold") { + value := args[0].(string) + + for _, r := range m.rows { + d := types.Datum{} + d.SetString(value, "") + chunk.MutRow(r).SetDatum(3, d) + } + } + return nil, nil, nil } func TestIsLogBackupEnabled(t *testing.T) { @@ -130,7 +143,7 @@ func TestGc(t *testing.T) { types.NewFieldType(mysql.TypeString), types.NewFieldType(mysql.TypeString), types.NewFieldType(mysql.TypeString), - types.NewFieldType(mysql.TypeFloat), + types.NewFieldType(mysql.TypeString), } for i := 0; i < len(tps); i++ { rf := new(ast.ResultField) @@ -139,13 +152,19 @@ func TestGc(t *testing.T) { fields[i] = rf } rows := make([]chunk.Row, 0, 2) - row := chunk.MutRowFromValues("tikv", " 127.0.0.1:20161", "log-backup.enable", 1.1).ToRow() + row := chunk.MutRowFromValues("tikv", " 127.0.0.1:20161", "log-backup.enable", "1.1").ToRow() rows = append(rows, row) - row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20162", "log-backup.enable", 1.1).ToRow() + row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20162", "log-backup.enable", "1.1").ToRow() rows = append(rows, row) s := &mockRestrictedSQLExecutor{rows: rows, fields: fields} ratio, err := utils.GetGcRatio(s) require.Nil(t, err) - require.Equal(t, ratio, 1.1) + require.Equal(t, ratio, "1.1") + + err = utils.SetGcRatio(s, "-1.0") + require.Nil(t, err) + ratio, err = utils.GetGcRatio(s) + require.Nil(t, err) + require.Equal(t, ratio, "-1.0") } From 63a307897ae46e55984f9e45bf9e737e3d64b216 Mon Sep 17 00:00:00 2001 From: joccau Date: Thu, 8 Dec 2022 14:08:20 +0800 Subject: [PATCH 5/5] udpate ctxt to stop restore when meet error Signed-off-by: joccau --- br/pkg/restore/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 9e4e5a389b935..85e17aaad825d 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -2041,9 +2041,9 @@ func (rc *Client) RestoreKVFiles( } if supportBatch { - err = ApplyKVFilesWithBatchMethod(ctx, iter, int(pitrBatchCount), uint64(pitrBatchSize), applyFunc) + err = ApplyKVFilesWithBatchMethod(ectx, iter, int(pitrBatchCount), uint64(pitrBatchSize), applyFunc) } else { - err = ApplyKVFilesWithSingelMethod(ctx, iter, applyFunc) + err = ApplyKVFilesWithSingelMethod(ectx, iter, applyFunc) } if err != nil { return errors.Trace(err)