diff --git a/rust/lance-core/src/utils.rs b/rust/lance-core/src/utils.rs index acf215caeb..a67cfad693 100644 --- a/rust/lance-core/src/utils.rs +++ b/rust/lance-core/src/utils.rs @@ -8,6 +8,7 @@ pub mod deletion; pub mod futures; pub mod hash; pub mod mask; +pub mod parse; pub mod path; pub mod testing; pub mod tokio; diff --git a/rust/lance-core/src/utils/parse.rs b/rust/lance-core/src/utils/parse.rs new file mode 100644 index 0000000000..7efea7cfc7 --- /dev/null +++ b/rust/lance-core/src/utils/parse.rs @@ -0,0 +1,11 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +/// Parse a string into a boolean value. +pub fn str_is_truthy(val: &str) -> bool { + val.eq_ignore_ascii_case("1") + | val.eq_ignore_ascii_case("true") + | val.eq_ignore_ascii_case("on") + | val.eq_ignore_ascii_case("yes") + | val.eq_ignore_ascii_case("y") +} diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index 061ee5b03c..a9e8d60eca 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -17,6 +17,7 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use deepsize::DeepSizeOf; use futures::{future, stream::BoxStream, StreamExt, TryStreamExt}; +use lance_core::utils::parse::str_is_truthy; use lance_core::utils::tokio::get_num_compute_intensive_cpus; use object_store::aws::{ AmazonS3ConfigKey, AwsCredential as ObjectStoreAwsCredential, AwsCredentialProvider, @@ -1039,14 +1040,6 @@ fn infer_block_size(scheme: &str) -> usize { } } -fn str_is_truthy(val: &str) -> bool { - val.eq_ignore_ascii_case("1") - | val.eq_ignore_ascii_case("true") - | val.eq_ignore_ascii_case("on") - | val.eq_ignore_ascii_case("yes") - | val.eq_ignore_ascii_case("y") -} - /// Attempt to create a Url from given table location. /// /// The location could be: diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 22f419de36..5d8d0ddf3b 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -4395,10 +4395,13 @@ mod tests { let validate_res = dataset.validate().await; assert!(validate_res.is_err()); assert_eq!(dataset.load_indices().await.unwrap()[0].name, "vector_idx"); - assert!(dataset.index_statistics("vector_idx").await.is_err()); - // Force a migration - dataset.delete("false").await.unwrap(); + // Calling index statistics will force a migration + let stats = dataset.index_statistics("vector_idx").await.unwrap(); + let stats: serde_json::Value = serde_json::from_str(&stats).unwrap(); + assert_eq!(stats["num_indexed_fragments"], 2); + + dataset.checkout_latest().await.unwrap(); dataset.validate().await.unwrap(); let indices = dataset.load_indices().await.unwrap(); @@ -4408,10 +4411,6 @@ mod tests { } assert_eq!(get_bitmap(&indices[0]), vec![0]); assert_eq!(get_bitmap(&indices[1]), vec![1]); - - let stats = dataset.index_statistics("vector_idx").await.unwrap(); - let stats: serde_json::Value = serde_json::from_str(&stats).unwrap(); - assert_eq!(stats["num_indexed_fragments"], 2); } #[rstest] diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 51f46ad312..4d15cbb995 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -5,12 +5,13 @@ //! use std::collections::{HashMap, HashSet}; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use arrow_schema::DataType; use async_trait::async_trait; use futures::{stream, StreamExt, TryStreamExt}; use itertools::Itertools; +use lance_core::utils::parse::str_is_truthy; use lance_file::reader::FileReader; use lance_file::v2; use lance_file::v2::reader::FileReaderOptions; @@ -68,6 +69,17 @@ use self::append::merge_indices; use self::scalar::build_scalar_index; use self::vector::{build_vector_index, VectorIndexParams, LANCE_VECTOR_INDEX}; +// Whether to auto-migrate a dataset when we encounter corruption. +fn auto_migrate_corruption() -> bool { + static LANCE_AUTO_MIGRATION: OnceLock = OnceLock::new(); + *LANCE_AUTO_MIGRATION.get_or_init(|| { + std::env::var("LANCE_AUTO_MIGRATION") + .ok() + .map(|s| str_is_truthy(&s)) + .unwrap_or(true) + }) +} + /// Builds index. #[async_trait] pub trait IndexBuilder { @@ -590,7 +602,8 @@ impl DatasetIndexExt for Dataset { let index_type = indices[0].index_type().to_string(); let indexed_fragments_per_delta = self.indexed_fragments(index_name).await?; - let num_indexed_rows_per_delta = indexed_fragments_per_delta + + let res = indexed_fragments_per_delta .iter() .map(|frags| { let mut sum = 0; @@ -604,18 +617,49 @@ impl DatasetIndexExt for Dataset { } Ok(sum) }) - .collect::>>()?; + .collect::>>(); + + async fn migrate_and_recompute(ds: &Dataset, index_name: &str) -> Result { + let mut ds = ds.clone(); + log::warn!( + "Detecting out-dated fragment metadata, migrating dataset. \ + To disable migration, set LANCE_AUTO_MIGRATION=false" + ); + ds.delete("false").await.map_err(|err| { + Error::Execution { + message: format!("Failed to migrate dataset while calculating index statistics. \ + To disable migration, set LANCE_AUTO_MIGRATION=false. Original error: {}", err), + location: location!(), + } + })?; + ds.index_statistics(index_name).await + } + + let num_indexed_rows_per_delta = match res { + Ok(rows) => rows, + Err(Error::Internal { message, .. }) + if auto_migrate_corruption() && message.contains("trigger a single write") => + { + return migrate_and_recompute(self, index_name).await; + } + Err(e) => return Err(e), + }; let mut fragment_ids = HashSet::new(); for frags in indexed_fragments_per_delta.iter() { for frag in frags.iter() { if !fragment_ids.insert(frag.id) { - return Err(Error::Internal { - message: "Overlap in indexed fragments. Please upgrade to lance >= 0.23.0 \ + if auto_migrate_corruption() { + return migrate_and_recompute(self, index_name).await; + } else { + return Err(Error::Internal { + message: + "Overlap in indexed fragments. Please upgrade to lance >= 0.23.0 \ and trigger a single write to fix this" - .to_string(), - location: location!(), - }); + .to_string(), + location: location!(), + }); + } } } }