Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importinto: integrate global sort(without merge-sort) part 1 #46998

Merged
merged 22 commits into from
Sep 19, 2023

Conversation

D3Hunter
Copy link
Contributor

@D3Hunter D3Hunter commented Sep 15, 2023

What problem does this PR solve?

Issue Number: ref #46704

Problem Summary:

  • add cloud_storage_uri option
  • when use global sort: job step goes from none -> global-sorting -> importing -> validating -> none.
    • otherwise, it's none -> importing -> validating -> none.
  • integrate global sort(without merge-sort part), but since this pr is large, will add test for it later.
  • add Init to dispatcher, if Init returns error, dispatcher manager will fail the task directly, so the returned error should be a fatal error.
  • some refactor

when using global sort, we write index kv using IndexRouteWriter
we route kvs of different index to different writer in order to make
merge sort easier, else kv data of all subtasks will all be overlapped.

drawback of doing this is that the number of writers need to open will be
index-count * encode-concurrency, when the table has many indexes, and each
writer will take 256MiB buffer on default,
this will take a lot of memory, or even OOM.
we can adjust memory buf size later, but might write too many small files.

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Side effects

  • Performance regression: Consumes more CPU
  • Performance regression: Consumes more Memory
  • Breaking backward compatibility

Documentation

  • Affects user behaviors
  • Contains syntax changes
  • Contains variable changes
  • Contains experimental features
  • Changes MySQL compatibility

Release note

Please refer to Release Notes Language Style Guide to write a quality release note.

None

@ti-chi-bot ti-chi-bot bot added release-note-none Denotes a PR that doesn't merit a release note. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. labels Sep 15, 2023
@tiprow
Copy link

tiprow bot commented Sep 15, 2023

Hi @D3Hunter. Thanks for your PR.

PRs from untrusted users cannot be marked as trusted with /ok-to-test in this repo meaning untrusted PR authors can never trigger tests themselves. Collaborators can still trigger tests on the PR using /test all.

I understand the commands that are listed here.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@D3Hunter
Copy link
Contributor Author

/ok-to-test

@ti-chi-bot ti-chi-bot bot added the ok-to-test Indicates a PR is ready to be tested. label Sep 15, 2023
@codecov
Copy link

codecov bot commented Sep 15, 2023

Codecov Report

Merging #46998 (8b59ff8) into master (59e31f0) will decrease coverage by 0.4573%.
Report is 12 commits behind head on master.
The diff coverage is 20.7496%.

Additional details and impacted files
@@               Coverage Diff                @@
##             master     #46998        +/-   ##
================================================
- Coverage   73.0821%   72.6249%   -0.4573%     
================================================
  Files          1336       1358        +22     
  Lines        398227     405296      +7069     
