@@ -94,7 +94,19 @@ impl Assembler {
94
94
// counter to the new number of chunks left in the heap so that we can decide
95
95
// when to defragment the queue again if necessary.
96
96
fn defragment ( & mut self ) {
97
- let buffered = self . data . iter ( ) . map ( |c| c. bytes . len ( ) ) . sum :: < usize > ( ) ;
97
+ let high_utilization_over_allocation = self
98
+ . data
99
+ . iter ( )
100
+ . filter ( |b| b. high_utilization ( ) )
101
+ . map ( |b| b. over_allocation )
102
+ . sum :: < usize > ( ) ;
103
+ let include_highly_utilized = high_utilization_over_allocation > DEFRAGMENTATION_THRESHOLD ;
104
+ let buffered = self
105
+ . data
106
+ . iter ( )
107
+ . filter ( |b| b. should_defragment ( include_highly_utilized) )
108
+ . map ( |b| b. bytes . len ( ) )
109
+ . sum :: < usize > ( ) ;
98
110
let mut buffer = BytesMut :: with_capacity ( buffered) ;
99
111
let mut offset = self
100
112
. data
@@ -106,20 +118,24 @@ impl Assembler {
106
118
let new = BinaryHeap :: with_capacity ( self . data . len ( ) ) ;
107
119
let old = mem:: replace ( & mut self . data , new) ;
108
120
for chunk in old. into_sorted_vec ( ) . into_iter ( ) . rev ( ) {
109
- let end = offset + ( buffer. len ( ) as u64 ) ;
110
- if let Some ( overlap) = end. checked_sub ( chunk. offset ) {
111
- if let Some ( bytes) = chunk. bytes . get ( overlap as usize ..) {
112
- buffer. extend_from_slice ( bytes) ;
121
+ if chunk. should_defragment ( include_highly_utilized) {
122
+ let end = offset + ( buffer. len ( ) as u64 ) ;
123
+ if let Some ( overlap) = end. checked_sub ( chunk. offset ) {
124
+ if let Some ( bytes) = chunk. bytes . get ( overlap as usize ..) {
125
+ buffer. extend_from_slice ( bytes) ;
126
+ }
127
+ } else {
128
+ let bytes = buffer. split ( ) . freeze ( ) ;
129
+ self . data . push ( Buffer {
130
+ offset,
131
+ bytes,
132
+ over_allocation : 0 ,
133
+ } ) ;
134
+ offset = chunk. offset ;
135
+ buffer. extend_from_slice ( & chunk. bytes ) ;
113
136
}
114
137
} else {
115
- let bytes = buffer. split ( ) . freeze ( ) ;
116
- self . data . push ( Buffer {
117
- offset,
118
- bytes,
119
- over_allocation : 0 ,
120
- } ) ;
121
- offset = chunk. offset ;
122
- buffer. extend_from_slice ( & chunk. bytes ) ;
138
+ self . data . push ( chunk) ;
123
139
}
124
140
}
125
141
@@ -129,7 +145,11 @@ impl Assembler {
129
145
bytes,
130
146
over_allocation : 0 ,
131
147
} ) ;
132
- self . over_allocation = 0 ;
148
+ self . over_allocation = if include_highly_utilized {
149
+ 0
150
+ } else {
151
+ high_utilization_over_allocation
152
+ } ;
133
153
}
134
154
135
155
pub ( crate ) fn insert ( & mut self , mut offset : u64 , mut bytes : Bytes , mut allocation_size : usize ) {
@@ -170,7 +190,7 @@ impl Assembler {
170
190
// of memory allocated. In a worst case scenario like 32 1-byte chunks,
171
191
// each one from a ~1000-byte datagram, this limits us to having a
172
192
// maximum pathological over-allocation of about 32k bytes.
173
- if self . over_allocation > 32 * 1024 {
193
+ if self . over_allocation > DEFRAGMENTATION_THRESHOLD {
174
194
self . defragment ( )
175
195
}
176
196
}
@@ -208,6 +228,8 @@ impl Assembler {
208
228
}
209
229
}
210
230
231
+ const DEFRAGMENTATION_THRESHOLD : usize = 32 * 1024 ;
232
+
211
233
/// A chunk of data from the receive stream
212
234
#[ non_exhaustive]
213
235
#[ derive( Debug , PartialEq ) ]
@@ -231,6 +253,20 @@ struct Buffer {
231
253
over_allocation : usize ,
232
254
}
233
255
256
+ impl Buffer {
257
+ fn high_utilization ( & self ) -> bool {
258
+ self . bytes . len ( ) >= self . over_allocation
259
+ }
260
+
261
+ fn should_defragment ( & self , include_highly_utilized : bool ) -> bool {
262
+ if include_highly_utilized {
263
+ self . over_allocation > 0
264
+ } else {
265
+ !self . high_utilization ( )
266
+ }
267
+ }
268
+ }
269
+
234
270
impl Ord for Buffer {
235
271
// Invert ordering based on offset (max-heap, min offset first),
236
272
// prioritize longer chunks at the same offset.
0 commit comments