Skip to content

Commit

Permalink
fix engine load not found bug and add system variables
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Jul 13, 2023
1 parent b03852e commit 3df388e
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 32 deletions.
29 changes: 15 additions & 14 deletions br/pkg/lightning/backend/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,9 @@ var (
splitRetryTimes = 8
)

type ExtStoreConfig struct {
Bucket string
Prefix string
AccessKey string
SecretAccessKey string
Host string
Port string
}

func NewRemoteBackend(ctx context.Context, cfg local.BackendConfig, tls *common.TLS,
extCfg *ExtStoreConfig, jobID int64) (backend.Backend, error) {
uri := fmt.Sprintf("s3://%s/%s?access-key=%s&secret-access-key=%s&endpoint=http://%s:%s&force-path-style=true",
extCfg.Bucket, extCfg.Prefix, extCfg.AccessKey, extCfg.SecretAccessKey, extCfg.Host, extCfg.Port)
bd, err := storage.ParseBackend(uri, nil)
func NewRemoteBackend(ctx context.Context, remoteCfg *Config, cfg local.BackendConfig, tls *common.TLS,
tempStorageURI string, jobID int64) (backend.Backend, error) {
bd, err := storage.ParseBackend(tempStorageURI, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -143,6 +132,7 @@ func NewRemoteBackend(ctx context.Context, cfg local.BackendConfig, tls *common.
bc := &Backend{
jobID: jobID,
externalStorage: extStore,
config: remoteCfg,
mu: struct {
sync.RWMutex
maxWriterID int
Expand Down Expand Up @@ -171,9 +161,20 @@ func NewRemoteBackend(ctx context.Context, cfg local.BackendConfig, tls *common.
return bc, nil
}

type Config struct {
MemQuota uint64
ReadBufferSize uint64
WriteBatchSize int64
StatSampleKeys int64
StatSampleSize uint64
SubtaskCnt int64
S3ChunkSize uint64
}

type Backend struct {
jobID int64
externalStorage storage.ExternalStorage
config *Config
mu struct {
sync.RWMutex
maxWriterID int
Expand Down
2 changes: 1 addition & 1 deletion ddl/disttask_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (h *litBackfillFlowHandle) ProcessNormalFlow(ctx context.Context, taskHandl
switch gTask.Step {
case proto.StepOne:
if bcCtx, ok := ingest.LitBackCtxMgr.Load(job.ID); ok {
if bc := bcCtx.GetBackend().(*remote.Backend); bc != nil {
if bc, ok := bcCtx.GetBackend().(*remote.Backend); ok {
gTask.Step = proto.StepTwo
return h.splitSubtaskRanges(ctx, taskHandle, gTask, bc)
}
Expand Down
7 changes: 7 additions & 0 deletions ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type BackendCtx interface {
GetCheckpointManager() *CheckpointManager

GetBackend() backend.Backend
EngineLoaded(indexID int64) bool
}

// FlushMode is used to control how to flush.
Expand Down Expand Up @@ -295,3 +296,9 @@ func (bc *litBackendCtx) GetCheckpointManager() *CheckpointManager {
func (bc *litBackendCtx) GetBackend() backend.Backend {
return bc.backend
}

// EngineLoaded returns true if the engine is loaded.
func (bc *litBackendCtx) EngineLoaded(indexID int64) bool {
_, ok := bc.Load(indexID)
return ok
}
32 changes: 22 additions & 10 deletions ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ import (
"fmt"
"math"
"strconv"
"strings"
"time"

"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/backend/remote"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/generic"
"github.com/pingcap/tidb/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -93,7 +95,7 @@ func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int6
logutil.BgLogger().Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
}
bd, err := createRemoteBackend(ctx, cfg, jobID)
bd, err := createBackend(ctx, cfg, jobID)
if err != nil {
logutil.BgLogger().Error(LitErrCreateBackendFail, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
Expand All @@ -112,6 +114,14 @@ func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int6
return bc, nil
}

func createBackend(ctx context.Context, cfg *Config, jobID int64) (backend.Backend, error) {
tempStorage := variable.TempStorageURI.Load()
if strings.HasPrefix(tempStorage, "s3://") {
return createRemoteBackend(ctx, cfg, jobID, tempStorage)
}
return createLocalBackend(ctx, cfg)
}

func createLocalBackend(ctx context.Context, cfg *Config) (*local.Backend, error) {
tls, err := cfg.Lightning.ToTLS()
if err != nil {
Expand All @@ -127,21 +137,23 @@ func createLocalBackend(ctx context.Context, cfg *Config) (*local.Backend, error
return local.NewBackend(ctx, tls, backendConfig, regionSizeGetter)
}

func createRemoteBackend(ctx context.Context, cfg *Config, jobID int64) (backend.Backend, error) {
func createRemoteBackend(ctx context.Context, cfg *Config, jobID int64, tempStorage string) (backend.Backend, error) {
tls, err := cfg.Lightning.ToTLS()
if err != nil {
logutil.BgLogger().Error(LitErrCreateBackendFail, zap.Error(err))
return nil, err
}
backendConfig := local.NewBackendConfig(cfg.Lightning, int(LitRLimit), cfg.KeyspaceName)
return remote.NewRemoteBackend(ctx, backendConfig, tls, &remote.ExtStoreConfig{
Bucket: "nfs",
Prefix: "tools_test_data/sharedisk",
AccessKey: "minioadmin",
SecretAccessKey: "minioadmin",
Host: "127.0.0.1",
Port: "9000",
}, jobID)
remoteCfg := &remote.Config{
MemQuota: variable.GlobalSortMemQuota.Load(),
ReadBufferSize: variable.GlobalSortReadBufferSize.Load(),
WriteBatchSize: variable.GlobalSortWriteBatchSize.Load(),
StatSampleKeys: variable.GlobalSortStatSampleKeys.Load(),
StatSampleSize: variable.GlobalSortStatSampleSize.Load(),
SubtaskCnt: variable.GlobalSortSubtaskCnt.Load(),
S3ChunkSize: variable.GlobalSortS3ChunkSize.Load(),
}
return remote.NewRemoteBackend(ctx, remoteCfg, backendConfig, tls, tempStorage, jobID)
}

const checkpointUpdateInterval = 10 * time.Minute
Expand Down
5 changes: 5 additions & 0 deletions ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ func (m *MockBackendCtx) GetBackend() backend.Backend {
return nil
}

// EngineLoaded implements BackendCtx.EngineLoaded interface.
func (m *MockBackendCtx) EngineLoaded(_ int64) bool {
return false
}

// MockEngineInfo is a mock engine info.
type MockEngineInfo struct {
sessCtx sessionctx.Context
Expand Down
25 changes: 18 additions & 7 deletions ddl/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ func (b *backfillSchedulerHandle) InitSubtaskExecEnv(ctx context.Context) error
b.bc = bc
if b.stepForImport {
if b.stepForOrderedImport {
return nil
_, err := bc.Register(b.job.ID, b.index.ID, b.job.SchemaName, b.job.TableName)
return err
}
return b.doFlushAndHandleError(ingest.FlushModeForceGlobal)
}
Expand Down Expand Up @@ -326,14 +327,24 @@ func (b *backfillSchedulerHandle) orderedImport(ctx context.Context, subtask []b
logutil.BgLogger().Error("[ddl] unmarshal error", zap.Error(err))
return err
}
if bcCtx, ok := ingest.LitBackCtxMgr.Load(b.job.ID); ok {
if bc, ok := bcCtx.GetBackend().(*remote.Backend); ok {
err := bc.SetRange(ctx, subTaskMeta.StartKey, subTaskMeta.EndKey, subTaskMeta.DataFiles, subTaskMeta.StatsFiles)
if err != nil {
return err
}
bcCtx, ok := ingest.LitBackCtxMgr.Load(b.job.ID)
if !ok {
return errors.New("can not find backend context")
}
bc, ok := bcCtx.GetBackend().(*remote.Backend)
if !ok {
return errors.New("backend is not remote")
}
if !bcCtx.EngineLoaded(b.index.ID) {
_, err = bcCtx.Register(b.job.ID, b.index.ID, b.job.SchemaName, b.job.TableName)
if err != nil {
return err
}
}
err = bc.SetRange(ctx, subTaskMeta.StartKey, subTaskMeta.EndKey, subTaskMeta.DataFiles, subTaskMeta.StatsFiles)
if err != nil {
return err
}
return b.doFlushAndHandleError(ingest.FlushModeForceGlobal)
}

Expand Down
84 changes: 84 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/size"
stmtsummaryv2 "github.com/pingcap/tidb/util/stmtsummary/v2"
"github.com/pingcap/tidb/util/tiflash"
"github.com/pingcap/tidb/util/tiflashcompute"
Expand Down Expand Up @@ -2772,6 +2773,89 @@ var defaultSysVars = []*SysVar{
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
return BoolToOnOff(EnableCheckConstraint.Load()), nil
}},
{Scope: ScopeGlobal, Name: TiDBTempStorageURI, Value: "", Type: TypeStr, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
TempStorageURI.Store(s)
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
return TempStorageURI.Load(), nil
}},
{Scope: ScopeGlobal, Name: TiDBGlobalSortMemQuota, Value: strconv.Itoa(DefTiDBGlobalSortMemoryQuota), Type: TypeInt, MinValue: int64(256 * size.MB), MaxValue: 64 * size.GB,
SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
val, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return err
}
GlobalSortMemQuota.Store(uint64(val))
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
return strconv.Itoa(int(GlobalSortMemQuota.Load())), nil
}},
{Scope: ScopeGlobal, Name: TiDBGlobalSortReadBufferSize, Value: strconv.Itoa(DefTiDBGlobalSortReadBufferSize), Type: TypeInt, MinValue: int64(8 * size.KB), MaxValue: 1 * size.GB,
SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
val, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return err
}
GlobalSortReadBufferSize.Store(uint64(val))
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
return strconv.Itoa(int(GlobalSortReadBufferSize.Load())), nil
}},
{Scope: ScopeGlobal, Name: TiDBGlobalSortWriteBatchSize, Value: strconv.Itoa(DefTiDBGlobalSortWriteBatchSize), Type: TypeInt, MinValue: 1024, MaxValue: 1 * size.GB,
SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
val, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return err
}
GlobalSortWriteBatchSize.Store(val)
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
return strconv.Itoa(int(GlobalSortWriteBatchSize.Load())), nil
}},
{Scope: ScopeGlobal, Name: TiDBGlobalSortStatSampleKeys, Value: strconv.Itoa(DefTiDBGlobalSortStatSampleKeys), Type: TypeInt, MinValue: 1024, MaxValue: 1 * size.GB,
SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
val, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return err
}
GlobalSortStatSampleKeys.Store(val)
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
return strconv.Itoa(int(GlobalSortStatSampleKeys.Load())), nil
}},
{Scope: ScopeGlobal, Name: TiDBGlobalSortStatSampleSize, Value: strconv.Itoa(DefTiDBGlobalSortStatSampleSize), Type: TypeInt, MinValue: int64(1 * size.KB), MaxValue: 1 * size.GB,
SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
val, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return err
}
GlobalSortStatSampleSize.Store(uint64(val))
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
return strconv.Itoa(int(GlobalSortStatSampleSize.Load())), nil
}},
{Scope: ScopeGlobal, Name: TiDBGlobalSortSubtaskCnt, Value: strconv.Itoa(DefTiDBGlobalSortSubtaskCnt), Type: TypeInt, MinValue: 1, MaxValue: 1000,
SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
val, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return err
}
GlobalSortSubtaskCnt.Store(val)
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
return strconv.Itoa(int(GlobalSortSubtaskCnt.Load())), nil
}},
{Scope: ScopeGlobal, Name: TiDBGlobalSortS3ChunkSize, Value: strconv.Itoa(DefTiDBGlobalSortS3ChunkSize), Type: TypeInt, MinValue: int64(1 * size.MB), MaxValue: 1 * size.GB,
SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
val, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return err
}
GlobalSortS3ChunkSize.Store(uint64(val))
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
return strconv.Itoa(int(GlobalSortS3ChunkSize.Load())), nil
}},
}

func setTiFlashComputeDispatchPolicy(s *SessionVars, val string) error {
Expand Down
28 changes: 28 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,18 @@ const (

// TiDBEnableCheckConstraint indicates whether to enable check constraint feature.
TiDBEnableCheckConstraint = "tidb_enable_check_constraint"

// TiDBTempStorageURI indicates where to create temporary files.
TiDBTempStorageURI = "tidb_temp_storage_uri"

TiDBGlobalSortMemQuota = "tidb_global_sort_mem_quota"

TiDBGlobalSortReadBufferSize = "tidb_global_sort_read_buffer_size"
TiDBGlobalSortWriteBatchSize = "tidb_global_sort_write_batch_size"
TiDBGlobalSortStatSampleKeys = "tidb_global_sort_stat_sample_keys"
TiDBGlobalSortStatSampleSize = "tidb_global_sort_stat_sample_size"
TiDBGlobalSortSubtaskCnt = "tidb_global_sort_subtask_cnt"
TiDBGlobalSortS3ChunkSize = "tidb_global_sort_s3_chunk_size"
)

// TiDB vars that have only global scope
Expand Down Expand Up @@ -1379,6 +1391,13 @@ const (
DefRuntimeFilterMode = "OFF"
DefTiDBLockUnchangedKeys = true
DefTiDBEnableCheckConstraint = false
DefTiDBGlobalSortMemoryQuota = int(1 * size.GB)
DefTiDBGlobalSortReadBufferSize = int(64 * size.KB)
DefTiDBGlobalSortWriteBatchSize = 8 * 1024
DefTiDBGlobalSortStatSampleKeys = 8 * 1024
DefTiDBGlobalSortStatSampleSize = int(1 * size.MB)
DefTiDBGlobalSortSubtaskCnt = -1
DefTiDBGlobalSortS3ChunkSize = int(5 * size.MB)
)

// Process global variables.
Expand Down Expand Up @@ -1473,6 +1492,15 @@ var (
// It will be initialized to the right value after the first call of `rebuildSysVarCache`
EnableResourceControl = atomic.NewBool(false)
EnableCheckConstraint = atomic.NewBool(DefTiDBEnableCheckConstraint)

TempStorageURI = atomic.NewString("")
GlobalSortMemQuota = atomic.NewUint64(uint64(DefTiDBGlobalSortMemoryQuota))
GlobalSortReadBufferSize = atomic.NewUint64(uint64(DefTiDBGlobalSortReadBufferSize))
GlobalSortWriteBatchSize = atomic.NewInt64(DefTiDBGlobalSortWriteBatchSize)
GlobalSortStatSampleKeys = atomic.NewInt64(DefTiDBGlobalSortStatSampleKeys)
GlobalSortStatSampleSize = atomic.NewUint64(uint64(DefTiDBGlobalSortStatSampleSize))
GlobalSortSubtaskCnt = atomic.NewInt64(DefTiDBGlobalSortSubtaskCnt)
GlobalSortS3ChunkSize = atomic.NewUint64(uint64(DefTiDBGlobalSortS3ChunkSize))
)

var (
Expand Down

0 comments on commit 3df388e

Please sign in to comment.