From 4c09f2e57d29414c7d9651dfd6485cc6564e97e3 Mon Sep 17 00:00:00 2001 From: MoCuishle28 <32541204+MoCuishle28@users.noreply.github.com> Date: Tue, 22 Nov 2022 19:19:58 +0800 Subject: [PATCH] This is an automated cherry-pick of #39173 Signed-off-by: ti-chi-bot --- br/pkg/gluetidb/glue.go | 133 ++++ br/pkg/restore/BUILD.bazel | 181 ++++++ br/pkg/restore/client.go | 100 +++ br/pkg/restore/client_test.go | 1118 +++++++++++++++++++++++++++++++++ br/pkg/task/backup.go | 4 +- br/pkg/task/common.go | 4 + br/pkg/task/restore.go | 9 +- br/pkg/utils/db.go | 87 +++ 8 files changed, 1633 insertions(+), 3 deletions(-) create mode 100644 br/pkg/restore/BUILD.bazel diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index 05709049b360a..997747c296d8f 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -298,3 +298,136 @@ func (gs *tidbSession) showCreateDatabase(db *model.DBInfo) (string, error) { func (gs *tidbSession) showCreatePlacementPolicy(policy *model.PolicyInfo) string { return executor.ConstructResultOfShowCreatePlacementPolicy(policy) } +<<<<<<< HEAD +======= + +// mockSession is used for test. +type mockSession struct { + se session.Session + globalVars map[string]string +} + +// GetSessionCtx implements glue.Glue +func (s *mockSession) GetSessionCtx() sessionctx.Context { + return s.se +} + +// Execute implements glue.Session. +func (s *mockSession) Execute(ctx context.Context, sql string) error { + return s.ExecuteInternal(ctx, sql) +} + +func (s *mockSession) ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error { + ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR) + rs, err := s.se.ExecuteInternal(ctx, sql, args...) + if err != nil { + return err + } + // Some of SQLs (like ADMIN RECOVER INDEX) may lazily take effect + // when we polling the result set. + // At least call `next` once for triggering theirs side effect. + // (Maybe we'd better drain all returned rows?) + if rs != nil { + //nolint: errcheck + defer rs.Close() + c := rs.NewChunk(nil) + if err := rs.Next(ctx, c); err != nil { + return nil + } + } + return nil +} + +// CreateDatabase implements glue.Session. +func (s *mockSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { + log.Fatal("unimplemented CreateDatabase for mock session") + return nil +} + +// CreatePlacementPolicy implements glue.Session. +func (s *mockSession) CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error { + log.Fatal("unimplemented CreateDatabase for mock session") + return nil +} + +// CreateTables implements glue.BatchCreateTableSession. +func (s *mockSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { + log.Fatal("unimplemented CreateDatabase for mock session") + return nil +} + +// CreateTable implements glue.Session. +func (s *mockSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { + log.Fatal("unimplemented CreateDatabase for mock session") + return nil +} + +// Close implements glue.Session. +func (s *mockSession) Close() { + s.se.Close() +} + +// GetGlobalVariables implements glue.Session. +func (s *mockSession) GetGlobalVariable(name string) (string, error) { + if ret, ok := s.globalVars[name]; ok { + return ret, nil + } + return "True", nil +} + +// MockGlue only used for test +type MockGlue struct { + se session.Session + GlobalVars map[string]string +} + +func (m *MockGlue) SetSession(se session.Session) { + m.se = se +} + +// GetDomain implements glue.Glue. +func (*MockGlue) GetDomain(store kv.Storage) (*domain.Domain, error) { + return nil, nil +} + +// CreateSession implements glue.Glue. +func (m *MockGlue) CreateSession(store kv.Storage) (glue.Session, error) { + glueSession := &mockSession{ + se: m.se, + globalVars: m.GlobalVars, + } + return glueSession, nil +} + +// Open implements glue.Glue. +func (*MockGlue) Open(path string, option pd.SecurityOption) (kv.Storage, error) { + return nil, nil +} + +// OwnsStorage implements glue.Glue. +func (*MockGlue) OwnsStorage() bool { + return true +} + +// StartProgress implements glue.Glue. +func (*MockGlue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress { + return nil +} + +// Record implements glue.Glue. +func (*MockGlue) Record(name string, value uint64) { +} + +// GetVersion implements glue.Glue. +func (*MockGlue) GetVersion() string { + return "mock glue" +} + +// UseOneShotSession implements glue.Glue. +func (m *MockGlue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error { + glueSession := &mockSession{ + se: m.se, + } + return fn(glueSession) +} +>>>>>>> 84703efd01 (br: modify collate.newCollationEnabled according to the config of the cluster (#39173)) diff --git a/br/pkg/restore/BUILD.bazel b/br/pkg/restore/BUILD.bazel new file mode 100644 index 0000000000000..77c2fc2976570 --- /dev/null +++ b/br/pkg/restore/BUILD.bazel @@ -0,0 +1,181 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "restore", + srcs = [ + "batcher.go", + "client.go", + "data.go", + "db.go", + "import.go", + "import_retry.go", + "log_client.go", + "merge.go", + "pipeline_items.go", + "range.go", + "rawkv_client.go", + "search.go", + "split.go", + "stream_metas.go", + "systable_restore.go", + "util.go", + ], + importpath = "github.com/pingcap/tidb/br/pkg/restore", + visibility = ["//visibility:public"], + deps = [ + "//br/pkg/backup", + "//br/pkg/checksum", + "//br/pkg/common", + "//br/pkg/conn", + "//br/pkg/conn/util", + "//br/pkg/errors", + "//br/pkg/glue", + "//br/pkg/logutil", + "//br/pkg/metautil", + "//br/pkg/pdutil", + "//br/pkg/redact", + "//br/pkg/restore/prealloc_table_id", + "//br/pkg/restore/split", + "//br/pkg/restore/tiflashrec", + "//br/pkg/rtree", + "//br/pkg/storage", + "//br/pkg/stream", + "//br/pkg/summary", + "//br/pkg/utils", + "//br/pkg/utils/iter", + "//br/pkg/version", + "//config", + "//ddl", + "//ddl/util", + "//domain", + "//kv", + "//meta", + "//parser/model", + "//parser/mysql", + "//sessionctx/variable", + "//statistics/handle", + "//store/pdtypes", + "//tablecodec", + "//util", + "//util/codec", + "//util/collate", + "//util/hack", + "//util/mathutil", + "//util/table-filter", + "@com_github_emirpasic_gods//maps/treemap", + "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_google_uuid//:uuid", + "@com_github_opentracing_opentracing_go//:opentracing-go", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/brpb", + "@com_github_pingcap_kvproto//pkg/errorpb", + "@com_github_pingcap_kvproto//pkg/import_sstpb", + "@com_github_pingcap_kvproto//pkg/kvrpcpb", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_pingcap_kvproto//pkg/pdpb", + "@com_github_pingcap_kvproto//pkg/recoverdatapb", + "@com_github_pingcap_log//:log", + "@com_github_tikv_client_go_v2//config", + "@com_github_tikv_client_go_v2//kv", + "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//rawkv", + "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_client_go_v2//txnkv/rangetask", + "@com_github_tikv_pd_client//:client", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//backoff", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//credentials", + "@org_golang_google_grpc//keepalive", + "@org_golang_google_grpc//status", + "@org_golang_x_exp//slices", + "@org_golang_x_sync//errgroup", + "@org_uber_go_multierr//:multierr", + "@org_uber_go_zap//:zap", + "@org_uber_go_zap//zapcore", + ], +) + +go_test( + name = "restore_test", + timeout = "short", + srcs = [ + "batcher_test.go", + "client_test.go", + "data_test.go", + "db_test.go", + "import_retry_test.go", + "log_client_test.go", + "main_test.go", + "merge_fuzz_test.go", + "merge_test.go", + "range_test.go", + "rawkv_client_test.go", + "search_test.go", + "split_test.go", + "stream_metas_test.go", + "util_test.go", + ], + embed = [":restore"], + flaky = True, + race = "on", + shard_count = 20, + deps = [ + "//br/pkg/backup", + "//br/pkg/conn", + "//br/pkg/errors", + "//br/pkg/glue", + "//br/pkg/gluetidb", + "//br/pkg/logutil", + "//br/pkg/metautil", + "//br/pkg/mock", + "//br/pkg/pdutil", + "//br/pkg/restore/split", + "//br/pkg/restore/tiflashrec", + "//br/pkg/rtree", + "//br/pkg/storage", + "//br/pkg/stream", + "//br/pkg/utils", + "//br/pkg/utils/iter", + "//infoschema", + "//kv", + "//meta/autoid", + "//parser/model", + "//parser/mysql", + "//parser/types", + "//sessionctx/stmtctx", + "//store/pdtypes", + "//tablecodec", + "//testkit", + "//testkit/testsetup", + "//types", + "//util/codec", + "@com_github_fsouza_fake_gcs_server//fakestorage", + "@com_github_golang_protobuf//proto", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/brpb", + "@com_github_pingcap_kvproto//pkg/encryptionpb", + "@com_github_pingcap_kvproto//pkg/errorpb", + "@com_github_pingcap_kvproto//pkg/import_sstpb", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_pingcap_kvproto//pkg/pdpb", + "@com_github_pingcap_kvproto//pkg/recoverdatapb", + "@com_github_pingcap_log//:log", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//rawkv", + "@com_github_tikv_client_go_v2//testutils", + "@com_github_tikv_pd_client//:client", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//keepalive", + "@org_golang_google_grpc//status", + "@org_golang_x_exp//slices", + "@org_uber_go_goleak//:goleak", + "@org_uber_go_multierr//:multierr", + "@org_uber_go_zap//:zap", + "@org_uber_go_zap//zapcore", + ], +) diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 7a2c04285c09f..2c535974956ad 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -46,6 +46,7 @@ import ( "github.com/pingcap/tidb/store/pdtypes" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/mathutil" filter "github.com/pingcap/tidb/util/table-filter" "github.com/tikv/client-go/v2/oracle" @@ -2016,3 +2017,102 @@ func (rc *Client) SaveSchemas( } return nil } +<<<<<<< HEAD +======= + +// InitFullClusterRestore init fullClusterRestore and set SkipGrantTable as needed +func (rc *Client) InitFullClusterRestore(explicitFilter bool) { + rc.fullClusterRestore = !explicitFilter && rc.IsFull() + + log.Info("full cluster restore", zap.Bool("value", rc.fullClusterRestore)) + + if rc.fullClusterRestore { + // have to skip grant table, in order to NotifyUpdatePrivilege + config.GetGlobalConfig().Security.SkipGrantTable = true + } +} + +func (rc *Client) IsFullClusterRestore() bool { + return rc.fullClusterRestore +} + +func (rc *Client) SetWithSysTable(withSysTable bool) { + rc.withSysTable = withSysTable +} + +// MockClient create a fake client used to test. +func MockClient(dbs map[string]*utils.Database) *Client { + return &Client{databases: dbs} +} + +// TidyOldSchemas produces schemas information. +func TidyOldSchemas(sr *stream.SchemasReplace) *backup.Schemas { + var schemaIsEmpty bool + schemas := backup.NewBackupSchemas() + + for _, dr := range sr.DbMap { + if dr.OldDBInfo == nil { + continue + } + + schemaIsEmpty = true + for _, tr := range dr.TableMap { + if tr.OldTableInfo == nil { + continue + } + schemas.AddSchema(dr.OldDBInfo, tr.OldTableInfo) + schemaIsEmpty = false + } + + // backup this empty schema if it has nothing table. + if schemaIsEmpty { + schemas.AddSchema(dr.OldDBInfo, nil) + } + } + return schemas +} + +func CheckNewCollationEnable( + backupNewCollationEnable string, + g glue.Glue, + storage kv.Storage, + CheckRequirements bool, +) error { + if backupNewCollationEnable == "" { + if CheckRequirements { + return errors.Annotatef(berrors.ErrUnknown, + "the config 'new_collations_enabled_on_first_bootstrap' not found in backupmeta. "+ + "you can use \"show config WHERE name='new_collations_enabled_on_first_bootstrap';\" to manually check the config. "+ + "if you ensure the config 'new_collations_enabled_on_first_bootstrap' in backup cluster is as same as restore cluster, "+ + "use --check-requirements=false to skip this check") + } + log.Warn("the config 'new_collations_enabled_on_first_bootstrap' is not in backupmeta") + return nil + } + + se, err := g.CreateSession(storage) + if err != nil { + return errors.Trace(err) + } + + newCollationEnable, err := se.GetGlobalVariable(utils.GetTidbNewCollationEnabled()) + if err != nil { + return errors.Trace(err) + } + + if !strings.EqualFold(backupNewCollationEnable, newCollationEnable) { + return errors.Annotatef(berrors.ErrUnknown, + "the config 'new_collations_enabled_on_first_bootstrap' not match, upstream:%v, downstream: %v", + backupNewCollationEnable, newCollationEnable) + } + + // collate.newCollationEnabled is set to 1 when the collate package is initialized, + // so we need to modify this value according to the config of the cluster + // before using the collate package. + enabled := newCollationEnable == "True" + // modify collate.newCollationEnabled according to the config of the cluster + collate.SetNewCollationEnabledForTest(enabled) + log.Info("set new_collation_enabled", zap.Bool("new_collation_enabled", enabled)) + return nil +} +>>>>>>> 84703efd01 (br: modify collate.newCollationEnabled according to the config of the cluster (#39173)) diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index 334ac67387f5d..76787c8b0028d 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -240,3 +240,1121 @@ func TestPreCheckTableTiFlashReplicas(t *testing.T) { require.Nil(t, tables[i].Info.TiFlashReplica) } } +<<<<<<< HEAD +======= + +// Mock ImporterClient interface +type FakeImporterClient struct { + restore.ImporterClient +} + +// Record the stores that have communicated +type RecordStores struct { + mu sync.Mutex + stores []uint64 +} + +func NewRecordStores() RecordStores { + return RecordStores{stores: make([]uint64, 0)} +} + +func (r *RecordStores) put(id uint64) { + r.mu.Lock() + defer r.mu.Unlock() + r.stores = append(r.stores, id) +} + +func (r *RecordStores) sort() { + r.mu.Lock() + defer r.mu.Unlock() + slices.Sort(r.stores) +} + +func (r *RecordStores) len() int { + r.mu.Lock() + defer r.mu.Unlock() + return len(r.stores) +} + +func (r *RecordStores) get(i int) uint64 { + r.mu.Lock() + defer r.mu.Unlock() + return r.stores[i] +} + +func (r *RecordStores) toString() string { + r.mu.Lock() + defer r.mu.Unlock() + return fmt.Sprintf("%v", r.stores) +} + +var recordStores RecordStores + +const ( + SET_SPEED_LIMIT_ERROR = 999999 + WORKING_TIME = 100 +) + +func (fakeImportCli FakeImporterClient) SetDownloadSpeedLimit( + ctx context.Context, + storeID uint64, + req *import_sstpb.SetDownloadSpeedLimitRequest, +) (*import_sstpb.SetDownloadSpeedLimitResponse, error) { + if storeID == SET_SPEED_LIMIT_ERROR { + return nil, fmt.Errorf("storeID:%v ERROR", storeID) + } + + time.Sleep(WORKING_TIME * time.Millisecond) // simulate doing 100 ms work + recordStores.put(storeID) + return nil, nil +} + +func TestSetSpeedLimit(t *testing.T) { + mockStores := []*metapb.Store{ + {Id: 1}, + {Id: 2}, + {Id: 3}, + {Id: 4}, + {Id: 5}, + {Id: 6}, + {Id: 7}, + {Id: 8}, + {Id: 9}, + {Id: 10}, + } + + // 1. The cost of concurrent communication is expected to be less than the cost of serial communication. + client := restore.NewRestoreClient(fakePDClient{ + stores: mockStores, + }, nil, defaultKeepaliveCfg, false) + ctx := context.Background() + + recordStores = NewRecordStores() + start := time.Now() + err := restore.MockCallSetSpeedLimit(ctx, FakeImporterClient{}, client, 10) + cost := time.Since(start) + require.NoError(t, err) + + recordStores.sort() + t.Logf("Total Cost: %v\n", cost) + t.Logf("Has Communicated: %v\n", recordStores.toString()) + + serialCost := len(mockStores) * WORKING_TIME + require.Less(t, cost, time.Duration(serialCost)*time.Millisecond) + require.Equal(t, len(mockStores), recordStores.len()) + for i := 0; i < recordStores.len(); i++ { + require.Equal(t, mockStores[i].Id, recordStores.get(i)) + } + + // 2. Expect the number of communicated stores to be less than the length of the mockStore + // Because subsequent unstarted communications are aborted when an error is encountered. + recordStores = NewRecordStores() + mockStores[5].Id = SET_SPEED_LIMIT_ERROR // setting a fault store + client = restore.NewRestoreClient(fakePDClient{ + stores: mockStores, + }, nil, defaultKeepaliveCfg, false) + + // Concurrency needs to be less than the number of stores + err = restore.MockCallSetSpeedLimit(ctx, FakeImporterClient{}, client, 2) + require.Error(t, err) + t.Log(err) + + recordStores.sort() + sort.Slice(mockStores, func(i, j int) bool { return mockStores[i].Id < mockStores[j].Id }) + t.Logf("Has Communicated: %v\n", recordStores.toString()) + require.Less(t, recordStores.len(), len(mockStores)) + for i := 0; i < recordStores.len(); i++ { + require.Equal(t, mockStores[i].Id, recordStores.get(i)) + } +} + +func TestDeleteRangeQuery(t *testing.T) { + ctx := context.Background() + m := mc + mockStores := []*metapb.Store{ + { + Id: 1, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tiflash", + }, + }, + }, + { + Id: 2, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tiflash", + }, + }, + }, + } + + g := gluetidb.New() + client := restore.NewRestoreClient(fakePDClient{ + stores: mockStores, + }, nil, defaultKeepaliveCfg, false) + err := client.Init(g, m.Storage) + require.NoError(t, err) + + client.RunGCRowsLoader(ctx) + + client.InsertDeleteRangeForTable(2, []int64{3}) + client.InsertDeleteRangeForTable(4, []int64{5, 6}) + + elementID := int64(1) + client.InsertDeleteRangeForIndex(7, &elementID, 8, []int64{1}) + client.InsertDeleteRangeForIndex(9, &elementID, 10, []int64{1, 2}) + + querys := client.GetGCRows() + require.Equal(t, querys[0], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (2, 1, '748000000000000003', '748000000000000004', %[1]d)") + require.Equal(t, querys[1], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (4, 1, '748000000000000005', '748000000000000006', %[1]d),(4, 2, '748000000000000006', '748000000000000007', %[1]d)") + require.Equal(t, querys[2], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (7, 1, '7480000000000000085f698000000000000001', '7480000000000000085f698000000000000002', %[1]d)") + require.Equal(t, querys[3], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (9, 2, '74800000000000000a5f698000000000000001', '74800000000000000a5f698000000000000002', %[1]d),(9, 3, '74800000000000000a5f698000000000000002', '74800000000000000a5f698000000000000003', %[1]d)") +} + +func TestRestoreMetaKVFilesWithBatchMethod1(t *testing.T) { + files := []*backuppb.DataFileInfo{} + batchCount := 0 + + client := restore.MockClient(nil) + err := client.RestoreMetaKVFilesWithBatchMethod( + context.Background(), + files, + files, + nil, + nil, + nil, + func( + ctx context.Context, + defaultFiles []*backuppb.DataFileInfo, + schemasReplace *stream.SchemasReplace, + entries []*restore.KvEntryWithTS, + filterTS uint64, + updateStats func(kvCount uint64, size uint64), + progressInc func(), + cf string, + ) ([]*restore.KvEntryWithTS, error) { + batchCount++ + return nil, nil + }, + ) + require.Nil(t, err) + require.Equal(t, batchCount, 0) +} + +func TestRestoreMetaKVFilesWithBatchMethod2(t *testing.T) { + files := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 120, + }, + } + batchCount := 0 + result := make(map[int][]*backuppb.DataFileInfo) + + client := restore.MockClient(nil) + err := client.RestoreMetaKVFilesWithBatchMethod( + context.Background(), + files, + nil, + nil, + nil, + nil, + func( + ctx context.Context, + fs []*backuppb.DataFileInfo, + schemasReplace *stream.SchemasReplace, + entries []*restore.KvEntryWithTS, + filterTS uint64, + updateStats func(kvCount uint64, size uint64), + progressInc func(), + cf string, + ) ([]*restore.KvEntryWithTS, error) { + if len(fs) > 0 { + result[batchCount] = fs + batchCount++ + } + return nil, nil + }, + ) + require.Nil(t, err) + require.Equal(t, batchCount, 1) + require.Equal(t, len(result), 1) + require.Equal(t, result[0], files) +} + +func TestRestoreMetaKVFilesWithBatchMethod3(t *testing.T) { + defaultFiles := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 120, + }, + { + Path: "f2", + MinTs: 100, + MaxTs: 120, + }, + { + Path: "f3", + MinTs: 110, + MaxTs: 130, + }, + { + Path: "f4", + MinTs: 140, + MaxTs: 150, + }, + { + Path: "f5", + MinTs: 150, + MaxTs: 160, + }, + } + writeFiles := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 120, + }, + { + Path: "f2", + MinTs: 100, + MaxTs: 120, + }, + { + Path: "f3", + MinTs: 110, + MaxTs: 130, + }, + { + Path: "f4", + MinTs: 135, + MaxTs: 150, + }, + { + Path: "f5", + MinTs: 150, + MaxTs: 160, + }, + } + + batchCount := 0 + result := make(map[int][]*backuppb.DataFileInfo) + resultKV := make(map[int]int) + + client := restore.MockClient(nil) + err := client.RestoreMetaKVFilesWithBatchMethod( + context.Background(), + defaultFiles, + writeFiles, + nil, + nil, + nil, + func( + ctx context.Context, + fs []*backuppb.DataFileInfo, + schemasReplace *stream.SchemasReplace, + entries []*restore.KvEntryWithTS, + filterTS uint64, + updateStats func(kvCount uint64, size uint64), + progressInc func(), + cf string, + ) ([]*restore.KvEntryWithTS, error) { + result[batchCount] = fs + t.Log(filterTS) + resultKV[batchCount] = len(entries) + batchCount++ + return make([]*restore.KvEntryWithTS, batchCount), nil + }, + ) + require.Nil(t, err) + require.Equal(t, len(result), 4) + require.Equal(t, result[0], defaultFiles[0:3]) + require.Equal(t, resultKV[0], 0) + require.Equal(t, result[1], writeFiles[0:4]) + require.Equal(t, resultKV[1], 0) + require.Equal(t, result[2], defaultFiles[3:]) + require.Equal(t, resultKV[2], 1) + require.Equal(t, result[3], writeFiles[4:]) + require.Equal(t, resultKV[3], 2) +} + +func TestRestoreMetaKVFilesWithBatchMethod4(t *testing.T) { + defaultFiles := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 100, + }, + { + Path: "f2", + MinTs: 100, + MaxTs: 100, + }, + { + Path: "f3", + MinTs: 110, + MaxTs: 130, + }, + { + Path: "f4", + MinTs: 110, + MaxTs: 150, + }, + } + + writeFiles := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 100, + }, + { + Path: "f2", + MinTs: 100, + MaxTs: 100, + }, + { + Path: "f3", + MinTs: 110, + MaxTs: 130, + }, + { + Path: "f4", + MinTs: 110, + MaxTs: 150, + }, + } + batchCount := 0 + result := make(map[int][]*backuppb.DataFileInfo) + + client := restore.MockClient(nil) + err := client.RestoreMetaKVFilesWithBatchMethod( + context.Background(), + defaultFiles, + writeFiles, + nil, + nil, + nil, + func( + ctx context.Context, + fs []*backuppb.DataFileInfo, + schemasReplace *stream.SchemasReplace, + entries []*restore.KvEntryWithTS, + filterTS uint64, + updateStats func(kvCount uint64, size uint64), + progressInc func(), + cf string, + ) ([]*restore.KvEntryWithTS, error) { + result[batchCount] = fs + batchCount++ + return nil, nil + }, + ) + require.Nil(t, err) + require.Equal(t, len(result), 4) + require.Equal(t, result[0], defaultFiles[0:2]) + require.Equal(t, result[1], writeFiles[0:2]) + require.Equal(t, result[2], defaultFiles[2:]) + require.Equal(t, result[3], writeFiles[2:]) +} + +func TestRestoreMetaKVFilesWithBatchMethod5(t *testing.T) { + defaultFiles := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 100, + }, + { + Path: "f2", + MinTs: 100, + MaxTs: 100, + }, + { + Path: "f3", + MinTs: 110, + MaxTs: 130, + }, + { + Path: "f4", + MinTs: 110, + MaxTs: 150, + }, + } + + writeFiles := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 100, + }, + { + Path: "f2", + MinTs: 100, + MaxTs: 100, + }, + { + Path: "f3", + MinTs: 100, + MaxTs: 130, + }, + { + Path: "f4", + MinTs: 100, + MaxTs: 150, + }, + } + batchCount := 0 + result := make(map[int][]*backuppb.DataFileInfo) + + client := restore.MockClient(nil) + err := client.RestoreMetaKVFilesWithBatchMethod( + context.Background(), + defaultFiles, + writeFiles, + nil, + nil, + nil, + func( + ctx context.Context, + fs []*backuppb.DataFileInfo, + schemasReplace *stream.SchemasReplace, + entries []*restore.KvEntryWithTS, + filterTS uint64, + updateStats func(kvCount uint64, size uint64), + progressInc func(), + cf string, + ) ([]*restore.KvEntryWithTS, error) { + result[batchCount] = fs + batchCount++ + return nil, nil + }, + ) + require.Nil(t, err) + require.Equal(t, len(result), 4) + require.Equal(t, result[0], defaultFiles[0:2]) + require.Equal(t, result[1], writeFiles[0:]) + require.Equal(t, result[2], defaultFiles[2:]) + require.Equal(t, len(result[3]), 0) +} + +func TestRestoreMetaKVFilesWithBatchMethod6(t *testing.T) { + defaultFiles := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 120, + Length: 1, + }, + { + Path: "f2", + MinTs: 100, + MaxTs: 120, + Length: restore.MetaKVBatchSize, + }, + { + Path: "f3", + MinTs: 110, + MaxTs: 130, + Length: 1, + }, + { + Path: "f4", + MinTs: 140, + MaxTs: 150, + Length: 1, + }, + { + Path: "f5", + MinTs: 150, + MaxTs: 160, + Length: 1, + }, + } + + writeFiles := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 120, + }, + { + Path: "f2", + MinTs: 100, + MaxTs: 120, + }, + { + Path: "f3", + MinTs: 110, + MaxTs: 140, + }, + { + Path: "f4", + MinTs: 120, + MaxTs: 150, + }, + { + Path: "f5", + MinTs: 140, + MaxTs: 160, + }, + } + + batchCount := 0 + result := make(map[int][]*backuppb.DataFileInfo) + resultKV := make(map[int]int) + + client := restore.MockClient(nil) + err := client.RestoreMetaKVFilesWithBatchMethod( + context.Background(), + defaultFiles, + writeFiles, + nil, + nil, + nil, + func( + ctx context.Context, + fs []*backuppb.DataFileInfo, + schemasReplace *stream.SchemasReplace, + entries []*restore.KvEntryWithTS, + filterTS uint64, + updateStats func(kvCount uint64, size uint64), + progressInc func(), + cf string, + ) ([]*restore.KvEntryWithTS, error) { + result[batchCount] = fs + t.Log(filterTS) + resultKV[batchCount] = len(entries) + batchCount++ + return make([]*restore.KvEntryWithTS, batchCount), nil + }, + ) + require.Nil(t, err) + require.Equal(t, len(result), 6) + require.Equal(t, result[0], defaultFiles[0:2]) + require.Equal(t, resultKV[0], 0) + require.Equal(t, result[1], writeFiles[0:2]) + require.Equal(t, resultKV[1], 0) + require.Equal(t, result[2], defaultFiles[2:3]) + require.Equal(t, resultKV[2], 1) + require.Equal(t, result[3], writeFiles[2:4]) + require.Equal(t, resultKV[3], 2) + require.Equal(t, result[4], defaultFiles[3:]) + require.Equal(t, resultKV[4], 3) + require.Equal(t, result[5], writeFiles[4:]) + require.Equal(t, resultKV[5], 4) +} + +func TestSortMetaKVFiles(t *testing.T) { + files := []*backuppb.DataFileInfo{ + { + Path: "f5", + MinTs: 110, + MaxTs: 150, + ResolvedTs: 120, + }, + { + Path: "f1", + MinTs: 100, + MaxTs: 100, + ResolvedTs: 80, + }, + { + Path: "f2", + MinTs: 100, + MaxTs: 100, + ResolvedTs: 90, + }, + { + Path: "f4", + MinTs: 110, + MaxTs: 130, + ResolvedTs: 120, + }, + { + Path: "f3", + MinTs: 105, + MaxTs: 130, + ResolvedTs: 100, + }, + } + + files = restore.SortMetaKVFiles(files) + require.Equal(t, len(files), 5) + require.Equal(t, files[0].Path, "f1") + require.Equal(t, files[1].Path, "f2") + require.Equal(t, files[2].Path, "f3") + require.Equal(t, files[3].Path, "f4") + require.Equal(t, files[4].Path, "f5") +} + +func TestApplyKVFilesWithSingelMethod(t *testing.T) { + var ( + totalKVCount int64 = 0 + totalSize uint64 = 0 + logs = make([]string, 0) + ) + ds := []*backuppb.DataFileInfo{ + { + Path: "log3", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Delete, + }, + { + Path: "log1", + NumberOfEntries: 5, + Length: 100, + Cf: stream.DefaultCF, + Type: backuppb.FileType_Put, + }, { + Path: "log2", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + }, + } + applyFunc := func( + files []*backuppb.DataFileInfo, + kvCount int64, + size uint64, + ) { + totalKVCount += kvCount + totalSize += size + for _, f := range files { + logs = append(logs, f.GetPath()) + } + } + + restore.ApplyKVFilesWithSingelMethod( + context.TODO(), + iter.FromSlice(ds), + applyFunc, + ) + + require.Equal(t, totalKVCount, int64(15)) + require.Equal(t, totalSize, uint64(300)) + require.Equal(t, logs, []string{"log1", "log2", "log3"}) +} + +func TestApplyKVFilesWithBatchMethod1(t *testing.T) { + var ( + runCount = 0 + batchCount int = 3 + batchSize uint64 = 1000 + totalKVCount int64 = 0 + logs = make([][]string, 0) + ) + ds := []*backuppb.DataFileInfo{ + { + Path: "log5", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Delete, + RegionId: 1, + }, { + Path: "log3", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, { + Path: "log4", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, { + Path: "log1", + NumberOfEntries: 5, + Length: 800, + Cf: stream.DefaultCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, + { + Path: "log2", + NumberOfEntries: 5, + Length: 200, + Cf: stream.DefaultCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, + } + applyFunc := func( + files []*backuppb.DataFileInfo, + kvCount int64, + size uint64, + ) { + runCount += 1 + totalKVCount += kvCount + log := make([]string, 0, len(files)) + for _, f := range files { + log = append(log, f.GetPath()) + } + logs = append(logs, log) + } + + restore.ApplyKVFilesWithBatchMethod( + context.TODO(), + iter.FromSlice(ds), + batchCount, + batchSize, + applyFunc, + ) + + require.Equal(t, runCount, 3) + require.Equal(t, totalKVCount, int64(25)) + require.Equal(t, + logs, + [][]string{ + {"log1", "log2"}, + {"log3", "log4"}, + {"log5"}, + }, + ) +} + +func TestApplyKVFilesWithBatchMethod2(t *testing.T) { + var ( + runCount = 0 + batchCount int = 2 + batchSize uint64 = 1500 + totalKVCount int64 = 0 + logs = make([][]string, 0) + ) + ds := []*backuppb.DataFileInfo{ + { + Path: "log1", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Delete, + RegionId: 1, + }, { + Path: "log2", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, { + Path: "log3", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, { + Path: "log4", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, { + Path: "log5", + NumberOfEntries: 5, + Length: 800, + Cf: stream.DefaultCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, + { + Path: "log6", + NumberOfEntries: 5, + Length: 200, + Cf: stream.DefaultCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, + } + applyFunc := func( + files []*backuppb.DataFileInfo, + kvCount int64, + size uint64, + ) { + runCount += 1 + totalKVCount += kvCount + log := make([]string, 0, len(files)) + for _, f := range files { + log = append(log, f.GetPath()) + } + logs = append(logs, log) + } + + restore.ApplyKVFilesWithBatchMethod( + context.TODO(), + iter.FromSlice(ds), + batchCount, + batchSize, + applyFunc, + ) + + require.Equal(t, runCount, 4) + require.Equal(t, totalKVCount, int64(30)) + require.Equal(t, + logs, + [][]string{ + {"log2", "log3"}, + {"log5", "log6"}, + {"log4"}, + {"log1"}, + }, + ) +} + +func TestApplyKVFilesWithBatchMethod3(t *testing.T) { + var ( + runCount = 0 + batchCount int = 2 + batchSize uint64 = 1500 + totalKVCount int64 = 0 + logs = make([][]string, 0) + ) + ds := []*backuppb.DataFileInfo{ + { + Path: "log1", + NumberOfEntries: 5, + Length: 2000, + Cf: stream.WriteCF, + Type: backuppb.FileType_Delete, + RegionId: 1, + }, { + Path: "log2", + NumberOfEntries: 5, + Length: 2000, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, { + Path: "log3", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, { + Path: "log4", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + RegionId: 2, + }, { + Path: "log5", + NumberOfEntries: 5, + Length: 800, + Cf: stream.DefaultCF, + Type: backuppb.FileType_Put, + RegionId: 3, + }, + { + Path: "log6", + NumberOfEntries: 5, + Length: 200, + Cf: stream.DefaultCF, + Type: backuppb.FileType_Put, + RegionId: 3, + }, + } + applyFunc := func( + files []*backuppb.DataFileInfo, + kvCount int64, + size uint64, + ) { + runCount += 1 + totalKVCount += kvCount + log := make([]string, 0, len(files)) + for _, f := range files { + log = append(log, f.GetPath()) + } + logs = append(logs, log) + } + + restore.ApplyKVFilesWithBatchMethod( + context.TODO(), + iter.FromSlice(ds), + batchCount, + batchSize, + applyFunc, + ) + + require.Equal(t, runCount, 5) + require.Equal(t, totalKVCount, int64(30)) + require.Equal(t, + logs, + [][]string{ + {"log2"}, + {"log5", "log6"}, + {"log3"}, + {"log4"}, + {"log1"}, + }, + ) +} + +func TestApplyKVFilesWithBatchMethod4(t *testing.T) { + var ( + runCount = 0 + batchCount int = 2 + batchSize uint64 = 1500 + totalKVCount int64 = 0 + logs = make([][]string, 0) + ) + ds := []*backuppb.DataFileInfo{ + { + Path: "log1", + NumberOfEntries: 5, + Length: 2000, + Cf: stream.WriteCF, + Type: backuppb.FileType_Delete, + TableId: 1, + }, { + Path: "log2", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + TableId: 1, + }, { + Path: "log3", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + TableId: 2, + }, { + Path: "log4", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + TableId: 1, + }, { + Path: "log5", + NumberOfEntries: 5, + Length: 100, + Cf: stream.DefaultCF, + Type: backuppb.FileType_Put, + TableId: 2, + }, + } + applyFunc := func( + files []*backuppb.DataFileInfo, + kvCount int64, + size uint64, + ) { + runCount += 1 + totalKVCount += kvCount + log := make([]string, 0, len(files)) + for _, f := range files { + log = append(log, f.GetPath()) + } + logs = append(logs, log) + } + + restore.ApplyKVFilesWithBatchMethod( + context.TODO(), + iter.FromSlice(ds), + batchCount, + batchSize, + applyFunc, + ) + + require.Equal(t, runCount, 4) + require.Equal(t, totalKVCount, int64(25)) + require.Equal(t, + logs, + [][]string{ + {"log2", "log4"}, + {"log5"}, + {"log3"}, + {"log1"}, + }, + ) +} + +func TestCheckNewCollationEnable(t *testing.T) { + caseList := []struct { + backupMeta *backuppb.BackupMeta + newCollationEnableInCluster string + CheckRequirements bool + isErr bool + }{ + { + backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: "True"}, + newCollationEnableInCluster: "True", + CheckRequirements: true, + isErr: false, + }, + { + backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: "True"}, + newCollationEnableInCluster: "False", + CheckRequirements: true, + isErr: true, + }, + { + backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: "False"}, + newCollationEnableInCluster: "True", + CheckRequirements: true, + isErr: true, + }, + { + backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: "False"}, + newCollationEnableInCluster: "false", + CheckRequirements: true, + isErr: false, + }, + { + backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: "False"}, + newCollationEnableInCluster: "True", + CheckRequirements: false, + isErr: true, + }, + { + backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: "True"}, + newCollationEnableInCluster: "False", + CheckRequirements: false, + isErr: true, + }, + { + backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: ""}, + newCollationEnableInCluster: "True", + CheckRequirements: false, + isErr: false, + }, + { + backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: ""}, + newCollationEnableInCluster: "True", + CheckRequirements: true, + isErr: true, + }, + } + + for i, ca := range caseList { + g := &gluetidb.MockGlue{ + GlobalVars: map[string]string{"new_collation_enabled": ca.newCollationEnableInCluster}, + } + err := restore.CheckNewCollationEnable(ca.backupMeta.GetNewCollationsEnabled(), g, nil, ca.CheckRequirements) + + t.Logf("[%d] Got Error: %v\n", i, err) + if ca.isErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + } +} +>>>>>>> 84703efd01 (br: modify collate.newCollationEnabled according to the config of the cluster (#39173)) diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index d089d50b64fe1..1eabae944b46e 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -264,12 +264,12 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig var newCollationEnable string err = g.UseOneShotSession(mgr.GetStorage(), !needDomain, func(se glue.Session) error { - newCollationEnable, err = se.GetGlobalVariable(tidbNewCollationEnabled) + newCollationEnable, err = se.GetGlobalVariable(utils.GetTidbNewCollationEnabled()) if err != nil { return errors.Trace(err) } log.Info("get new_collations_enabled_on_first_bootstrap config from system table", - zap.String(tidbNewCollationEnabled, newCollationEnable)) + zap.String(utils.GetTidbNewCollationEnabled(), newCollationEnable)) return nil }) if err != nil { diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 7deada25bd562..f774c60ab91de 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -88,7 +88,11 @@ const ( crypterAES192KeyLen = 24 crypterAES256KeyLen = 32 +<<<<<<< HEAD tidbNewCollationEnabled = "new_collation_enabled" +======= + flagFullBackupType = "type" +>>>>>>> 84703efd01 (br: modify collate.newCollationEnabled according to the config of the cluster (#39173)) ) // TLSConfig is the common configuration for TLS connection. diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index b4dd4cd0e67b6..209152c59d26a 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -23,7 +23,11 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/config" +<<<<<<< HEAD "github.com/pingcap/tidb/kv" +======= + "github.com/pingcap/tidb/util" +>>>>>>> 84703efd01 (br: modify collate.newCollationEnabled according to the config of the cluster (#39173)) "github.com/pingcap/tidb/util/mathutil" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -330,6 +334,7 @@ func CheckRestoreDBAndTable(client *restore.Client, cfg *RestoreConfig) error { return nil } +<<<<<<< HEAD func CheckNewCollationEnable( backupNewCollationEnable string, g glue.Glue, @@ -367,6 +372,8 @@ func CheckNewCollationEnable( return nil } +======= +>>>>>>> 84703efd01 (br: modify collate.newCollationEnabled according to the config of the cluster (#39173)) func isFullRestore(cmdName string) bool { return cmdName == FullRestoreCmd } @@ -439,7 +446,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf return errors.Trace(versionErr) } } - if err = CheckNewCollationEnable(backupMeta.GetNewCollationsEnabled(), g, mgr.GetStorage(), cfg.CheckRequirements); err != nil { + if err = restore.CheckNewCollationEnable(backupMeta.GetNewCollationsEnabled(), g, mgr.GetStorage(), cfg.CheckRequirements); err != nil { return errors.Trace(err) } diff --git a/br/pkg/utils/db.go b/br/pkg/utils/db.go index 346aca6157dbb..dfcdea0c5b462 100644 --- a/br/pkg/utils/db.go +++ b/br/pkg/utils/db.go @@ -7,6 +7,10 @@ import ( "database/sql" ) +const ( + tidbNewCollationEnabled = "new_collation_enabled" +) + var ( // check sql.DB and sql.Conn implement QueryExecutor and DBExecutor _ DBExecutor = &sql.DB{} @@ -30,3 +34,86 @@ type DBExecutor interface { StmtExecutor BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) } +<<<<<<< HEAD +======= + +// CheckLogBackupEnabled checks if LogBackup is enabled in cluster. +// this mainly used in three places. +// 1. GC worker resolve locks to scan more locks after safepoint. (every minute) +// 2. Add index skipping use lightning.(every add index ddl) +// 3. Telemetry of log backup feature usage (every 6 hours). +// NOTE: this result shouldn't be cached by caller. because it may change every time in one cluster. +func CheckLogBackupEnabled(ctx sessionctx.Context) bool { + executor, ok := ctx.(sqlexec.RestrictedSQLExecutor) + if !ok { + // shouldn't happen + log.Error("[backup] unable to translate executor from sessionctx") + return false + } + enabled, err := IsLogBackupEnabled(executor) + if err != nil { + // if it failed by any reason. we can simply return true this time. + // for GC worker it will scan more locks in one tick. + // for Add index it will skip using lightning this time. + // for Telemetry it will get a false positive usage count. + log.Warn("[backup] check log backup config failed, ignore it", zap.Error(err)) + return true + } + return enabled +} + +// IsLogBackupEnabled is used for br to check whether tikv has enabled log backup. +// we use `sqlexec.RestrictedSQLExecutor` as parameter because it's easy to mock. +// it should return error. +func IsLogBackupEnabled(ctx sqlexec.RestrictedSQLExecutor) (bool, error) { + valStr := "show config where name = 'log-backup.enable' and type = 'tikv'" + internalCtx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBR) + rows, fields, errSQL := ctx.ExecRestrictedSQL(internalCtx, nil, valStr) + if errSQL != nil { + return false, errSQL + } + if len(rows) == 0 { + // no rows mean not support log backup. + return false, nil + } + for _, row := range rows { + d := row.GetDatum(3, &fields[3].Column.FieldType) + value, errField := d.ToString() + if errField != nil { + return false, errField + } + if strings.ToLower(value) == "false" { + return false, nil + } + } + return true, nil +} + +// CheckLogBackupTaskExist increases the count of log backup task. +func LogBackupTaskCountInc() { + LogBackupTaskMutex.Lock() + logBackupTaskCount++ + LogBackupTaskMutex.Unlock() +} + +// CheckLogBackupTaskExist decreases the count of log backup task. +func LogBackupTaskCountDec() { + LogBackupTaskMutex.Lock() + logBackupTaskCount-- + LogBackupTaskMutex.Unlock() +} + +// CheckLogBackupTaskExist checks that whether log-backup is existed. +func CheckLogBackupTaskExist() bool { + return logBackupTaskCount > 0 +} + +// IsLogBackupInUse checks the log backup task existed. +func IsLogBackupInUse(ctx sessionctx.Context) bool { + return CheckLogBackupEnabled(ctx) && CheckLogBackupTaskExist() +} + +func GetTidbNewCollationEnabled() string { + return tidbNewCollationEnabled +} +>>>>>>> 84703efd01 (br: modify collate.newCollationEnabled according to the config of the cluster (#39173))