Skip to content

Commit

Permalink
feat: introduce an intermediate version that is compatible with both โ€ฆ (
Browse files Browse the repository at this point in the history
#16354)

* feat: introduce an intermediate version that is compatible with both the old version and vacuum2

* make lint

* add ut
  • Loading branch information
SkyFan2002 authored Sep 5, 2024
1 parent 2c8f5a8 commit 015778c
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 31 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ tonic = { version = "0.11.0", features = ["transport", "codegen", "prost", "tls-
tonic-build = { version = "0.11" }
tonic-reflection = { version = "0.11.0" }
typetag = "0.2.3"
uuid = { version = "1.1.2", features = ["serde", "v4"] }
uuid = { version = "1.10.0", features = ["serde", "v4", "v7"] }
walkdir = "2.3.2"
xorfilter-rs = "0.5"

Expand Down
17 changes: 8 additions & 9 deletions src/query/catalog/src/plan/internal_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::path::Path;

use databend_common_arrow::arrow::bitmap::MutableBitmap;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand Down Expand Up @@ -44,6 +42,7 @@ use databend_common_expression::SEARCH_MATCHED_COLUMN_ID;
use databend_common_expression::SEARCH_SCORE_COLUMN_ID;
use databend_common_expression::SEGMENT_NAME_COLUMN_ID;
use databend_common_expression::SNAPSHOT_NAME_COLUMN_ID;
use databend_storages_common_table_meta::meta::try_extract_uuid_str_from_path;
use databend_storages_common_table_meta::meta::NUM_BLOCK_ID_BITS;

// Segment and Block id Bits when generate internal column `_row_id`
Expand Down Expand Up @@ -261,13 +260,13 @@ impl InternalColumn {
)
}
InternalColumnType::BaseRowId => {
let file_stem = Path::new(&meta.block_location).file_stem().unwrap();
let file_strs = file_stem
.to_str()
.unwrap_or("")
.split('_')
.collect::<Vec<&str>>();
let uuid = file_strs[0];
let uuid =
try_extract_uuid_str_from_path(&meta.block_location).unwrap_or_else(|e| {
panic!(
"Internal error: block_location {} should be a valid table object key: {}",
&meta.block_location, e
)
});
let mut row_ids = Vec::with_capacity(num_rows);
if let Some(offsets) = &meta.offsets {
for i in offsets {
Expand Down
22 changes: 5 additions & 17 deletions src/query/catalog/src/plan/stream_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::any::Any;
use std::path::Path;
use std::sync::Arc;

use databend_common_base::base::uuid::Uuid;
Expand All @@ -40,6 +39,7 @@ use databend_common_expression::ORIGIN_BLOCK_ID_COLUMN_ID;
use databend_common_expression::ORIGIN_BLOCK_ROW_NUM_COLUMN_ID;
use databend_common_expression::ORIGIN_VERSION_COLUMN_ID;
use databend_common_expression::ROW_VERSION_COLUMN_ID;
use databend_storages_common_table_meta::meta::try_extract_uuid_str_from_path;

use crate::plan::PartInfo;
use crate::plan::PartInfoPtr;
Expand Down Expand Up @@ -222,22 +222,10 @@ impl StreamColumn {
}

pub fn block_id_from_location(path: &str) -> Result<i128> {
if let Some(file_stem) = Path::new(path).file_stem() {
let file_strs = file_stem
.to_str()
.unwrap_or("")
.split('_')
.collect::<Vec<&str>>();
let block_id = Uuid::parse_str(file_strs[0])
.map_err(|e| e.to_string())?
.as_u128();
Ok(block_id as i128)
} else {
Err(ErrorCode::Internal(format!(
"Illegal meta file format: {}",
path
)))
}
let uuid = try_extract_uuid_str_from_path(path)
.map_err(|e| e.add_message(format!("invalid block path {}", path)))?;
let block_id = Uuid::parse_str(uuid).map_err(|e| e.to_string())?.as_u128();
Ok(block_id as i128)
}

pub fn gen_mutation_stream_meta(
Expand Down
3 changes: 3 additions & 0 deletions src/query/storages/common/table_meta/src/meta/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ pub use statistics::*;
// currently, used by versioned readers only
pub(crate) use testing::*;
pub use utils::parse_storage_prefix;
pub use utils::trim_vacuum2_object_prefix;
pub use utils::try_extract_uuid_str_from_path;
pub use utils::TEMP_TABLE_STORAGE_PREFIX;
pub use utils::VACUUM2_OBJECT_KEY_PREFIX;
pub(crate) use utils::*;
pub use versions::testify_version;
pub use versions::SegmentInfoVersion;
Expand Down
67 changes: 67 additions & 0 deletions src/query/storages/common/table_meta/src/meta/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::BTreeMap;
use std::ops::Add;
use std::path::Path;

use chrono::DateTime;
use chrono::Datelike;
Expand All @@ -29,6 +30,7 @@ use crate::table::OPT_KEY_STORAGE_PREFIX;
use crate::table::OPT_KEY_TEMP_PREFIX;

pub const TEMP_TABLE_STORAGE_PREFIX: &str = "_tmp_tbl";
pub const VACUUM2_OBJECT_KEY_PREFIX: &str = "g";

pub fn trim_timestamp_to_micro_second(ts: DateTime<Utc>) -> DateTime<Utc> {
Utc.with_ymd_and_hms(
Expand Down Expand Up @@ -78,3 +80,68 @@ pub fn parse_storage_prefix(options: &BTreeMap<String, String>, table_id: u64) -
}
Ok(prefix)
}

#[inline]
pub fn trim_vacuum2_object_prefix(key: &str) -> &str {
key.strip_prefix(VACUUM2_OBJECT_KEY_PREFIX).unwrap_or(key)
}

// Extracts the UUID part from the object key.
// For example, given a path like:
// bucket/root/115/122/_b/g0191114d30fd78b89fae8e5c88327725_v2.parquet
// bucket/root/115/122/_b/0191114d30fd78b89fae8e5c88327725_v2.parquet
// The function should return: 0191114d30fd78b89fae8e5c88327725
pub fn try_extract_uuid_str_from_path(path: &str) -> databend_common_exception::Result<&str> {
if let Some(file_stem) = Path::new(path).file_stem() {
let file_name = file_stem
.to_str()
.unwrap() // path is always valid utf8 string
.split('_')
.collect::<Vec<&str>>();
let uuid = trim_vacuum2_object_prefix(file_name[0]);
Ok(uuid)
} else {
Err(ErrorCode::StorageOther(format!(
"Illegal object key, no file stem found: {}",
path
)))
}
}

#[cfg(test)]
mod tests {
use databend_common_base::base::uuid::Uuid;

use super::*;

#[test]
fn test_trim_vacuum2_object_prefix() {
let uuid = Uuid::now_v7();
assert_eq!(
trim_vacuum2_object_prefix(&format!("g{}", uuid)),
uuid.to_string()
);
assert_eq!(
trim_vacuum2_object_prefix(&uuid.to_string()),
uuid.to_string()
);
}

#[test]
fn test_try_extract_uuid_str_from_path() {
let test_cases = vec![
(
"bucket/root/115/122/_b/g0191114d30fd78b89fae8e5c88327725_v2.parquet",
"0191114d30fd78b89fae8e5c88327725",
),
(
"bucket/root/115/122/_b/0191114d30fd78b89fae8e5c88327725_v2.parquet",
"0191114d30fd78b89fae8e5c88327725",
),
];

for (input, expected) in test_cases {
assert_eq!(try_extract_uuid_str_from_path(input).unwrap(), expected);
}
}
}
5 changes: 5 additions & 0 deletions src/query/storages/common/table_meta/src/meta/v4/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ pub struct TableSnapshot {
/// The metadata of the cluster keys.
pub cluster_key_meta: Option<ClusterKey>,
pub table_statistics_location: Option<String>,

pub least_visible_timestamp: Option<DateTime<Utc>>,
}

impl TableSnapshot {
Expand Down Expand Up @@ -120,6 +122,7 @@ impl TableSnapshot {
segments,
cluster_key_meta,
table_statistics_location,
least_visible_timestamp: None,
}
}

Expand Down Expand Up @@ -234,6 +237,7 @@ impl From<v2::TableSnapshot> for TableSnapshot {
segments: s.segments,
cluster_key_meta: s.cluster_key_meta,
table_statistics_location: s.table_statistics_location,
least_visible_timestamp: None,
}
}
}
Expand All @@ -256,6 +260,7 @@ where T: Into<v3::TableSnapshot>
segments: s.segments,
cluster_key_meta: s.cluster_key_meta,
table_statistics_location: s.table_statistics_location,
least_visible_timestamp: None,
}
}
}
Expand Down
18 changes: 14 additions & 4 deletions src/query/storages/fuse/src/io/locations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ use std::marker::PhantomData;

use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_storages_common_table_meta::meta::trim_vacuum2_object_prefix;
use databend_storages_common_table_meta::meta::Location;
use databend_storages_common_table_meta::meta::SegmentInfo;
use databend_storages_common_table_meta::meta::SnapshotVersion;
use databend_storages_common_table_meta::meta::TableSnapshotStatisticsVersion;
use databend_storages_common_table_meta::meta::Versioned;
use databend_storages_common_table_meta::meta::VACUUM2_OBJECT_KEY_PREFIX;
use uuid::Uuid;
use uuid::Version;

use crate::constants::FUSE_TBL_BLOCK_PREFIX;
use crate::constants::FUSE_TBL_SEGMENT_PREFIX;
Expand All @@ -34,7 +37,6 @@ use crate::FUSE_TBL_AGG_INDEX_PREFIX;
use crate::FUSE_TBL_INVERTED_INDEX_PREFIX;
use crate::FUSE_TBL_LAST_SNAPSHOT_HINT;
use crate::FUSE_TBL_XOR_BLOOM_INDEX_PREFIX;

static SNAPSHOT_V0: SnapshotVersion = SnapshotVersion::V0(PhantomData);
static SNAPSHOT_V1: SnapshotVersion = SnapshotVersion::V1(PhantomData);
static SNAPSHOT_V2: SnapshotVersion = SnapshotVersion::V2(PhantomData);
Expand Down Expand Up @@ -170,7 +172,7 @@ impl TableMetaLocationGenerator {
let splits = loc.split('/').collect::<Vec<_>>();
let len = splits.len();
let prefix = splits[..len - 2].join("/");
let block_name = splits[len - 1];
let block_name = trim_vacuum2_object_prefix(splits[len - 1]);
format!("{prefix}/{FUSE_TBL_AGG_INDEX_PREFIX}/{index_id}/{block_name}")
}

Expand All @@ -182,7 +184,7 @@ impl TableMetaLocationGenerator {
let splits = loc.split('/').collect::<Vec<_>>();
let len = splits.len();
let prefix = splits[..len - 2].join("/");
let block_name = splits[len - 1];
let block_name = trim_vacuum2_object_prefix(splits[len - 1]);
let id: String = block_name.chars().take(32).collect();
let short_ver: String = index_version.chars().take(7).collect();
format!(
Expand All @@ -204,8 +206,16 @@ trait SnapshotLocationCreator {

impl SnapshotLocationCreator for SnapshotVersion {
fn create(&self, id: &Uuid, prefix: impl AsRef<str>) -> String {
let vacuum_prefix = if id
.get_version()
.is_some_and(|v| matches!(v, Version::SortRand))
{
VACUUM2_OBJECT_KEY_PREFIX
} else {
""
};
format!(
"{}/{}/{}{}",
"{}/{}/{vacuum_prefix}{}{}",
prefix.as_ref(),
FUSE_TBL_SNAPSHOT_PREFIX,
id.simple(),
Expand Down

0 comments on commit 015778c

Please sign in to comment.