From 6e06cdc00158f2e71fcd785bffb698c68df3f04f Mon Sep 17 00:00:00 2001 From: Luo Yangzhixin Date: Mon, 7 Aug 2023 18:30:09 +0800 Subject: [PATCH] This is an automated cherry-pick of #45486 Signed-off-by: ti-chi-bot --- br/pkg/lightning/importer/BUILD.bazel | 189 +++ br/pkg/lightning/importer/table_import.go | 1700 +++++++++++++++++++++ br/tests/lightning_routes/config.toml | 3 + br/tests/lightning_routes/run.sh | 5 + disttask/importinto/subtask_executor.go | 245 +++ 5 files changed, 2142 insertions(+) create mode 100644 br/pkg/lightning/importer/BUILD.bazel create mode 100644 br/pkg/lightning/importer/table_import.go create mode 100644 disttask/importinto/subtask_executor.go diff --git a/br/pkg/lightning/importer/BUILD.bazel b/br/pkg/lightning/importer/BUILD.bazel new file mode 100644 index 0000000000000..ecc31bc3c7354 --- /dev/null +++ b/br/pkg/lightning/importer/BUILD.bazel @@ -0,0 +1,189 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "importer", + srcs = [ + "check_info.go", + "check_template.go", + "checksum_helper.go", + "chunk_process.go", + "dup_detect.go", + "get_pre_info.go", + "import.go", + "meta_manager.go", + "precheck.go", + "precheck_impl.go", + "table_import.go", + "tidb.go", + ], + importpath = "github.com/pingcap/tidb/br/pkg/lightning/importer", + visibility = ["//visibility:public"], + deps = [ + "//br/pkg/backup", + "//br/pkg/errors", + "//br/pkg/lightning/backend", + "//br/pkg/lightning/backend/encode", + "//br/pkg/lightning/backend/kv", + "//br/pkg/lightning/backend/local", + "//br/pkg/lightning/backend/tidb", + "//br/pkg/lightning/checkpoints", + "//br/pkg/lightning/common", + "//br/pkg/lightning/config", + "//br/pkg/lightning/duplicate", + "//br/pkg/lightning/errormanager", + "//br/pkg/lightning/importer/opts", + "//br/pkg/lightning/log", + "//br/pkg/lightning/metric", + "//br/pkg/lightning/mydump", + "//br/pkg/lightning/precheck", + "//br/pkg/lightning/tikv", + "//br/pkg/lightning/verification", + "//br/pkg/lightning/web", + "//br/pkg/lightning/worker", + "//br/pkg/pdutil", + "//br/pkg/redact", + "//br/pkg/storage", + "//br/pkg/streamhelper", + "//br/pkg/utils", + "//br/pkg/version", + "//br/pkg/version/build", + "//config", + "//ddl", + "//errno", + "//keyspace", + "//kv", + "//meta/autoid", + "//parser", + "//parser/ast", + "//parser/format", + "//parser/model", + "//parser/mysql", + "//planner/core", + "//sessionctx/variable", + "//store/driver", + "//store/driver/txn", + "//store/pdtypes", + "//table", + "//table/tables", + "//tablecodec", + "//types", + "//util/codec", + "//util/collate", + "//util/dbterror", + "//util/engine", + "//util/extsort", + "//util/mathutil", + "//util/mock", + "//util/regexpr-router", + "//util/set", + "@com_github_coreos_go_semver//semver", + "@com_github_docker_go_units//:go-units", + "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_google_uuid//:uuid", + "@com_github_jedib0t_go_pretty_v6//table", + "@com_github_jedib0t_go_pretty_v6//text", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_prometheus_client_golang//prometheus", + "@com_github_tikv_client_go_v2//config", + "@com_github_tikv_client_go_v2//util", + "@com_github_tikv_pd_client//:client", + "@io_etcd_go_etcd_client_v3//:client", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", + "@org_golang_x_exp//maps", + "@org_golang_x_exp//slices", + "@org_golang_x_sync//errgroup", + "@org_uber_go_atomic//:atomic", + "@org_uber_go_multierr//:multierr", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "importer_test", + timeout = "short", + srcs = [ + "check_info_test.go", + "chunk_process_test.go", + "dup_detect_test.go", + "get_pre_info_test.go", + "import_test.go", + "meta_manager_test.go", + "precheck_impl_test.go", + "precheck_test.go", + "restore_schema_test.go", + "table_import_test.go", + "tidb_test.go", + ], + embed = [":importer"], + flaky = True, + shard_count = 50, + deps = [ + "//br/pkg/lightning/backend", + "//br/pkg/lightning/backend/encode", + "//br/pkg/lightning/backend/kv", + "//br/pkg/lightning/backend/local", + "//br/pkg/lightning/backend/tidb", + "//br/pkg/lightning/checkpoints", + "//br/pkg/lightning/common", + "//br/pkg/lightning/config", + "//br/pkg/lightning/duplicate", + "//br/pkg/lightning/errormanager", + "//br/pkg/lightning/importer/mock", + "//br/pkg/lightning/importer/opts", + "//br/pkg/lightning/log", + "//br/pkg/lightning/metric", + "//br/pkg/lightning/mydump", + "//br/pkg/lightning/precheck", + "//br/pkg/lightning/verification", + "//br/pkg/lightning/web", + "//br/pkg/lightning/worker", + "//br/pkg/mock", + "//br/pkg/storage", + "//br/pkg/streamhelper", + "//br/pkg/version/build", + "//ddl", + "//errno", + "//kv", + "//meta", + "//meta/autoid", + "//parser", + "//parser/ast", + "//parser/model", + "//parser/mysql", + "//store/mockstore", + "//store/pdtypes", + "//table/tables", + "//tablecodec", + "//types", + "//util/codec", + "//util/dbutil", + "//util/extsort", + "//util/mock", + "//util/promutil", + "//util/table-filter", + "//util/table-router", + "@com_github_data_dog_go_sqlmock//:go-sqlmock", + "@com_github_docker_go_units//:go-units", + "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_golang_mock//gomock", + "@com_github_google_uuid//:uuid", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_stretchr_testify//require", + "@com_github_stretchr_testify//suite", + "@com_github_tikv_client_go_v2//config", + "@com_github_tikv_client_go_v2//testutils", + "@com_github_tikv_pd_client//:client", + "@com_github_xitongsys_parquet_go//writer", + "@com_github_xitongsys_parquet_go_source//buffer", + "@io_etcd_go_etcd_client_v3//:client", + "@io_etcd_go_etcd_tests_v3//integration", + "@org_golang_x_exp//slices", + "@org_uber_go_zap//:zap", + ], +) diff --git a/br/pkg/lightning/importer/table_import.go b/br/pkg/lightning/importer/table_import.go new file mode 100644 index 0000000000000..0c0b21dfb28d8 --- /dev/null +++ b/br/pkg/lightning/importer/table_import.go @@ -0,0 +1,1700 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importer + +import ( + "context" + "database/sql" + "encoding/hex" + "fmt" + "path/filepath" + "strings" + "sync" + "time" + + dmysql "github.com/go-sql-driver/mysql" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/br/pkg/lightning/backend" + "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" + "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" + "github.com/pingcap/tidb/br/pkg/lightning/backend/local" + "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" + "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/lightning/config" + "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/tidb/br/pkg/lightning/metric" + "github.com/pingcap/tidb/br/pkg/lightning/mydump" + verify "github.com/pingcap/tidb/br/pkg/lightning/verification" + "github.com/pingcap/tidb/br/pkg/lightning/web" + "github.com/pingcap/tidb/br/pkg/lightning/worker" + "github.com/pingcap/tidb/br/pkg/version" + "github.com/pingcap/tidb/errno" + tidbkv "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/extsort" + "github.com/pingcap/tidb/util/mathutil" + "go.uber.org/multierr" + "go.uber.org/zap" + "golang.org/x/exp/slices" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// TableImporter is a helper struct to import a table. +type TableImporter struct { + // The unique table name in the form "`db`.`tbl`". + tableName string + dbInfo *checkpoints.TidbDBInfo + tableInfo *checkpoints.TidbTableInfo + tableMeta *mydump.MDTableMeta + encTable table.Table + alloc autoid.Allocators + logger log.Logger + kvStore tidbkv.Storage + + // dupIgnoreRows tracks the rowIDs of rows that are duplicated and should be ignored. + dupIgnoreRows extsort.ExternalSorter + + ignoreColumns map[string]struct{} +} + +// NewTableImporter creates a new TableImporter. +func NewTableImporter( + tableName string, + tableMeta *mydump.MDTableMeta, + dbInfo *checkpoints.TidbDBInfo, + tableInfo *checkpoints.TidbTableInfo, + cp *checkpoints.TableCheckpoint, + ignoreColumns map[string]struct{}, + kvStore tidbkv.Storage, + logger log.Logger, +) (*TableImporter, error) { + idAlloc := kv.NewPanickingAllocators(cp.AllocBase) + tbl, err := tables.TableFromMeta(idAlloc, tableInfo.Core) + if err != nil { + return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", tableName) + } + + return &TableImporter{ + tableName: tableName, + dbInfo: dbInfo, + tableInfo: tableInfo, + tableMeta: tableMeta, + encTable: tbl, + alloc: idAlloc, + kvStore: kvStore, + logger: logger.With(zap.String("table", tableName)), + ignoreColumns: ignoreColumns, + }, nil +} + +func (tr *TableImporter) importTable( + ctx context.Context, + rc *Controller, + cp *checkpoints.TableCheckpoint, +) (bool, error) { + // 1. Load the table info. + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } + + metaMgr := rc.metaMgrBuilder.TableMetaMgr(tr) + // no need to do anything if the chunks are already populated + if len(cp.Engines) > 0 { + tr.logger.Info("reusing engines and files info from checkpoint", + zap.Int("enginesCnt", len(cp.Engines)), + zap.Int("filesCnt", cp.CountChunks()), + ) + err := addExtendDataForCheckpoint(ctx, rc.cfg, cp) + if err != nil { + return false, errors.Trace(err) + } + } else if cp.Status < checkpoints.CheckpointStatusAllWritten { + if err := tr.populateChunks(ctx, rc, cp); err != nil { + return false, errors.Trace(err) + } + + // fetch the max chunk row_id max value as the global max row_id + rowIDMax := int64(0) + for _, engine := range cp.Engines { + if len(engine.Chunks) > 0 && engine.Chunks[len(engine.Chunks)-1].Chunk.RowIDMax > rowIDMax { + rowIDMax = engine.Chunks[len(engine.Chunks)-1].Chunk.RowIDMax + } + } + versionStr, err := version.FetchVersion(ctx, rc.db) + if err != nil { + return false, errors.Trace(err) + } + + versionInfo := version.ParseServerInfo(versionStr) + + // "show table next_row_id" is only available after tidb v4.0.0 + if versionInfo.ServerVersion.Major >= 4 && isLocalBackend(rc.cfg) { + // first, insert a new-line into meta table + if err = metaMgr.InitTableMeta(ctx); err != nil { + return false, err + } + + checksum, rowIDBase, err := metaMgr.AllocTableRowIDs(ctx, rowIDMax) + if err != nil { + return false, err + } + tr.RebaseChunkRowIDs(cp, rowIDBase) + + if checksum != nil { + if cp.Checksum != *checksum { + cp.Checksum = *checksum + rc.saveCpCh <- saveCp{ + tableName: tr.tableName, + merger: &checkpoints.TableChecksumMerger{ + Checksum: cp.Checksum, + }, + } + } + tr.logger.Info("checksum before restore table", zap.Object("checksum", &cp.Checksum)) + } + } + if err := rc.checkpointsDB.InsertEngineCheckpoints(ctx, tr.tableName, cp.Engines); err != nil { + return false, errors.Trace(err) + } + web.BroadcastTableCheckpoint(tr.tableName, cp) + + // rebase the allocator so it exceeds the number of rows. + if tr.tableInfo.Core.ContainsAutoRandomBits() { + cp.AllocBase = mathutil.Max(cp.AllocBase, tr.tableInfo.Core.AutoRandID) + if err := tr.alloc.Get(autoid.AutoRandomType).Rebase(context.Background(), cp.AllocBase, false); err != nil { + return false, err + } + } else { + cp.AllocBase = mathutil.Max(cp.AllocBase, tr.tableInfo.Core.AutoIncID) + if err := tr.alloc.Get(autoid.RowIDAllocType).Rebase(context.Background(), cp.AllocBase, false); err != nil { + return false, err + } + } + rc.saveCpCh <- saveCp{ + tableName: tr.tableName, + merger: &checkpoints.RebaseCheckpointMerger{ + AllocBase: cp.AllocBase, + }, + } + } + + // 2. Do duplicate detection if needed + if isLocalBackend(rc.cfg) && rc.cfg.Conflict.Strategy != "" { + _, uuid := backend.MakeUUID(tr.tableName, common.IndexEngineID) + workingDir := filepath.Join(rc.cfg.TikvImporter.SortedKVDir, uuid.String()+local.DupDetectDirSuffix) + resultDir := filepath.Join(rc.cfg.TikvImporter.SortedKVDir, uuid.String()+local.DupResultDirSuffix) + + dupIgnoreRows, err := extsort.OpenDiskSorter(resultDir, &extsort.DiskSorterOptions{ + Concurrency: rc.cfg.App.RegionConcurrency, + }) + if err != nil { + return false, errors.Trace(err) + } + tr.dupIgnoreRows = dupIgnoreRows + + if cp.Status < checkpoints.CheckpointStatusDupDetected { + err := tr.preDeduplicate(ctx, rc, cp, workingDir) + saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, checkpoints.WholeTableEngineID, err, checkpoints.CheckpointStatusDupDetected) + if err := firstErr(err, saveCpErr); err != nil { + return false, errors.Trace(err) + } + } + + if !dupIgnoreRows.IsSorted() { + if err := dupIgnoreRows.Sort(ctx); err != nil { + return false, errors.Trace(err) + } + } + + failpoint.Inject("FailAfterDuplicateDetection", func() { + panic("forcing failure after duplicate detection") + }) + } + + // 3. Drop indexes if add-index-by-sql is enabled + if cp.Status < checkpoints.CheckpointStatusIndexDropped && isLocalBackend(rc.cfg) && rc.cfg.TikvImporter.AddIndexBySQL { + err := tr.dropIndexes(ctx, rc.db) + saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, checkpoints.WholeTableEngineID, err, checkpoints.CheckpointStatusIndexDropped) + if err := firstErr(err, saveCpErr); err != nil { + return false, errors.Trace(err) + } + } + + // 4. Restore engines (if still needed) + err := tr.importEngines(ctx, rc, cp) + if err != nil { + return false, errors.Trace(err) + } + + err = metaMgr.UpdateTableStatus(ctx, metaStatusRestoreFinished) + if err != nil { + return false, errors.Trace(err) + } + + // 5. Post-process. With the last parameter set to false, we can allow delay analyze execute latter + return tr.postProcess(ctx, rc, cp, false /* force-analyze */, metaMgr) +} + +// Close implements the Importer interface. +func (tr *TableImporter) Close() { + tr.encTable = nil + if tr.dupIgnoreRows != nil { + _ = tr.dupIgnoreRows.Close() + } + tr.logger.Info("restore done") +} + +func (tr *TableImporter) populateChunks(ctx context.Context, rc *Controller, cp *checkpoints.TableCheckpoint) error { + task := tr.logger.Begin(zap.InfoLevel, "load engines and files") + divideConfig := mydump.NewDataDivideConfig(rc.cfg, len(tr.tableInfo.Core.Columns), rc.ioWorkers, rc.store, tr.tableMeta) + tableRegions, err := mydump.MakeTableRegions(ctx, divideConfig) + if err == nil { + timestamp := time.Now().Unix() + failpoint.Inject("PopulateChunkTimestamp", func(v failpoint.Value) { + timestamp = int64(v.(int)) + }) + for _, region := range tableRegions { + engine, found := cp.Engines[region.EngineID] + if !found { + engine = &checkpoints.EngineCheckpoint{ + Status: checkpoints.CheckpointStatusLoaded, + } + cp.Engines[region.EngineID] = engine + } + ccp := &checkpoints.ChunkCheckpoint{ + Key: checkpoints.ChunkCheckpointKey{ + Path: region.FileMeta.Path, + Offset: region.Chunk.Offset, + }, + FileMeta: region.FileMeta, + ColumnPermutation: nil, + Chunk: region.Chunk, + Timestamp: timestamp, + } + if len(region.Chunk.Columns) > 0 { + perms, err := parseColumnPermutations( + tr.tableInfo.Core, + region.Chunk.Columns, + tr.ignoreColumns, + log.FromContext(ctx)) + if err != nil { + return errors.Trace(err) + } + ccp.ColumnPermutation = perms + } + engine.Chunks = append(engine.Chunks, ccp) + } + + // Add index engine checkpoint + cp.Engines[common.IndexEngineID] = &checkpoints.EngineCheckpoint{Status: checkpoints.CheckpointStatusLoaded} + } + task.End(zap.ErrorLevel, err, + zap.Int("enginesCnt", len(cp.Engines)), + zap.Int("filesCnt", len(tableRegions)), + ) + return err +} + +// RebaseChunkRowIDs rebase the row id of the chunks. +func (*TableImporter) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowIDBase int64) { + if rowIDBase == 0 { + return + } + for _, engine := range cp.Engines { + for _, chunk := range engine.Chunks { + chunk.Chunk.PrevRowIDMax += rowIDBase + chunk.Chunk.RowIDMax += rowIDBase + } + } +} + +// initializeColumns computes the "column permutation" for an INSERT INTO +// statement. Suppose a table has columns (a, b, c, d) in canonical order, and +// we execute `INSERT INTO (d, b, a) VALUES ...`, we will need to remap the +// columns as: +// +// - column `a` is at position 2 +// - column `b` is at position 1 +// - column `c` is missing +// - column `d` is at position 0 +// +// The column permutation of (d, b, a) is set to be [2, 1, -1, 0]. +// +// The argument `columns` _must_ be in lower case. +func (tr *TableImporter) initializeColumns(columns []string, ccp *checkpoints.ChunkCheckpoint) error { + colPerm, err := createColumnPermutation(columns, tr.ignoreColumns, tr.tableInfo.Core, tr.logger) + if err != nil { + return err + } + ccp.ColumnPermutation = colPerm + return nil +} + +func createColumnPermutation( + columns []string, + ignoreColumns map[string]struct{}, + tableInfo *model.TableInfo, + logger log.Logger, +) ([]int, error) { + var colPerm []int + if len(columns) == 0 { + colPerm = make([]int, 0, len(tableInfo.Columns)+1) + shouldIncludeRowID := common.TableHasAutoRowID(tableInfo) + + // no provided columns, so use identity permutation. + for i, col := range tableInfo.Columns { + idx := i + if _, ok := ignoreColumns[col.Name.L]; ok { + idx = -1 + } else if col.IsGenerated() { + idx = -1 + } + colPerm = append(colPerm, idx) + } + if shouldIncludeRowID { + colPerm = append(colPerm, -1) + } + } else { + var err error + colPerm, err = parseColumnPermutations(tableInfo, columns, ignoreColumns, logger) + if err != nil { + return nil, errors.Trace(err) + } + } + return colPerm, nil +} + +func (tr *TableImporter) importEngines(pCtx context.Context, rc *Controller, cp *checkpoints.TableCheckpoint) error { + indexEngineCp := cp.Engines[common.IndexEngineID] + if indexEngineCp == nil { + tr.logger.Error("fail to importEngines because indexengine is nil") + return common.ErrCheckpointNotFound.GenWithStack("table %v index engine checkpoint not found", tr.tableName) + } + + ctx, cancel := context.WithCancel(pCtx) + defer cancel() + + // The table checkpoint status set to `CheckpointStatusIndexImported` only if + // both all data engines and the index engine had been imported to TiKV. + // But persist index engine checkpoint status and table checkpoint status are + // not an atomic operation, so `cp.Status < CheckpointStatusIndexImported` + // but `indexEngineCp.Status == CheckpointStatusImported` could happen + // when kill lightning after saving index engine checkpoint status before saving + // table checkpoint status. + var closedIndexEngine *backend.ClosedEngine + var restoreErr error + // if index-engine checkpoint is lower than `CheckpointStatusClosed`, there must be + // data-engines that need to be restore or import. Otherwise, all data-engines should + // be finished already. + + handleDataEngineThisRun := false + idxEngineCfg := &backend.EngineConfig{ + TableInfo: tr.tableInfo, + } + if indexEngineCp.Status < checkpoints.CheckpointStatusClosed { + handleDataEngineThisRun = true + indexWorker := rc.indexWorkers.Apply() + defer rc.indexWorkers.Recycle(indexWorker) + + if rc.cfg.TikvImporter.Backend == config.BackendLocal { + // for index engine, the estimate factor is non-clustered index count + idxCnt := len(tr.tableInfo.Core.Indices) + if !common.TableHasAutoRowID(tr.tableInfo.Core) { + idxCnt-- + } + threshold := local.EstimateCompactionThreshold(tr.tableMeta.DataFiles, cp, int64(idxCnt)) + idxEngineCfg.Local = backend.LocalEngineConfig{ + Compact: threshold > 0, + CompactConcurrency: 4, + CompactThreshold: threshold, + } + } + // import backend can't reopen engine if engine is closed, so + // only open index engine if any data engines don't finish writing. + var indexEngine *backend.OpenedEngine + var err error + for engineID, engine := range cp.Engines { + if engineID == common.IndexEngineID { + continue + } + if engine.Status < checkpoints.CheckpointStatusAllWritten { + indexEngine, err = rc.engineMgr.OpenEngine(ctx, idxEngineCfg, tr.tableName, common.IndexEngineID) + if err != nil { + return errors.Trace(err) + } + break + } + } + + logTask := tr.logger.Begin(zap.InfoLevel, "import whole table") + var wg sync.WaitGroup + var engineErr common.OnceError + setError := func(err error) { + engineErr.Set(err) + // cancel this context to fail fast + cancel() + } + + type engineCheckpoint struct { + engineID int32 + checkpoint *checkpoints.EngineCheckpoint + } + allEngines := make([]engineCheckpoint, 0, len(cp.Engines)) + for engineID, engine := range cp.Engines { + allEngines = append(allEngines, engineCheckpoint{engineID: engineID, checkpoint: engine}) + } + slices.SortFunc(allEngines, func(i, j engineCheckpoint) bool { return i.engineID < j.engineID }) + + for _, ecp := range allEngines { + engineID := ecp.engineID + engine := ecp.checkpoint + select { + case <-ctx.Done(): + // Set engineErr and break this for loop to wait all the sub-routines done before return. + // Directly return may cause panic because caller will close the pebble db but some sub routines + // are still reading from or writing to the pebble db. + engineErr.Set(ctx.Err()) + default: + } + if engineErr.Get() != nil { + break + } + + // Should skip index engine + if engineID < 0 { + continue + } + + if engine.Status < checkpoints.CheckpointStatusImported { + wg.Add(1) + + // If the number of chunks is small, it means that this engine may be finished in a few times. + // We do not limit it in TableConcurrency + restoreWorker := rc.tableWorkers.Apply() + go func(w *worker.Worker, eid int32, ecp *checkpoints.EngineCheckpoint) { + defer wg.Done() + engineLogTask := tr.logger.With(zap.Int32("engineNumber", eid)).Begin(zap.InfoLevel, "restore engine") + dataClosedEngine, err := tr.preprocessEngine(ctx, rc, indexEngine, eid, ecp) + engineLogTask.End(zap.ErrorLevel, err) + rc.tableWorkers.Recycle(w) + if err == nil { + dataWorker := rc.closedEngineLimit.Apply() + defer rc.closedEngineLimit.Recycle(dataWorker) + err = tr.importEngine(ctx, dataClosedEngine, rc, ecp) + if rc.status != nil && rc.status.backend == config.BackendLocal { + for _, chunk := range ecp.Chunks { + rc.status.FinishedFileSize.Add(chunk.TotalSize()) + } + } + } + if err != nil { + setError(err) + } + }(restoreWorker, engineID, engine) + } else { + for _, chunk := range engine.Chunks { + rc.status.FinishedFileSize.Add(chunk.TotalSize()) + } + } + } + + wg.Wait() + + restoreErr = engineErr.Get() + logTask.End(zap.ErrorLevel, restoreErr) + if restoreErr != nil { + return errors.Trace(restoreErr) + } + + if indexEngine != nil { + closedIndexEngine, restoreErr = indexEngine.Close(ctx) + } else { + closedIndexEngine, restoreErr = rc.engineMgr.UnsafeCloseEngine(ctx, idxEngineCfg, tr.tableName, common.IndexEngineID) + } + + if err = rc.saveStatusCheckpoint(ctx, tr.tableName, common.IndexEngineID, restoreErr, checkpoints.CheckpointStatusClosed); err != nil { + return errors.Trace(firstErr(restoreErr, err)) + } + } else if indexEngineCp.Status == checkpoints.CheckpointStatusClosed { + // If index engine file has been closed but not imported only if context cancel occurred + // when `importKV()` execution, so `UnsafeCloseEngine` and continue import it. + closedIndexEngine, restoreErr = rc.engineMgr.UnsafeCloseEngine(ctx, idxEngineCfg, tr.tableName, common.IndexEngineID) + } + if restoreErr != nil { + return errors.Trace(restoreErr) + } + + // if data engine is handled in previous run and we continue importing from checkpoint + if !handleDataEngineThisRun { + for _, engine := range cp.Engines { + for _, chunk := range engine.Chunks { + rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset - chunk.Key.Offset) + } + } + } + + if cp.Status < checkpoints.CheckpointStatusIndexImported { + var err error + if indexEngineCp.Status < checkpoints.CheckpointStatusImported { + failpoint.Inject("FailBeforeStartImportingIndexEngine", func() { + errMsg := "fail before importing index KV data" + tr.logger.Warn(errMsg) + failpoint.Return(errors.New(errMsg)) + }) + err = tr.importKV(ctx, closedIndexEngine, rc) + failpoint.Inject("FailBeforeIndexEngineImported", func() { + finished := rc.status.FinishedFileSize.Load() + total := rc.status.TotalFileSize.Load() + tr.logger.Warn("print lightning status", + zap.Int64("finished", finished), + zap.Int64("total", total), + zap.Bool("equal", finished == total)) + panic("forcing failure due to FailBeforeIndexEngineImported") + }) + } + + saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, checkpoints.WholeTableEngineID, err, checkpoints.CheckpointStatusIndexImported) + if err = firstErr(err, saveCpErr); err != nil { + return errors.Trace(err) + } + } + return nil +} + +// preprocessEngine do some preprocess work +// for local backend, it do local sort, for tidb backend it transforms data into sql and execute +// TODO: it's not a correct name for tidb backend, since there's no post-process for it +// TODO: after separate local/tidb backend more clearly, rename it. +func (tr *TableImporter) preprocessEngine( + pCtx context.Context, + rc *Controller, + indexEngine *backend.OpenedEngine, + engineID int32, + cp *checkpoints.EngineCheckpoint, +) (*backend.ClosedEngine, error) { + ctx, cancel := context.WithCancel(pCtx) + defer cancel() + // all data has finished written, we can close the engine directly. + if cp.Status >= checkpoints.CheckpointStatusAllWritten { + engineCfg := &backend.EngineConfig{ + TableInfo: tr.tableInfo, + } + closedEngine, err := rc.engineMgr.UnsafeCloseEngine(ctx, engineCfg, tr.tableName, engineID) + // If any error occurred, recycle worker immediately + if err != nil { + return closedEngine, errors.Trace(err) + } + if rc.status != nil && rc.status.backend == config.BackendTiDB { + for _, chunk := range cp.Chunks { + rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset - chunk.Key.Offset) + } + } + return closedEngine, nil + } + + // if the key are ordered, LocalWrite can optimize the writing. + // table has auto-incremented _tidb_rowid must satisfy following restrictions: + // - clustered index disable and primary key is not number + // - no auto random bits (auto random or shard row id) + // - no partition table + // - no explicit _tidb_rowid field (At this time we can't determine if the source file contains _tidb_rowid field, + // so we will do this check in LocalWriter when the first row is received.) + hasAutoIncrementAutoID := common.TableHasAutoRowID(tr.tableInfo.Core) && + tr.tableInfo.Core.AutoRandomBits == 0 && tr.tableInfo.Core.ShardRowIDBits == 0 && + tr.tableInfo.Core.Partition == nil + dataWriterCfg := &backend.LocalWriterConfig{ + IsKVSorted: hasAutoIncrementAutoID, + TableName: tr.tableName, + } + + logTask := tr.logger.With(zap.Int32("engineNumber", engineID)).Begin(zap.InfoLevel, "encode kv data and write") + dataEngineCfg := &backend.EngineConfig{ + TableInfo: tr.tableInfo, + } + if !tr.tableMeta.IsRowOrdered { + dataEngineCfg.Local.Compact = true + dataEngineCfg.Local.CompactConcurrency = 4 + dataEngineCfg.Local.CompactThreshold = local.CompactionUpperThreshold + } + dataEngine, err := rc.engineMgr.OpenEngine(ctx, dataEngineCfg, tr.tableName, engineID) + if err != nil { + return nil, errors.Trace(err) + } + + var wg sync.WaitGroup + var chunkErr common.OnceError + + type chunkFlushStatus struct { + dataStatus backend.ChunkFlushStatus + indexStatus backend.ChunkFlushStatus + chunkCp *checkpoints.ChunkCheckpoint + } + + // chunks that are finished writing, but checkpoints are not finished due to flush not finished. + var checkFlushLock sync.Mutex + flushPendingChunks := make([]chunkFlushStatus, 0, 16) + + chunkCpChan := make(chan *checkpoints.ChunkCheckpoint, 16) + go func() { + for { + select { + case cp, ok := <-chunkCpChan: + if !ok { + return + } + saveCheckpoint(rc, tr, engineID, cp) + case <-ctx.Done(): + return + } + } + }() + + setError := func(err error) { + chunkErr.Set(err) + cancel() + } + + metrics, _ := metric.FromContext(ctx) + + // Restore table data +ChunkLoop: + for chunkIndex, chunk := range cp.Chunks { + if rc.status != nil && rc.status.backend == config.BackendTiDB { + rc.status.FinishedFileSize.Add(chunk.Chunk.Offset - chunk.Key.Offset) + } + if chunk.Chunk.Offset >= chunk.Chunk.EndOffset { + continue + } + + checkFlushLock.Lock() + finished := 0 + for _, c := range flushPendingChunks { + if !(c.indexStatus.Flushed() && c.dataStatus.Flushed()) { + break + } + chunkCpChan <- c.chunkCp + finished++ + } + if finished > 0 { + flushPendingChunks = flushPendingChunks[finished:] + } + checkFlushLock.Unlock() + + failpoint.Inject("orphanWriterGoRoutine", func() { + if chunkIndex > 0 { + <-pCtx.Done() + } + }) + + select { + case <-pCtx.Done(): + break ChunkLoop + default: + } + + if chunkErr.Get() != nil { + break + } + + // Flows : + // 1. read mydump file + // 2. sql -> kvs + // 3. load kvs data (into kv deliver server) + // 4. flush kvs data (into tikv node) + var remainChunkCnt float64 + if chunk.Chunk.Offset < chunk.Chunk.EndOffset { + remainChunkCnt = float64(chunk.UnfinishedSize()) / float64(chunk.TotalSize()) + if metrics != nil { + metrics.ChunkCounter.WithLabelValues(metric.ChunkStatePending).Add(remainChunkCnt) + } + } + + dataWriter, err := dataEngine.LocalWriter(ctx, dataWriterCfg) + if err != nil { + setError(err) + break + } + + indexWriter, err := indexEngine.LocalWriter(ctx, &backend.LocalWriterConfig{TableName: tr.tableName}) + if err != nil { + _, _ = dataWriter.Close(ctx) + setError(err) + break + } + cr, err := newChunkProcessor(ctx, chunkIndex, rc.cfg, chunk, rc.ioWorkers, rc.store, tr.tableInfo.Core) + if err != nil { + setError(err) + break + } + + restoreWorker := rc.regionWorkers.Apply() + wg.Add(1) + go func(w *worker.Worker, cr *chunkProcessor) { + // Restore a chunk. + defer func() { + cr.close() + wg.Done() + rc.regionWorkers.Recycle(w) + }() + if metrics != nil { + metrics.ChunkCounter.WithLabelValues(metric.ChunkStateRunning).Add(remainChunkCnt) + } + err := cr.process(ctx, tr, engineID, dataWriter, indexWriter, rc) + var dataFlushStatus, indexFlushStaus backend.ChunkFlushStatus + if err == nil { + dataFlushStatus, err = dataWriter.Close(ctx) + } + if err == nil { + indexFlushStaus, err = indexWriter.Close(ctx) + } + if err == nil { + if metrics != nil { + metrics.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Add(remainChunkCnt) + metrics.BytesCounter.WithLabelValues(metric.StateRestoreWritten).Add(float64(cr.chunk.Checksum.SumSize())) + } + if dataFlushStatus != nil && indexFlushStaus != nil { + if dataFlushStatus.Flushed() && indexFlushStaus.Flushed() { + saveCheckpoint(rc, tr, engineID, cr.chunk) + } else { + checkFlushLock.Lock() + flushPendingChunks = append(flushPendingChunks, chunkFlushStatus{ + dataStatus: dataFlushStatus, + indexStatus: indexFlushStaus, + chunkCp: cr.chunk, + }) + checkFlushLock.Unlock() + } + } + } else { + if metrics != nil { + metrics.ChunkCounter.WithLabelValues(metric.ChunkStateFailed).Add(remainChunkCnt) + } + setError(err) + } + }(restoreWorker, cr) + } + + wg.Wait() + select { + case <-pCtx.Done(): + return nil, pCtx.Err() + default: + } + + // Report some statistics into the log for debugging. + totalKVSize := uint64(0) + totalSQLSize := int64(0) + logKeyName := "read(bytes)" + for _, chunk := range cp.Chunks { + totalKVSize += chunk.Checksum.SumSize() + totalSQLSize += chunk.UnfinishedSize() + if chunk.FileMeta.Type == mydump.SourceTypeParquet { + logKeyName = "read(rows)" + } + } + + err = chunkErr.Get() + logTask.End(zap.ErrorLevel, err, + zap.Int64(logKeyName, totalSQLSize), + zap.Uint64("written", totalKVSize), + ) + + trySavePendingChunks := func(flushCtx context.Context) error { + checkFlushLock.Lock() + cnt := 0 + for _, chunk := range flushPendingChunks { + if !(chunk.dataStatus.Flushed() && chunk.indexStatus.Flushed()) { + break + } + saveCheckpoint(rc, tr, engineID, chunk.chunkCp) + cnt++ + } + flushPendingChunks = flushPendingChunks[cnt:] + checkFlushLock.Unlock() + return nil + } + + // in local mode, this check-point make no sense, because we don't do flush now, + // so there may be data lose if exit at here. So we don't write this checkpoint + // here like other mode. + if !isLocalBackend(rc.cfg) { + if saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, engineID, err, checkpoints.CheckpointStatusAllWritten); saveCpErr != nil { + return nil, errors.Trace(firstErr(err, saveCpErr)) + } + } + if err != nil { + // if process is canceled, we should flush all chunk checkpoints for local backend + if isLocalBackend(rc.cfg) && common.IsContextCanceledError(err) { + // ctx is canceled, so to avoid Close engine failed, we use `context.Background()` here + if _, err2 := dataEngine.Close(context.Background()); err2 != nil { + log.FromContext(ctx).Warn("flush all chunk checkpoints failed before manually exits", zap.Error(err2)) + return nil, errors.Trace(err) + } + if err2 := trySavePendingChunks(context.Background()); err2 != nil { + log.FromContext(ctx).Warn("flush all chunk checkpoints failed before manually exits", zap.Error(err2)) + } + } + return nil, errors.Trace(err) + } + + closedDataEngine, err := dataEngine.Close(ctx) + // For local backend, if checkpoint is enabled, we must flush index engine to avoid data loss. + // this flush action impact up to 10% of the performance, so we only do it if necessary. + if err == nil && rc.cfg.Checkpoint.Enable && isLocalBackend(rc.cfg) { + if err = indexEngine.Flush(ctx); err != nil { + return nil, errors.Trace(err) + } + if err = trySavePendingChunks(ctx); err != nil { + return nil, errors.Trace(err) + } + } + saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, engineID, err, checkpoints.CheckpointStatusClosed) + if err = firstErr(err, saveCpErr); err != nil { + // If any error occurred, recycle worker immediately + return nil, errors.Trace(err) + } + return closedDataEngine, nil +} + +func (tr *TableImporter) importEngine( + ctx context.Context, + closedEngine *backend.ClosedEngine, + rc *Controller, + cp *checkpoints.EngineCheckpoint, +) error { + if cp.Status >= checkpoints.CheckpointStatusImported { + return nil + } + + // 1. calling import + if err := tr.importKV(ctx, closedEngine, rc); err != nil { + return errors.Trace(err) + } + + // 2. perform a level-1 compact if idling. + if rc.cfg.PostRestore.Level1Compact && rc.compactState.CompareAndSwap(compactStateIdle, compactStateDoing) { + go func() { + // we ignore level-1 compact failure since it is not fatal. + // no need log the error, it is done in (*Importer).Compact already. + _ = rc.doCompact(ctx, Level1Compact) + rc.compactState.Store(compactStateIdle) + }() + } + + return nil +} + +// postProcess execute rebase-auto-id/checksum/analyze according to the task config. +// +// if the parameter forcePostProcess to true, postProcess force run checksum and analyze even if the +// post-process-at-last config is true. And if this two phases are skipped, the first return value will be true. +func (tr *TableImporter) postProcess( + ctx context.Context, + rc *Controller, + cp *checkpoints.TableCheckpoint, + forcePostProcess bool, + metaMgr tableMetaMgr, +) (bool, error) { + if !rc.backend.ShouldPostProcess() { + return false, nil + } + + // alter table set auto_increment + if cp.Status < checkpoints.CheckpointStatusAlteredAutoInc { + rc.alterTableLock.Lock() + tblInfo := tr.tableInfo.Core + var err error + if tblInfo.ContainsAutoRandomBits() { + ft := &common.GetAutoRandomColumn(tblInfo).FieldType + shardFmt := autoid.NewShardIDFormat(ft, tblInfo.AutoRandomBits, tblInfo.AutoRandomRangeBits) + maxCap := shardFmt.IncrementalBitsCapacity() + err = AlterAutoRandom(ctx, rc.db, tr.tableName, uint64(tr.alloc.Get(autoid.AutoRandomType).Base())+1, maxCap) + } else if common.TableHasAutoRowID(tblInfo) || tblInfo.GetAutoIncrementColInfo() != nil { + // only alter auto increment id iff table contains auto-increment column or generated handle + err = AlterAutoIncrement(ctx, rc.db, tr.tableName, uint64(tr.alloc.Get(autoid.RowIDAllocType).Base())+1) + } + rc.alterTableLock.Unlock() + saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, checkpoints.WholeTableEngineID, err, checkpoints.CheckpointStatusAlteredAutoInc) + if err = firstErr(err, saveCpErr); err != nil { + return false, err + } + cp.Status = checkpoints.CheckpointStatusAlteredAutoInc + } + + // tidb backend don't need checksum & analyze + if rc.cfg.PostRestore.Checksum == config.OpLevelOff && rc.cfg.PostRestore.Analyze == config.OpLevelOff { + tr.logger.Debug("skip checksum & analyze, either because not supported by this backend or manually disabled") + err := rc.saveStatusCheckpoint(ctx, tr.tableName, checkpoints.WholeTableEngineID, nil, checkpoints.CheckpointStatusAnalyzeSkipped) + return false, errors.Trace(err) + } + + if !forcePostProcess && rc.cfg.PostRestore.PostProcessAtLast { + return true, nil + } + + w := rc.checksumWorks.Apply() + defer rc.checksumWorks.Recycle(w) + + shouldSkipAnalyze := false + if cp.Status < checkpoints.CheckpointStatusChecksumSkipped { + // 4. do table checksum + var localChecksum verify.KVChecksum + for _, engine := range cp.Engines { + for _, chunk := range engine.Chunks { + localChecksum.Add(&chunk.Checksum) + } + } + tr.logger.Info("local checksum", zap.Object("checksum", &localChecksum)) + + // 4.5. do duplicate detection. + // if we came here, it must be a local backend. + // todo: remove this cast after we refactor the backend interface. Physical mode is so different, we shouldn't + // try to abstract it with logical mode. + localBackend := rc.backend.(*local.Backend) + dupeController := localBackend.GetDupeController(rc.cfg.TikvImporter.RangeConcurrency*2, rc.errorMgr) + hasDupe := false + if rc.cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone { + opts := &encode.SessionOptions{ + SQLMode: mysql.ModeStrictAllTables, + SysVars: rc.sysVars, + } + var err error + hasLocalDupe, err := dupeController.CollectLocalDuplicateRows(ctx, tr.encTable, tr.tableName, opts) + if err != nil { + tr.logger.Error("collect local duplicate keys failed", log.ShortError(err)) + return false, err + } + hasDupe = hasLocalDupe + } + failpoint.Inject("SlowDownCheckDupe", func(v failpoint.Value) { + sec := v.(int) + tr.logger.Warn("start to sleep several seconds before checking other dupe", + zap.Int("seconds", sec)) + time.Sleep(time.Duration(sec) * time.Second) + }) + + otherHasDupe, needRemoteDupe, baseTotalChecksum, err := metaMgr.CheckAndUpdateLocalChecksum(ctx, &localChecksum, hasDupe) + if err != nil { + return false, err + } + needChecksum := !otherHasDupe && needRemoteDupe + hasDupe = hasDupe || otherHasDupe + + if needRemoteDupe && rc.cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone { + opts := &encode.SessionOptions{ + SQLMode: mysql.ModeStrictAllTables, + SysVars: rc.sysVars, + } + hasRemoteDupe, e := dupeController.CollectRemoteDuplicateRows(ctx, tr.encTable, tr.tableName, opts) + if e != nil { + tr.logger.Error("collect remote duplicate keys failed", log.ShortError(e)) + return false, e + } + hasDupe = hasDupe || hasRemoteDupe + + if hasDupe { + if err = dupeController.ResolveDuplicateRows(ctx, tr.encTable, tr.tableName, rc.cfg.TikvImporter.DuplicateResolution); err != nil { + tr.logger.Error("resolve remote duplicate keys failed", log.ShortError(err)) + return false, err + } + } + } + + if rc.dupIndicator != nil { + tr.logger.Debug("set dupIndicator", zap.Bool("has-duplicate", hasDupe)) + rc.dupIndicator.CompareAndSwap(false, hasDupe) + } + + nextStage := checkpoints.CheckpointStatusChecksummed + if rc.cfg.PostRestore.Checksum != config.OpLevelOff && !hasDupe && needChecksum { + if cp.Checksum.SumKVS() > 0 || baseTotalChecksum.SumKVS() > 0 { + localChecksum.Add(&cp.Checksum) + localChecksum.Add(baseTotalChecksum) + tr.logger.Info("merged local checksum", zap.Object("checksum", &localChecksum)) + } + + var remoteChecksum *local.RemoteChecksum + remoteChecksum, err = DoChecksum(ctx, tr.tableInfo) + failpoint.Inject("checksum-error", func() { + tr.logger.Info("failpoint checksum-error injected.") + remoteChecksum = nil + err = status.Error(codes.Unknown, "Checksum meets error.") + }) + if err != nil { + if rc.cfg.PostRestore.Checksum != config.OpLevelOptional { + return false, err + } + tr.logger.Warn("do checksum failed, will skip this error and go on", log.ShortError(err)) + err = nil + } + if remoteChecksum != nil { + err = tr.compareChecksum(remoteChecksum, localChecksum) + // with post restore level 'optional', we will skip checksum error + if rc.cfg.PostRestore.Checksum == config.OpLevelOptional { + if err != nil { + tr.logger.Warn("compare checksum failed, will skip this error and go on", log.ShortError(err)) + err = nil + } + } + } + } else { + switch { + case rc.cfg.PostRestore.Checksum == config.OpLevelOff: + tr.logger.Info("skip checksum because the checksum option is off") + case hasDupe: + tr.logger.Info("skip checksum&analyze because duplicates were detected") + shouldSkipAnalyze = true + case !needChecksum: + tr.logger.Info("skip checksum&analyze because other lightning instance will do this") + shouldSkipAnalyze = true + } + err = nil + nextStage = checkpoints.CheckpointStatusChecksumSkipped + } + + // Don't call FinishTable when other lightning will calculate checksum. + if err == nil && needChecksum { + err = metaMgr.FinishTable(ctx) + } + + saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, checkpoints.WholeTableEngineID, err, nextStage) + if err = firstErr(err, saveCpErr); err != nil { + return false, errors.Trace(err) + } + cp.Status = nextStage + } + + if cp.Status < checkpoints.CheckpointStatusIndexAdded { + var err error + if rc.cfg.TikvImporter.AddIndexBySQL { + w := rc.addIndexLimit.Apply() + err = tr.addIndexes(ctx, rc.db) + rc.addIndexLimit.Recycle(w) + // Analyze will be automatically triggered after indexes are added by SQL. We can skip manual analyze. + shouldSkipAnalyze = true + } + saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, checkpoints.WholeTableEngineID, err, checkpoints.CheckpointStatusIndexAdded) + if err = firstErr(err, saveCpErr); err != nil { + return false, errors.Trace(err) + } + cp.Status = checkpoints.CheckpointStatusIndexAdded + } + + // do table analyze + if cp.Status < checkpoints.CheckpointStatusAnalyzeSkipped { + switch { + case shouldSkipAnalyze || rc.cfg.PostRestore.Analyze == config.OpLevelOff: + tr.logger.Info("skip analyze") + if err := rc.saveStatusCheckpoint(ctx, tr.tableName, checkpoints.WholeTableEngineID, nil, checkpoints.CheckpointStatusAnalyzeSkipped); err != nil { + return false, errors.Trace(err) + } + cp.Status = checkpoints.CheckpointStatusAnalyzeSkipped + case forcePostProcess || !rc.cfg.PostRestore.PostProcessAtLast: + err := tr.analyzeTable(ctx, rc.db) + // witch post restore level 'optional', we will skip analyze error + if rc.cfg.PostRestore.Analyze == config.OpLevelOptional { + if err != nil { + tr.logger.Warn("analyze table failed, will skip this error and go on", log.ShortError(err)) + err = nil + } + } + saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, checkpoints.WholeTableEngineID, err, checkpoints.CheckpointStatusAnalyzed) + if err = firstErr(err, saveCpErr); err != nil { + return false, errors.Trace(err) + } + cp.Status = checkpoints.CheckpointStatusAnalyzed + } + } + + return true, nil +} + +func parseColumnPermutations( + tableInfo *model.TableInfo, + columns []string, + ignoreColumns map[string]struct{}, + logger log.Logger, +) ([]int, error) { + colPerm := make([]int, 0, len(tableInfo.Columns)+1) + + columnMap := make(map[string]int) + for i, column := range columns { + columnMap[column] = i + } + + tableColumnMap := make(map[string]int) + for i, col := range tableInfo.Columns { + tableColumnMap[col.Name.L] = i + } + + // check if there are some unknown columns + var unknownCols []string + for _, c := range columns { + if _, ok := tableColumnMap[c]; !ok && c != model.ExtraHandleName.L { + if _, ignore := ignoreColumns[c]; !ignore { + unknownCols = append(unknownCols, c) + } + } + } + + if len(unknownCols) > 0 { + return colPerm, common.ErrUnknownColumns.GenWithStackByArgs(strings.Join(unknownCols, ","), tableInfo.Name) + } + + for _, colInfo := range tableInfo.Columns { + if i, ok := columnMap[colInfo.Name.L]; ok { + if _, ignore := ignoreColumns[colInfo.Name.L]; !ignore { + colPerm = append(colPerm, i) + } else { + logger.Debug("column ignored by user requirements", + zap.Stringer("table", tableInfo.Name), + zap.String("colName", colInfo.Name.O), + zap.Stringer("colType", &colInfo.FieldType), + ) + colPerm = append(colPerm, -1) + } + } else { + if len(colInfo.GeneratedExprString) == 0 { + logger.Warn("column missing from data file, going to fill with default value", + zap.Stringer("table", tableInfo.Name), + zap.String("colName", colInfo.Name.O), + zap.Stringer("colType", &colInfo.FieldType), + ) + } + colPerm = append(colPerm, -1) + } + } + // append _tidb_rowid column + rowIDIdx := -1 + if i, ok := columnMap[model.ExtraHandleName.L]; ok { + if _, ignored := ignoreColumns[model.ExtraHandleName.L]; !ignored { + rowIDIdx = i + } + } + // FIXME: the schema info for tidb backend is not complete, so always add the _tidb_rowid field. + // Other logic should ignore this extra field if not needed. + colPerm = append(colPerm, rowIDIdx) + + return colPerm, nil +} + +func (tr *TableImporter) importKV( + ctx context.Context, + closedEngine *backend.ClosedEngine, + rc *Controller, +) error { + task := closedEngine.Logger().Begin(zap.InfoLevel, "import and cleanup engine") + regionSplitSize := int64(rc.cfg.TikvImporter.RegionSplitSize) + regionSplitKeys := int64(rc.cfg.TikvImporter.RegionSplitKeys) + + if regionSplitSize == 0 && rc.taskMgr != nil { + regionSplitSize = int64(config.SplitRegionSize) + if err := rc.taskMgr.CheckTasksExclusively(ctx, func(tasks []taskMeta) ([]taskMeta, error) { + if len(tasks) > 0 { + regionSplitSize = int64(config.SplitRegionSize) * int64(mathutil.Min(len(tasks), config.MaxSplitRegionSizeRatio)) + } + return nil, nil + }); err != nil { + return errors.Trace(err) + } + } + if regionSplitKeys == 0 { + if regionSplitSize > int64(config.SplitRegionSize) { + regionSplitKeys = int64(float64(regionSplitSize) / float64(config.SplitRegionSize) * float64(config.SplitRegionKeys)) + } else { + regionSplitKeys = int64(config.SplitRegionKeys) + } + } + err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys) + saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, closedEngine.GetID(), err, checkpoints.CheckpointStatusImported) + // Don't clean up when save checkpoint failed, because we will verifyLocalFile and import engine again after restart. + if err == nil && saveCpErr == nil { + err = multierr.Append(err, closedEngine.Cleanup(ctx)) + } + err = firstErr(err, saveCpErr) + + dur := task.End(zap.ErrorLevel, err) + + if err != nil { + return errors.Trace(err) + } + + if m, ok := metric.FromContext(ctx); ok { + m.ImportSecondsHistogram.Observe(dur.Seconds()) + } + + failpoint.Inject("SlowDownImport", func() {}) + + return nil +} + +// do checksum for each table. +func (tr *TableImporter) compareChecksum(remoteChecksum *local.RemoteChecksum, localChecksum verify.KVChecksum) error { + if remoteChecksum.Checksum != localChecksum.Sum() || + remoteChecksum.TotalKVs != localChecksum.SumKVS() || + remoteChecksum.TotalBytes != localChecksum.SumSize() { + return common.ErrChecksumMismatch.GenWithStackByArgs( + remoteChecksum.Checksum, localChecksum.Sum(), + remoteChecksum.TotalKVs, localChecksum.SumKVS(), + remoteChecksum.TotalBytes, localChecksum.SumSize(), + ) + } + + tr.logger.Info("checksum pass", zap.Object("local", &localChecksum)) + return nil +} + +func (tr *TableImporter) analyzeTable(ctx context.Context, db *sql.DB) error { + task := tr.logger.Begin(zap.InfoLevel, "analyze") + exec := common.SQLWithRetry{ + DB: db, + Logger: tr.logger, + } + err := exec.Exec(ctx, "analyze table", "ANALYZE TABLE "+tr.tableName) + task.End(zap.ErrorLevel, err) + return err +} + +func (tr *TableImporter) dropIndexes(ctx context.Context, db *sql.DB) error { + logger := log.FromContext(ctx).With(zap.String("table", tr.tableName)) + + tblInfo := tr.tableInfo + tableName := common.UniqueTable(tblInfo.DB, tblInfo.Name) + remainIndexes, dropIndexes := common.GetDropIndexInfos(tblInfo.Core) + for _, idxInfo := range dropIndexes { + sqlStr := common.BuildDropIndexSQL(tableName, idxInfo) + + logger.Info("drop index", zap.String("sql", sqlStr)) + + s := common.SQLWithRetry{ + DB: db, + Logger: logger, + } + if err := s.Exec(ctx, "drop index", sqlStr); err != nil { + if merr, ok := errors.Cause(err).(*dmysql.MySQLError); ok { + switch merr.Number { + case errno.ErrCantDropFieldOrKey, errno.ErrDropIndexNeededInForeignKey: + remainIndexes = append(remainIndexes, idxInfo) + logger.Info("can't drop index, skip", zap.String("index", idxInfo.Name.O), zap.Error(err)) + continue + } + } + return common.ErrDropIndexFailed.Wrap(err).GenWithStackByArgs(common.EscapeIdentifier(idxInfo.Name.O), tr.tableName) + } + } + if len(remainIndexes) < len(tblInfo.Core.Indices) { + // Must clone (*model.TableInfo) before modifying it, since it may be referenced in other place. + tblInfo.Core = tblInfo.Core.Clone() + tblInfo.Core.Indices = remainIndexes + + // Rebuild encTable. + encTable, err := tables.TableFromMeta(tr.alloc, tblInfo.Core) + if err != nil { + return errors.Trace(err) + } + tr.encTable = encTable + } + return nil +} + +func (tr *TableImporter) addIndexes(ctx context.Context, db *sql.DB) (retErr error) { + const progressStep = "add-index" + task := tr.logger.Begin(zap.InfoLevel, "add indexes") + defer func() { + task.End(zap.ErrorLevel, retErr) + }() + + tblInfo := tr.tableInfo + tableName := tr.tableName + + singleSQL, multiSQLs := common.BuildAddIndexSQL(tableName, tblInfo.Core, tblInfo.Desired) + if len(multiSQLs) == 0 { + return nil + } + + logger := log.FromContext(ctx).With(zap.String("table", tableName)) + + defer func() { + if retErr == nil { + web.BroadcastTableProgress(tr.tableName, progressStep, 1) + } else if !log.IsContextCanceledError(retErr) { + // Try to strip the prefix of the error message. + // e.g "add index failed: Error 1062 ..." -> "Error 1062 ..." + cause := errors.Cause(retErr) + if cause == nil { + cause = retErr + } + retErr = common.ErrAddIndexFailed.GenWithStack( + "add index failed on table %s: %v, you can add index manually by the following SQL: %s", + tableName, cause, singleSQL) + } + }() + + var totalRows int + if m, ok := metric.FromContext(ctx); ok { + totalRows = int(metric.ReadCounter(m.RowsCounter.WithLabelValues(metric.StateRestored, tableName))) + } + + // Try to add all indexes in one statement. + err := tr.executeDDL(ctx, db, singleSQL, func(status *ddlStatus) { + if totalRows > 0 { + progress := float64(status.rowCount) / float64(totalRows*len(multiSQLs)) + if progress > 1 { + progress = 1 + } + web.BroadcastTableProgress(tableName, progressStep, progress) + logger.Info("add index progress", zap.String("progress", fmt.Sprintf("%.1f%%", progress*100))) + } + }) + if err == nil { + return nil + } + if !common.IsDupKeyError(err) { + return err + } + if len(multiSQLs) == 1 { + return nil + } + logger.Warn("cannot add all indexes in one statement, try to add them one by one", zap.Strings("sqls", multiSQLs), zap.Error(err)) + + baseProgress := float64(0) + for _, ddl := range multiSQLs { + err := tr.executeDDL(ctx, db, ddl, func(status *ddlStatus) { + if totalRows > 0 { + p := float64(status.rowCount) / float64(totalRows) + progress := baseProgress + p/float64(len(multiSQLs)) + web.BroadcastTableProgress(tableName, progressStep, progress) + logger.Info("add index progress", zap.String("progress", fmt.Sprintf("%.1f%%", progress*100))) + } + }) + if err != nil && !common.IsDupKeyError(err) { + return err + } + baseProgress += 1.0 / float64(len(multiSQLs)) + web.BroadcastTableProgress(tableName, progressStep, baseProgress) + } + return nil +} + +func (*TableImporter) executeDDL( + ctx context.Context, + db *sql.DB, + ddl string, + updateProgress func(status *ddlStatus), +) error { + logger := log.FromContext(ctx).With(zap.String("ddl", ddl)) + logger.Info("execute ddl") + + s := common.SQLWithRetry{ + DB: db, + Logger: logger, + } + + var currentTS int64 + if err := s.QueryRow(ctx, "", "SELECT UNIX_TIMESTAMP()", ¤tTS); err != nil { + currentTS = time.Now().Unix() + logger.Warn("failed to query current timestamp, use current time instead", zap.Int64("currentTS", currentTS), zap.Error(err)) + } + + resultCh := make(chan error, 1) + go func() { + resultCh <- s.Exec(ctx, "add index", ddl) + }() + + failpoint.Inject("AddIndexCrash", func() { + _ = common.KillMySelf() + }) + + var ddlErr error + for { + select { + case ddlErr = <-resultCh: + failpoint.Inject("AddIndexFail", func() { + ddlErr = errors.New("injected error") + }) + if ddlErr == nil { + return nil + } + if log.IsContextCanceledError(ddlErr) { + return ddlErr + } + if isDeterminedError(ddlErr) { + return ddlErr + } + logger.Warn("failed to execute ddl, try to query ddl status", zap.Error(ddlErr)) + case <-time.After(getDDLStatusInterval): + } + + var status *ddlStatus + err := common.Retry("query ddl status", logger, func() error { + var err error + status, err = getDDLStatus(ctx, db, ddl, time.Unix(currentTS, 0)) + return err + }) + if err != nil || status == nil { + logger.Warn("failed to query ddl status", zap.Error(err)) + if ddlErr != nil { + return ddlErr + } + continue + } + updateProgress(status) + + if ddlErr != nil { + switch state := status.state; state { + case model.JobStateDone, model.JobStateSynced: + logger.Info("ddl job is finished", zap.Stringer("state", state)) + return nil + case model.JobStateRunning, model.JobStateQueueing, model.JobStateNone: + logger.Info("ddl job is running", zap.Stringer("state", state)) + default: + logger.Warn("ddl job is canceled or rollbacked", zap.Stringer("state", state)) + return ddlErr + } + } + } +} + +func isDeterminedError(err error) bool { + if merr, ok := errors.Cause(err).(*dmysql.MySQLError); ok { + switch merr.Number { + case errno.ErrDupKeyName, errno.ErrMultiplePriKey, errno.ErrDupUnique, errno.ErrDupEntry: + return true + } + } + return false +} + +const ( + getDDLStatusInterval = time.Minute + // Limit the number of jobs to query. Large limit may result in empty result. See https://github.com/pingcap/tidb/issues/42298. + // A new TiDB cluster has at least 40 jobs in the history queue, so 30 is a reasonable value. + getDDLStatusMaxJobs = 30 +) + +type ddlStatus struct { + state model.JobState + rowCount int64 +} + +func getDDLStatus( + ctx context.Context, + db *sql.DB, + query string, + minCreateTime time.Time, +) (*ddlStatus, error) { + jobID, err := getDDLJobIDByQuery(ctx, db, query) + if err != nil || jobID == 0 { + return nil, err + } + rows, err := db.QueryContext(ctx, fmt.Sprintf("ADMIN SHOW DDL JOBS %d WHERE job_id = %d", getDDLStatusMaxJobs, jobID)) + if err != nil { + return nil, errors.Trace(err) + } + defer rows.Close() + + cols, err := rows.Columns() + if err != nil { + return nil, errors.Trace(err) + } + + var ( + rowCount int64 + state string + createTimeStr sql.NullString + ) + dest := make([]any, len(cols)) + for i, col := range cols { + switch strings.ToLower(col) { + case "row_count": + dest[i] = &rowCount + case "state": + dest[i] = &state + case "create_time": + dest[i] = &createTimeStr + default: + var anyStr sql.NullString + dest[i] = &anyStr + } + } + status := &ddlStatus{} + + for rows.Next() { + if err := rows.Scan(dest...); err != nil { + return nil, errors.Trace(err) + } + status.rowCount += rowCount + // subjob doesn't have create_time, ignore it. + if !createTimeStr.Valid || createTimeStr.String == "" { + continue + } + createTime, err := time.Parse(time.DateTime, createTimeStr.String) + if err != nil { + return nil, errors.Trace(err) + } + // The job is not created by the current task, ignore it. + if createTime.Before(minCreateTime) { + return nil, nil + } + status.state = model.StrToJobState(state) + } + return status, errors.Trace(rows.Err()) +} + +func getDDLJobIDByQuery(ctx context.Context, db *sql.DB, wantQuery string) (int64, error) { + rows, err := db.QueryContext(ctx, fmt.Sprintf("ADMIN SHOW DDL JOB QUERIES LIMIT %d", getDDLStatusMaxJobs)) + if err != nil { + return 0, errors.Trace(err) + } + defer rows.Close() + + for rows.Next() { + var ( + jobID int64 + query string + ) + if err := rows.Scan(&jobID, &query); err != nil { + return 0, errors.Trace(err) + } + if query == wantQuery { + return jobID, errors.Trace(rows.Err()) + } + } + return 0, errors.Trace(rows.Err()) +} + +func (tr *TableImporter) preDeduplicate( + ctx context.Context, + rc *Controller, + cp *checkpoints.TableCheckpoint, + workingDir string, +) error { + d := &dupDetector{ + tr: tr, + rc: rc, + cp: cp, + logger: tr.logger, + } + originalErr := d.run(ctx, workingDir, tr.dupIgnoreRows) + if originalErr == nil { + return nil + } + + if !ErrDuplicateKey.Equal(originalErr) { + return errors.Trace(originalErr) + } + + var ( + idxName string + oneConflictMsg, otherConflictMsg string + ) + + // provide a more friendly error message + + dupErr := errors.Cause(originalErr).(*errors.Error) + conflictIdxID := dupErr.Args()[0].(int64) + if conflictIdxID == conflictOnHandle { + idxName = "PRIMARY" + } else { + for _, idxInfo := range tr.tableInfo.Core.Indices { + if idxInfo.ID == conflictIdxID { + idxName = idxInfo.Name.O + break + } + } + } + if idxName == "" { + tr.logger.Error("cannot find index name", zap.Int64("conflictIdxID", conflictIdxID)) + return errors.Trace(originalErr) + } + if !rc.cfg.Checkpoint.Enable { + err := errors.Errorf("duplicate key in table %s caused by index `%s`, but because checkpoint is off we can't have more details", + tr.tableName, idxName) + rc.errorMgr.RecordDuplicateOnce( + ctx, tr.logger, tr.tableName, "", -1, err.Error(), -1, "", + ) + return err + } + conflictEncodedRowIDs := dupErr.Args()[1].([][]byte) + if len(conflictEncodedRowIDs) < 2 { + tr.logger.Error("invalid conflictEncodedRowIDs", zap.Int("len", len(conflictEncodedRowIDs))) + return errors.Trace(originalErr) + } + rowID := make([]int64, 2) + var err error + _, rowID[0], err = codec.DecodeComparableVarint(conflictEncodedRowIDs[0]) + if err != nil { + rowIDHex := hex.EncodeToString(conflictEncodedRowIDs[0]) + tr.logger.Error("failed to decode rowID", + zap.String("rowID", rowIDHex), + zap.Error(err)) + return errors.Trace(originalErr) + } + _, rowID[1], err = codec.DecodeComparableVarint(conflictEncodedRowIDs[1]) + if err != nil { + rowIDHex := hex.EncodeToString(conflictEncodedRowIDs[1]) + tr.logger.Error("failed to decode rowID", + zap.String("rowID", rowIDHex), + zap.Error(err)) + return errors.Trace(originalErr) + } + + tableCp, err := rc.checkpointsDB.Get(ctx, tr.tableName) + if err != nil { + tr.logger.Error("failed to get table checkpoint", zap.Error(err)) + return errors.Trace(err) + } + var ( + secondConflictPath string + ) + for _, engineCp := range tableCp.Engines { + for _, chunkCp := range engineCp.Chunks { + if chunkCp.Chunk.PrevRowIDMax <= rowID[0] && rowID[0] < chunkCp.Chunk.RowIDMax { + oneConflictMsg = fmt.Sprintf("row %d counting from offset %d in file %s", + rowID[0]-chunkCp.Chunk.PrevRowIDMax, + chunkCp.Chunk.Offset, + chunkCp.FileMeta.Path) + } + if chunkCp.Chunk.PrevRowIDMax <= rowID[1] && rowID[1] < chunkCp.Chunk.RowIDMax { + secondConflictPath = chunkCp.FileMeta.Path + otherConflictMsg = fmt.Sprintf("row %d counting from offset %d in file %s", + rowID[1]-chunkCp.Chunk.PrevRowIDMax, + chunkCp.Chunk.Offset, + chunkCp.FileMeta.Path) + } + } + } + if oneConflictMsg == "" || otherConflictMsg == "" { + tr.logger.Error("cannot find conflict rows by rowID", + zap.Int64("rowID[0]", rowID[0]), + zap.Int64("rowID[1]", rowID[1])) + return errors.Trace(originalErr) + } + err = errors.Errorf("duplicate entry for key '%s', a pair of conflicting rows are (%s, %s)", + idxName, oneConflictMsg, otherConflictMsg) + rc.errorMgr.RecordDuplicateOnce( + ctx, tr.logger, tr.tableName, secondConflictPath, -1, err.Error(), rowID[1], "", + ) + return err +} diff --git a/br/tests/lightning_routes/config.toml b/br/tests/lightning_routes/config.toml index bb54609dd03b1..74913091c5916 100644 --- a/br/tests/lightning_routes/config.toml +++ b/br/tests/lightning_routes/config.toml @@ -8,3 +8,6 @@ schema-pattern = "routes_a*" table-pattern = "t*" target-schema = "routes_b" target-table = "u" + +[post-restore] +checksum = "optional" diff --git a/br/tests/lightning_routes/run.sh b/br/tests/lightning_routes/run.sh index 1db0ce2035021..5ae757eb0bd43 100755 --- a/br/tests/lightning_routes/run.sh +++ b/br/tests/lightning_routes/run.sh @@ -4,12 +4,17 @@ set -eux +echo "testing checksum-error..." +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/importer/checksum-error=1*return()" + run_sql 'DROP DATABASE IF EXISTS routes_a0;' run_sql 'DROP DATABASE IF EXISTS routes_a1;' run_sql 'DROP DATABASE IF EXISTS routes_b;' run_lightning +echo "test checksum-error success!" + run_sql 'SELECT count(1), sum(x) FROM routes_b.u;' check_contains 'count(1): 4' check_contains 'sum(x): 259' diff --git a/disttask/importinto/subtask_executor.go b/disttask/importinto/subtask_executor.go new file mode 100644 index 0000000000000..ebf3c37ab26ac --- /dev/null +++ b/disttask/importinto/subtask_executor.go @@ -0,0 +1,245 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importinto + +import ( + "context" + "strconv" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/br/pkg/lightning/backend/local" + "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/lightning/config" + verify "github.com/pingcap/tidb/br/pkg/lightning/verification" + "github.com/pingcap/tidb/disttask/framework/proto" + "github.com/pingcap/tidb/disttask/framework/scheduler" + "github.com/pingcap/tidb/disttask/framework/storage" + "github.com/pingcap/tidb/executor/importer" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/mathutil" + "github.com/tikv/client-go/v2/util" + "go.uber.org/zap" +) + +// TestSyncChan is used to test. +var TestSyncChan = make(chan struct{}) + +// ImportMinimalTaskExecutor is a minimal task executor for IMPORT INTO. +type ImportMinimalTaskExecutor struct { + mTtask *importStepMinimalTask +} + +// Run implements the SubtaskExecutor.Run interface. +func (e *ImportMinimalTaskExecutor) Run(ctx context.Context) error { + logger := logutil.BgLogger().With(zap.String("type", proto.ImportInto), zap.Int64("table-id", e.mTtask.Plan.TableInfo.ID)) + logger.Info("run minimal task") + failpoint.Inject("waitBeforeSortChunk", func() { + time.Sleep(3 * time.Second) + }) + failpoint.Inject("errorWhenSortChunk", func() { + failpoint.Return(errors.New("occur an error when sort chunk")) + }) + failpoint.Inject("syncBeforeSortChunk", func() { + TestSyncChan <- struct{}{} + <-TestSyncChan + }) + chunkCheckpoint := toChunkCheckpoint(e.mTtask.Chunk) + sharedVars := e.mTtask.SharedVars + if err := importer.ProcessChunk(ctx, &chunkCheckpoint, sharedVars.TableImporter, sharedVars.DataEngine, sharedVars.IndexEngine, sharedVars.Progress, logger); err != nil { + return err + } + + sharedVars.mu.Lock() + defer sharedVars.mu.Unlock() + sharedVars.Checksum.Add(&chunkCheckpoint.Checksum) + return nil +} + +type postProcessMinimalTaskExecutor struct { + mTask *postProcessStepMinimalTask +} + +func (e *postProcessMinimalTaskExecutor) Run(ctx context.Context) error { + mTask := e.mTask + failpoint.Inject("waitBeforePostProcess", func() { + time.Sleep(5 * time.Second) + }) + return postProcess(ctx, mTask.taskMeta, &mTask.meta, mTask.logger) +} + +// postProcess does the post-processing for the task. +func postProcess(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) (err error) { + failpoint.Inject("syncBeforePostProcess", func() { + TestSyncChan <- struct{}{} + <-TestSyncChan + }) + + logger.Info("post process") + + // TODO: create table indexes depends on the option. + // create table indexes even if the post process is failed. + // defer func() { + // err2 := createTableIndexes(ctx, globalTaskManager, taskMeta, logger) + // err = multierr.Append(err, err2) + // }() + + return verifyChecksum(ctx, taskMeta, subtaskMeta, logger) +} + +func verifyChecksum(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) error { + if taskMeta.Plan.Checksum == config.OpLevelOff { + return nil + } + localChecksum := verify.MakeKVChecksum(subtaskMeta.Checksum.Size, subtaskMeta.Checksum.KVs, subtaskMeta.Checksum.Sum) + logger.Info("local checksum", zap.Object("checksum", &localChecksum)) + + failpoint.Inject("waitCtxDone", func() { + <-ctx.Done() + }) + + globalTaskManager, err := storage.GetTaskManager() + if err != nil { + return err + } + remoteChecksum, err := checksumTable(ctx, globalTaskManager, taskMeta, logger) + if err != nil { + if taskMeta.Plan.Checksum != config.OpLevelOptional { + return err + } + logger.Warn("checksumTable failed, will skip this error and go on", zap.Error(err)) + } + if remoteChecksum != nil { + if !remoteChecksum.IsEqual(&localChecksum) { + err2 := common.ErrChecksumMismatch.GenWithStackByArgs( + remoteChecksum.Checksum, localChecksum.Sum(), + remoteChecksum.TotalKVs, localChecksum.SumKVS(), + remoteChecksum.TotalBytes, localChecksum.SumSize(), + ) + if taskMeta.Plan.Checksum == config.OpLevelOptional { + logger.Warn("verify checksum failed, but checksum is optional, will skip it", zap.Error(err2)) + err2 = nil + } + return err2 + } + logger.Info("checksum pass", zap.Object("local", &localChecksum)) + } + return nil +} + +func checksumTable(ctx context.Context, executor storage.SessionExecutor, taskMeta *TaskMeta, logger *zap.Logger) (*local.RemoteChecksum, error) { + var ( + tableName = common.UniqueTable(taskMeta.Plan.DBName, taskMeta.Plan.TableInfo.Name.L) + sql = "ADMIN CHECKSUM TABLE " + tableName + maxErrorRetryCount = 3 + distSQLScanConcurrencyFactor = 1 + remoteChecksum *local.RemoteChecksum + txnErr error + ) + + ctx = util.WithInternalSourceType(ctx, kv.InternalImportInto) + for i := 0; i < maxErrorRetryCount; i++ { + txnErr = executor.WithNewTxn(ctx, func(se sessionctx.Context) error { + // increase backoff weight + if err := setBackoffWeight(se, taskMeta, logger); err != nil { + logger.Warn("set tidb_backoff_weight failed", zap.Error(err)) + } + + distSQLScanConcurrency := se.GetSessionVars().DistSQLScanConcurrency() + se.GetSessionVars().SetDistSQLScanConcurrency(mathutil.Max(distSQLScanConcurrency/distSQLScanConcurrencyFactor, local.MinDistSQLScanConcurrency)) + defer func() { + se.GetSessionVars().SetDistSQLScanConcurrency(distSQLScanConcurrency) + }() + + rs, err := storage.ExecSQL(ctx, se, sql) + if err != nil { + return err + } + if len(rs) < 1 { + return errors.New("empty checksum result") + } + + failpoint.Inject("errWhenChecksum", func() { + if i == 0 { + failpoint.Return(errors.New("occur an error when checksum, coprocessor task terminated due to exceeding the deadline")) + } + }) + + // ADMIN CHECKSUM TABLE . example. + // mysql> admin checksum table test.t; + // +---------+------------+---------------------+-----------+-------------+ + // | Db_name | Table_name | Checksum_crc64_xor | Total_kvs | Total_bytes | + // +---------+------------+---------------------+-----------+-------------+ + // | test | t | 8520875019404689597 | 7296873 | 357601387 | + // +---------+------------+------------- + remoteChecksum = &local.RemoteChecksum{ + Schema: rs[0].GetString(0), + Table: rs[0].GetString(1), + Checksum: rs[0].GetUint64(2), + TotalKVs: rs[0].GetUint64(3), + TotalBytes: rs[0].GetUint64(4), + } + return nil + }) + if !common.IsRetryableError(txnErr) { + break + } + distSQLScanConcurrencyFactor *= 2 + logger.Warn("retry checksum table", zap.Int("retry count", i+1), zap.Error(txnErr)) + } + return remoteChecksum, txnErr +} + +// TestChecksumTable is used to test checksum table in unit test. +func TestChecksumTable(ctx context.Context, executor storage.SessionExecutor, taskMeta *TaskMeta, logger *zap.Logger) (*local.RemoteChecksum, error) { + return checksumTable(ctx, executor, taskMeta, logger) +} + +func setBackoffWeight(se sessionctx.Context, taskMeta *TaskMeta, logger *zap.Logger) error { + backoffWeight := local.DefaultBackoffWeight + if val, ok := taskMeta.Plan.ImportantSysVars[variable.TiDBBackOffWeight]; ok { + if weight, err := strconv.Atoi(val); err == nil && weight > backoffWeight { + backoffWeight = weight + } + } + logger.Info("set backoff weight", zap.Int("weight", backoffWeight)) + return se.GetSessionVars().SetSystemVar(variable.TiDBBackOffWeight, strconv.Itoa(backoffWeight)) +} + +func init() { + scheduler.RegisterSubtaskExectorConstructor(proto.ImportInto, StepImport, + // The order of the subtask executors is the same as the order of the subtasks. + func(minimalTask proto.MinimalTask, step int64) (scheduler.SubtaskExecutor, error) { + task, ok := minimalTask.(*importStepMinimalTask) + if !ok { + return nil, errors.Errorf("invalid task type %T", minimalTask) + } + return &ImportMinimalTaskExecutor{mTtask: task}, nil + }, + ) + scheduler.RegisterSubtaskExectorConstructor(proto.ImportInto, StepPostProcess, + func(minimalTask proto.MinimalTask, step int64) (scheduler.SubtaskExecutor, error) { + mTask, ok := minimalTask.(*postProcessStepMinimalTask) + if !ok { + return nil, errors.Errorf("invalid task type %T", minimalTask) + } + return &postProcessMinimalTaskExecutor{mTask: mTask}, nil + }, + ) +}