Skip to content

Commit

Permalink
loader(dm): remove loader and ColumnMapping and dump TLS certs (#7940)
Browse files Browse the repository at this point in the history
ref #3510
  • Loading branch information
lance6716 authored Dec 26, 2022
1 parent 189fe5f commit a98f447
Show file tree
Hide file tree
Showing 113 changed files with 637 additions and 6,092 deletions.
1 change: 1 addition & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ ErrConfigInvalidSafeModeDuration,[code=20060:class=config:scope=internal:level=m
ErrConfigConfictSafeModeDurationAndSafeMode,[code=20061:class=config:scope=internal:level=low], "Message: safe-mode(true) conflicts with safe-mode-duration(0s), Workaround: Please set safe-mode to false or safe-mode-duration to non-zero."
ErrConfigInvalidPhysicalDuplicateResolution,[code=20062:class=config:scope=internal:level=medium], "Message: invalid load on-duplicate-physical option '%s', Workaround: Please choose a valid value in ['none', 'manual'] or leave it empty."
ErrConfigInvalidPhysicalChecksum,[code=20063:class=config:scope=internal:level=medium], "Message: invalid load checksum-physical option '%s', Workaround: Please choose a valid value in ['required', 'optional', 'off'] or leave it empty."
ErrConfigColumnMappingDeprecated,[code=20064:class=config:scope=internal:level=high], "Message: column-mapping is not supported since v6.6.0, Workaround: Please use extract-table/extract-schema/extract-source to handle data conflict when merge tables. See https://docs.pingcap.com/tidb/v6.4/task-configuration-file-full#task-configuration-file-template-advanced"
ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high]
ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high], "Message: invalid binlog filename"
ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high]
Expand Down
39 changes: 0 additions & 39 deletions dm/config/security/security.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ import (
"encoding/base64"
"fmt"
"os"
"path"

"github.com/pingcap/tiflow/dm/pkg/utils"
)

// Security config.
Expand Down Expand Up @@ -81,42 +78,6 @@ func (s *Security) LoadTLSContent() error {
return firstErr
}

// DumpTLSContent dump tls certs data to file.
// if user specified the path for certs but the cert doesn't exist or user didn't specify the path for certs
// dump certs to dm-worker folder and change the cert path.
// see more here https://github.com/pingcap/tiflow/pull/3260#discussion_r749052994
func (s *Security) DumpTLSContent(baseDirPath string) error {
isSSLCANotExist := s.SSLCA == "" || !utils.IsFileExists(s.SSLCA)
isSSLCertNotExist := s.SSLCert == "" || !utils.IsFileExists(s.SSLCert)
isSSLKeyNotExist := s.SSLKey == "" || !utils.IsFileExists(s.SSLKey)
if isSSLCANotExist || isSSLCertNotExist || isSSLKeyNotExist {
if !utils.IsDirExists(baseDirPath) {
if err := os.MkdirAll(baseDirPath, 0o700); err != nil {
return err
}
}
}
if isSSLCANotExist {
s.SSLCA = path.Join(baseDirPath, "ca.pem")
if err := utils.WriteFileAtomic(s.SSLCA, s.SSLCABytes, 0o600); err != nil {
return err
}
}
if isSSLCertNotExist {
s.SSLCert = path.Join(baseDirPath, "cert.pem")
if err := utils.WriteFileAtomic(s.SSLCert, s.SSLCertBytes, 0o600); err != nil {
return err
}
}
if isSSLKeyNotExist {
s.SSLKey = path.Join(baseDirPath, "key.pem")
if err := utils.WriteFileAtomic(s.SSLKey, s.SSLKeyBytes, 0o600); err != nil {
return err
}
}
return nil
}

