@@ -55,7 +55,7 @@ use crate::{
55
55
large:: indexer:: { EntryAddress , HashedEntryAddress } ,
56
56
region:: { GetCleanRegionHandle , RegionManager } ,
57
57
runtime:: Runtime ,
58
- Compression , Dev , Statistics ,
58
+ Compression , Dev , SharedIoSlice , Statistics ,
59
59
} ;
60
60
61
61
pub enum Submission < K , V >
@@ -157,7 +157,11 @@ where
157
157
let ( tx, rx) = flume:: unbounded ( ) ;
158
158
159
159
let buffer_size = config. buffer_pool_size / config. flushers ;
160
- let writer = create_writer ( buffer_size, device. region_size ( ) , device. region_size ( ) , metrics. clone ( ) ) ;
160
+
161
+ let buffer = IoBuffer :: new ( buffer_size) ;
162
+ let rotate_buffer = Some ( IoBuffer :: new ( buffer_size) ) ;
163
+
164
+ let writer = create_writer ( buffer, device. region_size ( ) , device. region_size ( ) , metrics. clone ( ) ) ;
161
165
let writer = Some ( writer) ;
162
166
163
167
let current_region_handle = region_manager. get_clean_region ( ) ;
@@ -168,8 +172,8 @@ where
168
172
writer,
169
173
tombstone_infos : vec ! [ ] ,
170
174
waiters : vec ! [ ] ,
175
+ rotate_buffer,
171
176
queue_init : None ,
172
- buffer_size,
173
177
submit_queue_size : submit_queue_size. clone ( ) ,
174
178
region_manager,
175
179
device,
@@ -220,13 +224,13 @@ where
220
224
}
221
225
222
226
fn create_writer (
223
- capacity : usize ,
227
+ buffer : IoBuffer ,
224
228
region_size : usize ,
225
229
current_region_remain : usize ,
226
230
metrics : Arc < Metrics > ,
227
231
) -> BatchWriter {
228
232
// TODO(MrCroxx): optimize buffer allocation.
229
- BatchWriter :: new ( IoBuffer :: new ( capacity ) , region_size, current_region_remain, metrics)
233
+ BatchWriter :: new ( buffer , region_size, current_region_remain, metrics)
230
234
}
231
235
232
236
#[ derive( Debug ) ]
@@ -245,6 +249,7 @@ struct IoTaskCtx {
245
249
handle : Option < GetCleanRegionHandle > ,
246
250
waiters : Vec < oneshot:: Sender < ( ) > > ,
247
251
init : Instant ,
252
+ io_slice : SharedIoSlice ,
248
253
}
249
254
250
255
struct Runner < K , V >
@@ -260,10 +265,14 @@ where
260
265
waiters : Vec < oneshot:: Sender < ( ) > > ,
261
266
queue_init : Option < Instant > ,
262
267
268
+ /// IoBuffer rotates between writer and inflight io task.
269
+ ///
270
+ /// Use this field to avoid allocation.
271
+ rotate_buffer : Option < IoBuffer > ,
272
+
263
273
submit_queue_size : Arc < AtomicUsize > ,
264
274
265
275
current_region_handle : GetCleanRegionHandle ,
266
- buffer_size : usize ,
267
276
268
277
region_manager : RegionManager ,
269
278
indexer : Indexer ,
@@ -333,8 +342,9 @@ where
333
342
let io_task = self . submit_io_task ( buf, batch, tombstone_infos, waiters, init) ;
334
343
self . io_tasks . push_back ( io_task) ;
335
344
345
+ let buffer = self . rotate_buffer . take ( ) . unwrap ( ) ;
336
346
let writer = create_writer (
337
- self . buffer_size ,
347
+ buffer ,
338
348
self . device . region_size ( ) ,
339
349
remain,
340
350
// self.current_region_state.remain,
@@ -346,11 +356,13 @@ where
346
356
347
357
tokio:: select! {
348
358
biased;
349
- IoTaskCtx { handle, waiters, init } = self . next_io_task_finish( ) => {
359
+ IoTaskCtx { handle, waiters, init, io_slice } = self . next_io_task_finish( ) => {
350
360
if let Some ( handle) = handle {
351
361
self . current_region_handle = handle;
352
362
}
353
363
self . handle_io_complete( waiters, init) ;
364
+ // `try_into_io_buffer` must return `Some(..)` here.
365
+ self . rotate_buffer = io_slice. try_into_io_buffer( ) ;
354
366
}
355
367
Ok ( submission) = rx. recv_async( ) => {
356
368
self . recv( submission) ;
@@ -554,7 +566,6 @@ where
554
566
}
555
567
} ;
556
568
557
- // let f: BoxFuture<'_, Result<Vec<CleanRegionState>>> = try_join_all(futures).boxed();
558
569
let f: BoxFuture < ' _ , Result < ( Vec < GetCleanRegionHandle > , ( ) ) > > = try_join ( try_join_all ( futures) , future) . boxed ( ) ;
559
570
let handle = self
560
571
. runtime
@@ -565,13 +576,15 @@ where
565
576
handle : states. pop ( ) ,
566
577
waiters,
567
578
init,
579
+ io_slice : shared,
568
580
} ,
569
581
Ok ( Err ( e) ) => {
570
582
tracing:: error!( ?e, "[lodc flusher]: io task error" ) ;
571
583
IoTaskCtx {
572
584
handle : None ,
573
585
waiters,
574
586
init,
587
+ io_slice : shared,
575
588
}
576
589
}
577
590
Err ( e) => {
@@ -580,13 +593,12 @@ where
580
593
handle : None ,
581
594
waiters,
582
595
init,
596
+ io_slice : shared,
583
597
}
584
598
}
585
599
} )
586
600
. boxed ( ) ;
587
601
588
- // self.io_tasks.push_back(handle);
589
-
590
602
handle
591
603
}
592
604
0 commit comments