Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config(dm): add on-duplicate-logical, on-duplicate-physical #7714

Merged
merged 9 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,15 @@ ErrOpenAPITaskConfigExist,[code=20050:class=config:scope=internal:level=low], "M
ErrOpenAPITaskConfigNotExist,[code=20051:class=config:scope=internal:level=low], "Message: the openapi task config for '%s' does not exist"
ErrConfigCollationCompatibleNotSupport,[code=20052:class=config:scope=internal:level=medium], "Message: collation compatible %s not supported, Workaround: Please check the `collation_compatible` config in task configuration file, which can be set to `loose`/`strict`."
ErrConfigInvalidLoadMode,[code=20053:class=config:scope=internal:level=medium], "Message: invalid load mode '%s', Workaround: Please choose a valid value in ['logical', 'physical']"
ErrConfigInvalidDuplicateResolution,[code=20054:class=config:scope=internal:level=medium], "Message: invalid load on-duplicate '%s', Workaround: Please choose a valid value in ['replace', 'error', 'ignore']"
ErrConfigInvalidDuplicateResolution,[code=20054:class=config:scope=internal:level=medium], "Message: invalid load on-duplicate-logical or on-duplicate option '%s', Workaround: Please choose a valid value in ['replace', 'error', 'ignore'] or leave it empty."
ErrConfigValidationMode,[code=20055:class=config:scope=internal:level=high], "Message: invalid validation mode, Workaround: Please check `validation-mode` config in task configuration file."
ErrContinuousValidatorCfgNotFound,[code=20056:class=config:scope=internal:level=medium], "Message: mysql-instance(%d)'s continuous validator config %s not exist, Workaround: Please check the `validator-config-name` config in task configuration file."
ErrConfigStartTimeTooLate,[code=20057:class=config:scope=internal:level=high], "Message: start-time %s is too late, no binlog location matches it, Workaround: Please check the `--start-time` is expected or try again later."
ErrConfigLoaderDirInvalid,[code=20058:class=config:scope=internal:level=high], "Message: loader's dir %s is invalid, Workaround: Please check the `dir` config in task configuration file."
ErrConfigLoaderS3NotSupport,[code=20059:class=config:scope=internal:level=high], "Message: loader's dir %s is s3 dir, but s3 is not supported, Workaround: Please check the `dir` config in task configuration file and you can use `Lightning` by set config `import-mode` be `sql` which supports s3 instead."
ErrConfigInvalidSafeModeDuration,[code=20060:class=config:scope=internal:level=medium], "Message: safe-mode-duration '%s' parsed failed: %v, Workaround: Please check the `safe-mode-duration` is correct."
ErrConfigConfictSafeModeDurationAndSafeMode,[code=20061:class=config:scope=internal:level=low], "Message: safe-mode(true) conflicts with safe-mode-duration(0s), Workaround: Please set safe-mode to false or safe-mode-duration to non-zero."
ErrConfigInvalidPhysicalDuplicateResolution,[code=20062:class=config:scope=internal:level=medium], "Message: invalid load on-duplicate-physical option '%s', Workaround: Please choose a valid value in ['none', 'manual'] or leave it empty."
ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high]
ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high], "Message: invalid binlog filename"
ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high]
Expand Down
67 changes: 50 additions & 17 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,34 +250,48 @@ const (
LoadModePhysical = "physical"
)

// DuplicateResolveType defines the duplication resolution when meet duplicate rows.
// DuplicateResolveType defines the duplication resolution when meet duplicate rows for logical import.
type DuplicateResolveType string
okJiang marked this conversation as resolved.
Show resolved Hide resolved

const (
// OnDuplicateReplace represents replace the old row with new data.
OnDuplicateReplace DuplicateResolveType = "replace"
// OnDuplicateError represents return an error when meet duplicate row.
OnDuplicateError = "error"
OnDuplicateError DuplicateResolveType = "error"
// OnDuplicateIgnore represents ignore the new data when meet duplicate row.
OnDuplicateIgnore = "ignore"
OnDuplicateIgnore DuplicateResolveType = "ignore"
)

// PhysicalDuplicateResolveType defines the duplication resolution when meet duplicate rows for physical import.
type PhysicalDuplicateResolveType string

