@@ -66,8 +66,9 @@ type selectResult struct {
66
66
fieldTypes []* types.FieldType
67
67
ctx sessionctx.Context
68
68
69
- selectResp * tipb.SelectResponse
70
- respChkIdx int
69
+ selectResp * tipb.SelectResponse
70
+ selectRespSize int // record the selectResp.Size() when it is initialized.
71
+ respChkIdx int
71
72
72
73
feedback * statistics.QueryFeedback
73
74
partialCount int64 // number of partial results.
@@ -94,25 +95,30 @@ func (r *selectResult) fetch(ctx context.Context) {
94
95
metrics .DistSQLQueryHistgram .WithLabelValues (r .label , r .sqlType ).Observe (duration .Seconds ())
95
96
}()
96
97
for {
98
+ var result resultWithErr
97
99
resultSubset , err := r .resp .Next (ctx )
98
100
if err != nil {
99
- r .results <- resultWithErr {err : errors .Trace (err )}
101
+ result .err = err
102
+ } else if resultSubset == nil {
103
+ // If the result is drained, the resultSubset would be nil
100
104
return
101
- }
102
- if resultSubset == nil {
103
- return
104
- }
105
-
106
- if r .memTracker != nil {
107
- r .memTracker .Consume (int64 (resultSubset .MemSize ()))
105
+ } else {
106
+ result .result = resultSubset
107
+ r .memConsume (int64 (resultSubset .MemSize ()))
108
108
}
109
109
110
110
select {
111
- case r .results <- resultWithErr { result : resultSubset } :
111
+ case r .results <- result :
112
112
case <- r .closed :
113
113
// If selectResult called Close() already, make fetch goroutine exit.
114
+ if resultSubset != nil {
115
+ r .memConsume (- int64 (resultSubset .MemSize ()))
116
+ }
114
117
return
115
118
case <- ctx .Done ():
119
+ if resultSubset != nil {
120
+ r .memConsume (- int64 (resultSubset .MemSize ()))
121
+ }
116
122
return
117
123
}
118
124
}
@@ -157,24 +163,21 @@ func (r *selectResult) getSelectResp() error {
157
163
if re .err != nil {
158
164
return errors .Trace (re .err )
159
165
}
160
- if r .memTracker != nil && r . selectResp != nil {
161
- r .memTracker . Consume (- int64 (r .selectResp . Size () ))
166
+ if r .selectResp != nil {
167
+ r .memConsume (- int64 (r .selectRespSize ))
162
168
}
163
169
if re .result == nil {
164
170
r .selectResp = nil
165
171
return nil
166
172
}
167
- if r .memTracker != nil {
168
- r .memTracker .Consume (- int64 (re .result .MemSize ()))
169
- }
173
+ r .memConsume (- int64 (re .result .MemSize ()))
170
174
r .selectResp = new (tipb.SelectResponse )
171
175
err := r .selectResp .Unmarshal (re .result .GetData ())
172
176
if err != nil {
173
177
return errors .Trace (err )
174
178
}
175
- if r .memTracker != nil && r .selectResp != nil {
176
- r .memTracker .Consume (int64 (r .selectResp .Size ()))
177
- }
179
+ r .selectRespSize = r .selectResp .Size ()
180
+ r .memConsume (int64 (r .selectRespSize ))
178
181
if err := r .selectResp .Error ; err != nil {
179
182
return terror .ClassTiKV .New (terror .ErrCode (err .Code ), err .Msg )
180
183
}
@@ -207,13 +210,27 @@ func (r *selectResult) readRowsData(chk *chunk.Chunk) (err error) {
207
210
return nil
208
211
}
209
212
213
+ func (r * selectResult ) memConsume (bytes int64 ) {
214
+ if r .memTracker != nil {
215
+ r .memTracker .Consume (bytes )
216
+ }
217
+ }
218
+
210
219
// Close closes selectResult.
211
220
func (r * selectResult ) Close () error {
212
- // Close this channel tell fetch goroutine to exit.
213
221
if r .feedback .Actual () >= 0 {
214
222
metrics .DistSQLScanKeysHistogram .Observe (float64 (r .feedback .Actual ()))
215
223
}
216
224
metrics .DistSQLPartialCountHistogram .Observe (float64 (r .partialCount ))
225
+ // Close this channel to tell the fetch goroutine to exit.
217
226
close (r .closed )
227
+ for re := range r .results {
228
+ if re .result != nil {
229
+ r .memConsume (- int64 (re .result .MemSize ()))
230
+ }
231
+ }
232
+ if r .selectResp != nil {
233
+ r .memConsume (- int64 (r .selectRespSize ))
234
+ }
218
235
return r .resp .Close ()
219
236
}
0 commit comments