================================================
+ Hits         291033     294346      +3313     
- Misses        88451      92244      +3793     
+ Partials      18743      18706        -37     
Flag Coverage Δ
integration 30.5570% <0.0000%> (?)
unit 72.9633% <20.8333%> (-0.1189%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Components Coverage Δ
dumpling 53.9913% <ø> (ø)
parser 84.9728% <ø> (-0.0252%) ⬇️
br 48.5997% <7.5000%> (-4.3472%) ⬇️

Copy link
Contributor

@ywqzzy ywqzzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Framework PART LGTM

Copy link
Contributor

@lance6716 lance6716 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

15 / 52 files viewed

will review later

br/pkg/lightning/backend/external/util.go Outdated Show resolved Hide resolved
br/pkg/lightning/backend/external/writer.go Show resolved Hide resolved
br/pkg/lightning/backend/local/local.go Outdated Show resolved Hide resolved
Copy link
Contributor

@lance6716 lance6716 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

21 / 52 files viewed

logger := logutil.BgLogger().With(
zap.String("type", gTask.Type),
zap.Int64("task-id", gTask.ID),
zap.String("step", stepStr(gTask.Step)),
zap.String("curr-step", stepStr(gTask.Step)),
Copy link
Contributor

@lance6716 lance6716 Sep 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In master code, I have added nextStep as a parameter of OnNextSubtasksBatch. 😂 My consideration is, GetNextStep can be called only once while OnNextSubtasksBatch maybe called for multiple time for a subtask

Copy link
Contributor Author

@D3Hunter D3Hunter Sep 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see that GetNextStep now has a handle param, we can remove it, and move those logic into OnNextSubtasksBatch, if there's no need to do merge-sort, just return 0 subtask metas, and let dispatcher call it again on next step.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If GetNextStep is heavy to compute, it's better we let framework call it once and send the result to the argument of OnNextSubtasksBatch

if there's no need to do merge-sort, just return 0 subtask metas

Currently I think it's also acceptable. In future we may have a complex step transition and must use non-linear steps.

Copy link
Contributor

@lance6716 lance6716 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

33 / 52 files viewed

// DecodeIndexID decodes indexID from the key.
// this method simply extract index id part, and no other checking.
// Caller should make sure the key is an index key.
func DecodeIndexID(key kv.Key) (int64, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems in below DecodeTableID we should also consider keyspace of APIv2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only that method checks it, seems we don't need to check it here for index kv

	// `V2` use new encoding for RawKV & TxnKV to support more features.
	//
	// Key Encoding:
	//  TiDB: start with `m` or `t`, the same as `V1`.
	//  TxnKV: prefix with `x`, encoded as `MCE( x{keyspace id} + {user key} ) + timestamp`.
	//  RawKV: prefix with `r`, encoded as `MCE( r{keyspace id} + {user key} ) + timestamp`.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, keyspace may be automatically adjusted in client-go, we don't need to care about it here.

@@ -147,6 +147,29 @@ func GetCachedKVStoreFrom(pdAddr string, tls *common.TLS) (tidbkv.Storage, error
return kvStore, nil
}

// GetRegionSplitSizeKeys gets the region split size and keys from PD.
func GetRegionSplitSizeKeys(ctx context.Context) (regionSplitSize int64, regionSplitKeys int64, err error) {
tidbCfg := tidb.GetGlobalConfig()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe in future we can get the singleton PD client of this TiDB instance, and use the latest PD address, in case of PD node is changed.

executor/importer/table_import.go Outdated Show resolved Hide resolved
Copy link
Contributor

@lance6716 lance6716 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

35 / 52 files viewed

switch gTask.Step {
case proto.StepInit:
if metrics, ok := metric.GetCommonMetric(ctx); ok {
metrics.BytesCounter.WithLabelValues(metric.StateTotalRestore).Add(float64(taskMeta.Plan.TotalFileSize))
}
if err := preProcess(ctx, taskHandle, gTask, taskMeta, logger); err != nil {
jobStep := importer.JobStepImporting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and these logic seems more natural as a part of GetNextStep

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, that's import-into job's step, exposed to user and has less steps

Copy link
Contributor

@lance6716 lance6716 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

37 / 52 files viewed

// sorted index kv storage path: /{taskID}/{subtaskID}/index/{indexID}/{workerID}
indexWriterFn := func(indexID int64) *external.Writer {
builder := external.NewWriterBuilder().
SetOnCloseFunc(func(summary *external.WriterSummary) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to let the "WriterSummary" result be returned when Writer.Close, not passing it around using OnClose. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

op.sharedVars.mergeIndexSummary(indexID, summary)
})
prefix := path.Join(strconv.Itoa(int(op.taskID)), strconv.Itoa(int(op.subtaskID)))
writerID := path.Join("index", strconv.Itoa(int(indexID)), strconv.Itoa(int(workerID)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering two nodes are running same subtask (maybe the old one suffers network partition and framework dispatch a new node), we should use random writerID to avoid overwritten. Or maybe WriterBuilder should assign UUID internally 🤔

The result file path will be returned by OnClose, so caller seems don't need to care about the writerID

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering two nodes are running same subtask

makes sense, will change it to a uuid here

so caller seems don't need to care about the writerID

that would be better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disttask/importinto/encode_and_sort_operator.go Outdated Show resolved Hide resolved
Copy link
Contributor

@lance6716 lance6716 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

41 / 52 files viewed

executor/importer/import.go Show resolved Hide resolved
@tiprow
Copy link

tiprow bot commented Sep 19, 2023

@D3Hunter: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
tiprow_fast_test c7202f7 link true /test tiprow_fast_test

Full PR test history. Your PR dashboard.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here.

Comment on lines 513 to 517
cloudStorageURI, err1 := seCtx.GetSessionVars().GlobalVarsAccessor.
GetGlobalSysVar(variable.TiDBCloudStorageURI)
if err1 != nil {
return errors.Trace(err1)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use variable.CloudStorageURI.Load() instead?


// NotNilMin returns the smallest of a and b, ignoring nil values.
func NotNilMin(a, b []byte) []byte {
if len(a) == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if both a and b nil?it will return b(nil)?

Copy link
Contributor Author

@D3Hunter D3Hunter Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's moved from add-index, didn't change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Benjamin2037 Yes, returning a nil value is expected.

}

// NotNilMin returns the smallest of a and b, ignoring nil values.
func NotNilMin(a, b []byte) []byte {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved from ddl/backfilling_dispatcher.go

@ti-chi-bot ti-chi-bot bot added needs-1-more-lgtm Indicates a PR needs 1 more LGTM. approved labels Sep 19, 2023
return err
}
} else {
if err := importer.ProcessChunkWith(ctx, &chunkCheckpoint, sharedVars.TableImporter, dataWriter, indexWriter, sharedVars.Progress, logger); err != nil {
Copy link
Collaborator

@Benjamin2037 Benjamin2037 Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why global sort use ProcessChunkWith, but local use ProcessChunk

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's expected

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the name of function is not consistency.

// If we import the index in `cleanupSubtaskEnv`, the dispatcher will not wait for the import to complete.
// Multiple index engines may suffer performance degradation due to range overlap.
// These issues will be alleviated after we integrate s3 sorter.
// engineID = -1, -2, -3, ...
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this comment used for “// engineID = -1, -2, -3, ...”?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each subtask will have a index engine, that's how we alloc it's id

m.Merge(NewSortedKVMeta(summary))
}

// NotNilMin returns the smallest of a and b, ignoring nil values.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// NotNilMin returns the smallest of a and b, ignoring nil values.
// NotNilMin returns the smallest of a and b.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Sep 19, 2023

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: lance6716, tangenta

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added lgtm and removed needs-1-more-lgtm Indicates a PR needs 1 more LGTM. labels Sep 19, 2023
@ti-chi-bot
Copy link

ti-chi-bot bot commented Sep 19, 2023

[LGTM Timeline notifier]

Timeline:

  • 2023-09-19 09:17:10.659715636 +0000 UTC m=+595396.627303685: ☑️ agreed by lance6716.
  • 2023-09-19 10:22:41.724814656 +0000 UTC m=+599327.692402706: ☑️ agreed by tangenta.

@D3Hunter
Copy link
Contributor Author

/retest

@ti-chi-bot ti-chi-bot bot merged commit 4450ae4 into pingcap:master Sep 19, 2023
17 of 23 checks passed
@D3Hunter D3Hunter deleted the import-global-sort branch September 19, 2023 11:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved lgtm ok-to-test Indicates a PR is ready to be tested. release-note-none Denotes a PR that doesn't merit a release note. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants