Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loader(dm): remove loader and ColumnMapping and dump TLS certs #7940

Merged
merged 28 commits into from
Dec 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ ErrConfigInvalidSafeModeDuration,[code=20060:class=config:scope=internal:level=m
ErrConfigConfictSafeModeDurationAndSafeMode,[code=20061:class=config:scope=internal:level=low], "Message: safe-mode(true) conflicts with safe-mode-duration(0s), Workaround: Please set safe-mode to false or safe-mode-duration to non-zero."
ErrConfigInvalidPhysicalDuplicateResolution,[code=20062:class=config:scope=internal:level=medium], "Message: invalid load on-duplicate-physical option '%s', Workaround: Please choose a valid value in ['none', 'manual'] or leave it empty."
ErrConfigInvalidPhysicalChecksum,[code=20063:class=config:scope=internal:level=medium], "Message: invalid load checksum-physical option '%s', Workaround: Please choose a valid value in ['required', 'optional', 'off'] or leave it empty."
ErrConfigColumnMappingDeprecated,[code=20064:class=config:scope=internal:level=high], "Message: column-mapping is not supported since v6.6.0, Workaround: Please use extract-table/extract-schema/extract-source to handle data conflict when merge tables. See https://docs.pingcap.com/tidb/v6.4/task-configuration-file-full#task-configuration-file-template-advanced"
ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high]
ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high], "Message: invalid binlog filename"
ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high]
Expand Down
39 changes: 0 additions & 39 deletions dm/config/security/security.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ import (
"encoding/base64"
"fmt"
"os"
"path"

"github.com/pingcap/tiflow/dm/pkg/utils"
)

// Security config.
Expand Down Expand Up @@ -81,42 +78,6 @@ func (s *Security) LoadTLSContent() error {
return firstErr
}

// DumpTLSContent dump tls certs data to file.
// if user specified the path for certs but the cert doesn't exist or user didn't specify the path for certs
// dump certs to dm-worker folder and change the cert path.
// see more here https://github.com/pingcap/tiflow/pull/3260#discussion_r749052994
func (s *Security) DumpTLSContent(baseDirPath string) error {
isSSLCANotExist := s.SSLCA == "" || !utils.IsFileExists(s.SSLCA)
isSSLCertNotExist := s.SSLCert == "" || !utils.IsFileExists(s.SSLCert)
isSSLKeyNotExist := s.SSLKey == "" || !utils.IsFileExists(s.SSLKey)
if isSSLCANotExist || isSSLCertNotExist || isSSLKeyNotExist {
if !utils.IsDirExists(baseDirPath) {
if err := os.MkdirAll(baseDirPath, 0o700); err != nil {
return err
}
}
}
if isSSLCANotExist {
s.SSLCA = path.Join(baseDirPath, "ca.pem")
if err := utils.WriteFileAtomic(s.SSLCA, s.SSLCABytes, 0o600); err != nil {
return err
}
}
if isSSLCertNotExist {
s.SSLCert = path.Join(baseDirPath, "cert.pem")
if err := utils.WriteFileAtomic(s.SSLCert, s.SSLCertBytes, 0o600); err != nil {
return err
}
}
if isSSLKeyNotExist {
s.SSLKey = path.Join(baseDirPath, "key.pem")
if err := utils.WriteFileAtomic(s.SSLKey, s.SSLKeyBytes, 0o600); err != nil {
return err
}
}
return nil
}

