@@ -83,6 +83,14 @@ type hashRowContainer struct {
83
83
hashTable baseHashTable
84
84
85
85
rowContainer * chunk.RowContainer
86
+ << << << < HEAD
87
+ == == == =
88
+ memTracker * memory.Tracker
89
+
90
+ // chkBuf buffer the data reads from the disk if rowContainer is spilled.
91
+ chkBuf * chunk.Chunk
92
+ chkBufSizeForOneProbe int64
93
+ >> >> >> > 5 cb84186dc (executor : track the memroy usage in HashJoin probe phase (#41081 ))
86
94
}
87
95
88
96
func newHashRowContainer (sCtx sessionctx.Context , estCount int , hCtx * hashContext , allTypes []* types.FieldType ) * hashRowContainer {
@@ -104,6 +112,88 @@ func (c *hashRowContainer) ShallowCopy() *hashRowContainer {
104
112
return & newHRC
105
113
}
106
114
115
+ << << << < HEAD
116
+ == == == =
117
+ // GetMatchedRows get matched rows from probeRow. It can be called
118
+ // in multiple goroutines while each goroutine should keep its own
119
+ // h and buf.
120
+ func (c * hashRowContainer ) GetMatchedRows (probeKey uint64 , probeRow chunk.Row , hCtx * hashContext , matched []chunk.Row ) ([]chunk.Row , error ) {
121
+ matchedRows , _ , err := c .GetMatchedRowsAndPtrs (probeKey , probeRow , hCtx , matched , nil , false )
122
+ return matchedRows , err
123
+ }
124
+
125
+ func (c * hashRowContainer ) GetAllMatchedRows (probeHCtx * hashContext , probeSideRow chunk.Row ,
126
+ probeKeyNullBits * bitmap.ConcurrentBitmap , matched []chunk.Row , needCheckBuildRowPos , needCheckProbeRowPos []int ) ([]chunk.Row , error ) {
127
+ // for NAAJ probe row with null, we should match them with all build rows.
128
+ var (
129
+ ok bool
130
+ err error
131
+ innerPtrs []chunk.RowPtr
132
+ )
133
+ c .hashTable .Iter (
134
+ func (_ uint64 , e * entry ) {
135
+ entryAddr := e
136
+ for entryAddr != nil {
137
+ innerPtrs = append (innerPtrs , entryAddr .ptr )
138
+ entryAddr = entryAddr .next
139
+ }
140
+ })
141
+ matched = matched [:0 ]
142
+ if len (innerPtrs ) == 0 {
143
+ return matched , nil
144
+ }
145
+ // all built bucket rows come from hash table, their bitmap are all nil (doesn't contain any null). so
146
+ // we could only use the probe null bits to filter valid rows.
147
+ if probeKeyNullBits != nil && len (probeHCtx .naKeyColIdx ) > 1 {
148
+ // if len(probeHCtx.naKeyColIdx)=1
149
+ // that means the NA-Join probe key is directly a (null) <-> (fetch all buckets), nothing to do.
150
+ // else like
151
+ // (null, 1, 2), we should use the not-null probe bit to filter rows. Only fetch rows like
152
+ // ( ? , 1, 2), that exactly with value as 1 and 2 in the second and third join key column.
153
+ needCheckProbeRowPos = needCheckProbeRowPos [:0 ]
154
+ needCheckBuildRowPos = needCheckBuildRowPos [:0 ]
155
+ keyColLen := len (c .hCtx .naKeyColIdx )
156
+ for i := 0 ; i < keyColLen ; i ++ {
157
+ // since all bucket is from hash table (Not Null), so the buildSideNullBits check is eliminated.
158
+ if probeKeyNullBits .UnsafeIsSet (i ) {
159
+ continue
160
+ }
161
+ needCheckBuildRowPos = append (needCheckBuildRowPos , c .hCtx .naKeyColIdx [i ])
162
+ needCheckProbeRowPos = append (needCheckProbeRowPos , probeHCtx .naKeyColIdx [i ])
163
+ }
164
+ }
165
+ var mayMatchedRow chunk.Row
166
+ for _ , ptr := range innerPtrs {
167
+ mayMatchedRow , c .chkBuf , err = c .rowContainer .GetRowAndAppendToChunk (ptr , c .chkBuf )
168
+ if err != nil {
169
+ return nil , err
170
+ }
171
+ if probeKeyNullBits != nil && len (probeHCtx .naKeyColIdx ) > 1 {
172
+ // check the idxs-th value of the join columns.
173
+ ok , err = codec .EqualChunkRow (c .sc , mayMatchedRow , c .hCtx .allTypes , needCheckBuildRowPos , probeSideRow , probeHCtx .allTypes , needCheckProbeRowPos )
174
+ if err != nil {
175
+ return nil , err
176
+ }
177
+ if ! ok {
178
+ continue
179
+ }
180
+ // once ok. just append the (maybe) valid build row for latter other conditions check if any.
181
+ }
182
+ matched = append (matched , mayMatchedRow )
183
+ }
184
+ return matched , nil
185
+ }
186
+
187
+ // signalCheckpointForJoin indicates the times of row probe that a signal detection will be triggered.
188
+ const signalCheckpointForJoin int = 1 << 14
189
+
190
+ // rowSize is the size of Row.
191
+ const rowSize = int64 (unsafe .Sizeof (chunk.Row {}))
192
+
193
+ // rowPtrSize is the size of RowPtr.
194
+ const rowPtrSize = int64 (unsafe .Sizeof (chunk.RowPtr {}))
195
+
196
+ >> >> >> > 5 cb84186dc (executor : track the memroy usage in HashJoin probe phase (#41081 ))
107
197
// GetMatchedRowsAndPtrs get matched rows and Ptrs from probeRow. It can be called
108
198
// in multiple goroutines while each goroutine should keep its own
109
199
// h and buf.
@@ -114,9 +204,27 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk
114
204
}
115
205
matched = make ([]chunk.Row , 0 , len (innerPtrs ))
116
206
var matchedRow chunk.Row
207
+ << << << < HEAD
117
208
matchedPtrs = make ([]chunk.RowPtr , 0 , len (innerPtrs ))
118
209
for _ , ptr := range innerPtrs {
119
210
matchedRow , err = c .rowContainer .GetRow (ptr )
211
+ == == == =
212
+ matchedPtrs = matchedPtrs [:0 ]
213
+
214
+ // Some variables used for memTracker.
215
+ var (
216
+ matchedDataSize = int64 (cap (matched ))* rowSize + int64 (cap (matchedPtrs ))* rowPtrSize
217
+ lastChunkBufPointer * chunk.Chunk = nil
218
+ memDelta int64 = 0
219
+ )
220
+ c .chkBuf = nil
221
+ c .memTracker .Consume (- c .chkBufSizeForOneProbe + int64 (cap (innerPtrs ))* rowPtrSize )
222
+ defer c .memTracker .Consume (- int64 (cap (innerPtrs ))* rowPtrSize + memDelta )
223
+ c .chkBufSizeForOneProbe = 0
224
+
225
+ for i , ptr := range innerPtrs {
226
+ matchedRow , c .chkBuf , err = c .rowContainer .GetRowAndAppendToChunk (ptr , c .chkBuf )
227
+ >> >> >> > 5 cb84186dc (executor : track the memroy usage in HashJoin probe phase (#41081 ))
120
228
if err != nil {
121
229
return
122
230
}
@@ -125,6 +233,19 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk
125
233
if err != nil {
126
234
return
127
235
}
236
+ if c .chkBuf != lastChunkBufPointer && lastChunkBufPointer != nil {
237
+ lastChunkSize := lastChunkBufPointer .MemoryUsage ()
238
+ c .chkBufSizeForOneProbe += lastChunkSize
239
+ memDelta += lastChunkSize
240
+ }
241
+ lastChunkBufPointer = c .chkBuf
242
+ if i & signalCheckpointForJoin == 0 {
243
+ // Trigger Consume for checking the OOM Action signal
244
+ memDelta += int64 (cap (matched ))* rowSize + int64 (cap (matchedPtrs ))* rowPtrSize - matchedDataSize
245
+ matchedDataSize = int64 (cap (matched ))* rowSize + int64 (cap (matchedPtrs ))* rowPtrSize
246
+ c .memTracker .Consume (memDelta + 1 )
247
+ memDelta = 0
248
+ }
128
249
if ! ok {
129
250
atomic .AddInt64 (& c .stat .probeCollision , 1 )
130
251
continue
0 commit comments