@@ -41,16 +41,6 @@ type prefetchMsg struct {
41
41
keys [][]byte
42
42
}
43
43
44
- type usedMsg struct {
45
- root common.Hash
46
- used [][]byte
47
- }
48
-
49
- type trieMsg struct {
50
- root common.Hash
51
- resultChan chan * subfetcher
52
- }
53
-
54
44
// triePrefetcher is an active prefetcher, which receives accounts or storage
55
45
// items and does trie-loading of them. The goal is to get as much useful content
56
46
// into the caches as possible.
@@ -65,11 +55,8 @@ type triePrefetcher struct {
65
55
closed int32
66
56
closeMainChan chan struct {} // it is to inform the mainLoop
67
57
closeMainDoneChan chan struct {}
68
- copyChan chan struct {}
69
- copyDoneChan chan * triePrefetcher
58
+ fetchersMutex sync.RWMutex
70
59
prefetchChan chan * prefetchMsg // no need to wait for return
71
- trieChan chan * trieMsg
72
- usedChan chan * usedMsg // no need to wait for return
73
60
74
61
abortChan chan * subfetcher
75
62
closeAbortChan chan struct {} // it is used to inform abortLoop
@@ -97,11 +84,7 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre
97
84
98
85
closeMainChan : make (chan struct {}),
99
86
closeMainDoneChan : make (chan struct {}),
100
- copyChan : make (chan struct {}, concurrentChanSize ),
101
- copyDoneChan : make (chan * triePrefetcher , concurrentChanSize ),
102
87
prefetchChan : make (chan * prefetchMsg , concurrentChanSize ),
103
- trieChan : make (chan * trieMsg , concurrentChanSize ),
104
- usedChan : make (chan * usedMsg , concurrentChanSize ),
105
88
106
89
deliveryMissMeter : metrics .GetOrRegisterMeter (prefix + "/deliverymiss" , nil ),
107
90
accountLoadMeter : metrics .GetOrRegisterMeter (prefix + "/account/load" , nil ),
@@ -121,34 +104,16 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre
121
104
func (p * triePrefetcher ) mainLoop () {
122
105
for {
123
106
select {
124
- case <- p .copyChan :
125
- fetcherCopied := & triePrefetcher {
126
- db : p .db ,
127
- root : p .root ,
128
- fetches : make (map [common.Hash ]Trie , len (p .fetchers )),
129
- }
130
- // we're copying an active fetcher, retrieve the current states
131
- for root , fetcher := range p .fetchers {
132
- fetcherCopied .fetches [root ] = fetcher .peek ()
133
- }
134
- p .copyDoneChan <- fetcherCopied
135
-
136
107
case pMsg := <- p .prefetchChan :
137
108
fetcher := p .fetchers [pMsg .root ]
138
109
if fetcher == nil {
139
110
fetcher = newSubfetcher (p .db , pMsg .root , pMsg .accountHash )
111
+ p .fetchersMutex .Lock ()
140
112
p .fetchers [pMsg .root ] = fetcher
113
+ p .fetchersMutex .Unlock ()
141
114
}
142
115
fetcher .schedule (pMsg .keys )
143
116
144
- case tireMsg := <- p .trieChan :
145
- tireMsg .resultChan <- p .fetchers [tireMsg .root ]
146
-
147
- case uMsg := <- p .usedChan :
148
- if fetcher := p .fetchers [uMsg .root ]; fetcher != nil {
149
- fetcher .used = uMsg .used
150
- }
151
-
152
117
case <- p .closeMainChan :
153
118
for _ , fetcher := range p .fetchers {
154
119
p .abortChan <- fetcher // safe to do multiple times
@@ -177,14 +142,14 @@ func (p *triePrefetcher) mainLoop() {
177
142
}
178
143
close (p .closeAbortChan )
179
144
close (p .closeMainDoneChan )
145
+ p .fetchersMutex .Lock ()
180
146
p .fetchers = nil
147
+ p .fetchersMutex .Unlock ()
148
+
181
149
// drain all the channels before quit the loop
182
150
for {
183
151
select {
184
- case <- p .copyChan :
185
152
case <- p .prefetchChan :
186
- case <- p .trieChan :
187
- case <- p .usedChan :
188
153
default :
189
154
return
190
155
}
@@ -238,24 +203,28 @@ func (p *triePrefetcher) copy() *triePrefetcher {
238
203
}
239
204
return fetcherCopied
240
205
}
241
- p . copyChan <- struct {}{}
206
+
242
207
select {
243
208
case <- p .closeMainChan :
244
- select {
245
- case <- p .copyChan : // to discard the message sent
246
- default :
209
+ // for closed trie prefetcher, the fetches should not be nil
210
+ fetcherCopied := & triePrefetcher {
211
+ db : p .db ,
212
+ root : p .root ,
213
+ fetches : make (map [common.Hash ]Trie ),
247
214
}
215
+ return fetcherCopied
216
+ default :
217
+ p .fetchersMutex .RLock ()
248
218
fetcherCopied := & triePrefetcher {
249
219
db : p .db ,
250
220
root : p .root ,
251
- fetches : make (map [common.Hash ]Trie , len (p .fetches )),
221
+ fetches : make (map [common.Hash ]Trie , len (p .fetchers )),
252
222
}
253
- // for closed trie prefetcher , retrieve the current states
223
+ // we're copying an active fetcher , retrieve the current states
254
224
for root , fetcher := range p .fetchers {
255
225
fetcherCopied .fetches [root ] = fetcher .peek ()
256
226
}
257
- return fetcherCopied
258
- case fetcherCopied := <- p .copyDoneChan :
227
+ p .fetchersMutex .RUnlock ()
259
228
return fetcherCopied
260
229
}
261
230
}
@@ -268,7 +237,6 @@ func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, accountHash c
268
237
}
269
238
select {
270
239
case <- p .closeMainChan : // skip closed trie prefetcher
271
- return
272
240
case p .prefetchChan <- & prefetchMsg {root , accountHash , keys }:
273
241
}
274
242
}
@@ -285,19 +253,9 @@ func (p *triePrefetcher) trie(root common.Hash) Trie {
285
253
return p .db .CopyTrie (trie )
286
254
}
287
255
288
- var fetcher * subfetcher
289
- // currentTrieChan is to make sure we receive root's fetcher in concurrency mode.
290
- currentTrieChan := make (chan * subfetcher )
291
- p .trieChan <- & trieMsg {root , currentTrieChan }
292
- select {
293
- case <- p .closeMainChan :
294
- select {
295
- case <- p .trieChan :
296
- default :
297
- }
298
- fetcher = p .fetchers [root ]
299
- case fetcher = <- currentTrieChan :
300
- }
256
+ p .fetchersMutex .RLock ()
257
+ fetcher := p .fetchers [root ]
258
+ p .fetchersMutex .RUnlock ()
301
259
if fetcher == nil {
302
260
p .deliveryMissMeter .Mark (1 )
303
261
return nil
@@ -329,12 +287,13 @@ func (p *triePrefetcher) used(root common.Hash, used [][]byte) {
329
287
return
330
288
}
331
289
select {
332
- case <- p .closeAbortChan :
333
- select {
334
- case <- p .usedChan :
335
- default :
290
+ case <- p .closeMainChan :
291
+ default :
292
+ p .fetchersMutex .RLock ()
293
+ if fetcher := p .fetchers [root ]; fetcher != nil {
294
+ fetcher .used = used
336
295
}
337
- case p . usedChan <- & usedMsg { root , used }:
296
+ p . fetchersMutex . RUnlock ()
338
297
}
339
298
}
340
299
0 commit comments