From 3df388eb1c7ef9c840be69f3a83cdda721333c5d Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 13 Jul 2023 21:37:44 +0800 Subject: [PATCH] fix engine load not found bug and add system variables --- br/pkg/lightning/backend/remote/remote.go | 29 ++++---- ddl/disttask_flow.go | 2 +- ddl/ingest/backend.go | 7 ++ ddl/ingest/backend_mgr.go | 32 ++++++--- ddl/ingest/mock.go | 5 ++ ddl/scheduler.go | 25 +++++-- sessionctx/variable/sysvar.go | 84 +++++++++++++++++++++++ sessionctx/variable/tidb_vars.go | 28 ++++++++ 8 files changed, 180 insertions(+), 32 deletions(-) diff --git a/br/pkg/lightning/backend/remote/remote.go b/br/pkg/lightning/backend/remote/remote.go index 8d9b75f77431c..9a6ef0c55658a 100644 --- a/br/pkg/lightning/backend/remote/remote.go +++ b/br/pkg/lightning/backend/remote/remote.go @@ -100,20 +100,9 @@ var ( splitRetryTimes = 8 ) -type ExtStoreConfig struct { - Bucket string - Prefix string - AccessKey string - SecretAccessKey string - Host string - Port string -} - -func NewRemoteBackend(ctx context.Context, cfg local.BackendConfig, tls *common.TLS, - extCfg *ExtStoreConfig, jobID int64) (backend.Backend, error) { - uri := fmt.Sprintf("s3://%s/%s?access-key=%s&secret-access-key=%s&endpoint=http://%s:%s&force-path-style=true", - extCfg.Bucket, extCfg.Prefix, extCfg.AccessKey, extCfg.SecretAccessKey, extCfg.Host, extCfg.Port) - bd, err := storage.ParseBackend(uri, nil) +func NewRemoteBackend(ctx context.Context, remoteCfg *Config, cfg local.BackendConfig, tls *common.TLS, + tempStorageURI string, jobID int64) (backend.Backend, error) { + bd, err := storage.ParseBackend(tempStorageURI, nil) if err != nil { return nil, err } @@ -143,6 +132,7 @@ func NewRemoteBackend(ctx context.Context, cfg local.BackendConfig, tls *common. bc := &Backend{ jobID: jobID, externalStorage: extStore, + config: remoteCfg, mu: struct { sync.RWMutex maxWriterID int @@ -171,9 +161,20 @@ func NewRemoteBackend(ctx context.Context, cfg local.BackendConfig, tls *common. return bc, nil } +type Config struct { + MemQuota uint64 + ReadBufferSize uint64 + WriteBatchSize int64 + StatSampleKeys int64 + StatSampleSize uint64 + SubtaskCnt int64 + S3ChunkSize uint64 +} + type Backend struct { jobID int64 externalStorage storage.ExternalStorage + config *Config mu struct { sync.RWMutex maxWriterID int diff --git a/ddl/disttask_flow.go b/ddl/disttask_flow.go index f5d9136746dd7..aadc6a8b80a82 100644 --- a/ddl/disttask_flow.go +++ b/ddl/disttask_flow.go @@ -80,7 +80,7 @@ func (h *litBackfillFlowHandle) ProcessNormalFlow(ctx context.Context, taskHandl switch gTask.Step { case proto.StepOne: if bcCtx, ok := ingest.LitBackCtxMgr.Load(job.ID); ok { - if bc := bcCtx.GetBackend().(*remote.Backend); bc != nil { + if bc, ok := bcCtx.GetBackend().(*remote.Backend); ok { gTask.Step = proto.StepTwo return h.splitSubtaskRanges(ctx, taskHandle, gTask, bc) } diff --git a/ddl/ingest/backend.go b/ddl/ingest/backend.go index 073da7d3becdf..2083df58a156a 100644 --- a/ddl/ingest/backend.go +++ b/ddl/ingest/backend.go @@ -55,6 +55,7 @@ type BackendCtx interface { GetCheckpointManager() *CheckpointManager GetBackend() backend.Backend + EngineLoaded(indexID int64) bool } // FlushMode is used to control how to flush. @@ -295,3 +296,9 @@ func (bc *litBackendCtx) GetCheckpointManager() *CheckpointManager { func (bc *litBackendCtx) GetBackend() backend.Backend { return bc.backend } + +// EngineLoaded returns true if the engine is loaded. +func (bc *litBackendCtx) EngineLoaded(indexID int64) bool { + _, ok := bc.Load(indexID) + return ok +} diff --git a/ddl/ingest/backend_mgr.go b/ddl/ingest/backend_mgr.go index fc1533735d87f..b7e81c42620e1 100644 --- a/ddl/ingest/backend_mgr.go +++ b/ddl/ingest/backend_mgr.go @@ -19,12 +19,14 @@ import ( "fmt" "math" "strconv" + "strings" "time" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/local" "github.com/pingcap/tidb/br/pkg/lightning/backend/remote" "github.com/pingcap/tidb/br/pkg/lightning/config" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/generic" "github.com/pingcap/tidb/util/logutil" clientv3 "go.etcd.io/etcd/client/v3" @@ -93,7 +95,7 @@ func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int6 logutil.BgLogger().Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err)) return nil, err } - bd, err := createRemoteBackend(ctx, cfg, jobID) + bd, err := createBackend(ctx, cfg, jobID) if err != nil { logutil.BgLogger().Error(LitErrCreateBackendFail, zap.Int64("job ID", jobID), zap.Error(err)) return nil, err @@ -112,6 +114,14 @@ func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int6 return bc, nil } +func createBackend(ctx context.Context, cfg *Config, jobID int64) (backend.Backend, error) { + tempStorage := variable.TempStorageURI.Load() + if strings.HasPrefix(tempStorage, "s3://") { + return createRemoteBackend(ctx, cfg, jobID, tempStorage) + } + return createLocalBackend(ctx, cfg) +} + func createLocalBackend(ctx context.Context, cfg *Config) (*local.Backend, error) { tls, err := cfg.Lightning.ToTLS() if err != nil { @@ -127,21 +137,23 @@ func createLocalBackend(ctx context.Context, cfg *Config) (*local.Backend, error return local.NewBackend(ctx, tls, backendConfig, regionSizeGetter) } -func createRemoteBackend(ctx context.Context, cfg *Config, jobID int64) (backend.Backend, error) { +func createRemoteBackend(ctx context.Context, cfg *Config, jobID int64, tempStorage string) (backend.Backend, error) { tls, err := cfg.Lightning.ToTLS() if err != nil { logutil.BgLogger().Error(LitErrCreateBackendFail, zap.Error(err)) return nil, err } backendConfig := local.NewBackendConfig(cfg.Lightning, int(LitRLimit), cfg.KeyspaceName) - return remote.NewRemoteBackend(ctx, backendConfig, tls, &remote.ExtStoreConfig{ - Bucket: "nfs", - Prefix: "tools_test_data/sharedisk", - AccessKey: "minioadmin", - SecretAccessKey: "minioadmin", - Host: "127.0.0.1", - Port: "9000", - }, jobID) + remoteCfg := &remote.Config{ + MemQuota: variable.GlobalSortMemQuota.Load(), + ReadBufferSize: variable.GlobalSortReadBufferSize.Load(), + WriteBatchSize: variable.GlobalSortWriteBatchSize.Load(), + StatSampleKeys: variable.GlobalSortStatSampleKeys.Load(), + StatSampleSize: variable.GlobalSortStatSampleSize.Load(), + SubtaskCnt: variable.GlobalSortSubtaskCnt.Load(), + S3ChunkSize: variable.GlobalSortS3ChunkSize.Load(), + } + return remote.NewRemoteBackend(ctx, remoteCfg, backendConfig, tls, tempStorage, jobID) } const checkpointUpdateInterval = 10 * time.Minute diff --git a/ddl/ingest/mock.go b/ddl/ingest/mock.go index b1c5160b81486..70e51ae1a095f 100644 --- a/ddl/ingest/mock.go +++ b/ddl/ingest/mock.go @@ -154,6 +154,11 @@ func (m *MockBackendCtx) GetBackend() backend.Backend { return nil } +// EngineLoaded implements BackendCtx.EngineLoaded interface. +func (m *MockBackendCtx) EngineLoaded(_ int64) bool { + return false +} + // MockEngineInfo is a mock engine info. type MockEngineInfo struct { sessCtx sessionctx.Context diff --git a/ddl/scheduler.go b/ddl/scheduler.go index 27fc42475457b..028fa94b7c4cd 100644 --- a/ddl/scheduler.go +++ b/ddl/scheduler.go @@ -170,7 +170,8 @@ func (b *backfillSchedulerHandle) InitSubtaskExecEnv(ctx context.Context) error b.bc = bc if b.stepForImport { if b.stepForOrderedImport { - return nil + _, err := bc.Register(b.job.ID, b.index.ID, b.job.SchemaName, b.job.TableName) + return err } return b.doFlushAndHandleError(ingest.FlushModeForceGlobal) } @@ -326,14 +327,24 @@ func (b *backfillSchedulerHandle) orderedImport(ctx context.Context, subtask []b logutil.BgLogger().Error("[ddl] unmarshal error", zap.Error(err)) return err } - if bcCtx, ok := ingest.LitBackCtxMgr.Load(b.job.ID); ok { - if bc, ok := bcCtx.GetBackend().(*remote.Backend); ok { - err := bc.SetRange(ctx, subTaskMeta.StartKey, subTaskMeta.EndKey, subTaskMeta.DataFiles, subTaskMeta.StatsFiles) - if err != nil { - return err - } + bcCtx, ok := ingest.LitBackCtxMgr.Load(b.job.ID) + if !ok { + return errors.New("can not find backend context") + } + bc, ok := bcCtx.GetBackend().(*remote.Backend) + if !ok { + return errors.New("backend is not remote") + } + if !bcCtx.EngineLoaded(b.index.ID) { + _, err = bcCtx.Register(b.job.ID, b.index.ID, b.job.SchemaName, b.job.TableName) + if err != nil { + return err } } + err = bc.SetRange(ctx, subTaskMeta.StartKey, subTaskMeta.EndKey, subTaskMeta.DataFiles, subTaskMeta.StatsFiles) + if err != nil { + return err + } return b.doFlushAndHandleError(ingest.FlushModeForceGlobal) } diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 4b40520cc4393..046e92ba1f126 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -43,6 +43,7 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mathutil" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/size" stmtsummaryv2 "github.com/pingcap/tidb/util/stmtsummary/v2" "github.com/pingcap/tidb/util/tiflash" "github.com/pingcap/tidb/util/tiflashcompute" @@ -2772,6 +2773,89 @@ var defaultSysVars = []*SysVar{ }, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { return BoolToOnOff(EnableCheckConstraint.Load()), nil }}, + {Scope: ScopeGlobal, Name: TiDBTempStorageURI, Value: "", Type: TypeStr, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error { + TempStorageURI.Store(s) + return nil + }, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { + return TempStorageURI.Load(), nil + }}, + {Scope: ScopeGlobal, Name: TiDBGlobalSortMemQuota, Value: strconv.Itoa(DefTiDBGlobalSortMemoryQuota), Type: TypeInt, MinValue: int64(256 * size.MB), MaxValue: 64 * size.GB, + SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error { + val, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return err + } + GlobalSortMemQuota.Store(uint64(val)) + return nil + }, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { + return strconv.Itoa(int(GlobalSortMemQuota.Load())), nil + }}, + {Scope: ScopeGlobal, Name: TiDBGlobalSortReadBufferSize, Value: strconv.Itoa(DefTiDBGlobalSortReadBufferSize), Type: TypeInt, MinValue: int64(8 * size.KB), MaxValue: 1 * size.GB, + SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error { + val, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return err + } + GlobalSortReadBufferSize.Store(uint64(val)) + return nil + }, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { + return strconv.Itoa(int(GlobalSortReadBufferSize.Load())), nil + }}, + {Scope: ScopeGlobal, Name: TiDBGlobalSortWriteBatchSize, Value: strconv.Itoa(DefTiDBGlobalSortWriteBatchSize), Type: TypeInt, MinValue: 1024, MaxValue: 1 * size.GB, + SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error { + val, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return err + } + GlobalSortWriteBatchSize.Store(val) + return nil + }, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { + return strconv.Itoa(int(GlobalSortWriteBatchSize.Load())), nil + }}, + {Scope: ScopeGlobal, Name: TiDBGlobalSortStatSampleKeys, Value: strconv.Itoa(DefTiDBGlobalSortStatSampleKeys), Type: TypeInt, MinValue: 1024, MaxValue: 1 * size.GB, + SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error { + val, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return err + } + GlobalSortStatSampleKeys.Store(val) + return nil + }, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { + return strconv.Itoa(int(GlobalSortStatSampleKeys.Load())), nil + }}, + {Scope: ScopeGlobal, Name: TiDBGlobalSortStatSampleSize, Value: strconv.Itoa(DefTiDBGlobalSortStatSampleSize), Type: TypeInt, MinValue: int64(1 * size.KB), MaxValue: 1 * size.GB, + SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error { + val, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return err + } + GlobalSortStatSampleSize.Store(uint64(val)) + return nil + }, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { + return strconv.Itoa(int(GlobalSortStatSampleSize.Load())), nil + }}, + {Scope: ScopeGlobal, Name: TiDBGlobalSortSubtaskCnt, Value: strconv.Itoa(DefTiDBGlobalSortSubtaskCnt), Type: TypeInt, MinValue: 1, MaxValue: 1000, + SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error { + val, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return err + } + GlobalSortSubtaskCnt.Store(val) + return nil + }, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { + return strconv.Itoa(int(GlobalSortSubtaskCnt.Load())), nil + }}, + {Scope: ScopeGlobal, Name: TiDBGlobalSortS3ChunkSize, Value: strconv.Itoa(DefTiDBGlobalSortS3ChunkSize), Type: TypeInt, MinValue: int64(1 * size.MB), MaxValue: 1 * size.GB, + SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error { + val, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return err + } + GlobalSortS3ChunkSize.Store(uint64(val)) + return nil + }, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { + return strconv.Itoa(int(GlobalSortS3ChunkSize.Load())), nil + }}, } func setTiFlashComputeDispatchPolicy(s *SessionVars, val string) error { diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 29e307c520bd8..aeece6564f2bd 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -902,6 +902,18 @@ const ( // TiDBEnableCheckConstraint indicates whether to enable check constraint feature. TiDBEnableCheckConstraint = "tidb_enable_check_constraint" + + // TiDBTempStorageURI indicates where to create temporary files. + TiDBTempStorageURI = "tidb_temp_storage_uri" + + TiDBGlobalSortMemQuota = "tidb_global_sort_mem_quota" + + TiDBGlobalSortReadBufferSize = "tidb_global_sort_read_buffer_size" + TiDBGlobalSortWriteBatchSize = "tidb_global_sort_write_batch_size" + TiDBGlobalSortStatSampleKeys = "tidb_global_sort_stat_sample_keys" + TiDBGlobalSortStatSampleSize = "tidb_global_sort_stat_sample_size" + TiDBGlobalSortSubtaskCnt = "tidb_global_sort_subtask_cnt" + TiDBGlobalSortS3ChunkSize = "tidb_global_sort_s3_chunk_size" ) // TiDB vars that have only global scope @@ -1379,6 +1391,13 @@ const ( DefRuntimeFilterMode = "OFF" DefTiDBLockUnchangedKeys = true DefTiDBEnableCheckConstraint = false + DefTiDBGlobalSortMemoryQuota = int(1 * size.GB) + DefTiDBGlobalSortReadBufferSize = int(64 * size.KB) + DefTiDBGlobalSortWriteBatchSize = 8 * 1024 + DefTiDBGlobalSortStatSampleKeys = 8 * 1024 + DefTiDBGlobalSortStatSampleSize = int(1 * size.MB) + DefTiDBGlobalSortSubtaskCnt = -1 + DefTiDBGlobalSortS3ChunkSize = int(5 * size.MB) ) // Process global variables. @@ -1473,6 +1492,15 @@ var ( // It will be initialized to the right value after the first call of `rebuildSysVarCache` EnableResourceControl = atomic.NewBool(false) EnableCheckConstraint = atomic.NewBool(DefTiDBEnableCheckConstraint) + + TempStorageURI = atomic.NewString("") + GlobalSortMemQuota = atomic.NewUint64(uint64(DefTiDBGlobalSortMemoryQuota)) + GlobalSortReadBufferSize = atomic.NewUint64(uint64(DefTiDBGlobalSortReadBufferSize)) + GlobalSortWriteBatchSize = atomic.NewInt64(DefTiDBGlobalSortWriteBatchSize) + GlobalSortStatSampleKeys = atomic.NewInt64(DefTiDBGlobalSortStatSampleKeys) + GlobalSortStatSampleSize = atomic.NewUint64(uint64(DefTiDBGlobalSortStatSampleSize)) + GlobalSortSubtaskCnt = atomic.NewInt64(DefTiDBGlobalSortSubtaskCnt) + GlobalSortS3ChunkSize = atomic.NewUint64(uint64(DefTiDBGlobalSortS3ChunkSize)) ) var (