Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement filesystem check #1103

Merged
merged 13 commits into from
Feb 3, 2023
7 changes: 6 additions & 1 deletion rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,11 @@ pub enum DeltaOperation {
predicate: Option<String>,
/// Target optimize size
target_size: DeltaDataTypeLong,
}, // TODO: Add more operations
},
#[serde(rename_all = "camelCase")]
/// Represents a `FileSystemCheck` operation
FileSystemCheck {},
// TODO: Add more operations
}

impl DeltaOperation {
Expand All @@ -502,6 +506,7 @@ impl DeltaOperation {
DeltaOperation::Write { .. } => "delta-rs.Write",
DeltaOperation::StreamingUpdate { .. } => "delta-rs.StreamingUpdate",
DeltaOperation::Optimize { .. } => "delta-rs.Optimize",
DeltaOperation::FileSystemCheck { .. } => "delta-rs.FileSystemCheck",
};
commit_info.insert(
"operation".to_string(),
Expand Down
178 changes: 178 additions & 0 deletions rust/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
//! Audit the Delta Table for active files that do not exist in the underlying filesystem and remove them.
//!
//! Active files are ones that have an add action in the log, but no corresponding remove action.
//! This operation creates a new transaction containing a remove action for each of the missing files.
//!
//! This can be used to repair tables where a data file has been deleted accidentally or
//! purposefully, if the file was corrupted.
//! # Example
//! ```rust ignore
//! let mut table = open_table("../path/to/table")?;
//! let (table, metrics) = FileSystemCheckBuilder::new(table.object_store(). table.state).await?;
//! ````
use crate::action::{Action, Add, DeltaOperation, Remove};
use crate::operations::transaction::commit;
use crate::storage::DeltaObjectStore;
use crate::table_state::DeltaTableState;
use crate::DeltaDataTypeVersion;
use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable};
use futures::future::BoxFuture;
use futures::StreamExt;
pub use object_store::path::Path;
use object_store::ObjectStore;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;

/// Audit the Delta Table's active files with the underlying file system.
/// See this module's documentaiton for more information
#[derive(Debug)]
pub struct FileSystemCheckBuilder {
/// A snapshot of the to-be-checked table's state
state: DeltaTableState,
/// Delta object store for handling data files
store: Arc<DeltaObjectStore>,
/// Don't remove actions to the table log. Just determine which files can be removed
dry_run: bool,
}

/// Details of the FSCK operation including which files were removed from the log
#[derive(Debug)]
pub struct FileSystemCheckMetrics {
/// Was this a dry run
pub dry_run: bool,
/// Files that wrere removed successfully
pub files_removed: Vec<String>,
}

struct FileSystemCheckPlan {
/// Version of the snapshot provided
version: DeltaDataTypeVersion,
/// Delta object store for handling data files
store: Arc<DeltaObjectStore>,
/// Files that no longer exists in undlying ObjectStore but have active add actions
pub files_to_remove: Vec<Add>,
}

impl FileSystemCheckBuilder {
/// Create a new [`FileSystemCheckBuilder`]
pub fn new(store: Arc<DeltaObjectStore>, state: DeltaTableState) -> Self {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we validate that the provided state is the latest version of the table? Since I don't think this is a valid operation for earlier states? or maybe that is an overall issue with the operations module?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think dry run of this operation would be helpful for diagnosing issues that arise from a combination of vacuum + time travel. If executed on a older version I do expect it to fail during the commit operation.
I can add a test to demonstrate that.

FileSystemCheckBuilder {
state,
store,
dry_run: false,
}
}

/// Only determine which add actions should be removed
pub fn with_dry_run(mut self, dry_run: bool) -> Self {
self.dry_run = dry_run;
self
}

async fn create_fsck_plan(&self) -> DeltaResult<FileSystemCheckPlan> {
let mut files_to_remove = Vec::new();
wjones127 marked this conversation as resolved.
Show resolved Hide resolved
let mut files_to_check: HashMap<String, &Add> = self.state.files().iter()
.map(|active| (active.path.to_owned(), active))
.collect();
let version = self.state.version();
let store = self.store.clone();

let mut files = self.store.list(None).await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delta Tables are allowed to have data files outside the table root. Currently, if that is the case, this operation will remove them all from the log. I see two options:

  1. Call HEAD on each file outside the root to check if they exist.
  2. Error if we detect any files outside the root.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Do any examples of tables that use absolute paths?
I just checked the protocol and noticed absolute paths are encoded as defined in rfc2396

It states the following

An absolute URI contains the name of the scheme being used (<scheme>)
   followed by a colon (":") and then a string (the <scheme-specific-
   part>) whose interpretation depends on the scheme.

So I can segment paths based of it the scheme is set on the path and then use the correct api call for each.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that's a good point! Checking for the scheme at the beginning makes sense.

We don't have any example tables right now. We don't generally support them in most operations, but we should try to keep them in mind. Hard to have example tables without setting up a public S3 bucket or something.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not comfortable with supporting absolute paths without having some integration tests. Currently the operation will fail when an absolute path is encountered.

while let Some(result) = files.next().await {
let file = result?;
files_to_check.remove(file.location.as_ref());
}

let files_to_remove: Vec<Add> = files_to_check
.into_iter()
.map(|(_, file)| file.to_owned())
.collect();

Ok(FileSystemCheckPlan {
files_to_remove,
version,
store,
})
}
}

impl FileSystemCheckPlan {
pub async fn execute(self) -> DeltaResult<FileSystemCheckMetrics> {
if self.files_to_remove.is_empty() {
return Ok(FileSystemCheckMetrics {
dry_run: false,
files_removed: Vec::new(),
});
}

let mut actions = Vec::with_capacity(self.files_to_remove.len());
let mut removed_file_paths = Vec::with_capacity(self.files_to_remove.len());
let version = self.version;
let store = &self.store;

for file in self.files_to_remove {
let deletion_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let deletion_time = deletion_time.as_millis() as DeltaDataTypeLong;
removed_file_paths.push(file.path.clone());
actions.push(Action::remove(Remove {
path: file.path,
deletion_timestamp: Some(deletion_time),
data_change: true,
extended_file_metadata: None,
partition_values: Some(file.partition_values),
size: Some(file.size),
tags: file.tags,
}));
}

if !actions.is_empty() {
Blajda marked this conversation as resolved.
Show resolved Hide resolved
commit(
store,
version + 1,
actions,
DeltaOperation::FileSystemCheck {},
None,
)
.await?;
}

Ok(FileSystemCheckMetrics {
dry_run: false,
files_removed: removed_file_paths,
})
}
}

impl std::future::IntoFuture for FileSystemCheckBuilder {
type Output = DeltaResult<(DeltaTable, FileSystemCheckMetrics)>;
type IntoFuture = BoxFuture<'static, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
let this = self;

Box::pin(async move {
let plan = this.create_fsck_plan().await?;
if this.dry_run {
return Ok((
DeltaTable::new_with_state(this.store, this.state),
FileSystemCheckMetrics {
files_removed: plan
.files_to_remove
.into_iter()
.map(|f| f.path)
.collect(),
dry_run: true,
},
));
}

let metrics = plan.execute().await?;
let mut table = DeltaTable::new_with_state(this.store, this.state);
table.update().await?;
Ok((table, metrics))
})
}
}
8 changes: 8 additions & 0 deletions rust/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
//! if the operation returns data as well.

