@@ -2,6 +2,7 @@ use std::{
2
2
cmp:: { max, min, Ordering } ,
3
3
collections:: { binary_heap:: PeekMut , BinaryHeap } ,
4
4
mem,
5
+ num:: NonZeroUsize ,
5
6
} ;
6
7
7
8
use bytes:: { Buf , Bytes , BytesMut } ;
@@ -32,9 +33,9 @@ impl Assembler {
32
33
return Err ( IllegalOrderedRead ) ;
33
34
} else if !ordered && self . state . is_ordered ( ) {
34
35
// Enter unordered mode
35
- let mut recvd = self . deduplicate ( ) ;
36
- recvd. insert ( 0 .. self . bytes_read ) ;
37
- self . state = State :: Unordered { recvd } ;
36
+ self . state = State :: Unordered {
37
+ recvd : self . deduplicate ( ) ,
38
+ } ;
38
39
}
39
40
Ok ( ( ) )
40
41
}
@@ -54,7 +55,7 @@ impl Assembler {
54
55
} else if ( chunk. offset + chunk. bytes . len ( ) as u64 ) <= self . bytes_read {
55
56
// Next chunk is useless as the read index is beyond its end
56
57
self . buffered -= chunk. size ;
57
- self . allocated -= chunk. allocation_size ;
58
+ self . allocated -= chunk. allocation_size ( ) ;
58
59
PeekMut :: pop ( chunk) ;
59
60
continue ;
60
61
}
@@ -75,7 +76,7 @@ impl Assembler {
75
76
} else {
76
77
self . bytes_read += chunk. bytes . len ( ) as u64 ;
77
78
self . buffered -= chunk. size ;
78
- self . allocated -= chunk. allocation_size ;
79
+ self . allocated -= chunk. allocation_size ( ) ;
79
80
let chunk = PeekMut :: pop ( chunk) ;
80
81
Chunk :: new ( chunk. offset , chunk. bytes )
81
82
} ) ;
@@ -88,10 +89,13 @@ impl Assembler {
88
89
// counter to the new number of chunks left in the heap so that we can decide
89
90
// when to defragment the queue again if necessary.
90
91
fn defragment ( & mut self ) {
92
+ if self . state . is_ordered ( ) {
93
+ self . deduplicate ( ) ;
94
+ }
91
95
let fragmented_buffered = self
92
96
. data
93
97
. iter ( )
94
- . filter ( |c| c. is_fragmented ( ) )
98
+ . filter ( |c| c. should_defragment ( ) )
95
99
. map ( |c| c. bytes . len ( ) )
96
100
. sum :: < usize > ( ) ;
97
101
let mut buffer = BytesMut :: with_capacity ( fragmented_buffered) ;
@@ -104,24 +108,25 @@ impl Assembler {
104
108
105
109
let new = BinaryHeap :: with_capacity ( self . data . len ( ) ) ;
106
110
let old = mem:: replace ( & mut self . data , new) ;
111
+ self . buffered = fragmented_buffered;
107
112
for chunk in old. into_sorted_vec ( ) . into_iter ( ) . rev ( ) {
108
- if !chunk. is_fragmented ( ) {
113
+ if chunk. is_defragmented ( ) {
114
+ self . buffered += chunk. size ;
109
115
self . data . push ( chunk) ;
110
116
continue ;
117
+ } else if !chunk. should_defragment ( ) {
118
+ self . buffered += chunk. bytes . len ( ) ;
119
+ self . data
120
+ . push ( Buffer :: new_defragmented ( chunk. offset , chunk. bytes ) ) ;
121
+ continue ;
111
122
}
112
- let end = offset + ( buffer. len ( ) as u64 ) ;
113
- if let Some ( overlap) = end. checked_sub ( chunk. offset ) {
114
- if let Some ( bytes) = chunk. bytes . get ( overlap as usize ..) {
115
- buffer. extend_from_slice ( bytes) ;
116
- }
117
- } else {
123
+ if chunk. offset != offset + ( buffer. len ( ) as u64 ) {
118
124
self . data
119
125
. push ( Buffer :: new_defragmented ( offset, buffer. split ( ) . freeze ( ) ) ) ;
120
126
offset = chunk. offset ;
121
- buffer. extend_from_slice ( & chunk. bytes ) ;
122
127
}
128
+ buffer. extend_from_slice ( & chunk. bytes ) ;
123
129
}
124
-
125
130
self . data
126
131
. push ( Buffer :: new_defragmented ( offset, buffer. split ( ) . freeze ( ) ) ) ;
127
132
self . allocated = self . buffered ;
@@ -130,9 +135,19 @@ impl Assembler {
130
135
fn deduplicate ( & mut self ) -> RangeSet {
131
136
debug_assert ! ( self . state. is_ordered( ) ) ;
132
137
let mut ranges = RangeSet :: new ( ) ;
138
+ ranges. insert ( 0 ..self . bytes_read ) ;
133
139
let new = BinaryHeap :: with_capacity ( self . data . len ( ) ) ;
134
- let old = mem:: replace ( & mut self . data , new) ;
135
- for mut buffer in old. into_sorted_vec ( ) . into_iter ( ) . rev ( ) {
140
+ let old = mem:: replace ( & mut self . data , new) . into_vec ( ) ;
141
+ for buffer in old. iter ( ) {
142
+ if buffer. is_defragmented ( ) {
143
+ ranges. insert ( buffer. offset ..buffer. offset + buffer. bytes . len ( ) as u64 ) ;
144
+ }
145
+ }
146
+ for mut buffer in old. into_iter ( ) {
147
+ if buffer. is_defragmented ( ) {
148
+ self . data . push ( buffer) ;
149
+ continue ;
150
+ }
136
151
for duplicate in
137
152
ranges. replace ( buffer. offset ..buffer. offset + buffer. bytes . len ( ) as u64 )
138
153
{
@@ -142,9 +157,10 @@ impl Assembler {
142
157
buffer
143
158
. bytes
144
159
. split_to ( ( duplicate. start - buffer. offset ) as usize ) ,
145
- buffer. allocation_size ,
160
+ buffer. allocation_size ( ) ,
161
+ false ,
146
162
) ;
147
- self . allocated += new_buffer. allocation_size ;
163
+ self . allocated += new_buffer. allocation_size ( ) ;
148
164
self . data . push ( new_buffer) ;
149
165
}
150
166
let discarded = ( duplicate. end - duplicate. start ) as usize ;
@@ -154,13 +170,15 @@ impl Assembler {
154
170
buffer. offset = duplicate. end ;
155
171
}
156
172
if buffer. bytes . is_empty ( ) {
157
- self . allocated -= buffer. allocation_size ;
173
+ self . allocated -= buffer. allocation_size ( ) ;
158
174
self . buffered -= buffer. size ;
159
175
} else {
176
+ let allocation_size = buffer. allocation_size ( ) ;
160
177
self . data . push ( Buffer :: new (
161
178
buffer. offset ,
162
179
buffer. bytes ,
163
- buffer. allocation_size ,
180
+ allocation_size,
181
+ false ,
164
182
) ) ;
165
183
}
166
184
}
@@ -185,9 +203,10 @@ impl Assembler {
185
203
offset,
186
204
bytes. split_to ( ( duplicate. start - offset) as usize ) ,
187
205
allocation_size,
206
+ false ,
188
207
) ;
189
208
self . buffered += buffer. size ;
190
- self . allocated += buffer. allocation_size ;
209
+ self . allocated += buffer. allocation_size ( ) ;
191
210
self . data . push ( buffer) ;
192
211
offset = duplicate. start ;
193
212
}
@@ -207,9 +226,9 @@ impl Assembler {
207
226
if bytes. is_empty ( ) {
208
227
return ;
209
228
}
210
- let buffer = Buffer :: new ( offset, bytes, allocation_size) ;
229
+ let buffer = Buffer :: new ( offset, bytes, allocation_size, self . state . is_ordered ( ) ) ;
211
230
self . buffered += buffer. size ;
212
- self . allocated += buffer. allocation_size ;
231
+ self . allocated += buffer. allocation_size ( ) ;
213
232
self . data . push ( buffer) ;
214
233
// Rationale: on the one hand, we want to defragment rarely, ideally never
215
234
// in non-pathological scenarios. However, a pathological or malicious
@@ -222,9 +241,6 @@ impl Assembler {
222
241
let over_allocation = self . allocated - buffered;
223
242
let threshold = max ( buffered * 3 / 2 , 32 * 1024 ) ;
224
243
if over_allocation > threshold {
225
- if self . state . is_ordered ( ) && self . buffered > buffered {
226
- self . deduplicate ( ) ;
227
- }
228
244
self . defragment ( )
229
245
}
230
246
}
@@ -266,42 +282,53 @@ struct Buffer {
266
282
offset : u64 ,
267
283
bytes : Bytes ,
268
284
size : usize ,
269
- allocation_size : usize ,
285
+ state : BufferState ,
270
286
}
271
287
272
288
impl Buffer {
273
289
/// Constructs a new, possibly fragmented Buffer
274
- fn new ( offset : u64 , bytes : Bytes , allocation_size : usize ) -> Self {
290
+ fn new ( offset : u64 , bytes : Bytes , allocation_size : usize , possibly_duplicate : bool ) -> Self {
275
291
let size = bytes. len ( ) ;
276
- // Treat buffers with small over-allocation as defragmented
277
- let threshold = size * 6 / 5 ;
278
- let allocation_size = if allocation_size > threshold {
279
- allocation_size
280
- } else {
281
- size
282
- } ;
283
292
Self {
284
293
offset,
285
294
bytes,
286
295
size,
287
- allocation_size,
296
+ state : BufferState :: new ( size , allocation_size, possibly_duplicate ) ,
288
297
}
289
298
}
290
299
291
- /// Constructs a new Buffer that is not fragmented
300
+ /// Constructs a new Buffer that is considered to be defragmented
292
301
fn new_defragmented ( offset : u64 , bytes : Bytes ) -> Self {
293
302
let size = bytes. len ( ) ;
294
303
Self {
295
304
offset,
296
305
bytes,
297
306
size,
298
- allocation_size : size,
307
+ state : BufferState :: Defragmented ,
308
+ }
309
+ }
310
+
311
+ /// Returns `true` if the buffer is defragmented
312
+ fn is_defragmented ( & self ) -> bool {
313
+ self . state == BufferState :: Defragmented
314
+ }
315
+
316
+ /// Returns the size of the associated allocation
317
+ fn allocation_size ( & self ) -> usize {
318
+ match self . state {
319
+ BufferState :: Fragmented ( allocation_size) => allocation_size. get ( ) ,
320
+ BufferState :: Defragmented => self . size ,
299
321
}
300
322
}
301
323
302
- /// Returns `true` if the buffer is fragmented
303
- fn is_fragmented ( & self ) -> bool {
304
- self . size < self . allocation_size
324
+ /// Returns `true` if the buffer should be defragmented
325
+ fn should_defragment ( & self ) -> bool {
326
+ match self . state {
327
+ BufferState :: Fragmented ( allocation_size) => {
328
+ BufferState :: significant_over_allocation ( self . size , allocation_size. get ( ) )
329
+ }
330
+ BufferState :: Defragmented => false ,
331
+ }
305
332
}
306
333
}
307
334
@@ -328,6 +355,30 @@ impl PartialEq for Buffer {
328
355
}
329
356
}
330
357
358
+ #[ derive( Debug , Eq , PartialEq , Ord , PartialOrd ) ]
359
+ enum BufferState {
360
+ Fragmented ( NonZeroUsize ) ,
361
+ Defragmented ,
362
+ }
363
+
364
+ impl BufferState {
365
+ fn new ( size : usize , allocation_size : usize , possibly_duplicate : bool ) -> Self {
366
+ if possibly_duplicate || Self :: significant_over_allocation ( size, allocation_size) {
367
+ match NonZeroUsize :: new ( allocation_size) {
368
+ Some ( value) => Self :: Fragmented ( value) ,
369
+ None => Self :: Defragmented ,
370
+ }
371
+ } else {
372
+ Self :: Defragmented
373
+ }
374
+ }
375
+
376
+ /// Returns `true` if the buffer size and allocation size warrants a copy
377
+ fn significant_over_allocation ( size : usize , allocation_size : usize ) -> bool {
378
+ size * 6 / 5 < allocation_size
379
+ }
380
+ }
381
+
331
382
#[ derive( Debug ) ]
332
383
enum State {
333
384
Ordered ,
@@ -650,12 +701,7 @@ mod test {
650
701
x. insert ( 2 , Bytes :: from_static ( b"cd" ) , 2 ) ;
651
702
assert_eq ! (
652
703
x. data. peek( ) ,
653
- Some ( & Buffer {
654
- offset: 3 ,
655
- bytes: Bytes :: from_static( b"d" ) ,
656
- size: 1 ,
657
- allocation_size: 2
658
- } )
704
+ Some ( & Buffer :: new( 3 , Bytes :: from_static( b"d" ) , 2 , true ) )
659
705
) ;
660
706
}
661
707
0 commit comments