Skip to content

Commit

Permalink
lightning(dm): add checksum to task configuation (#7896)
Browse files Browse the repository at this point in the history
ref #3510
  • Loading branch information
lance6716 authored Dec 23, 2022
1 parent d94c297 commit cfb75ba
Show file tree
Hide file tree
Showing 16 changed files with 638 additions and 396 deletions.
2 changes: 2 additions & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ ErrConfigLoaderS3NotSupport,[code=20059:class=config:scope=internal:level=high],
ErrConfigInvalidSafeModeDuration,[code=20060:class=config:scope=internal:level=medium], "Message: safe-mode-duration '%s' parsed failed: %v, Workaround: Please check the `safe-mode-duration` is correct."
ErrConfigConfictSafeModeDurationAndSafeMode,[code=20061:class=config:scope=internal:level=low], "Message: safe-mode(true) conflicts with safe-mode-duration(0s), Workaround: Please set safe-mode to false or safe-mode-duration to non-zero."
ErrConfigInvalidPhysicalDuplicateResolution,[code=20062:class=config:scope=internal:level=medium], "Message: invalid load on-duplicate-physical option '%s', Workaround: Please choose a valid value in ['none', 'manual'] or leave it empty."
ErrConfigInvalidPhysicalChecksum,[code=20063:class=config:scope=internal:level=medium], "Message: invalid load checksum-physical option '%s', Workaround: Please choose a valid value in ['required', 'optional', 'off'] or leave it empty."
ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high]
ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high], "Message: invalid binlog filename"
ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high]
Expand Down Expand Up @@ -284,6 +285,7 @@ ErrLoadTaskWorkerNotMatch,[code=34017:class=functional:scope=internal:level=high
ErrLoadTaskCheckPointNotMatch,[code=34018:class=functional:scope=internal:level=high], "Message: inconsistent checkpoints between loader and target database, Workaround: If you want to redo the whole task, please check that you have not forgotten to add -remove-meta flag for start-task command."
ErrLoadLightningRuntime,[code=34019:class=load-unit:scope=internal:level=high]
ErrLoadLightningHasDup,[code=34020:class=load-unit:scope=internal:level=medium], "Message: physical import finished but the data has duplication, please check `%s`.`%s` to see the duplication, Workaround: You can refer to https://docs.pingcap.com/tidb/stable/tidb-lightning-physical-import-mode-usage#conflict-detection to manually insert data and resume the task."
ErrLoadLightningChecksum,[code=34021:class=load-unit:scope=internal:level=medium], "Message: checksum mismatched, KV number in source files: %s, KV number in TiDB cluster: %s, Workaround: If TiDB cluster has more KV, please check if the migrated tables are empty before the task. If source files have more KV, please set `on-duplicate-physical` and restart the task to see data duplication. You can resume the task to ignore the error if you want."
ErrSyncerUnitPanic,[code=36001:class=sync-unit:scope=internal:level=high], "Message: panic error: %v"
ErrSyncUnitInvalidTableName,[code=36002:class=sync-unit:scope=internal:level=high], "Message: extract table name for DML error: %s"
ErrSyncUnitTableNameQuery,[code=36003:class=sync-unit:scope=internal:level=high], "Message: table name parse error: %s"
Expand Down
20 changes: 20 additions & 0 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,15 @@ const (
OnDuplicateManual PhysicalDuplicateResolveType = "manual"
)

// PhysicalChecksumType defines the configuration of checksum of physical import.
type PhysicalChecksumType string

const (
ChecksumRequired = "required"
ChecksumOptional = "optional"
ChecksumOff = "off"
)

// LoaderConfig represents loader process unit's specific config.
type LoaderConfig struct {
PoolSize int `yaml:"pool-size" toml:"pool-size" json:"pool-size"`
Expand All @@ -286,6 +295,7 @@ type LoaderConfig struct {
OnDuplicateLogical LogicalDuplicateResolveType `yaml:"on-duplicate-logical" toml:"on-duplicate-logical" json:"on-duplicate-logical"`
OnDuplicatePhysical PhysicalDuplicateResolveType `yaml:"on-duplicate-physical" toml:"on-duplicate-physical" json:"on-duplicate-physical"`
DiskQuotaPhysical config.ByteSize `yaml:"disk-quota-physical" toml:"disk-quota-physical" json:"disk-quota-physical"`
ChecksumPhysical PhysicalChecksumType `yaml:"checksum-physical" toml:"checksum-physical" json:"checksum-physical"`
}

// DefaultLoaderConfig return default loader config for task.
Expand Down Expand Up @@ -352,6 +362,16 @@ func (m *LoaderConfig) adjust() error {
return terror.ErrConfigInvalidPhysicalDuplicateResolution.Generate(m.OnDuplicatePhysical)
}

if m.ChecksumPhysical == "" {
m.ChecksumPhysical = ChecksumRequired
}
m.ChecksumPhysical = PhysicalChecksumType(strings.ToLower(string(m.ChecksumPhysical)))
switch m.ChecksumPhysical {
case ChecksumRequired, ChecksumOptional, ChecksumOff:
default:
return terror.ErrConfigInvalidPhysicalChecksum.Generate(m.ChecksumPhysical)
}

return nil
}

Expand Down
2 changes: 2 additions & 0 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ func TestGenAndFromSubTaskConfigs(t *testing.T) {
ImportMode: LoadModePhysical,
OnDuplicateLogical: OnDuplicateReplace,
OnDuplicatePhysical: OnDuplicateNone,
ChecksumPhysical: ChecksumRequired,
},
SyncerConfig: SyncerConfig{
WorkerCount: 32,
Expand Down Expand Up @@ -1159,6 +1160,7 @@ func TestLoadConfigAdjust(t *testing.T) {
OnDuplicate: "",
OnDuplicateLogical: "replace",
OnDuplicatePhysical: "none",
ChecksumPhysical: "required",
}, cfg)

// test deprecated OnDuplicate will write to OnDuplicateLogical
Expand Down
31 changes: 31 additions & 0 deletions dm/ctl/common/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"testing"

"github.com/pingcap/tiflow/dm/pb"
"github.com/stretchr/testify/require"
)

