@@ -114,7 +114,8 @@ type hashRowContainer struct {
114
114
memTracker * memory.Tracker
115
115
116
116
// chkBuf buffer the data reads from the disk if rowContainer is spilled.
117
- chkBuf * chunk.Chunk
117
+ chkBuf * chunk.Chunk
118
+ chkBufSizeForOneProbe int64
118
119
}
119
120
120
121
func newHashRowContainer (sCtx sessionctx.Context , hCtx * hashContext , allTypes []* types.FieldType ) * hashRowContainer {
@@ -213,6 +214,15 @@ func (c *hashRowContainer) GetAllMatchedRows(probeHCtx *hashContext, probeSideRo
213
214
return matched , nil
214
215
}
215
216
217
+ // signalCheckpointForJoin indicates the times of row probe that a signal detection will be triggered.
218
+ const signalCheckpointForJoin int = 1 << 14
219
+
220
+ // rowSize is the size of Row.
221
+ const rowSize = int64 (unsafe .Sizeof (chunk.Row {}))
222
+
223
+ // rowPtrSize is the size of RowPtr.
224
+ const rowPtrSize = int64 (unsafe .Sizeof (chunk.RowPtr {}))
225
+
216
226
// GetMatchedRowsAndPtrs get matched rows and Ptrs from probeRow. It can be called
217
227
// in multiple goroutines while each goroutine should keep its own
218
228
// h and buf.
@@ -225,7 +235,19 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk
225
235
matched = matched [:0 ]
226
236
var matchedRow chunk.Row
227
237
matchedPtrs = matchedPtrs [:0 ]
228
- for _ , ptr := range innerPtrs {
238
+
239
+ // Some variables used for memTracker.
240
+ var (
241
+ matchedDataSize = int64 (cap (matched ))* rowSize + int64 (cap (matchedPtrs ))* rowPtrSize
242
+ lastChunkBufPointer * chunk.Chunk = nil
243
+ memDelta int64 = 0
244
+ )
245
+ c .chkBuf = nil
246
+ c .memTracker .Consume (- c .chkBufSizeForOneProbe + int64 (cap (innerPtrs ))* rowPtrSize )
247
+ defer c .memTracker .Consume (- int64 (cap (innerPtrs ))* rowPtrSize + memDelta )
248
+ c .chkBufSizeForOneProbe = 0
249
+
250
+ for i , ptr := range innerPtrs {
229
251
matchedRow , c .chkBuf , err = c .rowContainer .GetRowAndAppendToChunk (ptr , c .chkBuf )
230
252
if err != nil {
231
253
return nil , nil , err
@@ -235,6 +257,19 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk
235
257
if err != nil {
236
258
return nil , nil , err
237
259
}
260
+ if c .chkBuf != lastChunkBufPointer && lastChunkBufPointer != nil {
261
+ lastChunkSize := lastChunkBufPointer .MemoryUsage ()
262
+ c .chkBufSizeForOneProbe += lastChunkSize
263
+ memDelta += lastChunkSize
264
+ }
265
+ lastChunkBufPointer = c .chkBuf
266
+ if i & signalCheckpointForJoin == 0 {
267
+ // Trigger Consume for checking the OOM Action signal
268
+ memDelta += int64 (cap (matched ))* rowSize + int64 (cap (matchedPtrs ))* rowPtrSize - matchedDataSize
269
+ matchedDataSize = int64 (cap (matched ))* rowSize + int64 (cap (matchedPtrs ))* rowPtrSize
270
+ c .memTracker .Consume (memDelta + 1 )
271
+ memDelta = 0
272
+ }
238
273
if ! ok {
239
274
atomic .AddInt64 (& c .stat .probeCollision , 1 )
240
275
continue
0 commit comments