Skip to content

Commit

Permalink
fix: vacuum dry run should carry on when target objects are missing (#โ€ฆ
Browse files Browse the repository at this point in the history
โ€ฆ16322)

* fix: vacuum dry run should carry on when target objects are missing

* tweak UT
  • Loading branch information
dantengsky authored Aug 25, 2024
1 parent 380b626 commit b5e6155
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 28 deletions.
34 changes: 24 additions & 10 deletions src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,30 @@ pub async fn do_vacuum_drop_table(
.metakey(Metakey::ContentLength)
.await?;

while let Some(de) = ds.try_next().await? {
let meta = de.metadata();
if EntryMode::FILE == meta.mode() {
list_files.push((
table_info.name.clone(),
de.name().to_string(),
meta.content_length(),
));
if list_files.len() >= dry_run_limit {
break;
loop {
let entry = ds.try_next().await;
match entry {
Ok(Some(de)) => {
let meta = de.metadata();
if EntryMode::FILE == meta.mode() {
list_files.push((
table_info.name.clone(),
de.name().to_string(),
meta.content_length(),
));
if list_files.len() >= dry_run_limit {
break;
}
}
}
Ok(None) => break,
Err(e) => {
if e.kind() == opendal::ErrorKind::NotFound {
info!("target not found, ignored. {}", e);
continue;
} else {
return Err(e.into());
}
}
}
}
Expand Down
149 changes: 131 additions & 18 deletions src/query/ee/tests/it/storages/fuse/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,13 @@ async fn test_do_vacuum_temporary_files() -> Result<()> {
}

mod test_accessor {
use std::future::Future;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;

use opendal::raw::oio;
use opendal::raw::oio::Entry;
use opendal::raw::MaybeSend;
use opendal::raw::OpDelete;
use opendal::raw::OpList;
use opendal::raw::RpDelete;
Expand All @@ -162,26 +166,64 @@ mod test_accessor {
#[derive(Debug)]
pub(crate) struct AccessorFaultyDeletion {
hit_delete: AtomicBool,
hit_stat: AtomicBool,
inject_delete_faulty: bool,
inject_stat_faulty: bool,
}

impl AccessorFaultyDeletion {
pub(crate) fn new() -> Self {
pub(crate) fn with_delete_fault() -> Self {
AccessorFaultyDeletion {
hit_delete: AtomicBool::new(false),
hit_stat: AtomicBool::new(false),
inject_delete_faulty: true,
inject_stat_faulty: false,
}
}

pub(crate) fn with_stat_fault() -> Self {
AccessorFaultyDeletion {
hit_delete: AtomicBool::new(false),
hit_stat: AtomicBool::new(false),
inject_delete_faulty: false,
inject_stat_faulty: true,
}
}

pub(crate) fn hit_delete_operation(&self) -> bool {
self.hit_delete.load(Ordering::Acquire)
}

pub(crate) fn hit_stat_operation(&self) -> bool {
self.hit_stat.load(Ordering::Acquire)
}
}

pub struct VecLister(Vec<String>);
impl oio::List for VecLister {
fn next(&mut self) -> impl Future<Output = opendal::Result<Option<Entry>>> + MaybeSend {
let me = &mut self.0;
async move {
Ok(me.pop().map(|v| {
Entry::new(
&v,
if v.ends_with('/') {
Metadata::new(EntryMode::DIR)
} else {
Metadata::new(EntryMode::FILE)
},
)
}))
}
}
}

impl Access for AccessorFaultyDeletion {
type Reader = ();
type BlockingReader = ();
type Writer = ();
type BlockingWriter = ();
type Lister = ();
type Lister = VecLister;
type BlockingLister = ();

fn info(&self) -> Arc<AccessorInfo> {
Expand All @@ -196,24 +238,49 @@ mod test_accessor {
}

async fn stat(&self, _path: &str, _args: OpStat) -> opendal::Result<RpStat> {
let stat = RpStat::new(Metadata::new(EntryMode::DIR));
Ok(stat)
self.hit_stat.store(true, Ordering::Release);
if self.inject_stat_faulty {
Err(opendal::Error::new(
opendal::ErrorKind::NotFound,
"does not matter (stat)",
))
} else {
let stat = if _path.ends_with('/') {
RpStat::new(Metadata::new(EntryMode::DIR))
} else {
RpStat::new(Metadata::new(EntryMode::FILE))
};
Ok(stat)
}
}

async fn delete(&self, _path: &str, _args: OpDelete) -> opendal::Result<RpDelete> {
self.hit_delete.store(true, Ordering::Release);
Err(opendal::Error::new(
opendal::ErrorKind::Unexpected,
"does not matter (delete)",
))
if self.inject_delete_faulty {
Err(opendal::Error::new(
opendal::ErrorKind::Unexpected,
"does not matter (delete)",
))
} else {
Ok(RpDelete::default())
}
}

async fn list(
&self,
_path: &str,
_args: OpList,
) -> opendal::Result<(RpList, Self::Lister)> {
Ok((RpList::default(), ()))
async fn list(&self, path: &str, _args: OpList) -> opendal::Result<(RpList, Self::Lister)> {
if self.inject_delete_faulty {
// While injecting faulty for delete operation, return an empty list;
// otherwise we need to impl other methods.
return Ok((RpList::default(), VecLister(vec![])));
};

Ok((
RpList::default(),
if path.ends_with('/') {
VecLister(vec!["a".to_owned(), "b".to_owned()])
} else {
VecLister(vec![])
},
))
}
}
}
Expand All @@ -234,7 +301,7 @@ async fn test_fuse_do_vacuum_drop_table_deletion_error() -> Result<()> {
// Note that:
// In real case, `Accessor::batch` will be called (instead of Accessor::delete)
// but all that we need here is let Operator::remove_all failed
let faulty_accessor = std::sync::Arc::new(AccessorFaultyDeletion::new());
let faulty_accessor = std::sync::Arc::new(AccessorFaultyDeletion::with_delete_fault());
let operator = OperatorBuilder::new(faulty_accessor.clone()).finish();

let tables = vec![(table_info, operator)];
Expand All @@ -259,7 +326,7 @@ async fn test_fuse_vacuum_drop_tables_in_parallel_with_deletion_error() -> Resul

// Case 1: non-parallel vacuum dropped tables
{
let faulty_accessor = std::sync::Arc::new(AccessorFaultyDeletion::new());
let faulty_accessor = std::sync::Arc::new(AccessorFaultyDeletion::with_delete_fault());
let operator = OperatorBuilder::new(faulty_accessor.clone()).finish();

let table = (table_info.clone(), operator);
Expand All @@ -277,7 +344,7 @@ async fn test_fuse_vacuum_drop_tables_in_parallel_with_deletion_error() -> Resul

// Case 2: parallel vacuum dropped tables
{
let faulty_accessor = std::sync::Arc::new(AccessorFaultyDeletion::new());
let faulty_accessor = std::sync::Arc::new(AccessorFaultyDeletion::with_delete_fault());
let operator = OperatorBuilder::new(faulty_accessor.clone()).finish();

let table = (table_info, operator);
Expand All @@ -294,6 +361,52 @@ async fn test_fuse_vacuum_drop_tables_in_parallel_with_deletion_error() -> Resul
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_fuse_vacuum_drop_tables_dry_run_with_obj_not_found_error() -> Result<()> {
let mut table_info = TableInfo::default();
table_info
.meta
.options
.insert(OPT_KEY_DATABASE_ID.to_owned(), "1".to_owned());

use test_accessor::AccessorFaultyDeletion;

// Case 1: non-parallel vacuum dry-run dropped tables
{
let faulty_accessor = Arc::new(AccessorFaultyDeletion::with_stat_fault());
let operator = OperatorBuilder::new(faulty_accessor.clone()).finish();

let table = (table_info.clone(), operator);

// with one table and one thread, `vacuum_drop_tables_by_table_info` will NOT run in parallel
let tables = vec![table];
let num_threads = 1;
let result = vacuum_drop_tables_by_table_info(num_threads, tables, Some(usize::MAX)).await;
// verify that accessor.stat() was called
assert!(faulty_accessor.hit_stat_operation());
// verify that errors of NotFound are swallowed
assert!(result.is_ok());
}

// Case 2: parallel vacuum dry-run dropped tables
{
let faulty_accessor = Arc::new(AccessorFaultyDeletion::with_stat_fault());
let operator = OperatorBuilder::new(faulty_accessor.clone()).finish();

let table = (table_info, operator);
// with 2 tables and 2 threads, `vacuum_drop_tables_by_table_info` will run in parallel (one table per thread)
let tables = vec![table.clone(), table];
let num_threads = 2;
let result = vacuum_drop_tables_by_table_info(num_threads, tables, Some(usize::MAX)).await;
// verify that accessor.stat() was called
assert!(faulty_accessor.hit_stat_operation());
// verify that errors of NotFound are swallowed
assert!(result.is_ok());
}

Ok(())
}

// fuse table on external storage is same as internal storage.
#[tokio::test(flavor = "multi_thread")]
async fn test_fuse_do_vacuum_drop_table_external_storage() -> Result<()> {
Expand All @@ -310,7 +423,7 @@ async fn test_fuse_do_vacuum_drop_table_external_storage() -> Result<()> {
// Accessor passed in does NOT matter in this case, `do_vacuum_drop_table` should
// return Ok(None) before accessor is used.
use test_accessor::AccessorFaultyDeletion;
let accessor = std::sync::Arc::new(AccessorFaultyDeletion::new());
let accessor = std::sync::Arc::new(AccessorFaultyDeletion::with_delete_fault());
let operator = OperatorBuilder::new(accessor.clone()).finish();

let tables = vec![(table_info, operator)];
Expand Down

0 comments on commit b5e6155

Please sign in to comment.