// ClearSSLBytesData clear all tls config bytes data.
func (s *Security) ClearSSLBytesData() {
s.SSLCABytes = s.SSLCABytes[:0]
Expand Down
31 changes: 0 additions & 31 deletions dm/config/security_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,34 +171,3 @@ func (c *testTLSConfig) TestClone() {
clone.CertAllowedCN[0] = "g"
c.Require().NotEqual(s, clone)
}

func (c *testTLSConfig) TestLoadDumpTLSContent() {
s := &security.Security{
SSLCA: caFilePath,
SSLCert: certFilePath,
SSLKey: keyFilePath,
}
err := s.LoadTLSContent()
c.Require().NoError(err)
c.Require().Greater(len(s.SSLCABytes), 0)
c.Require().Greater(len(s.SSLCertBytes), 0)
c.Require().Greater(len(s.SSLKeyBytes), 0)

// cert file not exist
s.SSLCA += ".new"
s.SSLCert += ".new"
s.SSLKey += ".new"
c.Require().NoError(s.DumpTLSContent(c.T().TempDir()))
c.Require().FileExists(s.SSLCA)
c.Require().FileExists(s.SSLCert)
c.Require().FileExists(s.SSLKey)

// user not specify cert file
s.SSLCA = ""
s.SSLCert = ""
s.SSLKey = ""
c.Require().NoError(s.DumpTLSContent(c.T().TempDir()))
c.Require().FileExists(s.SSLCA)
c.Require().FileExists(s.SSLCert)
c.Require().FileExists(s.SSLKey)
}
19 changes: 9 additions & 10 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,11 @@ type SubTaskConfig struct {
From dbconfig.DBConfig `toml:"from" json:"from"`
To dbconfig.DBConfig `toml:"to" json:"to"`

RouteRules []*router.TableRule `toml:"route-rules" json:"route-rules"`
FilterRules []*bf.BinlogEventRule `toml:"filter-rules" json:"filter-rules"`
ColumnMappingRules []*column.Rule `toml:"mapping-rule" json:"mapping-rule"`
ExprFilter []*ExpressionFilter `yaml:"expression-filter" toml:"expression-filter" json:"expression-filter"`
RouteRules []*router.TableRule `toml:"route-rules" json:"route-rules"`
FilterRules []*bf.BinlogEventRule `toml:"filter-rules" json:"filter-rules"`
// deprecated
ColumnMappingRules []*column.Rule `toml:"mapping-rule" json:"mapping-rule"`
ExprFilter []*ExpressionFilter `yaml:"expression-filter" toml:"expression-filter" json:"expression-filter"`

// black-white-list is deprecated, use block-allow-list instead
BWList *filter.Rules `toml:"black-white-list" json:"black-white-list"`
Expand Down Expand Up @@ -288,6 +289,10 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
c.ShardMode = ShardPessimistic // use the pessimistic mode as default for back compatible.
}

if len(c.ColumnMappingRules) > 0 {
return terror.ErrConfigColumnMappingDeprecated.Generate()
}

if c.OnlineDDLScheme != "" && c.OnlineDDLScheme != PT && c.OnlineDDLScheme != GHOST {
return terror.ErrConfigOnlineSchemeNotSupport.Generate(c.OnlineDDLScheme)
} else if c.OnlineDDLScheme == PT || c.OnlineDDLScheme == GHOST {
Expand Down Expand Up @@ -481,9 +486,3 @@ func (c *SubTaskConfig) Clone() (*SubTaskConfig, error) {

return clone, nil
}

// NeedUseLightning returns whether need to use lightning loader.
func (c *SubTaskConfig) NeedUseLightning() bool {
// TODO: return true after remove loader
return (c.Mode == ModeAll || c.Mode == ModeFull) && c.ImportMode != LoadModeLoader
}
47 changes: 21 additions & 26 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,10 @@ func (m *Meta) Verify() error {
// MySQLInstance represents a sync config of a MySQL instance.
type MySQLInstance struct {
// it represents a MySQL/MariaDB instance or a replica group
SourceID string `yaml:"source-id"`
Meta *Meta `yaml:"meta"`
FilterRules []string `yaml:"filter-rules"`
SourceID string `yaml:"source-id"`
Meta *Meta `yaml:"meta"`
FilterRules []string `yaml:"filter-rules"`
// deprecated
ColumnMappingRules []string `yaml:"column-mapping-rules"`
RouteRules []string `yaml:"route-rules"`
ExpressionFilters []string `yaml:"expression-filters"`
Expand Down Expand Up @@ -243,13 +244,13 @@ const (
// LoadModeSQL means write data by sql statements, uses tidb-lightning tidb backend to load data.
// deprecated, use LoadModeLogical instead.
LoadModeSQL LoadMode = "sql"
// LoadModeLoader is the legacy sql mode, use loader to load data. this should be replaced by sql mode in new version.
// deprecated, loader will be removed in future.
LoadModeLoader = "loader"
// LoadModeLoader is the legacy sql mode, use loader to load data. this should be replaced by LoadModeLogical mode.
// deprecated, use LoadModeLogical instead.
LoadModeLoader LoadMode = "loader"
// LoadModeLogical means use tidb backend of lightning to load data, which uses SQL to load data.
LoadModeLogical = "logical"
LoadModeLogical LoadMode = "logical"
// LoadModePhysical means use local backend of lightning to load data, which ingest SST files to load data.
LoadModePhysical = "physical"
LoadModePhysical LoadMode = "physical"
)

// LogicalDuplicateResolveType defines the duplication resolution when meet duplicate rows for logical import.
Expand Down Expand Up @@ -325,7 +326,8 @@ func (m *LoaderConfig) adjust() error {
if m.ImportMode == "" {
m.ImportMode = LoadModeLogical
}
if strings.EqualFold(string(m.ImportMode), string(LoadModeSQL)) {
if strings.EqualFold(string(m.ImportMode), string(LoadModeSQL)) ||
strings.EqualFold(string(m.ImportMode), string(LoadModeLoader)) {
m.ImportMode = LoadModeLogical
}
m.ImportMode = LoadMode(strings.ToLower(string(m.ImportMode)))
Expand Down Expand Up @@ -524,10 +526,11 @@ type TaskConfig struct {
// deprecated
OnlineDDLScheme string `yaml:"online-ddl-scheme" toml:"online-ddl-scheme" json:"online-ddl-scheme"`

Routes map[string]*router.TableRule `yaml:"routes" toml:"routes" json:"routes"`
Filters map[string]*bf.BinlogEventRule `yaml:"filters" toml:"filters" json:"filters"`
ColumnMappings map[string]*column.Rule `yaml:"column-mappings" toml:"column-mappings" json:"column-mappings"`
ExprFilter map[string]*ExpressionFilter `yaml:"expression-filter" toml:"expression-filter" json:"expression-filter"`
Routes map[string]*router.TableRule `yaml:"routes" toml:"routes" json:"routes"`
Filters map[string]*bf.BinlogEventRule `yaml:"filters" toml:"filters" json:"filters"`
// deprecated
ColumnMappings map[string]*column.Rule `yaml:"column-mappings" toml:"column-mappings" json:"column-mappings"`
ExprFilter map[string]*ExpressionFilter `yaml:"expression-filter" toml:"expression-filter" json:"expression-filter"`

// black-white-list is deprecated, use block-allow-list instead
BWList map[string]*filter.Rules `yaml:"black-white-list" toml:"black-white-list" json:"black-white-list"`
Expand Down Expand Up @@ -629,12 +632,11 @@ func (c *TaskConfig) RawDecode(data string) error {
}

// find unused items in config.
var configRefPrefixes = []string{"RouteRules", "FilterRules", "ColumnMappingRules", "Mydumper", "Loader", "Syncer", "ExprFilter", "Validator"}
var configRefPrefixes = []string{"RouteRules", "FilterRules", "Mydumper", "Loader", "Syncer", "ExprFilter", "Validator"}

const (
routeRulesIdx = iota
filterRulesIdx
columnMappingIdx
mydumperIdx
loaderIdx
syncerIdx
Expand Down Expand Up @@ -667,6 +669,10 @@ func (c *TaskConfig) adjust() error {
c.ShardMode = ShardPessimistic // use the pessimistic mode as default for back compatible.
}

if len(c.ColumnMappings) > 0 {
return terror.ErrConfigColumnMappingDeprecated.Generate()
}

if c.CollationCompatible != "" && c.CollationCompatible != LooseCollationCompatible && c.CollationCompatible != StrictCollationCompatible {
return terror.ErrConfigCollationCompatibleNotSupport.Generate(c.CollationCompatible)
} else if c.CollationCompatible == "" {
Expand Down Expand Up @@ -778,12 +784,6 @@ func (c *TaskConfig) adjust() error {
}
globalConfigReferCount[configRefPrefixes[filterRulesIdx]+name]++
}
for _, name := range inst.ColumnMappingRules {
if _, ok := c.ColumnMappings[name]; !ok {
return terror.ErrConfigColumnMappingNotFound.Generate(i, name)
}
globalConfigReferCount[configRefPrefixes[columnMappingIdx]+name]++
}

// only when BAList is empty use BWList
if len(c.BAList) == 0 && len(c.BWList) != 0 {
Expand Down Expand Up @@ -936,11 +936,6 @@ func (c *TaskConfig) adjust() error {
unusedConfigs = append(unusedConfigs, filter)
}
}
for columnMapping := range c.ColumnMappings {
if globalConfigReferCount[configRefPrefixes[columnMappingIdx]+columnMapping] == 0 {
unusedConfigs = append(unusedConfigs, columnMapping)
}
}
for mydumper := range c.Mydumpers {
if globalConfigReferCount[configRefPrefixes[mydumperIdx]+mydumper] == 0 {
unusedConfigs = append(unusedConfigs, mydumper)
Expand Down
40 changes: 1 addition & 39 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,6 @@ filters:
events: ["all dml"]
action: Do
column-mappings:
column-mapping-rule-1:
schema-pattern: "test_*"
table-pattern: "t_*"
expression: "partition id"
source-column: "id"
target-column: "id"
arguments: ["1", "test", "t", "_"]
column-mapping-rule-2:
schema-pattern: "test_*"
table-pattern: "t_*"
expression: "partition id"
source-column: "id"
target-column: "id"
arguments: ["2", "test", "t", "_"]
mydumpers:
global1:
threads: 4
Expand Down Expand Up @@ -122,7 +106,6 @@ mysql-instances:
- source-id: "mysql-replica-01"
route-rules: ["route-rule-2"]
filter-rules: ["filter-rule-2"]
column-mapping-rules: ["column-mapping-rule-2"]
mydumper-config-name: "global1"
loader-config-name: "global1"
syncer-config-name: "global1"
Expand All @@ -131,7 +114,6 @@ mysql-instances:
- source-id: "mysql-replica-02"
route-rules: ["route-rule-1"]
filter-rules: ["filter-rule-1"]
column-mapping-rules: ["column-mapping-rule-1"]
mydumper-config-name: "global2"
loader-config-name: "global2"
syncer-config-name: "global2"
Expand Down Expand Up @@ -177,22 +159,6 @@ filters:
events: ["all dml"]
action: Do
column-mappings:
column-mapping-rule-1:
schema-pattern: "test_*"
table-pattern: "t_*"
expression: "partition id"
source-column: "id"
target-column: "id"
arguments: ["1", "test", "t", "_"]
column-mapping-rule-2:
schema-pattern: "test_*"
table-pattern: "t_*"
expression: "partition id"
source-column: "id"
target-column: "id"
arguments: ["2", "test", "t", "_"]
mydumpers:
global1:
threads: 4
Expand Down Expand Up @@ -235,22 +201,20 @@ mysql-instances:
- source-id: "mysql-replica-01"
route-rules: ["route-rule-1"]
filter-rules: ["filter-rule-1"]
column-mapping-rules: ["column-mapping-rule-1"]
mydumper-config-name: "global1"
loader-config-name: "global1"
syncer-config-name: "global1"
- source-id: "mysql-replica-02"
route-rules: ["route-rule-1"]
filter-rules: ["filter-rule-1"]
column-mapping-rules: ["column-mapping-rule-1"]
mydumper-config-name: "global2"
loader-config-name: "global2"
syncer-config-name: "global2"
`
taskConfig = NewTaskConfig()
err = taskConfig.Decode(errorTaskConfig)
require.ErrorContains(t, err, "The configurations as following [column-mapping-rule-2 expr-1 filter-rule-2 route-rule-2] are set in global configuration")
require.ErrorContains(t, err, "The configurations as following [expr-1 filter-rule-2 route-rule-2] are set in global configuration")
}

func TestName(t *testing.T) {
Expand All @@ -276,7 +240,6 @@ mysql-instances:
server-id: 101
block-allow-list: "instance"
route-rules: ["sharding-route-rules-table", "sharding-route-rules-schema"]
column-mapping-rules: ["instance-1"]
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"
Expand All @@ -301,7 +264,6 @@ mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"
route-rules: ["sharding-route-rules-table", "sharding-route-rules-schema"]
column-mapping-rules: ["instance-1"]
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"
Expand Down
6 changes: 6 additions & 0 deletions dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,12 @@ description = ""
workaround = "Please choose a valid value in ['required', 'optional', 'off'] or leave it empty."
tags = ["internal", "medium"]

[error.DM-config-20064]
message = "column-mapping is not supported since v6.6.0"
description = ""
workaround = "Please use extract-table/extract-schema/extract-source to handle data conflict when merge tables. See https://docs.pingcap.com/tidb/v6.4/task-configuration-file-full#task-configuration-file-template-advanced"
tags = ["internal", "high"]

[error.DM-binlog-op-22001]
message = ""
description = ""
Expand Down
Loading

0 comments on commit a98f447

Please sign in to comment.