Skip to content

Commit

Permalink
lightning(dm): stop task on data duplication (#7742)
Browse files Browse the repository at this point in the history
ref #3510
  • Loading branch information
lance6716 authored Dec 8, 2022
1 parent 215bf26 commit a5f6a27
Show file tree
Hide file tree
Showing 17 changed files with 1,323 additions and 1,081 deletions.
3 changes: 2 additions & 1 deletion dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ ErrLoadUnitDuplicateTableFile,[code=34015:class=load-unit:scope=internal:level=h
ErrLoadUnitGenBAList,[code=34016:class=load-unit:scope=internal:level=high], "Message: generate block allow list, Workaround: Please check the `block-allow-list` config in task configuration file."
ErrLoadTaskWorkerNotMatch,[code=34017:class=functional:scope=internal:level=high], "Message: different worker in load stage, previous worker: %s, current worker: %s, Workaround: Please check if the previous worker is online."
ErrLoadTaskCheckPointNotMatch,[code=34018:class=functional:scope=internal:level=high], "Message: inconsistent checkpoints between loader and target database, Workaround: If you want to redo the whole task, please check that you have not forgotten to add -remove-meta flag for start-task command."
ErrLoadLightningRuntime,[code=34019:class=load-unit:scope=not-set:level=high]
ErrLoadLightningRuntime,[code=34019:class=load-unit:scope=internal:level=high]
ErrLoadLightningHasDup,[code=34020:class=load-unit:scope=internal:level=medium], "Message: physical import finished but the data has duplication, please check `%s`.`%s` to see the duplication, Workaround: You can refer to https://docs.pingcap.com/tidb/stable/tidb-lightning-physical-import-mode-usage#conflict-detection to manually insert data and resume the task."
ErrSyncerUnitPanic,[code=36001:class=sync-unit:scope=internal:level=high], "Message: panic error: %v"
ErrSyncUnitInvalidTableName,[code=36002:class=sync-unit:scope=internal:level=high], "Message: extract table name for DML error: %s"
ErrSyncUnitTableNameQuery,[code=36003:class=sync-unit:scope=internal:level=high], "Message: table name parse error: %s"
Expand Down
5 changes: 2 additions & 3 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,8 @@ type LoaderConfig struct {
SQLMode string `yaml:"-" toml:"-" json:"-"` // wrote by dump unit (DM op) or jobmaster (DM in engine)
ImportMode LoadMode `yaml:"import-mode" toml:"import-mode" json:"import-mode"`
// deprecated, use OnDuplicateLogical instead.
OnDuplicate LogicalDuplicateResolveType `yaml:"on-duplicate" toml:"on-duplicate" json:"on-duplicate"`
OnDuplicateLogical LogicalDuplicateResolveType `yaml:"on-duplicate-logical" toml:"on-duplicate-logical" json:"on-duplicate-logical"`
// TODO: OnDuplicatePhysical has no effects now
OnDuplicate LogicalDuplicateResolveType `yaml:"on-duplicate" toml:"on-duplicate" json:"on-duplicate"`
OnDuplicateLogical LogicalDuplicateResolveType `yaml:"on-duplicate-logical" toml:"on-duplicate-logical" json:"on-duplicate-logical"`
OnDuplicatePhysical PhysicalDuplicateResolveType `yaml:"on-duplicate-physical" toml:"on-duplicate-physical" json:"on-duplicate-physical"`
DiskQuotaPhysical config.ByteSize `yaml:"disk-quota-physical" toml:"disk-quota-physical" json:"disk-quota-physical"`
}
Expand Down
8 changes: 7 additions & 1 deletion dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1712,7 +1712,13 @@ tags = ["internal", "high"]
message = ""
description = ""
workaround = ""
tags = ["not-set", "high"]
tags = ["internal", "high"]

[error.DM-load-unit-34020]
message = "physical import finished but the data has duplication, please check `%s`.`%s` to see the duplication"
description = ""
workaround = "You can refer to https://docs.pingcap.com/tidb/stable/tidb-lightning-physical-import-mode-usage#conflict-detection to manually insert data and resume the task."
tags = ["internal", "medium"]

[error.DM-sync-unit-36001]
message = "panic error: %v"
Expand Down
26 changes: 25 additions & 1 deletion dm/loader/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
lcfg "github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/errormanager"
"github.com/pingcap/tidb/dumpling/export"
tidbpromutil "github.com/pingcap/tidb/util/promutil"
"github.com/pingcap/tiflow/dm/config"
Expand Down Expand Up @@ -267,6 +268,11 @@ func (l *LightningLoader) runLightning(ctx context.Context, cfg *lcfg.Config) er
opts = append(opts, lightning.WithLogger(l.logger.Logger))
}

var hasDup atomic.Bool
if l.cfg.LoaderConfig.ImportMode == config.LoadModePhysical {
opts = append(opts, lightning.WithDupIndicator(&hasDup))
}

err = l.core.RunOnceWithOptions(taskCtx, cfg, opts...)
failpoint.Inject("LoadDataSlowDown", nil)
failpoint.Inject("LoadDataSlowDownByTask", func(val failpoint.Value) {
Expand All @@ -279,7 +285,18 @@ func (l *LightningLoader) runLightning(ctx context.Context, cfg *lcfg.Config) er
}
}
})
return terror.ErrLoadLightningRuntime.Delegate(err)
if err != nil {
return terror.ErrLoadLightningRuntime.Delegate(err)
}
if hasDup.Load() {
return terror.ErrLoadLightningHasDup.Generate(cfg.App.TaskInfoSchemaName, errormanager.ConflictErrorTableName)
}
return nil
}

