Skip to content

Commit

Permalink
Small refactoring using async.
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Nov 12, 2021
1 parent 7234bef commit 0de1195
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 33 deletions.
6 changes: 2 additions & 4 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,10 +465,8 @@ impl IndexWriter {
}

/// Detects and removes the files that are not used by the index anymore.
pub fn garbage_collect_files(
&self,
) -> impl Future<Output = crate::Result<GarbageCollectionResult>> {
self.segment_updater.schedule_garbage_collect()
pub async fn garbage_collect_files(&self) -> crate::Result<GarbageCollectionResult> {
self.segment_updater.schedule_garbage_collect().await
}

/// Deletes all documents from the index
Expand Down
60 changes: 31 additions & 29 deletions src/indexer/segment_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,37 +351,39 @@ impl SegmentUpdater {
*self.merge_policy.write().unwrap() = arc_merge_policy;
}

fn schedule_future<T: 'static + Send, F: Future<Output = crate::Result<T>> + 'static + Send>(
async fn schedule_task<
T: 'static + Send,
F: Future<Output = crate::Result<T>> + 'static + Send,
>(
&self,
f: F,
) -> impl Future<Output = crate::Result<T>> {
let (sender, receiver) = oneshot::channel();
if self.is_alive() {
self.pool.spawn_ok(async move {
let _ = sender.send(f.await);
});
} else {
let _ = sender.send(Err(crate::TantivyError::SystemError(
task: F,
) -> crate::Result<T> {
if !self.is_alive() {
return Err(crate::TantivyError::SystemError(
"Segment updater killed".to_string(),
)));
));
}
receiver.unwrap_or_else(|_| {
let (sender, receiver) = oneshot::channel();
self.pool.spawn_ok(async move {
let task_result = task.await;
let _ = sender.send(task_result);
});
let task_result = receiver.await;
task_result.unwrap_or_else(|_| {
let err_msg =
"A segment_updater future did not success. This should never happen.".to_string();
Err(crate::TantivyError::SystemError(err_msg))
})
}

pub fn schedule_add_segment(
&self,
segment_entry: SegmentEntry,
) -> impl Future<Output = crate::Result<()>> {
pub async fn schedule_add_segment(&self, segment_entry: SegmentEntry) -> crate::Result<()> {
let segment_updater = self.clone();
self.schedule_future(async move {
self.schedule_task(async move {
segment_updater.segment_manager.add_segment(segment_entry);
segment_updater.consider_merge_options().await;
Ok(())
})
.await
}

/// Orders `SegmentManager` to remove all segments
Expand Down Expand Up @@ -448,11 +450,9 @@ impl SegmentUpdater {
Ok(())
}

pub fn schedule_garbage_collect(
&self,
) -> impl Future<Output = crate::Result<GarbageCollectionResult>> {
pub async fn schedule_garbage_collect(&self) -> crate::Result<GarbageCollectionResult> {
let garbage_collect_future = garbage_collect_files(self.clone());
self.schedule_future(garbage_collect_future)
self.schedule_task(garbage_collect_future).await
}

/// List the files that are useful to the index.
Expand All @@ -470,20 +470,21 @@ impl SegmentUpdater {
files
}

pub fn schedule_commit(
pub async fn schedule_commit(
&self,
opstamp: Opstamp,
payload: Option<String>,
) -> impl Future<Output = crate::Result<()>> {
) -> crate::Result<()> {
let segment_updater: SegmentUpdater = self.clone();
self.schedule_future(async move {
self.schedule_task(async move {
let segment_entries = segment_updater.purge_deletes(opstamp)?;
segment_updater.segment_manager.commit(segment_entries);
segment_updater.save_metas(opstamp, payload)?;
let _ = garbage_collect_files(segment_updater.clone()).await;
segment_updater.consider_merge_options().await;
Ok(())
})
.await
}

fn store_meta(&self, index_meta: &IndexMeta) {
Expand Down Expand Up @@ -611,14 +612,14 @@ impl SegmentUpdater {
}
}

fn end_merge(
async fn end_merge(
&self,
merge_operation: MergeOperation,
mut after_merge_segment_entry: SegmentEntry,
) -> impl Future<Output = crate::Result<SegmentMeta>> {
) -> crate::Result<SegmentMeta> {
let segment_updater = self.clone();
let after_merge_segment_meta = after_merge_segment_entry.meta().clone();
let end_merge_future = self.schedule_future(async move {
self.schedule_task(async move {
info!("End merge {:?}", after_merge_segment_entry.meta());
{
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
Expand Down Expand Up @@ -661,8 +662,9 @@ impl SegmentUpdater {

let _ = garbage_collect_files(segment_updater).await;
Ok(())
});
end_merge_future.map_ok(|_| after_merge_segment_meta)
})
.await?;
Ok(after_merge_segment_meta)
}

/// Wait for current merging threads.
Expand Down

0 comments on commit 0de1195

Please sign in to comment.