Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: support set explicit task type for ddl request #45789

Merged
merged 4 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -6924,13 +6924,13 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "41aee514dad7b095f70a59843b8db9424b54cb1f11baf4f0608e2120768a0ab9",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20230731032349-719e6456f7d5",
sha256 = "ed4a6bacc74d58cca6eb30c8828a3c138c78895782b407e607dc5c13f3b338e7",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20230809050315-300545a8a3c4",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230731032349-719e6456f7d5.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230731032349-719e6456f7d5.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230731032349-719e6456f7d5.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230731032349-719e6456f7d5.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230809050315-300545a8a3c4.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230809050315-300545a8a3c4.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230809050315-300545a8a3c4.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230809050315-300545a8a3c4.zip",
],
)
go_repository(
Expand Down
67 changes: 42 additions & 25 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/pingcap/tidb/util/ranger"
tikverror "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/tikv"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -306,6 +307,8 @@ func getDupDetectClient(
region *split.RegionInfo,
keyRange tidbkv.KeyRange,
importClientFactory ImportClientFactory,
resourceGroupName string,
taskType string,
) (import_sstpb.ImportSST_DuplicateDetectClient, error) {
leader := region.Leader
if leader == nil {
Expand All @@ -319,6 +322,10 @@ func getDupDetectClient(
RegionId: region.Region.GetId(),
RegionEpoch: region.Region.GetRegionEpoch(),
Peer: leader,
ResourceControlContext: &kvrpcpb.ResourceControlContext{
ResourceGroupName: resourceGroupName,
},
RequestSource: kvutil.BuildRequestSource(true, tidbkv.InternalTxnLightning, taskType),
}
req := &import_sstpb.DuplicateDetectRequest{
Context: reqCtx,
Expand All @@ -338,9 +345,11 @@ func NewRemoteDupKVStream(
region *split.RegionInfo,
keyRange tidbkv.KeyRange,
importClientFactory ImportClientFactory,
resourceGroupName string,
taskType string,
) (*RemoteDupKVStream, error) {
subCtx, cancel := context.WithCancel(ctx)
cli, err := getDupDetectClient(subCtx, region, keyRange, importClientFactory)
cli, err := getDupDetectClient(subCtx, region, keyRange, importClientFactory, resourceGroupName, taskType)
if err != nil {
cancel()
return nil, errors.Trace(err)
Expand Down Expand Up @@ -398,17 +407,19 @@ func (s *RemoteDupKVStream) Close() error {
// are stored into the errorMgr.
// this object can only be used once, either for local or remote deduplication.
type DupeDetector struct {
tbl table.Table
tableName string
splitCli split.SplitClient
tikvCli *tikv.KVStore
tikvCodec tikv.Codec
errorMgr *errormanager.ErrorManager
decoder *kv.TableKVDecoder
logger log.Logger
concurrency int
hasDupe atomic.Bool
indexID int64
tbl table.Table
tableName string
splitCli split.SplitClient
tikvCli *tikv.KVStore
tikvCodec tikv.Codec
errorMgr *errormanager.ErrorManager
decoder *kv.TableKVDecoder
logger log.Logger
concurrency int
hasDupe atomic.Bool
indexID int64
resourceGroupName string
taskType string
}

// NewDupeDetector creates a new DupeDetector.
Expand All @@ -422,23 +433,27 @@ func NewDupeDetector(
sessOpts *encode.SessionOptions,
concurrency int,
logger log.Logger,
resourceGroupName string,
taskType string,
) (*DupeDetector, error) {
logger = logger.With(zap.String("tableName", tableName))
decoder, err := kv.NewTableKVDecoder(tbl, tableName, sessOpts, logger)
if err != nil {
return nil, errors.Trace(err)
}
return &DupeDetector{
tbl: tbl,
tableName: tableName,
splitCli: splitCli,
tikvCli: tikvCli,
tikvCodec: tikvCodec,
errorMgr: errMgr,
decoder: decoder,
logger: logger,
concurrency: concurrency,
indexID: sessOpts.IndexID,
tbl: tbl,
tableName: tableName,
splitCli: splitCli,
tikvCli: tikvCli,
tikvCodec: tikvCodec,
errorMgr: errMgr,
decoder: decoder,
logger: logger,
concurrency: concurrency,
indexID: sessOpts.IndexID,
resourceGroupName: resourceGroupName,
taskType: taskType,
}, nil
}

Expand Down Expand Up @@ -812,7 +827,7 @@ func (m *DupeDetector) processRemoteDupTaskOnce(
logutil.Key("dupDetectEndKey", kr.EndKey),
)
err := func() error {
stream, err := NewRemoteDupKVStream(ctx, region, kr, importClientFactory)
stream, err := NewRemoteDupKVStream(ctx, region, kr, importClientFactory, m.resourceGroupName, m.taskType)
if err != nil {
return errors.Annotatef(err, "failed to create remote duplicate kv stream")
}
Expand Down Expand Up @@ -937,6 +952,8 @@ type DupeController struct {
duplicateDB *pebble.DB
keyAdapter KeyAdapter
importClientFactory ImportClientFactory
resourceGroupName string
taskType string
}

// CollectLocalDuplicateRows collect duplicate keys from local db. We will store the duplicate keys which
Expand All @@ -948,7 +965,7 @@ func (local *DupeController) CollectLocalDuplicateRows(ctx context.Context, tbl
}()

duplicateManager, err := NewDupeDetector(tbl, tableName, local.splitCli, local.tikvCli, local.tikvCodec,
local.errorMgr, opts, local.dupeConcurrency, log.FromContext(ctx))
local.errorMgr, opts, local.dupeConcurrency, log.FromContext(ctx), local.resourceGroupName, local.taskType)
if err != nil {
return false, errors.Trace(err)
}
Expand All @@ -967,7 +984,7 @@ func (local *DupeController) CollectRemoteDuplicateRows(ctx context.Context, tbl
}()

duplicateManager, err := NewDupeDetector(tbl, tableName, local.splitCli, local.tikvCli, local.tikvCodec,
local.errorMgr, opts, local.dupeConcurrency, log.FromContext(ctx))
local.errorMgr, opts, local.dupeConcurrency, log.FromContext(ctx), local.resourceGroupName, local.taskType)
if err != nil {
return false, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/duplicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestBuildDupTask(t *testing.T) {
}
for _, tc := range testCases {
dupMgr, err := local.NewDupeDetector(tbl, "t", nil, nil, keyspace.CodecV1, nil,
tc.sessOpt, 4, log.FromContext(context.Background()))
tc.sessOpt, 4, log.FromContext(context.Background()), "test", "lightning")
require.NoError(t, err)
tasks, err := local.BuildDuplicateTaskForTest(dupMgr)
require.NoError(t, err)
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,11 +423,12 @@ type BackendConfig struct {
// the scope when pause PD schedulers.
PausePDSchedulerScope config.PausePDSchedulerScope
ResourceGroupName string
TaskType string
RaftKV2SwitchModeDuration time.Duration
}

// NewBackendConfig creates a new BackendConfig.
func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName, resourceGroupName string, raftKV2SwitchModeDuration time.Duration) BackendConfig {
func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName, resourceGroupName, taskType string, raftKV2SwitchModeDuration time.Duration) BackendConfig {
return BackendConfig{
PDAddr: cfg.TiDB.PdAddr,
LocalStoreDir: cfg.TikvImporter.SortedKVDir,
Expand All @@ -449,6 +450,7 @@ func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName, resour
KeyspaceName: keyspaceName,
PausePDSchedulerScope: cfg.TikvImporter.PausePDSchedulerScope,
ResourceGroupName: resourceGroupName,
TaskType: taskType,
RaftKV2SwitchModeDuration: raftKV2SwitchModeDuration,
}
}
Expand Down Expand Up @@ -1680,6 +1682,8 @@ func (local *Backend) GetDupeController(dupeConcurrency int, errorMgr *errormana
duplicateDB: local.duplicateDB,
keyAdapter: local.keyAdapter,
importClientFactory: local.importClientFactory,
resourceGroupName: local.ResourceGroupName,
taskType: local.TaskType,
}
}

Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
ResourceControlContext: &kvrpcpb.ResourceControlContext{
ResourceGroupName: local.ResourceGroupName,
},
RequestSource: util.BuildRequestSource(true, kv.InternalTxnLightning, util.ExplicitTypeLightning),
RequestSource: util.BuildRequestSource(true, kv.InternalTxnLightning, local.TaskType),
},
}
for _, peer := range region.GetPeers() {
Expand Down Expand Up @@ -600,7 +600,7 @@ func (local *Backend) doIngest(ctx context.Context, j *regionJob) (*sst.IngestRe
ResourceControlContext: &kvrpcpb.ResourceControlContext{
ResourceGroupName: local.ResourceGroupName,
},
RequestSource: util.BuildRequestSource(true, kv.InternalTxnLightning, util.ExplicitTypeLightning),
RequestSource: util.BuildRequestSource(true, kv.InternalTxnLightning, local.TaskType),
}

if supportMultiIngest {
Expand Down
7 changes: 1 addition & 6 deletions br/pkg/lightning/importer/checksum_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,7 @@ func NewChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) (
backoffWeight = local.DefaultBackoffWeight
}

explicitRequestSourceType, err := common.GetExplicitRequestSourceTypeFromDB(ctx, rc.db)
if err != nil {
log.FromContext(ctx).Warn("get tidb_request_source_type failed", zap.Error(err), zap.String("tidb_request_source_type", explicitRequestSourceType))
return nil, errors.Trace(err)
}
manager = local.NewTiKVChecksumManager(store.GetClient(), rc.pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight, rc.resourceGroupName, explicitRequestSourceType)
manager = local.NewTiKVChecksumManager(store.GetClient(), rc.pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight, rc.resourceGroupName, rc.taskType)
} else {
manager = local.NewTiDBChecksumExecutor(rc.db)
}
Expand Down
17 changes: 16 additions & 1 deletion br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/driver"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/collate"
Expand All @@ -64,6 +65,7 @@ import (
"github.com/pingcap/tidb/util/set"
"github.com/prometheus/client_golang/prometheus"
tikvconfig "github.com/tikv/client-go/v2/config"
kvutil "github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
"go.uber.org/multierr"
Expand Down Expand Up @@ -235,6 +237,7 @@ type Controller struct {

keyspaceName string
resourceGroupName string
taskType string
}

// LightningStatus provides the finished bytes and total bytes of the current task.
Expand Down Expand Up @@ -274,6 +277,8 @@ type ControllerParam struct {
KeyspaceName string
// ResourceGroup name for current TiDB user
ResourceGroupName string
// TaskType is the source component name use for background task control.
TaskType string
}

// NewImportController creates a new Controller instance.
Expand Down Expand Up @@ -383,6 +388,15 @@ func NewImportControllerWithPauser(
}
}

taskType, err := common.GetExplicitRequestSourceTypeFromDB(ctx, db)
if err != nil {
return nil, errors.Annotatef(err, "get system variable '%s' failed", variable.TiDBExplicitRequestSourceType)
}
if taskType == "" {
taskType = kvutil.ExplicitTypeLightning
}
p.TaskType = taskType

isRaftKV2, err := common.IsRaftKV2(ctx, db)
if err != nil {
log.FromContext(ctx).Warn("check isRaftKV2 failed", zap.Error(err))
Expand All @@ -391,7 +405,7 @@ func NewImportControllerWithPauser(
if isRaftKV2 {
raftKV2SwitchModeDuration = cfg.Cron.SwitchMode.Duration
}
backendConfig := local.NewBackendConfig(cfg, maxOpenFiles, p.KeyspaceName, p.ResourceGroupName, raftKV2SwitchModeDuration)
backendConfig := local.NewBackendConfig(cfg, maxOpenFiles, p.KeyspaceName, p.ResourceGroupName, p.TaskType, raftKV2SwitchModeDuration)
backendObj, err = local.NewBackend(ctx, tls, backendConfig, regionSizeGetter)
if err != nil {
return nil, common.NormalizeOrWrapErr(common.ErrUnknown, err)
Expand Down Expand Up @@ -494,6 +508,7 @@ func NewImportControllerWithPauser(

keyspaceName: p.KeyspaceName,
resourceGroupName: p.ResourceGroupName,
taskType: p.TaskType,
}

return rc, nil
Expand Down
3 changes: 3 additions & 0 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/util/topsql"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/tikv"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -775,6 +776,7 @@ func iterateSnapshotKeys(ctx *JobContext, store kv.Storage, priority int, keyPre
snap.SetOption(kv.Priority, priority)
snap.SetOption(kv.RequestSourceInternal, true)
snap.SetOption(kv.RequestSourceType, ctx.ddlJobSourceType())
snap.SetOption(kv.ExplicitRequestSourceType, kvutil.ExplicitTypeDDL)
if tagger := ctx.getResourceGroupTaggerForTopSQL(); tagger != nil {
snap.SetOption(kv.ResourceGroupTagger, tagger)
}
Expand Down Expand Up @@ -824,6 +826,7 @@ func GetRangeEndKey(ctx *JobContext, store kv.Storage, priority int, keyPrefix k
}
snap.SetOption(kv.RequestSourceInternal, true)
snap.SetOption(kv.RequestSourceType, ctx.ddlJobSourceType())
snap.SetOption(kv.ExplicitRequestSourceType, kvutil.ExplicitTypeDDL)
it, err := snap.IterReverse(endKey, nil)
if err != nil {
return nil, errors.Trace(err)
Expand Down
3 changes: 2 additions & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
decoder "github.com/pingcap/tidb/util/rowDecoder"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/sqlexec"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -1439,7 +1440,7 @@ func (w *updateColumnWorker) cleanRowMap() {
// BackfillData will backfill the table record in a transaction. A lock corresponds to a rowKey if the value of rowKey is changed.
func (w *updateColumnWorker) BackfillData(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
oprStartTime := time.Now()
ctx := kv.WithInternalSourceType(context.Background(), w.jobContext.ddlJobSourceType())
ctx := kv.WithInternalSourceAndTaskType(context.Background(), w.jobContext.ddlJobSourceType(), kvutil.ExplicitTypeDDL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it cannot use WithInternalSourceType? it should also be ddl?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ddlJobSourceType() returns something like "ddl_add_index", so in our current check logic, this can't be recognized as ddl.

errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
Expand Down
3 changes: 2 additions & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/util/topsql"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/tikv/client-go/v2/tikvrpc"
kvutil "github.com/tikv/client-go/v2/util"
clientv3 "go.etcd.io/etcd/client/v3"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -700,7 +701,7 @@ func (w *JobContext) setDDLLabelForDiagnosis(jobType model.ActionType) {
return
}
w.tp = getDDLRequestSource(jobType)
w.ddlJobCtx = kv.WithInternalSourceType(w.ddlJobCtx, w.ddlJobSourceType())
w.ddlJobCtx = kv.WithInternalSourceAndTaskType(w.ddlJobCtx, w.ddlJobSourceType(), kvutil.ExplicitTypeDDL)
}

func (w *worker) HandleJobDone(d *ddlCtx, job *model.Job, t *meta.Meta) error {
Expand Down
5 changes: 3 additions & 2 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -1732,7 +1733,7 @@ func (w *addIndexTxnWorker) BackfillData(handleRange reorgBackfillTask) (taskCtx

oprStartTime := time.Now()
jobID := handleRange.getJobID()
ctx := kv.WithInternalSourceType(context.Background(), w.jobContext.ddlJobSourceType())
ctx := kv.WithInternalSourceAndTaskType(context.Background(), w.jobContext.ddlJobSourceType(), kvutil.ExplicitTypeDDL)
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) (err error) {
taskCtx.finishTS = txn.StartTS()
taskCtx.addedCount = 0
Expand Down Expand Up @@ -2081,7 +2082,7 @@ func (w *cleanUpIndexWorker) BackfillData(handleRange reorgBackfillTask) (taskCt
})

oprStartTime := time.Now()
ctx := kv.WithInternalSourceType(context.Background(), w.jobContext.ddlJobSourceType())
ctx := kv.WithInternalSourceAndTaskType(context.Background(), w.jobContext.ddlJobSourceType(), kvutil.ExplicitTypeDDL)
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
Expand Down
Loading