const (
// OnDuplicateNone represents do nothing when meet duplicate row and the task will continue.
OnDuplicateNone PhysicalDuplicateResolveType = "none"
// OnDuplicateManual represents that task should be paused when meet duplicate row to let user handle it manually.
OnDuplicateManual PhysicalDuplicateResolveType = "manual"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember we have three duplicate resolutions in lightning:

# Physical Import Mode 设置是否检测和解决重复的记录(唯一键冲突)。
# 目前支持三种解决方法:
#  - record: 仅将重复记录添加到目的 TiDB 中的 `lightning_task_info.conflict_error_v1` 表中。注意,该方法要求目的 TiKV 的版本为 v5.2.0 或更新版本。如果版本过低,则会启用下面的 'none' 模式。
#  - none: 不检测重复记录。该模式是三种模式中性能最佳的,但是可能会导致目的 TiDB 中出现数据不一致的情况。
#  - remove: 记录所有的重复记录,和 'record' 模式相似。但是会删除所有的重复记录,以确保目的 TiDB 中的数据状态保持一致。
# duplicate-resolution = 'none'

Are there any reasons we don't support it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

record is worse than remove in all respects so we delete it in design

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we don't support "remove" in this version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"manual" is in fact "remove", it will remove the conflict KV first and let user query the recording table to decide to insert which row

)

