Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: Support parallel sort spill #50747

Merged
merged 232 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 212 commits
Commits
Show all changes
232 commits
Select commit Hold shift + click to select a range
9fc0d6e
add todo
xzhangxian1008 Oct 31, 2023
a10131c
Merge branch 'master' of https://github.com/pingcap/tidb into refine-…
xzhangxian1008 Nov 1, 2023
bad38da
Merge branch 'master' of https://github.com/pingcap/tidb into refine-…
xzhangxian1008 Nov 2, 2023
c7fbaba
Merge branch 'master' of https://github.com/pingcap/tidb into refine-…
xzhangxian1008 Nov 6, 2023
e852eb8
refactor
xzhangxian1008 Nov 7, 2023
585468f
init
xzhangxian1008 Nov 7, 2023
cea5d2c
fix ci
xzhangxian1008 Nov 7, 2023
1be2094
merge
xzhangxian1008 Nov 7, 2023
83ad190
save
xzhangxian1008 Nov 7, 2023
211de79
Merge branch 'master' of https://github.com/pingcap/tidb into refine-…
xzhangxian1008 Nov 13, 2023
5acdfe3
refine
xzhangxian1008 Nov 13, 2023
7ebfbde
reset list.go
xzhangxian1008 Nov 13, 2023
40b2d6a
save
xzhangxian1008 Nov 13, 2023
5bcd7ad
codes done, need tests
xzhangxian1008 Nov 13, 2023
95b71d9
merge
xzhangxian1008 Nov 13, 2023
5964db0
move sth to sort_util.go
xzhangxian1008 Nov 14, 2023
46d43a9
refine code
xzhangxian1008 Nov 14, 2023
be4d08e
refine code
xzhangxian1008 Nov 14, 2023
13d4dec
refine codes
xzhangxian1008 Nov 14, 2023
5affe1d
tweaking
xzhangxian1008 Nov 14, 2023
597f5e2
Merge branch 'master' of https://github.com/pingcap/tidb into refine-…
xzhangxian1008 Nov 14, 2023
1ba2e19
add todo
xzhangxian1008 Nov 14, 2023
085bb87
fix
xzhangxian1008 Nov 14, 2023
416636a
fix crash
xzhangxian1008 Nov 14, 2023
7b816dc
tweaking
xzhangxian1008 Nov 14, 2023
1a51794
tweaking
xzhangxian1008 Nov 14, 2023
b60091b
fix bugs
xzhangxian1008 Nov 15, 2023
915b58c
merge
xzhangxian1008 Nov 17, 2023
3dd65db
add tests
xzhangxian1008 Nov 20, 2023
9481e5b
Merge branch 'master' of https://github.com/pingcap/tidb into refine-…
xzhangxian1008 Nov 20, 2023
5930a4b
fix spill limit
xzhangxian1008 Nov 20, 2023
cace8d9
revoke spill trigger
xzhangxian1008 Nov 20, 2023
25f4e0a
validate correctness for sort test
xzhangxian1008 Nov 20, 2023
1414719
remove useless comment
xzhangxian1008 Nov 20, 2023
171c50e
tweaking
xzhangxian1008 Nov 20, 2023
b12d2f7
remove useless tests
xzhangxian1008 Nov 20, 2023
dac80a0
Merge branch 'master' of https://github.com/pingcap/tidb into refine-…
xzhangxian1008 Nov 21, 2023
be7d37a
remove useless codes
xzhangxian1008 Dec 4, 2023
5fba709
merge
xzhangxian1008 Dec 4, 2023
65d4fdf
bug needs fix
xzhangxian1008 Dec 4, 2023
edc5985
add some tests
xzhangxian1008 Dec 5, 2023
312e1c0
pass error from spill
xzhangxian1008 Dec 5, 2023
a754672
tweaking
xzhangxian1008 Dec 5, 2023
aa12e76
refine
xzhangxian1008 Dec 5, 2023
a871782
fix ci
xzhangxian1008 Dec 6, 2023
bb71717
create file
xzhangxian1008 Dec 7, 2023
285b5fb
address comments
xzhangxian1008 Dec 7, 2023
936ee2a
update bazel
xzhangxian1008 Dec 7, 2023
50269fe
refactor
xzhangxian1008 Dec 7, 2023
656eec1
add todo
xzhangxian1008 Dec 7, 2023
be6b18d
save
xzhangxian1008 Dec 7, 2023
540db7d
tweaking
xzhangxian1008 Dec 8, 2023
403c54c
save
xzhangxian1008 Dec 8, 2023
ae18b03
save
xzhangxian1008 Dec 8, 2023
af1e6c3
add some tests
xzhangxian1008 Dec 11, 2023
4fedd88
save
xzhangxian1008 Dec 11, 2023
20ac1b3
update
xzhangxian1008 Dec 12, 2023
08eb30d
fix bugs
xzhangxian1008 Dec 13, 2023
4c7738f
merge
xzhangxian1008 Dec 13, 2023
7e5b4a2
tweaking
xzhangxian1008 Dec 13, 2023
e216c7f
add todo
xzhangxian1008 Dec 13, 2023
17866cf
delete originOnExceed
xzhangxian1008 Dec 13, 2023
1d3278c
tweaking
xzhangxian1008 Dec 14, 2023
10ae250
unconsume all
xzhangxian1008 Dec 14, 2023
58bd4f6
tweaking
xzhangxian1008 Dec 14, 2023
6246f55
uncomment
xzhangxian1008 Dec 14, 2023
103ba0d
merge
xzhangxian1008 Dec 14, 2023
63a13c9
tweaking
xzhangxian1008 Dec 14, 2023
8291566
fix mpmcqueue and add tests
xzhangxian1008 Dec 15, 2023
1e351e8
fix bugs
xzhangxian1008 Dec 15, 2023
33626e3
address comments
xzhangxian1008 Dec 15, 2023
90720f4
Merge branch 'master' of https://github.com/pingcap/tidb into refine-…
xzhangxian1008 Dec 29, 2023
d901a47
address comment
xzhangxian1008 Dec 29, 2023
349430d
fix ci
xzhangxian1008 Dec 29, 2023
25cb181
put some logic into sort partition
xzhangxian1008 Dec 29, 2023
6d0b1d0
add todo
xzhangxian1008 Dec 29, 2023
729d7af
address comment
xzhangxian1008 Jan 3, 2024
a8c81b4
tweaking
xzhangxian1008 Jan 3, 2024
389f545
remove savedChunks
xzhangxian1008 Jan 3, 2024
85d03d8
remove useless codes
xzhangxian1008 Jan 3, 2024
3681dec
fix bug
xzhangxian1008 Jan 3, 2024
77eae04
remove useless field
xzhangxian1008 Jan 3, 2024
7ccad75
remove spillHelper
xzhangxian1008 Jan 3, 2024
c898a59
add tests
xzhangxian1008 Jan 4, 2024
42e6c4c
tweaking
xzhangxian1008 Jan 4, 2024
e1a590f
address comment
xzhangxian1008 Jan 4, 2024
5b740a2
tweaking
xzhangxian1008 Jan 4, 2024
5991094
fix analyze
xzhangxian1008 Jan 4, 2024
7a97bd2
tweaking
xzhangxian1008 Jan 4, 2024
cfdc9d3
fix ut
xzhangxian1008 Jan 4, 2024
caed9ca
fix ci
xzhangxian1008 Jan 4, 2024
380020b
tweaking
xzhangxian1008 Jan 4, 2024
5293652
tweaking
xzhangxian1008 Jan 4, 2024
6c4d00b
change to chunk iter
xzhangxian1008 Jan 4, 2024
2458060
add comment
xzhangxian1008 Jan 4, 2024
2498c89
aa
xzhangxian1008 Jan 4, 2024
14d9445
address comment
xzhangxian1008 Jan 4, 2024
9cf5935
a
xzhangxian1008 Jan 5, 2024
baab197
tweaking
xzhangxian1008 Jan 5, 2024
13e6079
fix ut
xzhangxian1008 Jan 5, 2024
915c4d0
remove useless function
xzhangxian1008 Jan 5, 2024
bccf964
update bazel
xzhangxian1008 Jan 5, 2024
9b5b872
fix ut
xzhangxian1008 Jan 5, 2024
58ea1c9
a
xzhangxian1008 Jan 5, 2024
fa8f62b
merge master
xzhangxian1008 Jan 5, 2024
dad50fb
merge sort refine
xzhangxian1008 Jan 5, 2024
3ee06b5
refine ut
xzhangxian1008 Jan 5, 2024
0e44add
replace with faster sort function
xzhangxian1008 Jan 8, 2024
3c0830c
refine
xzhangxian1008 Jan 8, 2024
d768973
merge
xzhangxian1008 Jan 9, 2024
8eb2791
switch code position
xzhangxian1008 Jan 9, 2024
1f2dbff
tweaking
xzhangxian1008 Jan 9, 2024
e48899d
update bazel
xzhangxian1008 Jan 9, 2024
f076e5b
save
xzhangxian1008 Jan 9, 2024
e62771d
move sortedRowsList into SortExec
xzhangxian1008 Jan 9, 2024
fcb3d50
Merge branch 'parallel-sort' into parallel-sort-spill
xzhangxian1008 Jan 9, 2024
336a9ba
save
xzhangxian1008 Jan 10, 2024
f6e85a4
remove local queue
xzhangxian1008 Jan 10, 2024
28b4abc
merge
xzhangxian1008 Jan 10, 2024
e61e7fd
refine
xzhangxian1008 Jan 10, 2024
a76d35a
refine
xzhangxian1008 Jan 10, 2024
32171fa
merge
xzhangxian1008 Jan 10, 2024
b55c0f2
replace mpmcqueue with channel
xzhangxian1008 Jan 10, 2024
53afd3f
update bazel
xzhangxian1008 Jan 10, 2024
14df456
merge
xzhangxian1008 Jan 10, 2024
35e5984
fix ci
xzhangxian1008 Jan 11, 2024
22e17e7
add failpoint
xzhangxian1008 Jan 11, 2024
c729a68
add comment and complete todo
xzhangxian1008 Jan 11, 2024
c643fef
fix ci
xzhangxian1008 Jan 11, 2024
21977f8
fix comment
xzhangxian1008 Jan 11, 2024
788c450
merge
xzhangxian1008 Jan 12, 2024
186ed7d
save
xzhangxian1008 Jan 12, 2024
1ce71f4
address comment
xzhangxian1008 Jan 16, 2024
34083c8
tweaking
xzhangxian1008 Jan 16, 2024
cda09d3
tweaking
xzhangxian1008 Jan 16, 2024
c5f6a4f
tweaking
xzhangxian1008 Jan 16, 2024
f8ce53b
merge
xzhangxian1008 Jan 16, 2024
2125940
refine
xzhangxian1008 Jan 16, 2024
d9e6491
merge branch parallel_sort
xzhangxian1008 Jan 16, 2024
14a801b
update
xzhangxian1008 Jan 16, 2024
97147d5
add finishCh
xzhangxian1008 Jan 16, 2024
fc97856
address comment
xzhangxian1008 Jan 17, 2024
8527e90
Merge branch 'master' of https://github.com/pingcap/tidb into paralle…
xzhangxian1008 Jan 17, 2024
eff836e
merge
xzhangxian1008 Jan 17, 2024
96a62ca
fix bug
xzhangxian1008 Jan 17, 2024
195133f
codes done, need tests
xzhangxian1008 Jan 17, 2024
e3e53fd
remove useless codes and udpate bazel
xzhangxian1008 Jan 17, 2024
ac05f1e
address some comments
xzhangxian1008 Jan 17, 2024
357718f
tweaking
xzhangxian1008 Jan 17, 2024
6058b30
address comment
xzhangxian1008 Jan 18, 2024
7580262
merge parallel-sort branch
xzhangxian1008 Jan 18, 2024
8acd6c0
ready to test
xzhangxian1008 Jan 18, 2024
34eff5e
tweaking
xzhangxian1008 Jan 18, 2024
c67b9c6
save
xzhangxian1008 Jan 18, 2024
fc4b727
refine close
xzhangxian1008 Jan 18, 2024
d894a3a
tweaking
xzhangxian1008 Jan 18, 2024
102d969
merge parallel-sort branch
xzhangxian1008 Jan 18, 2024
ad98c71
fix bug
xzhangxian1008 Jan 19, 2024
afd9a8c
refactor parallel sort
xzhangxian1008 Jan 21, 2024
f6fb1b0
remove useless codes
xzhangxian1008 Jan 21, 2024
5d10251
tweaking
xzhangxian1008 Jan 22, 2024
ef696ff
remove useless comment
xzhangxian1008 Jan 22, 2024
229a4b8
remove todo
xzhangxian1008 Jan 22, 2024
8c258ab
move initKWayMerge
xzhangxian1008 Jan 22, 2024
a369c56
tweaking
xzhangxian1008 Jan 23, 2024
c07f634
fix ci
xzhangxian1008 Jan 23, 2024
65c9a6f
merge
xzhangxian1008 Jan 23, 2024
3b0755a
fix ci
xzhangxian1008 Jan 24, 2024
757f88d
Merge branch 'master' of https://github.com/pingcap/tidb into paralle…
xzhangxian1008 Jan 24, 2024
4a0c6c8
merge
xzhangxian1008 Jan 24, 2024
307ec06
save
xzhangxian1008 Jan 24, 2024
5b8d58b
save
xzhangxian1008 Jan 24, 2024
44d13fe
Merge branch 'master' of https://github.com/pingcap/tidb into paralle…
xzhangxian1008 Jan 24, 2024
2408efe
tweaking
xzhangxian1008 Jan 24, 2024
3a1d18b
refine
xzhangxian1008 Jan 24, 2024
46e9e92
refactor and not done
xzhangxian1008 Jan 26, 2024
81e48e1
remove useless codes
xzhangxian1008 Jan 26, 2024
c5538e1
update bazel
xzhangxian1008 Jan 29, 2024
143c7cb
update introduction
xzhangxian1008 Jan 29, 2024
4c036e7
merge master
xzhangxian1008 Jan 29, 2024
bce3684
merge
xzhangxian1008 Jan 31, 2024
7ca7908
fix
xzhangxian1008 Jan 31, 2024
d76d5e2
tweaking
xzhangxian1008 Jan 31, 2024
9ef960a
codes done
xzhangxian1008 Jan 31, 2024
59f39ba
tweaking
xzhangxian1008 Jan 31, 2024
fb85d8b
fixc
xzhangxian1008 Jan 31, 2024
229a53a
fix bugs
xzhangxian1008 Feb 1, 2024
28831d3
tweaking
xzhangxian1008 Feb 1, 2024
741f2e7
set finished
xzhangxian1008 Feb 2, 2024
756cf86
fix bugs and add tests
xzhangxian1008 Feb 2, 2024
9160f7e
tweaking
xzhangxian1008 Feb 4, 2024
59b5e46
tweaking
xzhangxian1008 Feb 4, 2024
b1d9faf
address comment
xzhangxian1008 Feb 4, 2024
8d849c3
merge
xzhangxian1008 Feb 4, 2024
5c9acfc
merge
xzhangxian1008 Feb 4, 2024
45f1add
tweaking
xzhangxian1008 Feb 4, 2024
2ffbff3
ready to add random failpoint tests
xzhangxian1008 Feb 6, 2024
2923d6f
pull remote master
xzhangxian1008 Feb 6, 2024
ed3f9b2
fix bugs and refine tests
xzhangxian1008 Feb 6, 2024
df58e9a
Merge branch 'master' of https://github.com/pingcap/tidb into paralle…
xzhangxian1008 Feb 6, 2024
83a4e61
refine test
xzhangxian1008 Feb 6, 2024
cb7121e
merge
xzhangxian1008 Feb 6, 2024
840caa3
add random failpoint tests
xzhangxian1008 Feb 6, 2024
325a141
merge
xzhangxian1008 Feb 27, 2024
c72cd32
uncomment tests
xzhangxian1008 Feb 27, 2024
817fd04
tweaking
xzhangxian1008 Feb 27, 2024
9747e8b
Merge branch 'master' of https://github.com/pingcap/tidb into paralle…
xzhangxian1008 Feb 28, 2024
4519bcf
Merge branch 'master' of https://github.com/pingcap/tidb into paralle…
xzhangxian1008 Feb 28, 2024
475ebb9
add fallback
xzhangxian1008 Feb 28, 2024
fc6b90b
fix diskTracker's incorrect type
xzhangxian1008 Mar 7, 2024
1006752
tweaking
xzhangxian1008 Mar 7, 2024
09605dc
tweaking
xzhangxian1008 Mar 8, 2024
3393c2d
tweaking
xzhangxian1008 Mar 11, 2024
cc7b2d7
tweaking
xzhangxian1008 Mar 11, 2024
894cb60
refine
xzhangxian1008 Mar 11, 2024
5f25004
Merge branch 'master' of https://github.com/pingcap/tidb into paralle…
xzhangxian1008 Mar 11, 2024
a66000d
rename
xzhangxian1008 Mar 11, 2024
fef756a
address comments
xzhangxian1008 Mar 12, 2024
e8460aa
Merge branch 'master' of https://github.com/pingcap/tidb into paralle…
xzhangxian1008 Mar 18, 2024
41c50c7
tweaking
xzhangxian1008 Mar 18, 2024
a59916d
tweaking
xzhangxian1008 Mar 18, 2024
391c4a8
tweaking
xzhangxian1008 Mar 18, 2024
a59454c
Merge branch 'master' of https://github.com/pingcap/tidb into paralle…
xzhangxian1008 Mar 18, 2024
5b44802
save
xzhangxian1008 Mar 21, 2024
892f158
add implement
xzhangxian1008 Mar 22, 2024
581c1e2
add implementation
xzhangxian1008 Mar 22, 2024
d159e28
Merge branch 'master' of https://github.com/pingcap/tidb into paralle…
xzhangxian1008 Mar 26, 2024
72e38d6
address comments
xzhangxian1008 Mar 26, 2024
3ea5a29
udpate bazel
xzhangxian1008 Mar 27, 2024
50d91f6
fix ci
xzhangxian1008 Mar 27, 2024
6937a99
update bazel
xzhangxian1008 Mar 27, 2024
df433c1
disable some failpoints
xzhangxian1008 Mar 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/executor/sortexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "sortexec",
srcs = [
"parallel_sort_spill_helper.go",
"parallel_sort_worker.go",
"sort.go",
"sort_partition.go",
Expand Down Expand Up @@ -36,7 +37,7 @@ go_test(
timeout = "short",
srcs = ["sort_test.go"],
flaky = True,
shard_count = 8,
shard_count = 10,
deps = [
"//pkg/config",
"//pkg/sessionctx/variable",
Expand All @@ -52,6 +53,7 @@ go_test(
timeout = "short",
srcs = [
"benchmark_test.go",
"parallel_sort_spill_test.go",
"parallel_sort_test.go",
"sort_spill_test.go",
"sort_test.go",
Expand Down
225 changes: 225 additions & 0 deletions pkg/executor/sortexec/parallel_sort_spill_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sortexec

import (
"sync"
"sync/atomic"

"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)

type parallelSortSpillHelper struct {
cond *sync.Cond
spillStatus int
sortedRowsInDisk []*chunk.DataInDiskByChunks
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved
sortExec *SortExec

lessRowFunc func(chunk.Row, chunk.Row) int
errOutputChan chan rowWithError
finishCh chan struct{}

fieldTypes []*types.FieldType
tmpSpillChunk *chunk.Chunk

bytesConsumed atomic.Int64
bytesLimit atomic.Int64
}

func newParallelSortSpillHelper(sortExec *SortExec, fieldTypes []*types.FieldType, finishCh chan struct{}, lessRowFunc func(chunk.Row, chunk.Row) int, errOutputChan chan rowWithError) *parallelSortSpillHelper {
return &parallelSortSpillHelper{
cond: sync.NewCond(new(sync.Mutex)),
spillStatus: notSpilled,
sortExec: sortExec,
lessRowFunc: lessRowFunc,
errOutputChan: errOutputChan,
finishCh: finishCh,
fieldTypes: fieldTypes,
tmpSpillChunk: chunk.NewChunkWithCapacity(fieldTypes, spillChunkSize),
}
}

func (p *parallelSortSpillHelper) isNotSpilledNoLock() bool {
return p.spillStatus == notSpilled
}

func (p *parallelSortSpillHelper) isInSpillingNoLock() bool {
return p.spillStatus == inSpilling
}

func (p *parallelSortSpillHelper) isSpillNeeded() bool {
p.cond.L.Lock()
defer p.cond.L.Unlock()
return p.spillStatus == needSpill
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved
}

func (p *parallelSortSpillHelper) isSpillTriggered() bool {
p.cond.L.Lock()
defer p.cond.L.Unlock()
return len(p.sortedRowsInDisk) > 0
}

func (p *parallelSortSpillHelper) setInSpilling() {
p.cond.L.Lock()
defer p.cond.L.Unlock()
p.spillStatus = inSpilling
}

func (p *parallelSortSpillHelper) setNeedSpillNoLock() {
p.spillStatus = needSpill
}

func (p *parallelSortSpillHelper) setNotSpilled() {
p.cond.L.Lock()
defer p.cond.L.Unlock()
p.spillStatus = notSpilled
}

func (p *parallelSortSpillHelper) spill() (err error) {
defer func() {
if r := recover(); r != nil {
err = util.GetRecoverError(r)
}
}()

select {
case <-p.finishCh:
return nil
default:
}

workerNum := len(p.sortExec.Parallel.workers)
workerWaiter := &sync.WaitGroup{}
workerWaiter.Add(workerNum)
sortedRowsIters := make([]*chunk.Iterator4Slice, workerNum)
for i := 0; i < workerNum; i++ {
go func(idx int) {
defer func() {
if r := recover(); r != nil {
processPanicAndLog(p.errOutputChan, r)
}
workerWaiter.Done()
}()

sortedRowsIters[idx] = chunk.NewIterator4Slice(nil)
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved
sortedRowsIters[idx].Reset(p.sortExec.Parallel.workers[idx].sortLocalRows())
injectParallelSortRandomFail(200)
}(i)
}
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved

workerWaiter.Wait()
p.setInSpilling()

// Spill is done, broadcast to wake up all sleep goroutines
defer p.cond.Broadcast()
defer p.setNotSpilled()

totalRows := 0
for i := range sortedRowsIters {
totalRows += sortedRowsIters[i].Len()
}

merger := newMultiWayMerger(sortedRowsIters, p.lessRowFunc)
merger.init()
return p.spillImpl(merger)
}

func (p *parallelSortSpillHelper) releaseMemory() {
totalReleasedMemory := int64(0)
for _, worker := range p.sortExec.Parallel.workers {
totalReleasedMemory += worker.totalMemoryUsage
worker.totalMemoryUsage = 0
}
p.sortExec.memTracker.Consume(-totalReleasedMemory)
}

func (p *parallelSortSpillHelper) spillTmpSpillChunk(inDisk *chunk.DataInDiskByChunks) error {
err := inDisk.Add(p.tmpSpillChunk)
if err != nil {
return err
}
p.tmpSpillChunk.Reset()
return nil
}

func (p *parallelSortSpillHelper) spillImpl(merger *multiWayMerger) error {
logutil.BgLogger().Info(spillInfo, zap.Int64("consumed", p.bytesConsumed.Load()), zap.Int64("quota", p.bytesLimit.Load()))
p.tmpSpillChunk.Reset()
inDisk := chunk.NewDataInDiskByChunks(p.fieldTypes)
inDisk.GetDiskTracker().AttachTo(p.sortExec.diskTracker)

spilledRowChannel := make(chan chunk.Row, 10000)
go func() {
defer func() {
if r := recover(); r != nil {
processPanicAndLog(p.errOutputChan, r)
}
close(spilledRowChannel)
}()

injectParallelSortRandomFail(200)

for {
row := merger.next()
if row.IsEmpty() {
break
}
spilledRowChannel <- row
}
}()
wshwsh12 marked this conversation as resolved.
Show resolved Hide resolved

var (
row chunk.Row
ok bool
)
for {
select {
case <-p.finishCh:
// We must wait the finish of the above goroutine,
// or p.errOutputChan may be closed in advandce.
<-spilledRowChannel
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved
return nil
case row, ok = <-spilledRowChannel:
if !ok {
if p.tmpSpillChunk.NumRows() > 0 {
err := p.spillTmpSpillChunk(inDisk)
if err != nil {
return err
}
}

injectParallelSortRandomFail(200)

if inDisk.NumRows() > 0 {
p.sortedRowsInDisk = append(p.sortedRowsInDisk, inDisk)
p.releaseMemory()
}
return nil
}
}

p.tmpSpillChunk.AppendRow(row)
if p.tmpSpillChunk.IsFull() {
err := p.spillTmpSpillChunk(inDisk)
if err != nil {
return err
}
}
}
}
141 changes: 141 additions & 0 deletions pkg/executor/sortexec/parallel_sort_spill_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sortexec_test

import (
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/executor/internal/testutil"
"github.com/pingcap/tidb/pkg/executor/sortexec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/stretchr/testify/require"
)

var hardLimit1 = int64(100000)
var hardLimit2 = hardLimit1 * 10

func oneSpillCase(t *testing.T, ctx *mock.Context, exe *sortexec.SortExec, sortCase *testutil.SortCase, schema *expression.Schema, dataSource *testutil.MockDataSource) {
if exe == nil {
exe = buildSortExec(ctx, sortCase, dataSource)
}
dataSource.PrepareChunks()
resultChunks := executeSortExecutor(t, exe)

require.True(t, exe.IsSpillTriggeredInParallelSortForTest())
require.Equal(t, int64(sortCase.Rows), exe.GetSpilledRowNumInParallelSortForTest())

err := exe.Close()
require.NoError(t, err)

require.True(t, checkCorrectness(schema, exe, dataSource, resultChunks))
}

func inMemoryThenSpill(t *testing.T, ctx *mock.Context, exe *sortexec.SortExec, sortCase *testutil.SortCase, schema *expression.Schema, dataSource *testutil.MockDataSource) {
if exe == nil {
exe = buildSortExec(ctx, sortCase, dataSource)
}
dataSource.PrepareChunks()
resultChunks := executeSortExecutorAndManullyTriggerSpill(t, exe, hardLimit2, ctx.GetSessionVars().StmtCtx.MemTracker)

require.True(t, exe.IsSpillTriggeredInParallelSortForTest())
require.Greater(t, int64(sortCase.Rows), exe.GetSpilledRowNumInParallelSortForTest())
err := exe.Close()
require.NoError(t, err)

require.True(t, checkCorrectness(schema, exe, dataSource, resultChunks))
}

func failpointNoMemoryDataTest(t *testing.T, ctx *mock.Context, exe *sortexec.SortExec, sortCase *testutil.SortCase, schema *expression.Schema, dataSource *testutil.MockDataSource) {
if exe == nil {
exe = buildSortExec(ctx, sortCase, dataSource)
}
dataSource.PrepareChunks()
executeInFailpoint(t, exe, 0, nil)
}

func failpointDataInMemoryThenSpillTest(t *testing.T, ctx *mock.Context, exe *sortexec.SortExec, sortCase *testutil.SortCase, schema *expression.Schema, dataSource *testutil.MockDataSource) {
if exe == nil {
exe = buildSortExec(ctx, sortCase, dataSource)
}
dataSource.PrepareChunks()
executeInFailpoint(t, exe, hardLimit2, ctx.GetSessionVars().MemTracker)
}

func TestParallelSortSpillDisk(t *testing.T) {
sortexec.SetSmallSpillChunkSizeForTest()
ctx := mock.NewContext()
sortCase := &testutil.SortCase{Rows: 10000, OrderByIdx: []int{0, 1}, Ndvs: []int{0, 0}, Ctx: ctx}

failpoint.Enable("github.com/pingcap/tidb/pkg/executor/sortexec/SlowSomeWorkers", `return(true)`)
failpoint.Enable("github.com/pingcap/tidb/pkg/executor/sortexec/SignalCheckpointForSort", `return(true)`)

ctx.GetSessionVars().InitChunkSize = 32
ctx.GetSessionVars().MaxChunkSize = 32
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, hardLimit1)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
ctx.GetSessionVars().EnableParallelSort = true

schema := expression.NewSchema(sortCase.Columns()...)
dataSource := buildDataSource(ctx, sortCase, schema)
exe := buildSortExec(ctx, sortCase, dataSource)
for i := 0; i < 10; i++ {
oneSpillCase(t, ctx, nil, sortCase, schema, dataSource)
oneSpillCase(t, ctx, exe, sortCase, schema, dataSource)
}

ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, hardLimit2)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
for i := 0; i < 10; i++ {
inMemoryThenSpill(t, ctx, nil, sortCase, schema, dataSource)
inMemoryThenSpill(t, ctx, exe, sortCase, schema, dataSource)
}
}

func TestParallelSortSpillDiskFailpoint(t *testing.T) {
sortexec.SetSmallSpillChunkSizeForTest()
ctx := mock.NewContext()
sortCase := &testutil.SortCase{Rows: 10000, OrderByIdx: []int{0, 1}, Ndvs: []int{0, 0}, Ctx: ctx}

failpoint.Enable("github.com/pingcap/tidb/pkg/executor/sortexec/SlowSomeWorkers", `return(true)`)
failpoint.Enable("github.com/pingcap/tidb/pkg/executor/sortexec/SignalCheckpointForSort", `return(true)`)
failpoint.Enable("github.com/pingcap/tidb/pkg/executor/sortexec/ParallelSortRandomFail", `return(true)`)
failpoint.Enable("github.com/pingcap/tidb/pkg/util/chunk/ChunkInDiskError", `return(false)`)

ctx.GetSessionVars().InitChunkSize = 32
ctx.GetSessionVars().MaxChunkSize = 32
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, hardLimit1)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
ctx.GetSessionVars().EnableParallelSort = true

schema := expression.NewSchema(sortCase.Columns()...)
dataSource := buildDataSource(ctx, sortCase, schema)
exe := buildSortExec(ctx, sortCase, dataSource)
for i := 0; i < 20; i++ {
failpointNoMemoryDataTest(t, ctx, nil, sortCase, schema, dataSource)
failpointNoMemoryDataTest(t, ctx, exe, sortCase, schema, dataSource)
}

ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, hardLimit2)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
for i := 0; i < 20; i++ {
failpointDataInMemoryThenSpillTest(t, ctx, nil, sortCase, schema, dataSource)
failpointDataInMemoryThenSpillTest(t, ctx, exe, sortCase, schema, dataSource)
}
}
Loading