use self::create::CreateBuilder;
use self::filesystem_check::FileSystemCheckBuilder;
use self::vacuum::VacuumBuilder;
use crate::builder::DeltaTableBuilder;
use crate::{DeltaResult, DeltaTable, DeltaTableError};

pub mod create;
pub mod filesystem_check;
pub mod transaction;
pub mod vacuum;

Expand Down Expand Up @@ -115,6 +117,12 @@ impl DeltaOps {
pub fn vacuum(self) -> VacuumBuilder {
VacuumBuilder::new(self.0.object_store(), self.0.state)
}

/// Audit active files with files present on the filesystem
#[must_use]
pub fn filesystem_check(self) -> FileSystemCheckBuilder {
FileSystemCheckBuilder::new(self.0.object_store(), self.0.state)
}
}

impl From<DeltaTable> for DeltaOps {
Expand Down
17 changes: 15 additions & 2 deletions rust/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ impl IntegrationContext {
StorageIntegration::Google => format!("gs://{}", &bucket),
StorageIntegration::Local => format!("file://{}", &bucket),
};

// the "storage_backend" will always point to the root ofg the object store.
// TODO should we provide the store via object_Store builders?
let store = match integration {
Expand Down Expand Up @@ -89,6 +88,12 @@ impl IntegrationContext {
}
}

pub fn table_builder(&self, table: TestTables) -> DeltaTableBuilder {
let name = table.as_name();
let table_uri = format!("{}/{}", self.root_uri(), &name);
DeltaTableBuilder::from_uri(table_uri).with_allow_http(true)
}

pub fn uri_for_table(&self, table: TestTables) -> String {
format!("{}/{}", self.root_uri(), table.as_name())
}
Expand Down Expand Up @@ -186,6 +191,7 @@ pub enum TestTables {
Simple,
SimpleCommit,
Golden,
Delta0_8_0Partitioned,
Custom(String),
}

Expand All @@ -204,6 +210,11 @@ impl TestTables {
.to_str()
.unwrap()
.to_owned(),
Self::Delta0_8_0Partitioned => data_path
.join("delta-0.8.0-partitioned")
.to_str()
.unwrap()
.to_owned(),
// the data path for upload does not apply to custom tables.
Self::Custom(_) => todo!(),
}
Expand All @@ -214,12 +225,14 @@ impl TestTables {
Self::Simple => "simple".into(),
Self::SimpleCommit => "simple_commit".into(),
Self::Golden => "golden".into(),
Self::Delta0_8_0Partitioned => "delta-0.8.0-partitioned".into(),
Self::Custom(name) => name.to_owned(),
}
}
}