// ClearSSLBytesData clear all tls config bytes data.
func (s *Security) ClearSSLBytesData() {
s.SSLCABytes = s.SSLCABytes[:0]
Expand Down
31 changes: 0 additions & 31 deletions dm/config/security_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,34 +171,3 @@ func (c *testTLSConfig) TestClone() {
clone.CertAllowedCN[0] = "g"
c.Require().NotEqual(s, clone)
}

func (c *testTLSConfig) TestLoadDumpTLSContent() {
s := &security.Security{
SSLCA: caFilePath,
SSLCert: certFilePath,
SSLKey: keyFilePath,
}
err := s.LoadTLSContent()
c.Require().NoError(err)
c.Require().Greater(len(s.SSLCABytes), 0)
c.Require().Greater(len(s.SSLCertBytes), 0)
c.Require().Greater(len(s.SSLKeyBytes), 0)

// cert file not exist
s.SSLCA += ".new"
s.SSLCert += ".new"
s.SSLKey += ".new"
c.Require().NoError(s.DumpTLSContent(c.T().TempDir()))
c.Require().FileExists(s.SSLCA)
c.Require().FileExists(s.SSLCert)
c.Require().FileExists(s.SSLKey)

// user not specify cert file
s.SSLCA = ""
s.SSLCert = ""
s.SSLKey = ""
c.Require().NoError(s.DumpTLSContent(c.T().TempDir()))
c.Require().FileExists(s.SSLCA)
c.Require().FileExists(s.SSLCert)
c.Require().FileExists(s.SSLKey)
}
19 changes: 9 additions & 10 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,11 @@ type SubTaskConfig struct {
From dbconfig.DBConfig `toml:"from" json:"from"`
To dbconfig.DBConfig `toml:"to" json:"to"`

RouteRules []*router.TableRule `toml:"route-rules" json:"route-rules"`
FilterRules []*bf.BinlogEventRule `toml:"filter-rules" json:"filter-rules"`
ColumnMappingRules []*column.Rule `toml:"mapping-rule" json:"mapping-rule"`
ExprFilter []*ExpressionFilter `yaml:"expression-filter" toml:"expression-filter" json:"expression-filter"`
RouteRules []*router.TableRule `toml:"route-rules" json:"route-rules"`
FilterRules []*bf.BinlogEventRule `toml:"filter-rules" json:"filter-rules"`
// deprecated
ColumnMappingRules []*column.Rule `toml:"mapping-rule" json:"mapping-rule"`
ExprFilter []*ExpressionFilter `yaml:"expression-filter" toml:"expression-filter" json:"expression-filter"`

// black-white-list is deprecated, use block-allow-list instead
BWList *filter.Rules `toml:"black-white-list" json:"black-white-list"`
Expand Down Expand Up @@ -288,6 +289,10 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
c.ShardMode = ShardPessimistic // use the pessimistic mode as default for back compatible.
}

if len(c.ColumnMappingRules) > 0 {
return terror.ErrConfigColumnMappingDeprecated.Generate()
}

if c.OnlineDDLScheme != "" && c.OnlineDDLScheme != PT && c.OnlineDDLScheme != GHOST {
return terror.ErrConfigOnlineSchemeNotSupport.Generate(c.OnlineDDLScheme)
} else if c.OnlineDDLScheme == PT || c.OnlineDDLScheme == GHOST {
Expand Down Expand Up @@ -481,9 +486,3 @@ func (c *SubTaskConfig) Clone() (*SubTaskConfig, error) {

return clone, nil
}

// NeedUseLightning returns whether need to use lightning loader.
func (c *SubTaskConfig) NeedUseLightning() bool {
// TODO: return true after remove loader
return (c.Mode == ModeAll || c.Mode == ModeFull) && c.ImportMode != LoadModeLoader
}
47 changes: 21 additions & 26 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,10 @@ func (m *Meta) Verify() error {
// MySQLInstance represents a sync config of a MySQL instance.
type MySQLInstance struct {
// it represents a MySQL/MariaDB instance or a replica group
SourceID string `yaml:"source-id"`
Meta *Meta `yaml:"meta"`
FilterRules []string `yaml:"filter-rules"`
SourceID string `yaml:"source-id"`
Meta *Meta `yaml:"meta"`
FilterRules []string `yaml:"filter-rules"`
// deprecated
ColumnMappingRules []string `yaml:"column-mapping-rules"`
RouteRules []string `yaml:"route-rules"`
ExpressionFilters []string `yaml:"expression-filters"`
Expand Down Expand Up @@ -243,13 +244,13 @@ const (
// LoadModeSQL means write data by sql statements, uses tidb-lightning tidb backend to load data.
// deprecated, use LoadModeLogical instead.
LoadModeSQL LoadMode = "sql"
// LoadModeLoader is the legacy sql mode, use loader to load data. this should be replaced by sql mode in new version.
// deprecated, loader will be removed in future.
LoadModeLoader = "loader"
// LoadModeLoader is the legacy sql mode, use loader to load data. this should be replaced by LoadModeLogical mode.
// deprecated, use LoadModeLogical instead.
LoadModeLoader LoadMode = "loader"
// LoadModeLogical means use tidb backend of lightning to load data, which uses SQL to load data.
LoadModeLogical = "logical"
LoadModeLogical LoadMode = "logical"
// LoadModePhysical means use local backend of lightning to load data, which ingest SST files to load data.
LoadModePhysical = "physical"
LoadModePhysical LoadMode = "physical"
)

// LogicalDuplicateResolveType defines the duplication resolution when meet duplicate rows for logical import.
Expand Down Expand Up @@ -325,7 +326,8 @@ func (m *LoaderConfig) adjust() error {
if m.ImportMode == "" {
m.ImportMode = LoadModeLogical
}
if strings.EqualFold(string(m.ImportMode), string(LoadModeSQL)) {
if strings.EqualFold(string(m.ImportMode), string(LoadModeSQL)) ||
strings.EqualFold(string(m.ImportMode), string(LoadModeLoader)) {
m.ImportMode = LoadModeLogical
}
m.ImportMode = LoadMode(strings.ToLower(string(m.ImportMode)))
Expand Down Expand Up @@ -524,10 +526,11 @@ type TaskConfig struct {
// deprecated
OnlineDDLScheme string `yaml:"online-ddl-scheme" toml:"online-ddl-scheme" json:"online-ddl-scheme"`

Routes map[string]*router.TableRule `yaml:"routes" toml:"routes" json:"routes"`
Filters map[string]*bf.BinlogEventRule `yaml:"filters" toml:"filters" json:"filters"`
ColumnMappings map[string]*column.Rule `yaml:"column-mappings" toml:"column-mappings" json:"column-mappings"`
ExprFilter map[string]*ExpressionFilter `yaml:"expression-filter" toml:"expression-filter" json:"expression-filter"`
Routes map[string]*router.TableRule `yaml:"routes" toml:"routes" json:"routes"`
Filters map[string]*bf.BinlogEventRule `yaml:"filters" toml:"filters" json:"filters"`
// deprecated
ColumnMappings map[string]*column.Rule `yaml:"column-mappings" toml:"column-mappings" json:"column-mappings"`
ExprFilter map[string]*ExpressionFilter `yaml:"expression-filter" toml:"expression-filter" json:"expression-filter"`

// black-white-list is deprecated, use block-allow-list instead
BWList map[string]*filter.Rules `yaml:"black-white-list" toml:"black-white-list" json:"black-white-list"`
Expand Down Expand Up @@ -629,12 +632,11 @@ func (c *TaskConfig) RawDecode(data string) error {
}

// find unused items in config.
var configRefPrefixes = []string{"RouteRules", "FilterRules", "ColumnMappingRules", "Mydumper", "Loader", "Syncer", "ExprFilter", "Validator"}
var configRefPrefixes = []string{"RouteRules", "FilterRules", "Mydumper", "Loader", "Syncer", "ExprFilter", "Validator"}

const (
routeRulesIdx = iota
filterRulesIdx
columnMappingIdx
mydumperIdx
loaderIdx
syncerIdx
Expand Down Expand Up @@ -667,6 +669,10 @@ func (c *TaskConfig) adjust() error {
c.ShardMode = ShardPessimistic // use the pessimistic mode as default for back compatible.
}

if len(c.ColumnMappings) > 0 {
return terror.ErrConfigColumnMappingDeprecated.Generate()
}

if c.CollationCompatible != "" && c.CollationCompatible != LooseCollationCompatible && c.CollationCompatible != StrictCollationCompatible {
return terror.ErrConfigCollationCompatibleNotSupport.Generate(c.CollationCompatible)
} else if c.CollationCompatible == "" {
Expand Down Expand Up @@ -778,12 +784,6 @@ func (c *TaskConfig) adjust() error {
}
globalConfigReferCount[configRefPrefixes[filterRulesIdx]+name]++
}
for _, name := range inst.ColumnMappingRules {
if _, ok := c.ColumnMappings[name]; !ok {
return terror.ErrConfigColumnMappingNotFound.Generate(i, name)
}
globalConfigReferCount[configRefPrefixes[columnMappingIdx]+name]++
}

// only when BAList is empty use BWList
if len(c.BAList) == 0 && len(c.BWList) != 0 {
Expand Down Expand Up @@ -936,11 +936,6 @@ func (c *TaskConfig) adjust() error {
unusedConfigs = append(unusedConfigs, filter)
}
}
for columnMapping := range c.ColumnMappings {
if globalConfigReferCount[configRefPrefixes[columnMappingIdx]+columnMapping] == 0 {
unusedConfigs = append(unusedConfigs, columnMapping)
}
}
for mydumper := range c.Mydumpers {
if globalConfigReferCount[configRefPrefixes[mydumperIdx]+mydumper] == 0 {
unusedConfigs = append(unusedConfigs, mydumper)
Expand Down
40 changes: 1 addition & 39 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,6 @@ filters:
events: ["all dml"]
action: Do

column-mappings:
column-mapping-rule-1:
schema-pattern: "test_*"
table-pattern: "t_*"
expression: "partition id"
source-column: "id"
target-column: "id"
arguments: ["1", "test", "t", "_"]
column-mapping-rule-2:
schema-pattern: "test_*"
table-pattern: "t_*"
expression: "partition id"
source-column: "id"
target-column: "id"
arguments: ["2", "test", "t", "_"]

mydumpers:
global1:
threads: 4
Expand Down Expand Up @@ -122,7 +106,6 @@ mysql-instances:
- source-id: "mysql-replica-01"
route-rules: ["route-rule-2"]
filter-rules: ["filter-rule-2"]
column-mapping-rules: ["column-mapping-rule-2"]
mydumper-config-name: "global1"
loader-config-name: "global1"
syncer-config-name: "global1"
Expand All @@ -131,7 +114,6 @@ mysql-instances:
- source-id: "mysql-replica-02"
route-rules: ["route-rule-1"]
filter-rules: ["filter-rule-1"]
column-mapping-rules: ["column-mapping-rule-1"]
mydumper-config-name: "global2"
loader-config-name: "global2"
syncer-config-name: "global2"
Expand Down Expand Up @@ -177,22 +159,6 @@ filters:
events: ["all dml"]
action: Do

column-mappings:
column-mapping-rule-1:
schema-pattern: "test_*"
table-pattern: "t_*"
expression: "partition id"
source-column: "id"
target-column: "id"
arguments: ["1", "test", "t", "_"]
column-mapping-rule-2:
schema-pattern: "test_*"
table-pattern: "t_*"
expression: "partition id"
source-column: "id"
target-column: "id"
arguments: ["2", "test", "t", "_"]

mydumpers:
global1:
threads: 4
Expand Down Expand Up @@ -235,22 +201,20 @@ mysql-instances:
- source-id: "mysql-replica-01"
route-rules: ["route-rule-1"]
filter-rules: ["filter-rule-1"]
column-mapping-rules: ["column-mapping-rule-1"]
mydumper-config-name: "global1"
loader-config-name: "global1"
syncer-config-name: "global1"

- source-id: "mysql-replica-02"
route-rules: ["route-rule-1"]
filter-rules: ["filter-rule-1"]
column-mapping-rules: ["column-mapping-rule-1"]
mydumper-config-name: "global2"
loader-config-name: "global2"
syncer-config-name: "global2"
`
taskConfig = NewTaskConfig()
err = taskConfig.Decode(errorTaskConfig)
require.ErrorContains(t, err, "The configurations as following [column-mapping-rule-2 expr-1 filter-rule-2 route-rule-2] are set in global configuration")
require.ErrorContains(t, err, "The configurations as following [expr-1 filter-rule-2 route-rule-2] are set in global configuration")
}

func TestName(t *testing.T) {
Expand All @@ -276,7 +240,6 @@ mysql-instances:
server-id: 101
block-allow-list: "instance"
route-rules: ["sharding-route-rules-table", "sharding-route-rules-schema"]
column-mapping-rules: ["instance-1"]
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"
Expand All @@ -301,7 +264,6 @@ mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"
route-rules: ["sharding-route-rules-table", "sharding-route-rules-schema"]
column-mapping-rules: ["instance-1"]
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"
Expand Down
6 changes: 6 additions & 0 deletions dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,12 @@ description = ""
workaround = "Please choose a valid value in ['required', 'optional', 'off'] or leave it empty."
tags = ["internal", "medium"]

[error.DM-config-20064]
message = "column-mapping is not supported since v6.6.0"
description = ""
workaround = "Please use extract-table/extract-schema/extract-source to handle data conflict when merge tables. See https://docs.pingcap.com/tidb/v6.4/task-configuration-file-full#task-configuration-file-template-advanced"
tags = ["internal", "high"]

[error.DM-binlog-op-22001]
message = ""
description = ""
Expand Down
Loading