Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#45486
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
lyzx2001 authored and ti-chi-bot committed Aug 7, 2023
1 parent 032726c commit 8b30c96
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 7 deletions.
2 changes: 2 additions & 0 deletions br/pkg/lightning/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ go_library(
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_x_exp//maps",
"@org_golang_x_exp//slices",
"@org_golang_x_sync//errgroup",
Expand Down
27 changes: 20 additions & 7 deletions br/pkg/lightning/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// TableImporter is a helper struct to import a table.
Expand Down Expand Up @@ -988,15 +990,26 @@ func (tr *TableImporter) postProcess(

var remoteChecksum *local.RemoteChecksum
remoteChecksum, err = DoChecksum(ctx, tr.tableInfo)
failpoint.Inject("checksum-error", func() {
tr.logger.Info("failpoint checksum-error injected.")
remoteChecksum = nil
err = status.Error(codes.Unknown, "Checksum meets error.")
})
if err != nil {
return false, err
if rc.cfg.PostRestore.Checksum != config.OpLevelOptional {
return false, err
}
tr.logger.Warn("do checksum failed, will skip this error and go on", log.ShortError(err))
err = nil
}
err = tr.compareChecksum(remoteChecksum, localChecksum)
// with post restore level 'optional', we will skip checksum error
if rc.cfg.PostRestore.Checksum == config.OpLevelOptional {
if err != nil {
tr.logger.Warn("compare checksum failed, will skip this error and go on", log.ShortError(err))
err = nil
if remoteChecksum != nil {
err = tr.compareChecksum(remoteChecksum, localChecksum)
// with post restore level 'optional', we will skip checksum error
if rc.cfg.PostRestore.Checksum == config.OpLevelOptional {
if err != nil {
tr.logger.Warn("compare checksum failed, will skip this error and go on", log.ShortError(err))
err = nil
}
}
}
} else {
Expand Down
3 changes: 3 additions & 0 deletions br/tests/lightning_routes/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ schema-pattern = "routes_a*"
table-pattern = "t*"
target-schema = "routes_b"
target-table = "u"

[post-restore]
checksum = "optional"
5 changes: 5 additions & 0 deletions br/tests/lightning_routes/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@

set -eux

echo "testing checksum-error..."
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/importer/checksum-error=1*return()"

run_sql 'DROP DATABASE IF EXISTS routes_a0;'
run_sql 'DROP DATABASE IF EXISTS routes_a1;'
run_sql 'DROP DATABASE IF EXISTS routes_b;'

run_lightning

echo "test checksum-error success!"

run_sql 'SELECT count(1), sum(x) FROM routes_b.u;'
check_contains 'count(1): 4'
check_contains 'sum(x): 259'
Expand Down
245 changes: 245 additions & 0 deletions disttask/importinto/subtask_executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package importinto

import (
"context"
"strconv"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
verify "github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/scheduler"
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/executor/importer"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)

// TestSyncChan is used to test.
var TestSyncChan = make(chan struct{})

// ImportMinimalTaskExecutor is a minimal task executor for IMPORT INTO.
type ImportMinimalTaskExecutor struct {
mTtask *importStepMinimalTask
}

// Run implements the SubtaskExecutor.Run interface.
func (e *ImportMinimalTaskExecutor) Run(ctx context.Context) error {
logger := logutil.BgLogger().With(zap.String("type", proto.ImportInto), zap.Int64("table-id", e.mTtask.Plan.TableInfo.ID))
logger.Info("run minimal task")
failpoint.Inject("waitBeforeSortChunk", func() {
time.Sleep(3 * time.Second)
})
failpoint.Inject("errorWhenSortChunk", func() {
failpoint.Return(errors.New("occur an error when sort chunk"))
})
failpoint.Inject("syncBeforeSortChunk", func() {
TestSyncChan <- struct{}{}
<-TestSyncChan
})
chunkCheckpoint := toChunkCheckpoint(e.mTtask.Chunk)
sharedVars := e.mTtask.SharedVars
if err := importer.ProcessChunk(ctx, &chunkCheckpoint, sharedVars.TableImporter, sharedVars.DataEngine, sharedVars.IndexEngine, sharedVars.Progress, logger); err != nil {
return err
}

sharedVars.mu.Lock()
defer sharedVars.mu.Unlock()
sharedVars.Checksum.Add(&chunkCheckpoint.Checksum)
return nil
}

type postProcessMinimalTaskExecutor struct {
mTask *postProcessStepMinimalTask
}

func (e *postProcessMinimalTaskExecutor) Run(ctx context.Context) error {
mTask := e.mTask
failpoint.Inject("waitBeforePostProcess", func() {
time.Sleep(5 * time.Second)
})
return postProcess(ctx, mTask.taskMeta, &mTask.meta, mTask.logger)
}

// postProcess does the post-processing for the task.
func postProcess(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) (err error) {
failpoint.Inject("syncBeforePostProcess", func() {
TestSyncChan <- struct{}{}
<-TestSyncChan
})

logger.Info("post process")

// TODO: create table indexes depends on the option.
// create table indexes even if the post process is failed.
// defer func() {
// err2 := createTableIndexes(ctx, globalTaskManager, taskMeta, logger)
// err = multierr.Append(err, err2)
// }()

return verifyChecksum(ctx, taskMeta, subtaskMeta, logger)
}

func verifyChecksum(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) error {
if taskMeta.Plan.Checksum == config.OpLevelOff {
return nil
}
localChecksum := verify.MakeKVChecksum(subtaskMeta.Checksum.Size, subtaskMeta.Checksum.KVs, subtaskMeta.Checksum.Sum)
logger.Info("local checksum", zap.Object("checksum", &localChecksum))

failpoint.Inject("waitCtxDone", func() {
<-ctx.Done()
})

globalTaskManager, err := storage.GetTaskManager()
if err != nil {
return err
}
remoteChecksum, err := checksumTable(ctx, globalTaskManager, taskMeta, logger)
if err != nil {
if taskMeta.Plan.Checksum != config.OpLevelOptional {
return err
}
logger.Warn("checksumTable failed, will skip this error and go on", zap.Error(err))
}
if remoteChecksum != nil {
if !remoteChecksum.IsEqual(&localChecksum) {
err2 := common.ErrChecksumMismatch.GenWithStackByArgs(
remoteChecksum.Checksum, localChecksum.Sum(),
remoteChecksum.TotalKVs, localChecksum.SumKVS(),
remoteChecksum.TotalBytes, localChecksum.SumSize(),
)
if taskMeta.Plan.Checksum == config.OpLevelOptional {
logger.Warn("verify checksum failed, but checksum is optional, will skip it", zap.Error(err2))
err2 = nil
}
return err2
}
logger.Info("checksum pass", zap.Object("local", &localChecksum))
}
return nil
}

func checksumTable(ctx context.Context, executor storage.SessionExecutor, taskMeta *TaskMeta, logger *zap.Logger) (*local.RemoteChecksum, error) {
var (
tableName = common.UniqueTable(taskMeta.Plan.DBName, taskMeta.Plan.TableInfo.Name.L)
sql = "ADMIN CHECKSUM TABLE " + tableName
maxErrorRetryCount = 3
distSQLScanConcurrencyFactor = 1
remoteChecksum *local.RemoteChecksum
txnErr error
)

ctx = util.WithInternalSourceType(ctx, kv.InternalImportInto)
for i := 0; i < maxErrorRetryCount; i++ {
txnErr = executor.WithNewTxn(ctx, func(se sessionctx.Context) error {
// increase backoff weight
if err := setBackoffWeight(se, taskMeta, logger); err != nil {
logger.Warn("set tidb_backoff_weight failed", zap.Error(err))
}

distSQLScanConcurrency := se.GetSessionVars().DistSQLScanConcurrency()
se.GetSessionVars().SetDistSQLScanConcurrency(mathutil.Max(distSQLScanConcurrency/distSQLScanConcurrencyFactor, local.MinDistSQLScanConcurrency))
defer func() {
se.GetSessionVars().SetDistSQLScanConcurrency(distSQLScanConcurrency)
}()

rs, err := storage.ExecSQL(ctx, se, sql)
if err != nil {
return err
}
if len(rs) < 1 {
return errors.New("empty checksum result")
}

failpoint.Inject("errWhenChecksum", func() {
if i == 0 {
failpoint.Return(errors.New("occur an error when checksum, coprocessor task terminated due to exceeding the deadline"))
}
})

// ADMIN CHECKSUM TABLE <schema>.<table> example.
// mysql> admin checksum table test.t;
// +---------+------------+---------------------+-----------+-------------+
// | Db_name | Table_name | Checksum_crc64_xor | Total_kvs | Total_bytes |
// +---------+------------+---------------------+-----------+-------------+
// | test | t | 8520875019404689597 | 7296873 | 357601387 |
// +---------+------------+-------------
remoteChecksum = &local.RemoteChecksum{
Schema: rs[0].GetString(0),
Table: rs[0].GetString(1),
Checksum: rs[0].GetUint64(2),
TotalKVs: rs[0].GetUint64(3),
TotalBytes: rs[0].GetUint64(4),
}
return nil
})
if !common.IsRetryableError(txnErr) {
break
}
distSQLScanConcurrencyFactor *= 2
logger.Warn("retry checksum table", zap.Int("retry count", i+1), zap.Error(txnErr))
}
return remoteChecksum, txnErr
}

// TestChecksumTable is used to test checksum table in unit test.
func TestChecksumTable(ctx context.Context, executor storage.SessionExecutor, taskMeta *TaskMeta, logger *zap.Logger) (*local.RemoteChecksum, error) {
return checksumTable(ctx, executor, taskMeta, logger)
}

func setBackoffWeight(se sessionctx.Context, taskMeta *TaskMeta, logger *zap.Logger) error {
backoffWeight := local.DefaultBackoffWeight
if val, ok := taskMeta.Plan.ImportantSysVars[variable.TiDBBackOffWeight]; ok {
if weight, err := strconv.Atoi(val); err == nil && weight > backoffWeight {
backoffWeight = weight
}
}
logger.Info("set backoff weight", zap.Int("weight", backoffWeight))
return se.GetSessionVars().SetSystemVar(variable.TiDBBackOffWeight, strconv.Itoa(backoffWeight))
}

func init() {
scheduler.RegisterSubtaskExectorConstructor(proto.ImportInto, StepImport,
// The order of the subtask executors is the same as the order of the subtasks.
func(minimalTask proto.MinimalTask, step int64) (scheduler.SubtaskExecutor, error) {
task, ok := minimalTask.(*importStepMinimalTask)
if !ok {
return nil, errors.Errorf("invalid task type %T", minimalTask)
}
return &ImportMinimalTaskExecutor{mTtask: task}, nil
},
)
scheduler.RegisterSubtaskExectorConstructor(proto.ImportInto, StepPostProcess,
func(minimalTask proto.MinimalTask, step int64) (scheduler.SubtaskExecutor, error) {
mTask, ok := minimalTask.(*postProcessStepMinimalTask)
if !ok {
return nil, errors.Errorf("invalid task type %T", minimalTask)
}
return &postProcessMinimalTaskExecutor{mTask: mTask}, nil
},
)
}

0 comments on commit 8b30c96

Please sign in to comment.