Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: auto-migrate old index metadata #3428

Merged
merged 2 commits into from
Feb 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/lance-core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod deletion;
pub mod futures;
pub mod hash;
pub mod mask;
pub mod parse;
pub mod path;
pub mod testing;
pub mod tokio;
Expand Down
11 changes: 11 additions & 0 deletions rust/lance-core/src/utils/parse.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

/// Parse a string into a boolean value.
pub fn str_is_truthy(val: &str) -> bool {
val.eq_ignore_ascii_case("1")
| val.eq_ignore_ascii_case("true")
| val.eq_ignore_ascii_case("on")
| val.eq_ignore_ascii_case("yes")
| val.eq_ignore_ascii_case("y")
}
9 changes: 1 addition & 8 deletions rust/lance-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use bytes::Bytes;
use chrono::{DateTime, Utc};
use deepsize::DeepSizeOf;
use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
use lance_core::utils::parse::str_is_truthy;
use lance_core::utils::tokio::get_num_compute_intensive_cpus;
use object_store::aws::{
AmazonS3ConfigKey, AwsCredential as ObjectStoreAwsCredential, AwsCredentialProvider,
Expand Down Expand Up @@ -1039,14 +1040,6 @@ fn infer_block_size(scheme: &str) -> usize {
}
}

fn str_is_truthy(val: &str) -> bool {
val.eq_ignore_ascii_case("1")
| val.eq_ignore_ascii_case("true")
| val.eq_ignore_ascii_case("on")
| val.eq_ignore_ascii_case("yes")
| val.eq_ignore_ascii_case("y")
}

/// Attempt to create a Url from given table location.
///
/// The location could be:
Expand Down
13 changes: 6 additions & 7 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4395,10 +4395,13 @@ mod tests {
let validate_res = dataset.validate().await;
assert!(validate_res.is_err());
assert_eq!(dataset.load_indices().await.unwrap()[0].name, "vector_idx");
assert!(dataset.index_statistics("vector_idx").await.is_err());

// Force a migration
dataset.delete("false").await.unwrap();
// Calling index statistics will force a migration
let stats = dataset.index_statistics("vector_idx").await.unwrap();
let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
assert_eq!(stats["num_indexed_fragments"], 2);

dataset.checkout_latest().await.unwrap();
dataset.validate().await.unwrap();

let indices = dataset.load_indices().await.unwrap();
Expand All @@ -4408,10 +4411,6 @@ mod tests {
}
assert_eq!(get_bitmap(&indices[0]), vec![0]);
assert_eq!(get_bitmap(&indices[1]), vec![1]);

let stats = dataset.index_statistics("vector_idx").await.unwrap();
let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
assert_eq!(stats["num_indexed_fragments"], 2);
}

#[rstest]
Expand Down
60 changes: 52 additions & 8 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
//!

use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::sync::{Arc, OnceLock};

use arrow_schema::DataType;
use async_trait::async_trait;
use futures::{stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use lance_core::utils::parse::str_is_truthy;
use lance_file::reader::FileReader;
use lance_file::v2;
use lance_file::v2::reader::FileReaderOptions;
Expand Down Expand Up @@ -68,6 +69,17 @@ use self::append::merge_indices;
use self::scalar::build_scalar_index;
use self::vector::{build_vector_index, VectorIndexParams, LANCE_VECTOR_INDEX};

// Whether to auto-migrate a dataset when we encounter corruption.
fn auto_migrate_corruption() -> bool {
static LANCE_AUTO_MIGRATION: OnceLock<bool> = OnceLock::new();
*LANCE_AUTO_MIGRATION.get_or_init(|| {
std::env::var("LANCE_AUTO_MIGRATION")
.ok()
.map(|s| str_is_truthy(&s))
.unwrap_or(true)
})
}

/// Builds index.
#[async_trait]
pub trait IndexBuilder {
Expand Down Expand Up @@ -590,7 +602,8 @@ impl DatasetIndexExt for Dataset {
let index_type = indices[0].index_type().to_string();

let indexed_fragments_per_delta = self.indexed_fragments(index_name).await?;
let num_indexed_rows_per_delta = indexed_fragments_per_delta

let res = indexed_fragments_per_delta
.iter()
.map(|frags| {
let mut sum = 0;
Expand All @@ -604,18 +617,49 @@ impl DatasetIndexExt for Dataset {
}
Ok(sum)
})
.collect::<Result<Vec<_>>>()?;
.collect::<Result<Vec<_>>>();

async fn migrate_and_recompute(ds: &Dataset, index_name: &str) -> Result<String> {
let mut ds = ds.clone();
log::warn!(
"Detecting out-dated fragment metadata, migrating dataset. \
To disable migration, set LANCE_AUTO_MIGRATION=false"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably fine but this warning is a little too late 😆 . I guess users can always restore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well this is for if they hit it in a production system. Hopefully they catch it on the first table it hits.

);
ds.delete("false").await.map_err(|err| {
Error::Execution {
message: format!("Failed to migrate dataset while calculating index statistics. \
To disable migration, set LANCE_AUTO_MIGRATION=false. Original error: {}", err),
location: location!(),
}
})?;
ds.index_statistics(index_name).await
}

let num_indexed_rows_per_delta = match res {
Ok(rows) => rows,
Err(Error::Internal { message, .. })
if auto_migrate_corruption() && message.contains("trigger a single write") =>
{
return migrate_and_recompute(self, index_name).await;
}
Err(e) => return Err(e),
};

let mut fragment_ids = HashSet::new();
for frags in indexed_fragments_per_delta.iter() {
for frag in frags.iter() {
if !fragment_ids.insert(frag.id) {
return Err(Error::Internal {
message: "Overlap in indexed fragments. Please upgrade to lance >= 0.23.0 \
if auto_migrate_corruption() {
return migrate_and_recompute(self, index_name).await;
} else {
return Err(Error::Internal {
message:
"Overlap in indexed fragments. Please upgrade to lance >= 0.23.0 \
and trigger a single write to fix this"
.to_string(),
location: location!(),
});
.to_string(),
location: location!(),
});
}
}
}
}
Expand Down
Loading