Skip to content

Commit

Permalink
fix: rduplicate move lock files
Browse files Browse the repository at this point in the history
  • Loading branch information
LingyuCoder committed Dec 16, 2024
1 parent b394916 commit b0177cc
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 110 deletions.
53 changes: 13 additions & 40 deletions crates/rspack_storage/src/pack/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,13 @@ impl ScopeManager {
let _ = match res {
Ok(new_scopes) => {
let _ = std::mem::replace(&mut *scopes_lock, new_scopes);
println!("save scopes success");
tx.send(Ok(()))
}
Err(e) => tx.send(Err(e)),
Err(e) => {
println!("save scopes failed {:?}", e);
tx.send(Err(e))
}
};
}));

Expand Down Expand Up @@ -230,21 +234,9 @@ async fn save_scopes(
) -> Result<ScopeMap> {
scopes.retain(|_, scope| scope.loaded());

for (_, scope) in scopes.iter_mut() {
strategy.before_all(scope)?;
}
strategy.before_all(&mut scopes).await?;

join_all(
scopes
.values()
.map(|scope| async move { strategy.before_write(scope).await })
.collect_vec(),
)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

let wrote_results = join_all(
let changed = join_all(
scopes
.values_mut()
.map(|scope| async move {
Expand All @@ -261,33 +253,14 @@ async fn save_scopes(
.into_iter()
.collect::<Result<Vec<WriteScopeResult>>>()?
.into_iter()
.collect_vec();
.fold(WriteScopeResult::default(), |mut acc, res| {
acc.extend(res);
acc
});

strategy.write_root_meta(root_meta).await?;

join_all(
scopes
.values()
.zip(wrote_results)
.map(|(scope, scope_wrote_result)| async move {
strategy
.after_write(
scope,
scope_wrote_result.wrote_files,
scope_wrote_result.removed_files,
)
.await
})
.collect_vec(),
)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

for (_, scope) in scopes.iter_mut() {
strategy.after_all(scope)?;
}

strategy.merge_changed(changed).await?;
strategy.after_all(&mut scopes).await?;
strategy
.clean_unused(root_meta, &scopes, root_options)
.await?;
Expand Down
14 changes: 4 additions & 10 deletions crates/rspack_storage/src/pack/strategy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ pub trait ScopeValidateStrategy {
async fn validate_packs(&self, scope: &mut PackScope) -> Result<ValidateResult>;
}

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct WriteScopeResult {
pub wrote_files: HashSet<Utf8PathBuf>,
pub removed_files: HashSet<Utf8PathBuf>,
Expand All @@ -154,15 +154,9 @@ pub type ScopeUpdate = HashMap<StorageItemKey, Option<StorageItemValue>>;
#[async_trait]
pub trait ScopeWriteStrategy {
fn update_scope(&self, scope: &mut PackScope, updates: ScopeUpdate) -> Result<()>;
fn before_all(&self, scope: &mut PackScope) -> Result<()>;
async fn before_write(&self, scope: &PackScope) -> Result<()>;
async fn before_all(&self, scopes: &mut HashMap<String, PackScope>) -> Result<()>;
async fn write_packs(&self, scope: &mut PackScope) -> Result<WriteScopeResult>;
async fn write_meta(&self, scope: &mut PackScope) -> Result<WriteScopeResult>;
async fn after_write(
&self,
scope: &PackScope,
wrote_files: HashSet<Utf8PathBuf>,
removed_files: HashSet<Utf8PathBuf>,
) -> Result<()>;
fn after_all(&self, scope: &mut PackScope) -> Result<()>;
async fn merge_changed(&self, changed: WriteScopeResult) -> Result<()>;
async fn after_all(&self, scopes: &mut HashMap<String, PackScope>) -> Result<()>;
}
129 changes: 119 additions & 10 deletions crates/rspack_storage/src/pack/strategy/split/handle_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,56 @@ use crate::{
PackFS,
};

pub async fn prepare_scope(
scope_path: &Utf8Path,
root: &Utf8Path,
temp_root: &Utf8Path,
fs: Arc<dyn PackFS>,
) -> Result<()> {
let temp_path = redirect_to_path(scope_path, root, temp_root)?;
fs.remove_dir(&temp_path).await?;
fs.ensure_dir(&temp_path).await?;
fs.ensure_dir(scope_path).await?;
Ok(())
}

pub async fn prepare_scope_dirs(
scopes: &HashMap<String, PackScope>,
root: &Utf8Path,
temp_root: &Utf8Path,
fs: Arc<dyn PackFS>,
) -> Result<()> {
let tasks = scopes.values().map(|scope| {
let fs = fs.clone();
let scope_path = scope.path.clone();
let root_path = root.to_path_buf();
let temp_root_path = temp_root.to_path_buf();
tokio::spawn(async move { prepare_scope(&scope_path, &root_path, &temp_root_path, fs).await })
.map_err(|e| error!("{e}"))
});

let res = join_all(tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

let mut errors = vec![];
for task_result in res {
if let Err(e) = task_result {
errors.push(format!("- {}", e));
}
}

if errors.is_empty() {
Ok(())
} else {
Err(error!(
"prepare scopes directories failed:\n{}",
errors.join("\n")
))
}
}

pub async fn remove_files(files: HashSet<Utf8PathBuf>, fs: Arc<dyn PackFS>) -> Result<()> {
let tasks = files.into_iter().map(|path| {
let fs = fs.clone();
Expand All @@ -35,7 +85,32 @@ pub async fn remove_files(files: HashSet<Utf8PathBuf>, fs: Arc<dyn PackFS>) -> R
}
}

pub async fn move_temp_files(
pub async fn write_lock(
lock_file: &str,
files: &HashSet<Utf8PathBuf>,
root: &Utf8Path,
temp_root: &Utf8Path,
fs: Arc<dyn PackFS>,
) -> Result<()> {
let lock_file = root.join(lock_file);
let mut lock_writer = fs.write_file(&lock_file).await?;
let mut contents = vec![temp_root.to_string()];
contents.extend(files.iter().map(|path| path.to_string()));

lock_writer
.write_all(contents.join("\n").as_bytes())
.await?;
lock_writer.flush().await?;
Ok(())
}

pub async fn remove_lock(lock_file: &str, root: &Utf8Path, fs: Arc<dyn PackFS>) -> Result<()> {
let lock_file = root.join(lock_file);
fs.remove_file(&lock_file).await?;
Ok(())
}

pub async fn move_files(
files: HashSet<Utf8PathBuf>,
root: &Utf8Path,
temp_root: &Utf8Path,
Expand Down Expand Up @@ -75,27 +150,29 @@ pub async fn move_temp_files(
}

if errors.is_empty() {
fs.remove_file(&lock_file).await?;

Ok(())
} else {
Err(error!("move temp files failed:\n{}", errors.join("\n")))
}
}

pub async fn recovery_move_lock(
async fn recovery_lock(
lock: &str,
root: &Utf8Path,
temp_root: &Utf8Path,
fs: Arc<dyn PackFS>,
) -> Result<()> {
let lock_file = root.join("move.lock");
) -> Result<Vec<String>> {
let lock_file = root.join(lock);
if !fs.exists(&lock_file).await? {
return Ok(());
return Ok(vec![]);
}
let mut lock_reader = fs.read_file(&lock_file).await?;
let lock_file_content = String::from_utf8(lock_reader.read_to_end().await?)
.map_err(|e| error!("parse utf8 failed: {}", e))?;
let files = lock_file_content.split("\n").collect::<Vec<_>>();
let files = lock_file_content
.split("\n")
.map(|i| i.to_owned())
.collect::<Vec<_>>();
fs.remove_file(&lock_file).await?;

if files.is_empty() {
Expand All @@ -106,8 +183,20 @@ pub async fn recovery_move_lock(
"incomplete storage due to `move.lock` from an unexpected directory"
));
}
move_temp_files(
files[1..]
Ok(files[1..].to_vec())
}

pub async fn recovery_move_lock(
root: &Utf8Path,
temp_root: &Utf8Path,
fs: Arc<dyn PackFS>,
) -> Result<()> {
let moving_files = recovery_lock("move.lock", root, temp_root, fs.clone()).await?;
if moving_files.is_empty() {
return Ok(());
}
move_files(
moving_files
.iter()
.map(Utf8PathBuf::from)
.collect::<HashSet<_>>(),
Expand All @@ -119,6 +208,26 @@ pub async fn recovery_move_lock(
Ok(())
}

pub async fn recovery_remove_lock(
root: &Utf8Path,
temp_root: &Utf8Path,
fs: Arc<dyn PackFS>,
) -> Result<()> {
let removing_files = recovery_lock("remove.lock", root, temp_root, fs.clone()).await?;
if removing_files.is_empty() {
return Ok(());
}
remove_files(
removing_files
.iter()
.map(Utf8PathBuf::from)
.collect::<HashSet<_>>(),
fs,
)
.await?;
Ok(())
}

pub async fn walk_dir(root: &Utf8Path, fs: Arc<dyn PackFS>) -> Result<HashSet<Utf8PathBuf>> {
let mut files = HashSet::default();
let mut stack = vec![root.to_owned()];
Expand Down
5 changes: 4 additions & 1 deletion crates/rspack_storage/src/pack/strategy/split/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ mod write_scope;

use std::{hash::Hasher, sync::Arc};

use handle_file::{clean_root, clean_scopes, clean_versions, recovery_move_lock};
use handle_file::{
clean_root, clean_scopes, clean_versions, recovery_move_lock, recovery_remove_lock,
};
use itertools::Itertools;
use rspack_error::{error, Result};
use rspack_paths::{Utf8Path, Utf8PathBuf};
Expand Down Expand Up @@ -58,6 +60,7 @@ impl SplitPackStrategy {
#[async_trait::async_trait]
impl RootStrategy for SplitPackStrategy {
async fn before_load(&self) -> Result<()> {
recovery_remove_lock(&self.root, &self.temp_root, self.fs.clone()).await?;
recovery_move_lock(&self.root, &self.temp_root, self.fs.clone()).await?;
Ok(())
}
Expand Down
39 changes: 29 additions & 10 deletions crates/rspack_storage/src/pack/strategy/split/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ use crate::pack::data::{Pack, PackContents, PackFileMeta, PackKeys, PackScope};
pub type PackIndexList = Vec<(usize, usize)>;
pub type PackInfoList<'a> = Vec<(&'a PackFileMeta, &'a Pack)>;

pub fn flag_scope_wrote(scope: &mut PackScope) {
let scope_meta = scope.meta.expect_value_mut();
for bucket in scope_meta.packs.iter_mut() {
for pack in bucket {
pack.wrote = true;
}
}
}

pub fn get_indexed_packs<'a>(
scope: &'a PackScope,
filter: Option<&dyn Fn(&'a Pack, &'a PackFileMeta) -> bool>,
Expand Down Expand Up @@ -73,11 +82,15 @@ pub mod test_pack_utils {
use rspack_paths::{AssertUtf8, Utf8Path, Utf8PathBuf};
use rustc_hash::FxHashMap as HashMap;

use super::flag_scope_wrote;
use crate::{
pack::{
data::{current_time, PackOptions, PackScope},
fs::PackFS,
strategy::{ScopeUpdate, ScopeWriteStrategy, SplitPackStrategy, WriteScopeResult},
strategy::{
split::handle_file::prepare_scope, ScopeUpdate, ScopeWriteStrategy, SplitPackStrategy,
WriteScopeResult,
},
},
PackBridgeFS,
};
Expand Down Expand Up @@ -226,15 +239,21 @@ pub mod test_pack_utils {
scope: &mut PackScope,
strategy: &SplitPackStrategy,
) -> Result<WriteScopeResult> {
let mut res = WriteScopeResult::default();
strategy.before_write(scope).await?;
res.extend(strategy.write_packs(scope).await?);
res.extend(strategy.write_meta(scope).await?);
strategy
.after_write(scope, res.wrote_files.clone(), res.removed_files.clone())
.await?;
strategy.after_all(scope)?;
Ok(res)
prepare_scope(
&scope.path,
&strategy.root,
&strategy.temp_root,
strategy.fs.clone(),
)
.await?;

let mut changed = WriteScopeResult::default();
changed.extend(strategy.write_packs(scope).await?);
changed.extend(strategy.write_meta(scope).await?);
strategy.merge_changed(changed.clone()).await?;
flag_scope_wrote(scope);

Ok(changed)
}

pub fn get_native_path(p: &str) -> Utf8PathBuf {
Expand Down
Loading

0 comments on commit b0177cc

Please sign in to comment.