Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement filesystem check #1103

Merged
merged 13 commits into from
Feb 3, 2023
7 changes: 6 additions & 1 deletion rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,11 @@ pub enum DeltaOperation {
predicate: Option<String>,
/// Target optimize size
target_size: DeltaDataTypeLong,
}, // TODO: Add more operations
},
#[serde(rename_all = "camelCase")]
/// Represents a `FileSystemCheck` operation
FileSystemCheck {},
// TODO: Add more operations
}

impl DeltaOperation {
Expand All @@ -502,6 +506,7 @@ impl DeltaOperation {
DeltaOperation::Write { .. } => "delta-rs.Write",
DeltaOperation::StreamingUpdate { .. } => "delta-rs.StreamingUpdate",
DeltaOperation::Optimize { .. } => "delta-rs.Optimize",
DeltaOperation::FileSystemCheck { .. } => "delta-rs.FileSystemCheck",
};
commit_info.insert(
"operation".to_string(),
Expand Down
221 changes: 221 additions & 0 deletions rust/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
//! Audit the Delta Table for active files that do not exist in the underlying filesystem and remove them.
//!
//! Active files are ones that have an add action in the log, but no corresponding remove action.
//! This operation creates a new transaction containing a remove action for each of the missing files.
//!
//! This can be used to repair tables where a data file has been deleted accidentally or
//! purposefully, if the file was corrupted.
//!
//! # Example
//! ```rust ignore
//! let mut table = open_table("../path/to/table")?;
//! let (table, metrics) = FileSystemCheckBuilder::new(table.object_store(), table.state).await?;
//! ````
use crate::action::{Action, Add, DeltaOperation, Remove};
use crate::operations::transaction::commit;
use crate::storage::DeltaObjectStore;
use crate::table_state::DeltaTableState;
use crate::DeltaDataTypeVersion;
use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError};
use futures::future::BoxFuture;
use futures::StreamExt;
pub use object_store::path::Path;
use object_store::ObjectStore;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use url::{ParseError, Url};

/// Audit the Delta Table's active files with the underlying file system.
/// See this module's documentaiton for more information
#[derive(Debug)]
pub struct FileSystemCheckBuilder {
/// A snapshot of the to-be-checked table's state
state: DeltaTableState,
/// Delta object store for handling data files
store: Arc<DeltaObjectStore>,
/// Don't remove actions to the table log. Just determine which files can be removed
dry_run: bool,
}

/// Details of the FSCK operation including which files were removed from the log
#[derive(Debug)]
pub struct FileSystemCheckMetrics {
/// Was this a dry run
pub dry_run: bool,
/// Files that wrere removed successfully
pub files_removed: Vec<String>,
}

struct FileSystemCheckPlan {
/// Version of the snapshot provided
version: DeltaDataTypeVersion,
/// Delta object store for handling data files
store: Arc<DeltaObjectStore>,
/// Files that no longer exists in undlying ObjectStore but have active add actions
pub files_to_remove: Vec<Add>,
}

fn is_absolute_path(path: &str) -> DeltaResult<bool> {
match Url::parse(path) {
Ok(_) => Ok(true),
Err(ParseError::RelativeUrlWithoutBase) => Ok(false),
Err(_) => Err(DeltaTableError::Generic(format!(
"Unable to parse path: {}",
&path
))),
}
}
Comment on lines +61 to +70
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will not work for local paths. One pattern we apply in object_store and here is to try and cannonicalize the path and if it fails proceed with url parsing. e.g.

if let Ok(path) = std::fs::canonicalize(table_uri) {
return Url::from_directory_path(path)
.map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string()));
}

Copy link
Collaborator Author

@Blajda Blajda Feb 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow on why it would not work on local paths. Paths in the Delta log must follow rfc2396 which states absolute paths must have a scheme.
Do you have a example that would cause the unit test for this function to fail?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well .. in that case I am wrong! If that paths are required to have a scheme, then url parsing should always work for valid paths ... The windows case is also covered in your tests, so all is well :).