func TestHTMLEscape(t *testing.T) {
msg := &pb.ProcessResult{Errors: []*pb.ProcessError{
{Message: "checksum mismatched remote vs local =>"},
}}
output, err := marshResponseToString(msg)
require.NoError(t, err)
// TODO: how can we turn it off? https://github.com/gogo/protobuf/issues/484
require.Contains(t, output, "checksum mismatched remote vs local =\\u003e")
}
12 changes: 12 additions & 0 deletions dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1186,6 +1186,12 @@ description = ""
workaround = "Please choose a valid value in ['none', 'manual'] or leave it empty."
tags = ["internal", "medium"]

[error.DM-config-20063]
message = "invalid load checksum-physical option '%s'"
description = ""
workaround = "Please choose a valid value in ['required', 'optional', 'off'] or leave it empty."
tags = ["internal", "medium"]

[error.DM-binlog-op-22001]
message = ""
description = ""
Expand Down Expand Up @@ -1720,6 +1726,12 @@ description = ""
workaround = "You can refer to https://docs.pingcap.com/tidb/stable/tidb-lightning-physical-import-mode-usage#conflict-detection to manually insert data and resume the task."
tags = ["internal", "medium"]

[error.DM-load-unit-34021]
message = "checksum mismatched, KV number in source files: %s, KV number in TiDB cluster: %s"
description = ""
workaround = "If TiDB cluster has more KV, please check if the migrated tables are empty before the task. If source files have more KV, please set `on-duplicate-physical` and restart the task to see data duplication. You can resume the task to ignore the error if you want."
tags = ["internal", "medium"]

[error.DM-sync-unit-36001]
message = "panic error: %v"
description = ""
Expand Down
52 changes: 49 additions & 3 deletions dm/loader/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (
"context"
"fmt"
"path/filepath"
"regexp"
"strings"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/common"
lcfg "github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/errormanager"
"github.com/pingcap/tidb/dumpling/export"
Expand Down Expand Up @@ -76,6 +78,7 @@ type LightningLoader struct {
closed atomic.Bool
metaBinlog atomic.String
metaBinlogGTID atomic.String
lastErr error

speedRecorder *export.SpeedRecorder
}
Expand Down Expand Up @@ -212,14 +215,14 @@ func (l *LightningLoader) ignoreCheckpointError(ctx context.Context, cfg *lcfg.C
return errors.Trace(cpdb.IgnoreErrorCheckpoint(ctx, "all"))
}

func (l *LightningLoader) runLightning(ctx context.Context, cfg *lcfg.Config) error {
func (l *LightningLoader) runLightning(ctx context.Context, cfg *lcfg.Config) (err error) {
taskCtx, cancel := context.WithCancel(ctx)
l.Lock()
l.cancel = cancel
l.Unlock()

// always try to skill all checkpoint errors so we can resume this phase.
err := l.ignoreCheckpointError(ctx, cfg)
err = l.ignoreCheckpointError(ctx, cfg)
if err != nil {
l.logger.Warn("check lightning checkpoint status failed, skip this error", log.ShortError(err))
}
Expand Down Expand Up @@ -286,15 +289,32 @@ func (l *LightningLoader) runLightning(ctx context.Context, cfg *lcfg.Config) er
}
}
})
defer func() {
l.lastErr = err
}()
if err != nil {
return terror.ErrLoadLightningRuntime.Delegate(err)
return convertLightningError(err)
}
if hasDup.Load() {
return terror.ErrLoadLightningHasDup.Generate(cfg.App.TaskInfoSchemaName, errormanager.ConflictErrorTableName)
}
return nil
}

