-
Notifications
You must be signed in to change notification settings - Fork 403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement filesystem check #1103
Changes from 8 commits
5d32c56
0747602
e944633
18e7974
e90ba53
d42eb78
48a460a
85886a5
3dc4b67
7cbe4fa
558e890
46841cc
9d19ef3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
//! Audit the Delta Table for active files that do not exist in the underlying filesystem and remove them. | ||
//! | ||
//! Active files are ones that have an add action in the log, but no corresponding remove action. | ||
//! This operation creates a new transaction containing a remove action for each of the missing files. | ||
//! | ||
//! This can be used to repair tables where a data file has been deleted accidentally or | ||
//! purposefully, if the file was corrupted. | ||
//! # Example | ||
//! ```rust ignore | ||
//! let mut table = open_table("../path/to/table")?; | ||
//! let (table, metrics) = FileSystemCheckBuilder::new(table.object_store(). table.state).await?; | ||
//! ```` | ||
use crate::action::{Action, Add, DeltaOperation, Remove}; | ||
use crate::operations::transaction::commit; | ||
use crate::storage::DeltaObjectStore; | ||
use crate::table_state::DeltaTableState; | ||
use crate::DeltaDataTypeVersion; | ||
use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable}; | ||
use futures::future::BoxFuture; | ||
use futures::StreamExt; | ||
pub use object_store::path::Path; | ||
use object_store::ObjectStore; | ||
use std::collections::HashMap; | ||
use std::fmt::Debug; | ||
use std::sync::Arc; | ||
use std::time::SystemTime; | ||
use std::time::UNIX_EPOCH; | ||
|
||
/// Audit the Delta Table's active files with the underlying file system. | ||
/// See this module's documentaiton for more information | ||
#[derive(Debug)] | ||
pub struct FileSystemCheckBuilder { | ||
/// A snapshot of the to-be-checked table's state | ||
state: DeltaTableState, | ||
/// Delta object store for handling data files | ||
store: Arc<DeltaObjectStore>, | ||
/// Don't remove actions to the table log. Just determine which files can be removed | ||
dry_run: bool, | ||
} | ||
|
||
/// Details of the FSCK operation including which files were removed from the log | ||
#[derive(Debug)] | ||
pub struct FileSystemCheckMetrics { | ||
/// Was this a dry run | ||
pub dry_run: bool, | ||
/// Files that wrere removed successfully | ||
pub files_removed: Vec<String>, | ||
} | ||
|
||
struct FileSystemCheckPlan { | ||
/// Version of the snapshot provided | ||
version: DeltaDataTypeVersion, | ||
/// Delta object store for handling data files | ||
store: Arc<DeltaObjectStore>, | ||
/// Files that no longer exists in undlying ObjectStore but have active add actions | ||
pub files_to_remove: Vec<Add>, | ||
} | ||
|
||
impl FileSystemCheckBuilder { | ||
/// Create a new [`FileSystemCheckBuilder`] | ||
pub fn new(store: Arc<DeltaObjectStore>, state: DeltaTableState) -> Self { | ||
FileSystemCheckBuilder { | ||
state, | ||
store, | ||
dry_run: false, | ||
} | ||
} | ||
|
||
/// Only determine which add actions should be removed | ||
pub fn with_dry_run(mut self, dry_run: bool) -> Self { | ||
self.dry_run = dry_run; | ||
self | ||
} | ||
|
||
async fn create_fsck_plan(&self) -> DeltaResult<FileSystemCheckPlan> { | ||
let mut files_to_remove = Vec::new(); | ||
wjones127 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let mut files_to_check: HashMap<String, &Add> = self.state.files().iter() | ||
.map(|active| (active.path.to_owned(), active)) | ||
.collect(); | ||
let version = self.state.version(); | ||
let store = self.store.clone(); | ||
|
||
let mut files = self.store.list(None).await?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Delta Tables are allowed to have data files outside the table root. Currently, if that is the case, this operation will remove them all from the log. I see two options:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense. Do any examples of tables that use absolute paths? It states the following
So I can segment paths based of it the scheme is set on the path and then use the correct api call for each. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh that's a good point! Checking for the scheme at the beginning makes sense. We don't have any example tables right now. We don't generally support them in most operations, but we should try to keep them in mind. Hard to have example tables without setting up a public S3 bucket or something. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not comfortable with supporting absolute paths without having some integration tests. Currently the operation will fail when an absolute path is encountered. |
||
while let Some(result) = files.next().await { | ||
let file = result?; | ||
files_to_check.remove(file.location.as_ref()); | ||
} | ||
|
||
let files_to_remove: Vec<Add> = files_to_check | ||
.into_iter() | ||
.map(|(_, file)| file.to_owned()) | ||
.collect(); | ||
|
||
Ok(FileSystemCheckPlan { | ||
files_to_remove, | ||
version, | ||
store, | ||
}) | ||
} | ||
} | ||
|
||
impl FileSystemCheckPlan { | ||
pub async fn execute(self) -> DeltaResult<FileSystemCheckMetrics> { | ||
if self.files_to_remove.is_empty() { | ||
return Ok(FileSystemCheckMetrics { | ||
dry_run: false, | ||
files_removed: Vec::new(), | ||
}); | ||
} | ||
|
||
let mut actions = Vec::with_capacity(self.files_to_remove.len()); | ||
let mut removed_file_paths = Vec::with_capacity(self.files_to_remove.len()); | ||
let version = self.version; | ||
let store = &self.store; | ||
|
||
for file in self.files_to_remove { | ||
let deletion_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); | ||
let deletion_time = deletion_time.as_millis() as DeltaDataTypeLong; | ||
removed_file_paths.push(file.path.clone()); | ||
actions.push(Action::remove(Remove { | ||
path: file.path, | ||
deletion_timestamp: Some(deletion_time), | ||
data_change: true, | ||
extended_file_metadata: None, | ||
partition_values: Some(file.partition_values), | ||
size: Some(file.size), | ||
tags: file.tags, | ||
})); | ||
} | ||
|
||
if !actions.is_empty() { | ||
Blajda marked this conversation as resolved.
Show resolved
Hide resolved
|
||
commit( | ||
store, | ||
version + 1, | ||
actions, | ||
DeltaOperation::FileSystemCheck {}, | ||
None, | ||
) | ||
.await?; | ||
} | ||
|
||
Ok(FileSystemCheckMetrics { | ||
dry_run: false, | ||
files_removed: removed_file_paths, | ||
}) | ||
} | ||
} | ||
|
||
impl std::future::IntoFuture for FileSystemCheckBuilder { | ||
type Output = DeltaResult<(DeltaTable, FileSystemCheckMetrics)>; | ||
type IntoFuture = BoxFuture<'static, Self::Output>; | ||
|
||
fn into_future(self) -> Self::IntoFuture { | ||
let this = self; | ||
|
||
Box::pin(async move { | ||
let plan = this.create_fsck_plan().await?; | ||
if this.dry_run { | ||
return Ok(( | ||
DeltaTable::new_with_state(this.store, this.state), | ||
FileSystemCheckMetrics { | ||
files_removed: plan | ||
.files_to_remove | ||
.into_iter() | ||
.map(|f| f.path) | ||
.collect(), | ||
dry_run: true, | ||
}, | ||
)); | ||
} | ||
|
||
let metrics = plan.execute().await?; | ||
let mut table = DeltaTable::new_with_state(this.store, this.state); | ||
table.update().await?; | ||
Ok((table, metrics)) | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
#![cfg(all(feature = "integration_test"))] | ||
Blajda marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
use deltalake::test_utils::{ | ||
set_env_if_not_set, IntegrationContext, StorageIntegration, TestResult, TestTables, | ||
}; | ||
use deltalake::DeltaOps; | ||
use deltalake::Path; | ||
use serial_test::serial; | ||
|
||
mod common; | ||
|
||
#[tokio::test] | ||
#[serial] | ||
async fn test_filesystem_check_local() -> TestResult { | ||
Ok(test_filesystem_check(StorageIntegration::Local).await?) | ||
} | ||
|
||
#[cfg(any(feature = "s3", feature = "s3-rustls"))] | ||
#[tokio::test] | ||
#[serial] | ||
async fn test_filesystem_check_aws() -> TestResult { | ||
set_env_if_not_set("AWS_S3_ALLOW_UNSAFE_RENAME", "true"); | ||
set_env_if_not_set("AWS_S3_LOCKING_PROVIDER", "none"); | ||
Blajda marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Ok(test_filesystem_check(StorageIntegration::Amazon).await?) | ||
} | ||
|
||
#[cfg(feature = "azure")] | ||
#[tokio::test] | ||
#[serial] | ||
async fn test_filesystem_check_azure() -> TestResult { | ||
Ok(test_filesystem_check(StorageIntegration::Microsoft).await?) | ||
} | ||
|
||
#[cfg(feature = "gcs")] | ||
#[tokio::test] | ||
#[serial] | ||
async fn test_filesystem_check_gcp() -> TestResult { | ||
Ok(test_filesystem_check(StorageIntegration::Google).await?) | ||
} | ||
|
||
async fn test_filesystem_check(storage: StorageIntegration) -> TestResult { | ||
let context = IntegrationContext::new(storage)?; | ||
context.load_table(TestTables::Simple).await?; | ||
let file = "part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet"; | ||
let path = Path::from_iter([&TestTables::Simple.as_name(), file]); | ||
|
||
// Delete an active file from underlying storage without an update to the log to simulate an external fault | ||
context.object_store().delete(&path).await?; | ||
|
||
let table = context.table_builder(TestTables::Simple).load().await?; | ||
let version = table.state.version(); | ||
let active = table.state.files().len(); | ||
|
||
// Validate a Dry run does not mutate the table log and indentifies orphaned add actions | ||
let op = DeltaOps::from(table); | ||
let (table, metrics) = op.filesystem_check().with_dry_run(true).await?; | ||
assert_eq!(version, table.state.version()); | ||
assert_eq!(active, table.state.files().len()); | ||
assert_eq!(vec![file.to_string()], metrics.files_removed); | ||
|
||
// Validate a run updates the table version with proper remove actions | ||
let op = DeltaOps::from(table); | ||
let (table, metrics) = op.filesystem_check().await?; | ||
assert_eq!(version + 1, table.state.version()); | ||
assert_eq!(active - 1, table.state.files().len()); | ||
assert_eq!(vec![file.to_string()], metrics.files_removed); | ||
|
||
let remove = table.state.all_tombstones().get(file).unwrap(); | ||
assert_eq!(remove.data_change, true); | ||
|
||
// An additonal run should return an empty list of orphaned actions | ||
let op = DeltaOps::from(table); | ||
let (table, metrics) = op.filesystem_check().await?; | ||
assert_eq!(version + 1, table.state.version()); | ||
assert_eq!(active - 1, table.state.files().len()); | ||
assert!(metrics.files_removed.is_empty()); | ||
|
||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
#[serial] | ||
async fn test_filesystem_check_partitioned() -> TestResult { | ||
let storage = StorageIntegration::Local; | ||
let context = IntegrationContext::new(storage)?; | ||
context | ||
.load_table(TestTables::Delta0_8_0Partitioned) | ||
.await?; | ||
let file = "year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet"; | ||
let path = Path::parse(TestTables::Delta0_8_0Partitioned.as_name() + "/" + file).unwrap(); | ||
|
||
// Delete an active file from underlying storage without an update to the log to simulate an external fault | ||
context.object_store().delete(&path).await?; | ||
|
||
let table = context | ||
.table_builder(TestTables::Delta0_8_0Partitioned) | ||
.load() | ||
.await?; | ||
let version = table.state.version(); | ||
let active = table.state.files().len(); | ||
|
||
// Validate a run updates the table version with proper remove actions | ||
let op = DeltaOps::from(table); | ||
let (table, metrics) = op.filesystem_check().await?; | ||
assert_eq!(version + 1, table.state.version()); | ||
assert_eq!(active - 1, table.state.files().len()); | ||
assert_eq!(vec![file.to_string()], metrics.files_removed); | ||
|
||
let remove = table.state.all_tombstones().get(file).unwrap(); | ||
assert_eq!(remove.data_change, true); | ||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we validate that the provided state is the latest version of the table? Since I don't think this is a valid operation for earlier states? or maybe that is an overall issue with the operations module?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think dry run of this operation would be helpful for diagnosing issues that arise from a combination of vacuum + time travel. If executed on a older version I do expect it to fail during the commit operation.
I can add a test to demonstrate that.