impl FileSystemCheckBuilder {
/// Create a new [`FileSystemCheckBuilder`]
pub fn new(store: Arc<DeltaObjectStore>, state: DeltaTableState) -> Self {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we validate that the provided state is the latest version of the table? Since I don't think this is a valid operation for earlier states? or maybe that is an overall issue with the operations module?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think dry run of this operation would be helpful for diagnosing issues that arise from a combination of vacuum + time travel. If executed on a older version I do expect it to fail during the commit operation.
I can add a test to demonstrate that.

FileSystemCheckBuilder {
state,
store,
dry_run: false,
}
}

/// Only determine which add actions should be removed. A dry run will not commit actions to the Delta log
pub fn with_dry_run(mut self, dry_run: bool) -> Self {
self.dry_run = dry_run;
self
}

async fn create_fsck_plan(&self) -> DeltaResult<FileSystemCheckPlan> {
let mut files_relative: HashMap<&str, &Add> =
HashMap::with_capacity(self.state.files().len());
let version = self.state.version();
let store = self.store.clone();

for active in self.state.files() {
if is_absolute_path(&active.path)? {
return Err(DeltaTableError::Generic(
"Filesystem check does not support absolute paths".to_string(),
));
} else {
files_relative.insert(&active.path, active);
}
}

let mut files = self.store.list(None).await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delta Tables are allowed to have data files outside the table root. Currently, if that is the case, this operation will remove them all from the log. I see two options:

  1. Call HEAD on each file outside the root to check if they exist.
  2. Error if we detect any files outside the root.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Do any examples of tables that use absolute paths?
I just checked the protocol and noticed absolute paths are encoded as defined in rfc2396

It states the following

An absolute URI contains the name of the scheme being used (<scheme>)
   followed by a colon (":") and then a string (the <scheme-specific-
   part>) whose interpretation depends on the scheme.

So I can segment paths based of it the scheme is set on the path and then use the correct api call for each.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that's a good point! Checking for the scheme at the beginning makes sense.

We don't have any example tables right now. We don't generally support them in most operations, but we should try to keep them in mind. Hard to have example tables without setting up a public S3 bucket or something.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not comfortable with supporting absolute paths without having some integration tests. Currently the operation will fail when an absolute path is encountered.

while let Some(result) = files.next().await {
let file = result?;
files_relative.remove(file.location.as_ref());

if files_relative.is_empty() {
break;
}
}

let files_to_remove: Vec<Add> = files_relative
.into_values()
.map(|file| file.to_owned())
.collect();

Ok(FileSystemCheckPlan {
files_to_remove,
version,
store,
})
}
}

impl FileSystemCheckPlan {
pub async fn execute(self) -> DeltaResult<FileSystemCheckMetrics> {
if self.files_to_remove.is_empty() {
return Ok(FileSystemCheckMetrics {
dry_run: false,
files_removed: Vec::new(),
});
}

let mut actions = Vec::with_capacity(self.files_to_remove.len());
let mut removed_file_paths = Vec::with_capacity(self.files_to_remove.len());
let version = self.version;
let store = &self.store;

for file in self.files_to_remove {
let deletion_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let deletion_time = deletion_time.as_millis() as DeltaDataTypeLong;
removed_file_paths.push(file.path.clone());
actions.push(Action::remove(Remove {
path: file.path,
deletion_timestamp: Some(deletion_time),
data_change: true,
extended_file_metadata: None,
partition_values: Some(file.partition_values),
size: Some(file.size),
tags: file.tags,
}));
}

commit(
store,
version + 1,
actions,
DeltaOperation::FileSystemCheck {},
None,
)
.await?;

Ok(FileSystemCheckMetrics {
dry_run: false,
files_removed: removed_file_paths,
})
}
}