var checksumErrorPattern = regexp.MustCompile(`total_kvs: (\d*) vs (\d*)`)

func convertLightningError(err error) error {
if common.ErrChecksumMismatch.Equal(err) {
lErr := errors.Cause(err).(*errors.Error)
msg := lErr.GetMsg()
matches := checksumErrorPattern.FindStringSubmatch(msg)
if len(matches) == 3 {
return terror.ErrLoadLightningChecksum.Generate(matches[2], matches[1])
}
}
return terror.ErrLoadLightningRuntime.Delegate(err)
}

// GetTaskInfoSchemaName is used to assign to TikvImporter.DuplicateResolution in lightning config.
func GetTaskInfoSchemaName(dmMetaSchema, taskName string) string {
return dmMetaSchema + "_" + taskName
Expand Down Expand Up @@ -322,6 +342,7 @@ func GetLightningConfig(globalCfg *lcfg.GlobalConfig, subtaskCfg *config.SubTask
cfg.Checkpoint.DSN = cpPath
cfg.Checkpoint.KeepAfterSuccess = lcfg.CheckpointOrigin

cfg.TikvImporter.DiskQuota = subtaskCfg.LoaderConfig.DiskQuotaPhysical
cfg.TikvImporter.OnDuplicate = string(subtaskCfg.OnDuplicateLogical)
switch subtaskCfg.OnDuplicatePhysical {
case config.OnDuplicateManual:
Expand All @@ -330,6 +351,14 @@ func GetLightningConfig(globalCfg *lcfg.GlobalConfig, subtaskCfg *config.SubTask
case config.OnDuplicateNone:
cfg.TikvImporter.DuplicateResolution = lcfg.DupeResAlgNone
}
switch subtaskCfg.ChecksumPhysical {
case config.ChecksumRequired:
cfg.PostRestore.Checksum = lcfg.OpLevelRequired
case config.ChecksumOptional:
cfg.PostRestore.Checksum = lcfg.OpLevelOptional
case config.ChecksumOff:
cfg.PostRestore.Checksum = lcfg.OpLevelOff
}
cfg.TiDB.Vars = make(map[string]string)
cfg.Routes = subtaskCfg.RouteRules
if subtaskCfg.To.Session != nil {
Expand Down Expand Up @@ -367,6 +396,23 @@ func (l *LightningLoader) restore(ctx context.Context) error {
return err
}

// we have disabled auto-resume for below errors, so if lightning is resuming
// it means user wants to skip this error.
switch {
case terror.ErrLoadLightningHasDup.Equal(l.lastErr),
terror.ErrLoadLightningChecksum.Equal(l.lastErr):
l.logger.Info("manually resume from error, DM will skip the error and continue to next unit",
zap.Error(l.lastErr))

l.finish.Store(true)
err = l.checkPointList.UpdateStatus(ctx, lightningStatusFinished)
if err != nil {
l.logger.Error("failed to update checkpoint status", zap.Error(err))
return err
}
status = lightningStatusFinished
}

if status < lightningStatusFinished {
if err = l.checkPointList.RegisterCheckPoint(ctx); err != nil {
return err
Expand Down
14 changes: 14 additions & 0 deletions dm/loader/lightning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@ package loader
import (
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/stretchr/testify/require"
)

func TestSetLightningConfig(t *testing.T) {
t.Parallel()

stCfg := &config.SubTaskConfig{
LoaderConfig: config.LoaderConfig{
PoolSize: 10,
Expand All @@ -31,3 +36,12 @@ func TestSetLightningConfig(t *testing.T) {
require.NoError(t, err)
require.Equal(t, stCfg.LoaderConfig.PoolSize, cfg.App.RegionConcurrency)
}

func TestConvertLightningError(t *testing.T) {
t.Parallel()

err := common.ErrChecksumMismatch.GenWithStackByArgs(1, 2, 3, 4, 5, 6)
converted := convertLightningError(errors.Trace(err))
require.True(t, terror.ErrLoadLightningChecksum.Equal(converted))
require.Contains(t, converted.Error(), "checksum mismatched, KV number in source files: 4, KV number in TiDB cluster: 3")
}
1 change: 1 addition & 0 deletions dm/pkg/retry/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ var (
int32(terror.ErrSyncerCancelledDDL.Code()): {},
int32(terror.ErrLoadLightningRuntime.Code()): {},
int32(terror.ErrLoadLightningHasDup.Code()): {},
int32(terror.ErrLoadLightningChecksum.Code()): {},
}

// UnresumableRelayErrCodes is a set of unresumeable relay unit err codes.
Expand Down
Loading

0 comments on commit cfb75ba

Please sign in to comment.