fn set_env_if_not_set(key: impl AsRef<str>, value: impl AsRef<str>) {
/// Set environment variable if it is not set
pub fn set_env_if_not_set(key: impl AsRef<str>, value: impl AsRef<str>) {
if std::env::var(key.as_ref()).is_err() {
std::env::set_var(key.as_ref(), value.as_ref())
};
Expand Down
112 changes: 112 additions & 0 deletions rust/tests/command_filesystem_check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#![cfg(all(feature = "integration_test"))]
Blajda marked this conversation as resolved.
Show resolved Hide resolved

use deltalake::test_utils::{
set_env_if_not_set, IntegrationContext, StorageIntegration, TestResult, TestTables,
};
use deltalake::DeltaOps;
use deltalake::Path;
use serial_test::serial;

mod common;

#[tokio::test]
#[serial]
async fn test_filesystem_check_local() -> TestResult {
Ok(test_filesystem_check(StorageIntegration::Local).await?)
}

#[cfg(any(feature = "s3", feature = "s3-rustls"))]
#[tokio::test]
#[serial]
async fn test_filesystem_check_aws() -> TestResult {
set_env_if_not_set("AWS_S3_ALLOW_UNSAFE_RENAME", "true");
set_env_if_not_set("AWS_S3_LOCKING_PROVIDER", "none");
Blajda marked this conversation as resolved.
Show resolved Hide resolved
Ok(test_filesystem_check(StorageIntegration::Amazon).await?)
}

#[cfg(feature = "azure")]
#[tokio::test]
#[serial]
async fn test_filesystem_check_azure() -> TestResult {
Ok(test_filesystem_check(StorageIntegration::Microsoft).await?)
}

#[cfg(feature = "gcs")]
#[tokio::test]
#[serial]
async fn test_filesystem_check_gcp() -> TestResult {
Ok(test_filesystem_check(StorageIntegration::Google).await?)
}

async fn test_filesystem_check(storage: StorageIntegration) -> TestResult {
let context = IntegrationContext::new(storage)?;
context.load_table(TestTables::Simple).await?;
let file = "part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet";
let path = Path::from_iter([&TestTables::Simple.as_name(), file]);

// Delete an active file from underlying storage without an update to the log to simulate an external fault
context.object_store().delete(&path).await?;

let table = context.table_builder(TestTables::Simple).load().await?;
let version = table.state.version();
let active = table.state.files().len();

// Validate a Dry run does not mutate the table log and indentifies orphaned add actions
let op = DeltaOps::from(table);
let (table, metrics) = op.filesystem_check().with_dry_run(true).await?;
assert_eq!(version, table.state.version());
assert_eq!(active, table.state.files().len());
assert_eq!(vec![file.to_string()], metrics.files_removed);

// Validate a run updates the table version with proper remove actions
let op = DeltaOps::from(table);
let (table, metrics) = op.filesystem_check().await?;
assert_eq!(version + 1, table.state.version());
assert_eq!(active - 1, table.state.files().len());
assert_eq!(vec![file.to_string()], metrics.files_removed);

let remove = table.state.all_tombstones().get(file).unwrap();
assert_eq!(remove.data_change, true);

// An additonal run should return an empty list of orphaned actions
let op = DeltaOps::from(table);
let (table, metrics) = op.filesystem_check().await?;
assert_eq!(version + 1, table.state.version());
assert_eq!(active - 1, table.state.files().len());
assert!(metrics.files_removed.is_empty());

Ok(())
}

#[tokio::test]
#[serial]
async fn test_filesystem_check_partitioned() -> TestResult {
let storage = StorageIntegration::Local;
let context = IntegrationContext::new(storage)?;
context
.load_table(TestTables::Delta0_8_0Partitioned)
.await?;
let file = "year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet";
let path = Path::parse(TestTables::Delta0_8_0Partitioned.as_name() + "/" + file).unwrap();

// Delete an active file from underlying storage without an update to the log to simulate an external fault
context.object_store().delete(&path).await?;

let table = context
.table_builder(TestTables::Delta0_8_0Partitioned)
.load()
.await?;
let version = table.state.version();
let active = table.state.files().len();

// Validate a run updates the table version with proper remove actions
let op = DeltaOps::from(table);
let (table, metrics) = op.filesystem_check().await?;
assert_eq!(version + 1, table.state.version());
assert_eq!(active - 1, table.state.files().len());
assert_eq!(vec![file.to_string()], metrics.files_removed);

let remove = table.state.all_tombstones().get(file).unwrap();
assert_eq!(remove.data_change, true);
Ok(())
}