impl std::future::IntoFuture for FileSystemCheckBuilder {
type Output = DeltaResult<(DeltaTable, FileSystemCheckMetrics)>;
type IntoFuture = BoxFuture<'static, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
let this = self;

Box::pin(async move {
let plan = this.create_fsck_plan().await?;
if this.dry_run {
return Ok((
DeltaTable::new_with_state(this.store, this.state),
FileSystemCheckMetrics {
files_removed: plan.files_to_remove.into_iter().map(|f| f.path).collect(),
dry_run: true,
},
));
}

let metrics = plan.execute().await?;
let mut table = DeltaTable::new_with_state(this.store, this.state);
table.update().await?;
Ok((table, metrics))
})
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn absolute_path() {
assert!(!is_absolute_path(
"part-00003-53f42606-6cda-4f13-8d07-599a21197296-c000.snappy.parquet"
)
.unwrap());
assert!(!is_absolute_path(
"x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet"
)
.unwrap());

assert!(is_absolute_path("abfss://container@account_name.blob.core.windows.net/full/part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet").unwrap());
assert!(is_absolute_path("file:///C:/my_table/windows.parquet").unwrap());
assert!(is_absolute_path("file:///home/my_table/unix.parquet").unwrap());
assert!(is_absolute_path("s3://container/path/file.parquet").unwrap());
assert!(is_absolute_path("gs://container/path/file.parquet").unwrap());
assert!(is_absolute_path("scheme://table/file.parquet").unwrap());
}
}
8 changes: 8 additions & 0 deletions rust/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
//! if the operation returns data as well.

use self::create::CreateBuilder;
use self::filesystem_check::FileSystemCheckBuilder;
use self::vacuum::VacuumBuilder;
use crate::builder::DeltaTableBuilder;
use crate::{DeltaResult, DeltaTable, DeltaTableError};

pub mod create;
pub mod filesystem_check;
pub mod transaction;
pub mod vacuum;

Expand Down Expand Up @@ -115,6 +117,12 @@ impl DeltaOps {
pub fn vacuum(self) -> VacuumBuilder {
VacuumBuilder::new(self.0.object_store(), self.0.state)
}

/// Audit active files with files present on the filesystem
#[must_use]
pub fn filesystem_check(self) -> FileSystemCheckBuilder {
FileSystemCheckBuilder::new(self.0.object_store(), self.0.state)
}
}

impl From<DeltaTable> for DeltaOps {
Expand Down
17 changes: 15 additions & 2 deletions rust/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ impl IntegrationContext {
StorageIntegration::Google => format!("gs://{}", &bucket),
StorageIntegration::Local => format!("file://{}", &bucket),
};

// the "storage_backend" will always point to the root ofg the object store.
// TODO should we provide the store via object_Store builders?
let store = match integration {
Expand Down Expand Up @@ -89,6 +88,12 @@ impl IntegrationContext {
}
}

pub fn table_builder(&self, table: TestTables) -> DeltaTableBuilder {
let name = table.as_name();
let table_uri = format!("{}/{}", self.root_uri(), &name);
DeltaTableBuilder::from_uri(table_uri).with_allow_http(true)
}

pub fn uri_for_table(&self, table: TestTables) -> String {
format!("{}/{}", self.root_uri(), table.as_name())
}
Expand Down Expand Up @@ -186,6 +191,7 @@ pub enum TestTables {
Simple,
SimpleCommit,
Golden,
Delta0_8_0Partitioned,
Custom(String),
}

Expand All @@ -204,6 +210,11 @@ impl TestTables {
.to_str()
.unwrap()
.to_owned(),
Self::Delta0_8_0Partitioned => data_path
.join("delta-0.8.0-partitioned")
.to_str()
.unwrap()
.to_owned(),
// the data path for upload does not apply to custom tables.
Self::Custom(_) => todo!(),
}
Expand All @@ -214,12 +225,14 @@ impl TestTables {
Self::Simple => "simple".into(),
Self::SimpleCommit => "simple_commit".into(),
Self::Golden => "golden".into(),
Self::Delta0_8_0Partitioned => "delta-0.8.0-partitioned".into(),
Self::Custom(name) => name.to_owned(),
}
}
}

fn set_env_if_not_set(key: impl AsRef<str>, value: impl AsRef<str>) {
/// Set environment variable if it is not set
pub fn set_env_if_not_set(key: impl AsRef<str>, value: impl AsRef<str>) {
if std::env::var(key.as_ref()).is_err() {
std::env::set_var(key.as_ref(), value.as_ref())
};
Expand Down
Loading