// GetTaskInfoSchemaName is used to assign to TikvImporter.DuplicateResolution in lightning config.
func GetTaskInfoSchemaName(dmMetaSchema, taskName string) string {
return dmMetaSchema + "_" + taskName
}

// GetLightningConfig returns the lightning task config for the lightning global config and DM subtask config.
Expand All @@ -305,6 +322,13 @@ func GetLightningConfig(globalCfg *lcfg.GlobalConfig, subtaskCfg *config.SubTask
cfg.Checkpoint.KeepAfterSuccess = lcfg.CheckpointOrigin

cfg.TikvImporter.OnDuplicate = string(subtaskCfg.OnDuplicateLogical)
switch subtaskCfg.OnDuplicatePhysical {
case config.OnDuplicateManual:
cfg.TikvImporter.DuplicateResolution = lcfg.DupeResAlgRemove
cfg.App.TaskInfoSchemaName = GetTaskInfoSchemaName(subtaskCfg.MetaSchema, subtaskCfg.Name)
case config.OnDuplicateNone:
cfg.TikvImporter.DuplicateResolution = lcfg.DupeResAlgNone
}
cfg.TiDB.Vars = make(map[string]string)
cfg.Routes = subtaskCfg.RouteRules
if subtaskCfg.To.Session != nil {
Expand Down
83 changes: 80 additions & 3 deletions dm/master/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,98 @@ import (
"path/filepath"
"strconv"
"strings"
"testing"
"time"

"github.com/golang/mock/gomock"
. "github.com/pingcap/check"
filter "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/master/workerrpc"
"github.com/pingcap/tiflow/dm/pb"
"github.com/pingcap/tiflow/dm/pbmock"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/ha"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/integration"
)

type testMaster struct {
workerClients map[string]workerrpc.Client
saveMaxRetryNum int
electionTTLBackup int
testT *testing.T

testEtcdCluster *integration.ClusterV3
etcdTestCli *clientv3.Client
}

var testSuite = SerialSuites(&testMaster{})

func TestMaster(t *testing.T) {
err := log.InitLogger(&log.Config{})
if err != nil {
t.Fatal(err)
}
pwd, err = os.Getwd()
if err != nil {
t.Fatal(err)
}
integration.BeforeTestExternal(t)
// inject *testing.T to testMaster
s := testSuite.(*testMaster)
s.testT = t

TestingT(t)
}

func (t *testMaster) SetUpSuite(c *C) {
c.Assert(log.InitLogger(&log.Config{}), IsNil)
t.workerClients = make(map[string]workerrpc.Client)
t.saveMaxRetryNum = maxRetryNum
t.electionTTLBackup = electionTTL
electionTTL = 3
maxRetryNum = 2
checkAndAdjustSourceConfigForDMCtlFunc = checkAndNoAdjustSourceConfigMock
}

func (t *testMaster) TearDownSuite(c *C) {
maxRetryNum = t.saveMaxRetryNum
electionTTL = t.electionTTLBackup
checkAndAdjustSourceConfigForDMCtlFunc = checkAndAdjustSourceConfig
}

func (t *testMaster) SetUpTest(c *C) {
t.testEtcdCluster = integration.NewClusterV3(t.testT, &integration.ClusterConfig{Size: 1})
t.etcdTestCli = t.testEtcdCluster.RandClient()
t.clearEtcdEnv(c)
}

func (t *testMaster) TearDownTest(c *C) {
t.clearEtcdEnv(c)
t.testEtcdCluster.Terminate(t.testT)
}

func (t *testMaster) clearEtcdEnv(c *C) {
c.Assert(ha.ClearTestInfoOperation(t.etcdTestCli), IsNil)
}

func testDefaultMasterServerWithC(c *C) *Server {
cfg := NewConfig()
err := cfg.FromContent(SampleConfig)
c.Assert(err, IsNil)
cfg.DataDir = c.MkDir()
server := NewServer(cfg)
server.leader.Store(oneselfLeader)
go server.ap.Start(context.Background())

return server
}

func (t *testMaster) TestCollectSourceConfigFilesV1Import(c *C) {
s := testDefaultMasterServer(c)
s := testDefaultMasterServerWithC(c)
defer s.Close()
s.cfg.V1SourcesPath = c.MkDir()

Expand Down Expand Up @@ -113,7 +190,7 @@ func (t *testMaster) TestWaitWorkersReadyV1Import(c *C) {

tctx := tcontext.NewContext(ctx, log.L())

s := testDefaultMasterServer(c)
s := testDefaultMasterServerWithC(c)
defer s.Close()
s.cfg.V1SourcesPath = c.MkDir()
c.Assert(s.scheduler.Start(ctx, t.etcdTestCli), IsNil)
Expand Down Expand Up @@ -210,7 +287,7 @@ func (t *testMaster) TestSubtaskCfgsStagesV1Import(c *C) {
defer cancel()
tctx := tcontext.NewContext(ctx, log.L())

s := testDefaultMasterServer(c)
s := testDefaultMasterServerWithC(c)
defer s.Close()
s.cfg.V1SourcesPath = c.MkDir()
c.Assert(s.scheduler.Start(ctx, t.etcdTestCli), IsNil)
Expand Down
8 changes: 4 additions & 4 deletions dm/master/openapi_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,11 +348,11 @@ func (s *OpenAPIViewSuite) TestReverseRequestToLeader() {
}

func (s *OpenAPIViewSuite) TestReverseRequestToHttpsLeader() {
pwd, err := os.Getwd()
pwd2, err := os.Getwd()
require.NoError(s.T(), err)
caPath := pwd + "/tls_for_test/ca.pem"
certPath := pwd + "/tls_for_test/dm.pem"
keyPath := pwd + "/tls_for_test/dm.key"
caPath := pwd2 + "/tls_for_test/ca.pem"
certPath := pwd2 + "/tls_for_test/dm.pem"
keyPath := pwd2 + "/tls_for_test/dm.key"

// master1
masterAddr1 := tempurl.Alloc()[len("http://"):]
Expand Down
4 changes: 4 additions & 0 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/tiflow/dm/config/dbconfig"
"github.com/pingcap/tiflow/dm/config/security"
ctlcommon "github.com/pingcap/tiflow/dm/ctl/common"
"github.com/pingcap/tiflow/dm/loader"
"github.com/pingcap/tiflow/dm/master/metrics"
"github.com/pingcap/tiflow/dm/master/scheduler"
"github.com/pingcap/tiflow/dm/master/shardddl"
Expand Down Expand Up @@ -1764,6 +1765,9 @@ func (s *Server) removeMetaData(ctx context.Context, taskName, metaSchema string
dbutil.TableName(metaSchema, cputil.ValidatorErrorChange(taskName))))
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS %s",
dbutil.TableName(metaSchema, cputil.ValidatorTableStatus(taskName))))
// clear lightning error manager table
sqls = append(sqls, fmt.Sprintf("DROP DATABASE IF EXISTS %s",
dbutil.ColumnName(loader.GetTaskInfoSchemaName(metaSchema, taskName))))

_, err = dbConn.ExecuteSQL(ctctx, nil, taskName, sqls)
if err == nil {
Expand Down
Loading

0 comments on commit a5f6a27

Please sign in to comment.