Skip to content

Commit be82514

Browse files
authored
executor: track the memroy usage in HashJoin probe phase (#41081) (#41223)
close #40500
1 parent 55671af commit be82514

File tree

2 files changed

+58
-2
lines changed

2 files changed

+58
-2
lines changed

executor/hash_table.go

+41-2
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ type hashRowContainer struct {
114114
memTracker *memory.Tracker
115115

116116
// chkBuf buffer the data reads from the disk if rowContainer is spilled.
117-
chkBuf *chunk.Chunk
117+
chkBuf *chunk.Chunk
118+
chkBufSizeForOneProbe int64
118119
}
119120

120121
func newHashRowContainer(sCtx sessionctx.Context, hCtx *hashContext, allTypes []*types.FieldType) *hashRowContainer {
@@ -213,6 +214,15 @@ func (c *hashRowContainer) GetAllMatchedRows(probeHCtx *hashContext, probeSideRo
213214
return matched, nil
214215
}
215216

217+
// signalCheckpointForJoinMask indicates the times of row probe that a signal detection will be triggered.
218+
const signalCheckpointForJoinMask int = 1<<14 - 1
219+
220+
// rowSize is the size of Row.
221+
const rowSize = int64(unsafe.Sizeof(chunk.Row{}))
222+
223+
// rowPtrSize is the size of RowPtr.
224+
const rowPtrSize = int64(unsafe.Sizeof(chunk.RowPtr{}))
225+
216226
// GetMatchedRowsAndPtrs get matched rows and Ptrs from probeRow. It can be called
217227
// in multiple goroutines while each goroutine should keep its own
218228
// h and buf.
@@ -225,7 +235,23 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk
225235
matched = matched[:0]
226236
var matchedRow chunk.Row
227237
matchedPtrs = matchedPtrs[:0]
228-
for _, ptr := range innerPtrs {
238+
239+
// Some variables used for memTracker.
240+
var (
241+
matchedDataSize = int64(cap(matched))*rowSize + int64(cap(matchedPtrs))*rowPtrSize
242+
lastChunkBufPointer *chunk.Chunk = nil
243+
memDelta int64 = 0
244+
needTrackMemUsage = cap(innerPtrs) > signalCheckpointForJoinMask
245+
)
246+
c.chkBuf = nil
247+
c.memTracker.Consume(-c.chkBufSizeForOneProbe)
248+
if needTrackMemUsage {
249+
c.memTracker.Consume(int64(cap(innerPtrs)) * rowPtrSize)
250+
defer c.memTracker.Consume(-int64(cap(innerPtrs))*rowPtrSize + memDelta)
251+
}
252+
c.chkBufSizeForOneProbe = 0
253+
254+
for i, ptr := range innerPtrs {
229255
matchedRow, c.chkBuf, err = c.rowContainer.GetRowAndAppendToChunk(ptr, c.chkBuf)
230256
if err != nil {
231257
return nil, nil, err
@@ -235,6 +261,19 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk
235261
if err != nil {
236262
return nil, nil, err
237263
}
264+
if needTrackMemUsage && c.chkBuf != lastChunkBufPointer && lastChunkBufPointer != nil {
265+
lastChunkSize := lastChunkBufPointer.MemoryUsage()
266+
c.chkBufSizeForOneProbe += lastChunkSize
267+
memDelta += lastChunkSize
268+
}
269+
lastChunkBufPointer = c.chkBuf
270+
if needTrackMemUsage && (i&signalCheckpointForJoinMask == signalCheckpointForJoinMask) {
271+
// Trigger Consume for checking the OOM Action signal
272+
memDelta += int64(cap(matched))*rowSize + int64(cap(matchedPtrs))*rowPtrSize - matchedDataSize
273+
matchedDataSize = int64(cap(matched))*rowSize + int64(cap(matchedPtrs))*rowPtrSize
274+
c.memTracker.Consume(memDelta + 1)
275+
memDelta = 0
276+
}
238277
if !ok {
239278
atomic.AddInt64(&c.stat.probeCollision, 1)
240279
continue

executor/join_test.go

+17
Original file line numberDiff line numberDiff line change
@@ -2892,3 +2892,20 @@ func TestOuterJoin(t *testing.T) {
28922892
),
28932893
)
28942894
}
2895+
2896+
func TestCartesianJoinPanic(t *testing.T) {
2897+
store := testkit.CreateMockStore(t)
2898+
tk := testkit.NewTestKit(t, store)
2899+
tk.MustExec("use test")
2900+
tk.MustExec("create table t(a int)")
2901+
tk.MustExec("insert into t values(1)")
2902+
tk.MustExec("set tidb_mem_quota_query = 1 << 30")
2903+
tk.MustExec("set global tidb_mem_oom_action = 'CANCEL'")
2904+
tk.MustExec("set global tidb_enable_tmp_storage_on_oom = off;")
2905+
for i := 0; i < 14; i++ {
2906+
tk.MustExec("insert into t select * from t")
2907+
}
2908+
err := tk.QueryToErr("desc analyze select * from t t1, t t2, t t3, t t4, t t5, t t6;")
2909+
require.NotNil(t, err)
2910+
require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!"))
2911+
}

0 commit comments

Comments
 (0)