// LoaderConfig represents loader process unit's specific config.
type LoaderConfig struct {
PoolSize int `yaml:"pool-size" toml:"pool-size" json:"pool-size"`
Dir string `yaml:"dir" toml:"dir" json:"dir"`
SQLMode string `yaml:"-" toml:"-" json:"-"` // wrote by dump unit (DM op) or jobmaster (DM in engine)
ImportMode LoadMode `yaml:"import-mode" toml:"import-mode" json:"import-mode"`
OnDuplicate DuplicateResolveType `yaml:"on-duplicate" toml:"on-duplicate" json:"on-duplicate"`
PoolSize int `yaml:"pool-size" toml:"pool-size" json:"pool-size"`
Dir string `yaml:"dir" toml:"dir" json:"dir"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dir seems a directory that maintains the export data. Do we need an extra config field for sst files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll check it with PM later, maybe we need add more fields in future

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also, lightning will check whether the source-dir and sst-dir are on the same disk (for IO performance sake, I think). I'm wondering if we should check it here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

precheck will be ported in another PR

SQLMode string `yaml:"-" toml:"-" json:"-"` // wrote by dump unit (DM op) or jobmaster (DM in engine)
ImportMode LoadMode `yaml:"import-mode" toml:"import-mode" json:"import-mode"`
// deprecated, use OnDuplicateLogical instead.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have any plan to remove the deprecated configuration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after all development is finished

OnDuplicate DuplicateResolveType `yaml:"on-duplicate" toml:"on-duplicate" json:"on-duplicate"`
OnDuplicateLogical DuplicateResolveType `yaml:"on-duplicate-logical" toml:"on-duplicate-logical" json:"on-duplicate-logical"`
// TODO: no effects now
OnDuplicatePhysical PhysicalDuplicateResolveType `yaml:"on-duplicate-physical" toml:"on-duplicate-physical" json:"on-duplicate-physical"`
}

// DefaultLoaderConfig return default loader config for task.
func DefaultLoaderConfig() LoaderConfig {
return LoaderConfig{
PoolSize: defaultPoolSize,
Dir: defaultDir,
ImportMode: LoadModeLogical,
OnDuplicate: OnDuplicateReplace,
PoolSize: defaultPoolSize,
Dir: defaultDir,
ImportMode: LoadModeLogical,
OnDuplicateLogical: OnDuplicateReplace,
}
}

Expand Down Expand Up @@ -308,12 +322,31 @@ func (m *LoaderConfig) adjust() error {
return terror.ErrConfigInvalidLoadMode.Generate(m.ImportMode)
}

if m.OnDuplicate == "" {
m.OnDuplicate = OnDuplicateReplace
if m.PoolSize == 0 {
m.PoolSize = defaultPoolSize
}

if m.OnDuplicateLogical == "" && m.OnDuplicate != "" {
m.OnDuplicateLogical = m.OnDuplicate
}
m.OnDuplicate = DuplicateResolveType(strings.ToLower(string(m.OnDuplicate)))
if m.OnDuplicate != OnDuplicateReplace && m.OnDuplicate != OnDuplicateError && m.OnDuplicate != OnDuplicateIgnore {
return terror.ErrConfigInvalidDuplicateResolution.Generate(m.OnDuplicate)
if m.OnDuplicateLogical == "" {
m.OnDuplicateLogical = OnDuplicateReplace
}
m.OnDuplicateLogical = DuplicateResolveType(strings.ToLower(string(m.OnDuplicateLogical)))
switch m.OnDuplicateLogical {
case OnDuplicateReplace, OnDuplicateError, OnDuplicateIgnore:
default:
return terror.ErrConfigInvalidDuplicateResolution.Generate(m.OnDuplicateLogical)
}

if m.OnDuplicatePhysical == "" {
m.OnDuplicatePhysical = OnDuplicateNone
}
m.OnDuplicatePhysical = PhysicalDuplicateResolveType(strings.ToLower(string(m.OnDuplicatePhysical)))
switch m.OnDuplicatePhysical {
case OnDuplicateNone, OnDuplicateManual:
default:
return terror.ErrConfigInvalidPhysicalDuplicateResolution.Generate(m.OnDuplicatePhysical)
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions dm/config/task_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
if fullCfg.DataDir != nil {
subTaskCfg.LoaderConfig.Dir = *fullCfg.DataDir
}
subTaskCfg.LoaderConfig.OnDuplicate = DuplicateResolveType(task.OnDuplicate)
subTaskCfg.LoaderConfig.OnDuplicateLogical = DuplicateResolveType(task.OnDuplicate)
}
// set incremental config
subTaskCfg.SyncerConfig = DefaultSyncerConfig()
Expand Down Expand Up @@ -596,7 +596,7 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta
TaskMode: openapi.TaskTaskMode(oneSubtaskConfig.Mode),
EnhanceOnlineSchemaChange: oneSubtaskConfig.OnlineDDL,
MetaSchema: &oneSubtaskConfig.MetaSchema,
OnDuplicate: openapi.TaskOnDuplicate(oneSubtaskConfig.LoaderConfig.OnDuplicate),
OnDuplicate: openapi.TaskOnDuplicate(oneSubtaskConfig.LoaderConfig.OnDuplicateLogical),
SourceConfig: taskSourceConfig,
TargetConfig: openapi.TaskTargetDataBase{
Host: oneSubtaskConfig.To.Host,
Expand Down
36 changes: 32 additions & 4 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,10 +695,11 @@ func TestGenAndFromSubTaskConfigs(t *testing.T) {
ExtraArgs: "--escape-backslash",
},
LoaderConfig: LoaderConfig{
PoolSize: 32,
Dir: "./dumpped_data",
ImportMode: LoadModePhysical,
OnDuplicate: OnDuplicateReplace,
PoolSize: 32,
Dir: "./dumpped_data",
ImportMode: LoadModePhysical,
OnDuplicateLogical: OnDuplicateReplace,
OnDuplicatePhysical: OnDuplicateNone,
},
SyncerConfig: SyncerConfig{
WorkerCount: 32,
Expand Down Expand Up @@ -1141,3 +1142,30 @@ func cloneValues(dest, src reflect.Value) {
}
}
}

func TestLoadConfigAdjust(t *testing.T) {
t.Parallel()

cfg := &LoaderConfig{}
require.NoError(t, cfg.adjust())
require.Equal(t, &LoaderConfig{
PoolSize: 16,
Dir: "",
SQLMode: "",
ImportMode: "logical",
OnDuplicate: "",
OnDuplicateLogical: "replace",
OnDuplicatePhysical: "none",
}, cfg)

// test deprecated OnDuplicate will write to OnDuplicateLogical
cfg.OnDuplicate = "replace"
cfg.OnDuplicateLogical = ""
require.NoError(t, cfg.adjust())
require.Equal(t, OnDuplicateReplace, cfg.OnDuplicateLogical)

// test wrong value
cfg.OnDuplicatePhysical = "wrong"
err := cfg.adjust()
require.True(t, terror.ErrConfigInvalidPhysicalDuplicateResolution.Equal(err))
}
10 changes: 8 additions & 2 deletions dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1133,9 +1133,9 @@ workaround = "Please choose a valid value in ['logical', 'physical']"
tags = ["internal", "medium"]

[error.DM-config-20054]
message = "invalid load on-duplicate '%s'"
message = "invalid load on-duplicate-logical or on-duplicate option '%s'"
description = ""
workaround = "Please choose a valid value in ['replace', 'error', 'ignore']"
workaround = "Please choose a valid value in ['replace', 'error', 'ignore'] or leave it empty."
tags = ["internal", "medium"]

[error.DM-config-20055]
Expand Down Expand Up @@ -1180,6 +1180,12 @@ description = ""
workaround = "Please set safe-mode to false or safe-mode-duration to non-zero."
tags = ["internal", "low"]

[error.DM-config-20062]
message = "invalid load on-duplicate-physical option '%s'"
description = ""
workaround = "Please choose a valid value in ['none', 'manual'] or leave it empty."
tags = ["internal", "medium"]

[error.DM-binlog-op-22001]
message = ""
description = ""
Expand Down
2 changes: 1 addition & 1 deletion dm/loader/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func GetLightningConfig(globalCfg *lcfg.GlobalConfig, subtaskCfg *config.SubTask
cfg.Checkpoint.DSN = cpPath
cfg.Checkpoint.KeepAfterSuccess = lcfg.CheckpointOrigin

cfg.TikvImporter.OnDuplicate = string(subtaskCfg.OnDuplicate)
cfg.TikvImporter.OnDuplicate = string(subtaskCfg.OnDuplicateLogical)
cfg.TiDB.Vars = make(map[string]string)
cfg.Routes = subtaskCfg.RouteRules
if subtaskCfg.To.Session != nil {
Expand Down
Loading